[统计]记录流量(以5分钟作为间隔)

This commit is contained in:
GoEdgeLab
2020-12-11 17:28:20 +08:00
parent 806cd069b5
commit d2f72d7e87
9 changed files with 135 additions and 44 deletions

View File

@@ -476,6 +476,7 @@ func (this *NodeDAO) ComposeNodeConfig(nodeId int64) (*nodeconfigs.NodeConfig, e
Version: int64(node.Version), Version: int64(node.Version),
Name: node.Name, Name: node.Name,
MaxCPU: types.Int32(node.MaxCPU), MaxCPU: types.Int32(node.MaxCPU),
RegionId: int64(node.RegionId),
} }
// 获取所有的服务 // 获取所有的服务

View File

@@ -0,0 +1,57 @@
package models
import (
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/maps"
timeutil "github.com/iwind/TeaGo/utils/time"
)
type ServerDailyStatDAO dbs.DAO
func NewServerDailyStatDAO() *ServerDailyStatDAO {
return dbs.NewDAO(&ServerDailyStatDAO{
DAOObject: dbs.DAOObject{
DB: Tea.Env,
Table: "edgeServerDailyStats",
Model: new(ServerDailyStat),
PkName: "id",
},
}).(*ServerDailyStatDAO)
}
var SharedServerDailyStatDAO *ServerDailyStatDAO
func init() {
dbs.OnReady(func() {
SharedServerDailyStatDAO = NewServerDailyStatDAO()
})
}
// 提交数据
func (this *ServerDailyStatDAO) SaveStats(stats []*pb.ServerDailyStat) error {
for _, stat := range stats {
day := timeutil.FormatTime("Ymd", stat.CreatedAt)
timeFrom := timeutil.FormatTime("His", stat.CreatedAt)
timeTo := timeutil.FormatTime("His", stat.CreatedAt+5*60) // 5分钟
_, _, err := this.Query().
Param("bytes", stat.Bytes).
InsertOrUpdate(maps.Map{
"serverId": stat.ServerId,
"regionId": stat.RegionId,
"bytes": dbs.SQL("bytes+:bytes"),
"day": day,
"timeFrom": timeFrom,
"timeTo": timeTo,
}, maps.Map{
"bytes": dbs.SQL("bytes+:bytes"),
})
if err != nil {
return err
}
}
return nil
}

View File

@@ -0,0 +1,39 @@
package models
import (
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
_ "github.com/go-sql-driver/mysql"
"testing"
)
func TestServerDailyStatDAO_SaveStats(t *testing.T) {
stats := []*pb.ServerDailyStat{
{
ServerId: 1,
RegionId: 2,
Bytes: 1,
CreatedAt: 1607671488,
},
}
err := NewServerDailyStatDAO().SaveStats(stats)
if err != nil {
t.Fatal(err)
}
t.Log("ok")
}
func TestServerDailyStatDAO_SaveStats2(t *testing.T) {
stats := []*pb.ServerDailyStat{
{
ServerId: 1,
RegionId: 3,
Bytes: 1,
CreatedAt: 1607671488,
},
}
err := NewServerDailyStatDAO().SaveStats(stats)
if err != nil {
t.Fatal(err)
}
t.Log("ok")
}

View File

@@ -1,7 +1,7 @@
package models package models
// 计费流量统计 // 计费流量统计
type ServerStat struct { type ServerDailyStat struct {
Id uint64 `field:"id"` // ID Id uint64 `field:"id"` // ID
ServerId uint32 `field:"serverId"` // 服务ID ServerId uint32 `field:"serverId"` // 服务ID
RegionId uint32 `field:"regionId"` // 区域ID RegionId uint32 `field:"regionId"` // 区域ID
@@ -11,7 +11,7 @@ type ServerStat struct {
TimeTo string `field:"timeTo"` // 结束时间 TimeTo string `field:"timeTo"` // 结束时间
} }
type ServerStatOperator struct { type ServerDailyStatOperator struct {
Id interface{} // ID Id interface{} // ID
ServerId interface{} // 服务ID ServerId interface{} // 服务ID
RegionId interface{} // 区域ID RegionId interface{} // 区域ID
@@ -21,6 +21,6 @@ type ServerStatOperator struct {
TimeTo interface{} // 结束时间 TimeTo interface{} // 结束时间
} }
func NewServerStatOperator() *ServerStatOperator { func NewServerDailyStatOperator() *ServerDailyStatOperator {
return &ServerStatOperator{} return &ServerDailyStatOperator{}
} }

View File

@@ -1,28 +0,0 @@
package models
import (
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
)
type ServerStatDAO dbs.DAO
func NewServerStatDAO() *ServerStatDAO {
return dbs.NewDAO(&ServerStatDAO{
DAOObject: dbs.DAOObject{
DB: Tea.Env,
Table: "edgeServerStats",
Model: new(ServerStat),
PkName: "id",
},
}).(*ServerStatDAO)
}
var SharedServerStatDAO *ServerStatDAO
func init() {
dbs.OnReady(func() {
SharedServerStatDAO = NewServerStatDAO()
})
}

View File

@@ -1,5 +0,0 @@
package models
import (
_ "github.com/go-sql-driver/mysql"
)

View File

@@ -203,6 +203,7 @@ func (this *APINode) listenRPC(listener net.Listener, tlsConfig *tls.Config) err
pb.RegisterACMETaskServiceServer(rpcServer, &services.ACMETaskService{}) pb.RegisterACMETaskServiceServer(rpcServer, &services.ACMETaskService{})
pb.RegisterACMEAuthenticationServiceServer(rpcServer, &services.ACMEAuthenticationService{}) pb.RegisterACMEAuthenticationServiceServer(rpcServer, &services.ACMEAuthenticationService{})
pb.RegisterUserServiceServer(rpcServer, &services.UserService{}) pb.RegisterUserServiceServer(rpcServer, &services.UserService{})
pb.RegisterServerDailyStatServiceServer(rpcServer, &services.ServerDailyStatService{})
err := rpcServer.Serve(listener) err := rpcServer.Serve(listener)
if err != nil { if err != nil {
return errors.New("[API_NODE]start rpc failed: " + err.Error()) return errors.New("[API_NODE]start rpc failed: " + err.Error())

View File

@@ -0,0 +1,26 @@
package services
import (
"context"
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
)
// 服务统计相关服务
type ServerDailyStatService struct {
BaseService
}
// 上传统计
func (this *ServerDailyStatService) UploadServerDailyStats(ctx context.Context, req *pb.UploadServerDailyStatsRequest) (*pb.RPCSuccess, error) {
_, err := this.ValidateNode(ctx)
if err != nil {
return nil, err
}
err = models.SharedServerDailyStatDAO.SaveStats(req.Stats)
if err != nil {
return nil, err
}
return this.Success()
}