diff --git a/go.mod b/go.mod index 2e6b17e..6ff4379 100644 --- a/go.mod +++ b/go.mod @@ -14,11 +14,8 @@ require ( github.com/golang/protobuf v1.5.2 github.com/iwind/TeaGo v0.0.0-20210628135026-38575a4ab060 github.com/iwind/gofcgi v0.0.0-20210528023741-a92711d45f11 - github.com/json-iterator/go v1.1.11 // indirect github.com/lionsoul2014/ip2region v2.2.0-release+incompatible - github.com/mattn/go-sqlite3 v1.14.7 - github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect - github.com/modern-go/reflect2 v1.0.1 // indirect + github.com/mattn/go-sqlite3 v2.0.3+incompatible github.com/mssola/user_agent v0.5.2 github.com/shirou/gopsutil v3.21.5+incompatible github.com/tklauser/go-sysconf v0.3.6 // indirect diff --git a/go.sum b/go.sum index 3ac5bc0..9abf335 100644 --- a/go.sum +++ b/go.sum @@ -48,7 +48,6 @@ github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrU github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= -github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= @@ -57,20 +56,17 @@ github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5a github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= -github.com/iwind/TeaGo v0.0.0-20210411134150-ddf57e240c2f h1:r2O8PONj/KiuZjJHVHn7KlCePUIjNtgAmvLfgRafQ8o= -github.com/iwind/TeaGo v0.0.0-20210411134150-ddf57e240c2f/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc= github.com/iwind/TeaGo v0.0.0-20210628135026-38575a4ab060 h1:qdLtK4PDXxk2vMKkTWl5Fl9xqYuRCukzWAgJbLHdfOo= github.com/iwind/TeaGo v0.0.0-20210628135026-38575a4ab060/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc= github.com/iwind/gofcgi v0.0.0-20210528023741-a92711d45f11 h1:DaQjoWZhLNxjhIXedVg4/vFEtHkZhK4IjIwsWdyzBLg= github.com/iwind/gofcgi v0.0.0-20210528023741-a92711d45f11/go.mod h1:JtbX20untAjUVjZs1ZBtq80f5rJWvwtQNRL6EnuYRnY= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= -github.com/json-iterator/go v1.1.11 h1:uVUAXhF2To8cbw/3xN3pxj6kk7TYKs98NIrTqPlMWAQ= github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -78,13 +74,11 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/lionsoul2014/ip2region v2.2.0-release+incompatible h1:1qp9iks+69h7IGLazAplzS9Ca14HAxuD5c0rbFdPGy4= github.com/lionsoul2014/ip2region v2.2.0-release+incompatible/go.mod h1:+ZBN7PBoh5gG6/y0ZQ85vJDBe21WnfbRrQQwTfliJJI= -github.com/mattn/go-sqlite3 v1.14.7 h1:fxWBnXkxfM6sRiuH3bqJ4CfzZojMOLVc0UTsTglEghA= -github.com/mattn/go-sqlite3 v1.14.7/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= +github.com/mattn/go-sqlite3 v2.0.3+incompatible h1:gXHsfypPkaMZrKbD5209QV9jbUTJKjyR5WD3HYQSd+U= +github.com/mattn/go-sqlite3 v2.0.3+incompatible/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= -github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= -github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/mssola/user_agent v0.5.2 h1:CZkTUahjL1+OcZ5zv3kZr8QiJ8jy2H08vZIEkBeRbxo= github.com/mssola/user_agent v0.5.2/go.mod h1:TTPno8LPY3wAIEKRpAtkdMT0f8SE24pLRGPahjCH4uw= @@ -102,8 +96,6 @@ github.com/opentracing/opentracing-go v1.1.1-0.20190913142402-a7454ce5950e/go.mo github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= -github.com/shirou/gopsutil v2.20.9+incompatible h1:msXs2frUV+O/JLva9EDLpuJ84PrFsdCTCQex8PUdtkQ= -github.com/shirou/gopsutil v2.20.9+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/shirou/gopsutil v3.21.5+incompatible h1:OloQyEerMi7JUrXiNzy8wQ5XN+baemxSl12QgIzt0jc= github.com/shirou/gopsutil v3.21.5+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ= @@ -142,7 +134,6 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7 h1:AeiKBIuRw3UomYXSbLy0Mc2dDLfdtbT/IVn4keq83P0= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210614182718-04defd469f4e h1:XpT3nA5TvE525Ne3hInMh6+GETgn27Zfm9dxsThnX2Q= @@ -163,10 +154,8 @@ golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20191010194322-b09406accb47/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200519105757-fe76b779f299 h1:DYfZAGf2WMFjMxbgTjaC+2HC7NkNAQs+6Q8b9WEB/F4= golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210316164454-77fc1eacc6aa h1:ZYxPR6aca/uhfRJyaOAtflSHjJYiktO7QnJC5ut7iY4= golang.org/x/sys v0.0.0-20210316164454-77fc1eacc6aa/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -175,7 +164,6 @@ golang.org/x/sys v0.0.0-20210616094352-59db8d763f22 h1:RqytpXGR1iVNX7psjB3ff8y7s golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= @@ -192,15 +180,14 @@ golang.org/x/tools v0.0.0-20200207183749-b753a1ba74fa/go.mod h1:TB2adYChydJhpapK golang.org/x/tools v0.1.3/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20191009194640-548a555dbc03/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= -google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/genproto v0.0.0-20210617175327-b9e0b3197ced h1:c5geK1iMU3cDKtFrCVQIcjR3W+JOZMuhIyICMCTbtus= google.golang.org/genproto v0.0.0-20210617175327-b9e0b3197ced/go.mod h1:SzzZ/N+nwJDaO1kznhnlzqS8ocJICar6hYhVyhi++24= @@ -209,7 +196,6 @@ google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyac google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= -google.golang.org/grpc v1.32.0 h1:zWTV+LMdc3kaiJMSTOFz2UgSBgx8RNQoTGiZu3fR9S0= google.golang.org/grpc v1.32.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= google.golang.org/grpc v1.38.0 h1:/9BgsAsa5nWe26HqOlvlgJnqBuktYOLCgjCPqsa56W0= google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= @@ -221,7 +207,6 @@ google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzi google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= -google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= diff --git a/internal/metrics/stat.go b/internal/metrics/stat.go index f3ea3ce..698e958 100644 --- a/internal/metrics/stat.go +++ b/internal/metrics/stat.go @@ -17,6 +17,6 @@ type Stat struct { keysData []byte } -func (this *Stat) Sum(version int, itemId int64) { - this.Hash = strconv.FormatUint(xxhash.Sum64String(strconv.FormatInt(this.ServerId, 10)+"@"+string(this.keysData)+"@"+this.Time+"@"+strconv.Itoa(version)+"@"+strconv.FormatInt(itemId, 10)), 10) +func (this *Stat) Sum(version int32, itemId int64) { + this.Hash = strconv.FormatUint(xxhash.Sum64String(strconv.FormatInt(this.ServerId, 10)+"@"+string(this.keysData)+"@"+this.Time+"@"+strconv.Itoa(int(version))+"@"+strconv.FormatInt(itemId, 10)), 10) } diff --git a/internal/metrics/task.go b/internal/metrics/task.go index b441da3..6dfb993 100644 --- a/internal/metrics/task.go +++ b/internal/metrics/task.go @@ -41,14 +41,16 @@ type Task struct { cleanTicker *utils.Ticker uploadTicker *utils.Ticker - cleanVersion int + cleanVersion int32 insertStatStmt *sql.Stmt deleteByVersionStmt *sql.Stmt deleteByExpiresTimeStmt *sql.Stmt selectTopStmt *sql.Stmt + sumStmt *sql.Stmt - serverIdMap map[int64]bool // 所有的服务Ids + serverIdMap map[int64]bool // 所有的服务Ids + timeMap map[string]bool // time => bool serverIdMapLocker sync.Mutex } @@ -58,6 +60,7 @@ func NewTask(item *serverconfigs.MetricItemConfig) *Task { item: item, statsChan: make(chan *Stat, 40960), serverIdMap: map[int64]bool{}, + timeMap: map[string]bool{}, } } @@ -128,7 +131,13 @@ ON "` + this.statTableName + `" ( } // select topN stmt - this.selectTopStmt, err = db.Prepare(`SELECT "id", "serverId", "hash", "keys", "value", "time", "version", "isUploaded" FROM "` + this.statTableName + `" WHERE "serverId"=? AND "version"=? ORDER BY "value" DESC LIMIT 100`) + this.selectTopStmt, err = db.Prepare(`SELECT "id", "hash", "keys", "value", "isUploaded" FROM "` + this.statTableName + `" WHERE "serverId"=? AND "version"=? AND time=? ORDER BY "value" DESC LIMIT 100`) + if err != nil { + return err + } + + // sum stmt + this.sumStmt, err = db.Prepare(`SELECT COUNT(*), IFNULL(SUM(value), 0) FROM "` + this.statTableName + `" WHERE "serverId"=? AND "version"=? AND time=?`) if err != nil { return err } @@ -162,7 +171,7 @@ func (this *Task) Start() error { // 清理 this.cleanTicker = utils.NewTicker(24 * time.Hour) go func() { - if this.cleanTicker.Next() { + for this.cleanTicker.Next() { err := this.CleanExpired() if err != nil { remotelogs.Error("METRIC", "clean expired stats failed: "+err.Error()) @@ -173,7 +182,7 @@ func (this *Task) Start() error { // 上传 this.uploadTicker = utils.NewTicker(this.item.UploadDuration()) go func() { - if this.uploadTicker.Next() { + for this.uploadTicker.Next() { err := this.Upload(1 * time.Second) if err != nil { remotelogs.Error("METRIC", "upload stats failed: "+err.Error()) @@ -230,6 +239,7 @@ func (this *Task) Stop() error { _ = this.deleteByVersionStmt.Close() _ = this.deleteByExpiresTimeStmt.Close() _ = this.selectTopStmt.Close() + _ = this.sumStmt.Close() if this.statsChan != nil { go func() { @@ -257,6 +267,7 @@ func (this *Task) InsertStat(stat *Stat) error { this.serverIdMapLocker.Lock() this.serverIdMap[stat.ServerId] = true + this.timeMap[stat.Time] = true this.serverIdMapLocker.Unlock() keyData, err := json.Marshal(stat.Keys) @@ -289,7 +300,7 @@ func (this *Task) CleanExpired() error { } // 清除过期的数据 - _, err := this.deleteByExpiresTimeStmt.Exec(this.item.ExpiresTime()) + _, err := this.deleteByExpiresTimeStmt.Exec(this.item.LocalExpiresTime()) if err != nil { return err } @@ -304,11 +315,21 @@ func (this *Task) Upload(pauseDuration time.Duration) error { } this.serverIdMapLocker.Lock() + + // 服务IDs var serverIds []int64 for serverId := range this.serverIdMap { serverIds = append(serverIds, serverId) } this.serverIdMap = map[int64]bool{} // 清空数据 + + // 时间 + var times = []string{} + for t := range this.timeMap { + times = append(times, t) + } + this.timeMap = map[string]bool{} // 清空数据 + this.serverIdMapLocker.Unlock() rpcClient, err := rpc.SharedRPC() @@ -317,72 +338,87 @@ func (this *Task) Upload(pauseDuration time.Duration) error { } for _, serverId := range serverIds { - idStrings, err := func(serverId int64) (ids []string, err error) { - rows, err := this.selectTopStmt.Query(serverId, this.item.Version) - if err != nil { - return nil, err - } - var isClosed bool - defer func() { - if isClosed { - return - } - _ = rows.Close() - }() - - var pbStats []*pb.MetricStat - for rows.Next() { - var pbStat = &pb.MetricStat{ - ItemId: this.item.Id, - } - // "id", "serverId", "hash", "keys", "value", "time", "version", "isUploaded" - var isUploaded int - var keysData []byte - err = rows.Scan(&pbStat.Id, &pbStat.ServerId, &pbStat.Hash, &keysData, &pbStat.Value, &pbStat.Time, &pbStat.Version, &isUploaded) + for _, currentTime := range times { + idStrings, err := func(serverId int64, currentTime string) (ids []string, err error) { + rows, err := this.selectTopStmt.Query(serverId, this.item.Version, currentTime) if err != nil { return nil, err } - if isUploaded == 1 { - continue + var isClosed bool + defer func() { + if isClosed { + return + } + _ = rows.Close() + }() + + var pbStats []*pb.UploadingMetricStat + for rows.Next() { + var pbStat = &pb.UploadingMetricStat{ + } + // "id", "hash", "keys", "value", "isUploaded" + var isUploaded int + var keysData []byte + err = rows.Scan(&pbStat.Id, &pbStat.Hash, &keysData, &pbStat.Value, &isUploaded) + if err != nil { + return nil, err + } + if isUploaded == 1 { + continue + } + if len(keysData) > 0 { + err = json.Unmarshal(keysData, &pbStat.Keys) + if err != nil { + return nil, err + } + } + pbStats = append(pbStats, pbStat) + ids = append(ids, strconv.FormatInt(pbStat.Id, 10)) } - if len(keysData) > 0 { - err = json.Unmarshal(keysData, &pbStat.Keys) + + // 提前关闭 + _ = rows.Close() + isClosed = true + + // 上传 + if len(pbStats) > 0 { + // 计算总和 + count, total, err := this.sum(serverId, currentTime) + if err != nil { + return nil, err + } + + _, err = rpcClient.MetricStatRPC().UploadMetricStats(rpcClient.Context(), &pb.UploadMetricStatsRequest{ + MetricStats: pbStats, + Time: currentTime, + ServerId: serverId, + ItemId: this.item.Id, + Version: this.item.Version, + Count: count, + Total: float32(total), + }) if err != nil { return nil, err } } - pbStats = append(pbStats, pbStat) - ids = append(ids, strconv.FormatInt(pbStat.Id, 10)) - } - // 提前关闭 - _ = rows.Close() - isClosed = true - - // 上传 - if len(pbStats) > 0 { - _, err = rpcClient.MetricStatRPC().UploadMetricStats(rpcClient.Context(), &pb.UploadMetricStatsRequest{MetricStats: pbStats}) - if err != nil { - return nil, err - } - } - - return - }(serverId) - if err != nil { - return err - } - - if len(idStrings) > 0 { - // 设置为已上传 - _, err = this.db.Exec(`UPDATE "` + this.statTableName + `" SET isUploaded=1 WHERE id IN (` + strings.Join(idStrings, ",") + `)`) + return + }(serverId, currentTime) if err != nil { return err } + + if len(idStrings) > 0 { + // 设置为已上传 + _, err = this.db.Exec(`UPDATE "` + this.statTableName + `" SET isUploaded=1 WHERE id IN (` + strings.Join(idStrings, ",") + `)`) + if err != nil { + return err + } + } } // 休息一下,防止短时间内上传数据过多 - if pauseDuration > 0 && len(idStrings) > 0 { + if pauseDuration > 0 { time.Sleep(pauseDuration) } } @@ -392,23 +428,65 @@ func (this *Task) Upload(pauseDuration time.Duration) error { // 加载服务ID func (this *Task) loadServerIdMap() error { - rows, err := this.db.Query(`SELECT DISTINCT "serverId" FROM `+this.statTableName+" WHERE version=?", this.item.Version) + { + rows, err := this.db.Query(`SELECT DISTINCT "serverId" FROM `+this.statTableName+" WHERE version=?", this.item.Version) + if err != nil { + return err + } + defer func() { + _ = rows.Close() + }() + + var serverId int64 + for rows.Next() { + err = rows.Scan(&serverId) + if err != nil { + return err + } + this.serverIdMapLocker.Lock() + this.serverIdMap[serverId] = true + this.serverIdMapLocker.Unlock() + } + } + + { + rows, err := this.db.Query(`SELECT DISTINCT "time" FROM `+this.statTableName+" WHERE version=?", this.item.Version) + if err != nil { + return err + } + defer func() { + _ = rows.Close() + }() + + var timeString string + for rows.Next() { + err = rows.Scan(&timeString) + if err != nil { + return err + } + this.serverIdMapLocker.Lock() + this.timeMap[timeString] = true + this.serverIdMapLocker.Unlock() + } + } + + return nil +} + +// 计算数量和综合 +func (this *Task) sum(serverId int64, time string) (count int64, total float64, err error) { + rows, err := this.sumStmt.Query(serverId, this.item.Version, time) if err != nil { - return err + return 0, 0, err } defer func() { _ = rows.Close() }() - - var serverId int64 - for rows.Next() { - err = rows.Scan(&serverId) + if rows.Next() { + err = rows.Scan(&count, &total) if err != nil { - return err + return 0, 0, err } - this.serverIdMapLocker.Lock() - this.serverIdMap[serverId] = true - this.serverIdMapLocker.Unlock() } - return nil + return } diff --git a/internal/metrics/task_test.go b/internal/metrics/task_test.go index bb51623..f5b88af 100644 --- a/internal/metrics/task_test.go +++ b/internal/metrics/task_test.go @@ -193,6 +193,7 @@ func TestTask_Upload(t *testing.T) { if err != nil { t.Fatal(err) } + err = task.Start() if err != nil { t.Fatal(err)