各项统计支持单节点多集群

This commit is contained in:
GoEdgeLab
2021-08-01 11:13:46 +08:00
parent 83ae3095cd
commit e459346e36
4 changed files with 26 additions and 8 deletions

View File

@@ -7,6 +7,7 @@ import (
"github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/maps" "github.com/iwind/TeaGo/maps"
"github.com/iwind/TeaGo/types"
timeutil "github.com/iwind/TeaGo/utils/time" timeutil "github.com/iwind/TeaGo/utils/time"
"time" "time"
) )
@@ -95,10 +96,18 @@ func (this *NodeValueDAO) ListValues(tx *dbs.Tx, role string, nodeId int64, item
func (this *NodeValueDAO) ListValuesWithClusterId(tx *dbs.Tx, role string, clusterId int64, item string, key string, timeRange nodeconfigs.NodeValueRange) (result []*NodeValue, err error) { func (this *NodeValueDAO) ListValuesWithClusterId(tx *dbs.Tx, role string, clusterId int64, item string, key string, timeRange nodeconfigs.NodeValueRange) (result []*NodeValue, err error) {
query := this.Query(tx). query := this.Query(tx).
Attr("role", role). Attr("role", role).
Attr("clusterId", clusterId).
Attr("item", item). Attr("item", item).
Result("AVG(JSON_EXTRACT(value, '$." + key + "')) AS value, MIN(createdAt) AS createdAt") Result("AVG(JSON_EXTRACT(value, '$." + key + "')) AS value, MIN(createdAt) AS createdAt")
switch role {
case nodeconfigs.NodeRoleNode:
query.Where("nodeId IN (SELECT id FROM " + SharedNodeDAO.Table + " WHERE (clusterId=:clusterId OR JSON_CONTAINS(secondaryClusterIds, :clusterIdString)) AND state=1)")
query.Param("clusterId", clusterId).
Param("clusterIdString", types.String(clusterId))
default:
query.Attr("clusterId", clusterId)
}
switch timeRange { switch timeRange {
// TODO 支持更多的时间范围 // TODO 支持更多的时间范围
case nodeconfigs.NodeValueRangeMinute: case nodeconfigs.NodeValueRangeMinute:
@@ -140,7 +149,6 @@ func (this *NodeValueDAO) ListValuesForUserNodes(tx *dbs.Tx, item string, key st
return return
} }
// ListValuesForNSNodes 列出用户节点相关的平均数据 // ListValuesForNSNodes 列出用户节点相关的平均数据
func (this *NodeValueDAO) ListValuesForNSNodes(tx *dbs.Tx, item string, key string, timeRange nodeconfigs.NodeValueRange) (result []*NodeValue, err error) { func (this *NodeValueDAO) ListValuesForNSNodes(tx *dbs.Tx, item string, key string, timeRange nodeconfigs.NodeValueRange) (result []*NodeValue, err error) {
query := this.Query(tx). query := this.Query(tx).

View File

@@ -787,6 +787,7 @@ func (this *ServerDAO) ComposeServerConfig(tx *dbs.Tx, serverId int64) (*serverc
config := &serverconfigs.ServerConfig{} config := &serverconfigs.ServerConfig{}
config.Id = serverId config.Id = serverId
config.ClusterId = int64(server.ClusterId)
config.Type = server.Type config.Type = server.Type
config.IsOn = server.IsOn == 1 config.IsOn = server.IsOn == 1
config.Name = server.Name config.Name = server.Name

View File

@@ -22,7 +22,7 @@ func (this *MetricStatService) UploadMetricStats(ctx context.Context, req *pb.Up
} }
var tx = this.NullTx() var tx = this.NullTx()
clusterId, err := models.SharedNodeDAO.FindNodeClusterId(tx, nodeId) clusterId, err := models.SharedServerDAO.FindServerClusterId(tx, req.ServerId)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -33,11 +33,6 @@ func (this *ServerDailyStatService) UploadServerDailyStats(ctx context.Context,
var clusterId int64 var clusterId int64
switch role { switch role {
case rpcutils.UserTypeNode:
clusterId, err = models.SharedNodeDAO.FindNodeClusterId(tx, nodeId)
if err != nil {
return nil, err
}
case rpcutils.UserTypeDNS: case rpcutils.UserTypeDNS:
clusterId, err = nameservers.SharedNSNodeDAO.FindNodeClusterId(tx, nodeId) clusterId, err = nameservers.SharedNSNodeDAO.FindNodeClusterId(tx, nodeId)
if err != nil { if err != nil {
@@ -48,6 +43,13 @@ func (this *ServerDailyStatService) UploadServerDailyStats(ctx context.Context,
// 写入其他统计表 // 写入其他统计表
// TODO 将来改成每小时入库一次 // TODO 将来改成每小时入库一次
for _, stat := range req.Stats { for _, stat := range req.Stats {
if role == rpcutils.UserTypeNode {
clusterId, err = models.SharedServerDAO.FindServerClusterId(tx, stat.ServerId)
if err != nil {
return nil, err
}
}
// 总体流量(按天) // 总体流量(按天)
err = stats.SharedTrafficDailyStatDAO.IncreaseDailyStat(tx, timeutil.FormatTime("Ymd", stat.CreatedAt), stat.Bytes, stat.CachedBytes, stat.CountRequests, stat.CountCachedRequests, stat.CountAttackRequests, stat.AttackBytes) err = stats.SharedTrafficDailyStatDAO.IncreaseDailyStat(tx, timeutil.FormatTime("Ymd", stat.CreatedAt), stat.Bytes, stat.CachedBytes, stat.CountRequests, stat.CountCachedRequests, stat.CountAttackRequests, stat.AttackBytes)
if err != nil { if err != nil {
@@ -84,6 +86,13 @@ func (this *ServerDailyStatService) UploadServerDailyStats(ctx context.Context,
// 域名统计 // 域名统计
for _, stat := range req.DomainStats { for _, stat := range req.DomainStats {
if role == rpcutils.UserTypeNode {
clusterId, err = models.SharedServerDAO.FindServerClusterId(tx, stat.ServerId)
if err != nil {
return nil, err
}
}
err := stats.SharedServerDomainHourlyStatDAO.IncreaseHourlyStat(tx, clusterId, nodeId, stat.ServerId, stat.Domain, timeutil.FormatTime("YmdH", stat.CreatedAt), stat.Bytes, stat.CachedBytes, stat.CountRequests, stat.CountCachedRequests, stat.CountAttackRequests, stat.AttackBytes) err := stats.SharedServerDomainHourlyStatDAO.IncreaseHourlyStat(tx, clusterId, nodeId, stat.ServerId, stat.Domain, timeutil.FormatTime("YmdH", stat.CreatedAt), stat.Bytes, stat.CachedBytes, stat.CountRequests, stat.CountCachedRequests, stat.CountAttackRequests, stat.AttackBytes)
if err != nil { if err != nil {
return nil, err return nil, err