diff --git a/internal/db/models/db_node_initializer_test.go b/internal/db/models/db_node_initializer_test.go index 21d7cbd2..2e35a323 100644 --- a/internal/db/models/db_node_initializer_test.go +++ b/internal/db/models/db_node_initializer_test.go @@ -12,13 +12,13 @@ func TestDBNodeInitializer_loop(t *testing.T) { if err != nil { t.Fatal(err) } - t.Log(len(accessLogDBMapping), len(accessLogDAOMapping)) + t.Log(len(accessLogDBMapping), len(httpAccessLogDAOMapping)) } func TestFindAccessLogTable(t *testing.T) { before := time.Now() db := SharedHTTPAccessLogDAO.Instance - tableName, err := findAccessLogTable(db, "20201010", false) + tableName, err := findHTTPAccessLogTable(db, "20201010", false) if err != nil { t.Fatal(err) } @@ -26,7 +26,7 @@ func TestFindAccessLogTable(t *testing.T) { t.Log(time.Since(before).Seconds()*1000, "ms") before = time.Now() - tableName, err = findAccessLogTable(db, "20201010", false) + tableName, err = findHTTPAccessLogTable(db, "20201010", false) if err != nil { t.Fatal(err) @@ -40,6 +40,6 @@ func BenchmarkFindAccessLogTable(b *testing.B) { runtime.GOMAXPROCS(1) for i := 0; i < b.N; i++ { - _, _ = findAccessLogTable(db, "20201010", false) + _, _ = findHTTPAccessLogTable(db, "20201010", false) } } diff --git a/internal/db/models/http_access_log_dao_test.go b/internal/db/models/http_access_log_dao_test.go index 984aa01d..4c989284 100644 --- a/internal/db/models/http_access_log_dao_test.go +++ b/internal/db/models/http_access_log_dao_test.go @@ -24,7 +24,7 @@ func TestCreateHTTPAccessLogs(t *testing.T) { Status: 200, Timestamp: time.Now().Unix(), } - dao := randomAccessLogDAO() + dao := randomHTTPAccessLogDAO() t.Log("dao:", dao) err = SharedHTTPAccessLogDAO.CreateHTTPAccessLogsWithDAO(tx, dao, []*pb.HTTPAccessLog{accessLog}) if err != nil { @@ -41,7 +41,7 @@ func TestHTTPAccessLogDAO_ListAccessLogs(t *testing.T) { t.Fatal(err) } - accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, "", 10, timeutil.Format("Ymd"), 0, false, false, 0, 0, 0, false, 0) + accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, "", 10, timeutil.Format("Ymd"), 0, false, false, 0, 0, 0, false, 0, "") if err != nil { t.Fatal(err) } @@ -68,7 +68,7 @@ func TestHTTPAccessLogDAO_ListAccessLogs_Page(t *testing.T) { times := 0 // 防止循环次数太多 for { before := time.Now() - accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, lastRequestId, 2, timeutil.Format("Ymd"), 0, false, false, 0, 0, 0, false, 0) + accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, lastRequestId, 2, timeutil.Format("Ymd"), 0, false, false, 0, 0, 0, false, 0, "") cost := time.Since(before).Seconds() if err != nil { t.Fatal(err) @@ -99,7 +99,7 @@ func TestHTTPAccessLogDAO_ListAccessLogs_Reverse(t *testing.T) { } before := time.Now() - accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, "16023261176446590001000000000000003500000004", 2, timeutil.Format("Ymd"), 0, true, false, 0, 0, 0, false, 0) + accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, "16023261176446590001000000000000003500000004", 2, timeutil.Format("Ymd"), 0, true, false, 0, 0, 0, false, 0, "") cost := time.Since(before).Seconds() if err != nil { t.Fatal(err) @@ -124,7 +124,7 @@ func TestHTTPAccessLogDAO_ListAccessLogs_Page_NotExists(t *testing.T) { times := 0 // 防止循环次数太多 for { before := time.Now() - accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, lastRequestId, 2, timeutil.Format("Ymd", time.Now().AddDate(0, 0, 1)), 0, false, false, 0, 0, 0, false, 0) + accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, lastRequestId, 2, timeutil.Format("Ymd", time.Now().AddDate(0, 0, 1)), 0, false, false, 0, 0, 0, false, 0, "") cost := time.Since(before).Seconds() if err != nil { t.Fatal(err) diff --git a/internal/db/models/node_value_dao_test.go b/internal/db/models/node_value_dao_test.go index b43d975c..ddfc73bd 100644 --- a/internal/db/models/node_value_dao_test.go +++ b/internal/db/models/node_value_dao_test.go @@ -1,6 +1,7 @@ package models import ( + "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" _ "github.com/go-sql-driver/mysql" _ "github.com/iwind/TeaGo/bootstrap" "github.com/iwind/TeaGo/maps" @@ -13,7 +14,7 @@ func TestNodeValueDAO_CreateValue(t *testing.T) { m := maps.Map{ "hello": "world12344", } - err := dao.CreateValue(nil, NodeRoleNode, 1, "test", m.AsJSON(), time.Now().Unix()) + err := dao.CreateValue(nil, nodeconfigs.NodeRoleNode, 1, "test", m.AsJSON(), time.Now().Unix()) if err != nil { t.Fatal(err) } diff --git a/internal/db/models/server_daily_stat_dao.go b/internal/db/models/server_daily_stat_dao.go index 5f9812a9..71c07ab1 100644 --- a/internal/db/models/server_daily_stat_dao.go +++ b/internal/db/models/server_daily_stat_dao.go @@ -7,6 +7,7 @@ import ( "github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/maps" timeutil "github.com/iwind/TeaGo/utils/time" + "regexp" ) type ServerDailyStatDAO dbs.DAO @@ -30,24 +31,33 @@ func init() { }) } -// 提交数据 +// SaveStats 提交数据 func (this *ServerDailyStatDAO) SaveStats(tx *dbs.Tx, stats []*pb.ServerDailyStat) error { for _, stat := range stats { day := timeutil.FormatTime("Ymd", stat.CreatedAt) timeFrom := timeutil.FormatTime("His", stat.CreatedAt) - timeTo := timeutil.FormatTime("His", stat.CreatedAt+5*60) // 5分钟 + timeTo := timeutil.FormatTime("His", stat.CreatedAt+5*60-1) // 5分钟 _, _, err := this.Query(tx). Param("bytes", stat.Bytes). + Param("cachedBytes", stat.CachedBytes). + Param("countRequests", stat.CountRequests). + Param("countCachedRequests", stat.CountCachedRequests). InsertOrUpdate(maps.Map{ - "serverId": stat.ServerId, - "regionId": stat.RegionId, - "bytes": dbs.SQL("bytes+:bytes"), - "day": day, - "timeFrom": timeFrom, - "timeTo": timeTo, + "serverId": stat.ServerId, + "regionId": stat.RegionId, + "bytes": dbs.SQL("bytes+:bytes"), + "cachedBytes": dbs.SQL("cachedBytes+:cachedBytes"), + "countRequests": dbs.SQL("countRequests+:countRequests"), + "countCachedRequests": dbs.SQL("countCachedRequests+:countCachedRequests"), + "day": day, + "timeFrom": timeFrom, + "timeTo": timeTo, }, maps.Map{ - "bytes": dbs.SQL("bytes+:bytes"), + "bytes": dbs.SQL("bytes+:bytes"), + "cachedBytes": dbs.SQL("cachedBytes+:cachedBytes"), + "countRequests": dbs.SQL("countRequests+:countRequests"), + "countCachedRequests": dbs.SQL("countCachedRequests+:countCachedRequests"), }) if err != nil { return err @@ -56,7 +66,7 @@ func (this *ServerDailyStatDAO) SaveStats(tx *dbs.Tx, stats []*pb.ServerDailySta return nil } -// 根据用户计算某月合计 +// SumUserMonthly 根据用户计算某月合计 // month 格式为YYYYMM func (this *ServerDailyStatDAO) SumUserMonthly(tx *dbs.Tx, userId int64, regionId int64, month string) (int64, error) { query := this.Query(tx) @@ -69,7 +79,7 @@ func (this *ServerDailyStatDAO) SumUserMonthly(tx *dbs.Tx, userId int64, regionI SumInt64("bytes", 0) } -// 获取某月带宽峰值 +// SumUserMonthlyPeek 获取某月带宽峰值 // month 格式为YYYYMM func (this *ServerDailyStatDAO) SumUserMonthlyPeek(tx *dbs.Tx, userId int64, regionId int64, month string) (int64, error) { query := this.Query(tx) @@ -86,7 +96,7 @@ func (this *ServerDailyStatDAO) SumUserMonthlyPeek(tx *dbs.Tx, userId int64, reg return int64(max), nil } -// 获取某天流量总和 +// SumUserDaily 获取某天流量总和 // day 格式为YYYYMMDD func (this *ServerDailyStatDAO) SumUserDaily(tx *dbs.Tx, userId int64, regionId int64, day string) (int64, error) { query := this.Query(tx) @@ -100,7 +110,7 @@ func (this *ServerDailyStatDAO) SumUserDaily(tx *dbs.Tx, userId int64, regionId SumInt64("bytes", 0) } -// 获取某天带宽峰值 +// SumUserDailyPeek 获取某天带宽峰值 // day 格式为YYYYMMDD func (this *ServerDailyStatDAO) SumUserDailyPeek(tx *dbs.Tx, userId int64, regionId int64, day string) (int64, error) { query := this.Query(tx) @@ -117,3 +127,34 @@ func (this *ServerDailyStatDAO) SumUserDailyPeek(tx *dbs.Tx, userId int64, regio } return int64(max), nil } + +// SumHourlyStat 获取 N 小时内的流量 +// hour 格式为YYYYMMDDHH +func (this *ServerDailyStatDAO) SumHourlyStat(tx *dbs.Tx, serverId int64, hour string) (stat *pb.ServerDailyStat, err error) { + stat = &pb.ServerDailyStat{} + + if !regexp.MustCompile(`^\d{10}$`).MatchString(hour) { + return + } + + one, _, err := this.Query(tx). + Result("SUM(bytes) AS bytes, SUM(cachedBytes) AS cachedBytes, SUM(countRequests) AS countRequests, SUM(countCachedRequests) AS countCachedRequests"). + Attr("serverId", serverId). + Attr("day", hour[:8]). + Gte("timeFrom", hour[8:]+"0000"). + Lte("timeTo", hour[8:]+"5959"). + FindOne() + if err != nil { + return nil, err + } + + if one == nil { + return + } + + stat.Bytes = one.GetInt64("bytes") + stat.CachedBytes = one.GetInt64("cachedBytes") + stat.CountRequests = one.GetInt64("countRequests") + stat.CountCachedRequests = one.GetInt64("countCachedRequests") + return +} diff --git a/internal/db/models/server_daily_stat_dao_test.go b/internal/db/models/server_daily_stat_dao_test.go index f1aaefa5..fead4962 100644 --- a/internal/db/models/server_daily_stat_dao_test.go +++ b/internal/db/models/server_daily_stat_dao_test.go @@ -4,6 +4,7 @@ import ( "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" _ "github.com/go-sql-driver/mysql" "github.com/iwind/TeaGo/dbs" + "github.com/iwind/TeaGo/logs" timeutil "github.com/iwind/TeaGo/utils/time" "testing" ) @@ -51,3 +52,14 @@ func TestServerDailyStatDAO_SumUserMonthly(t *testing.T) { } t.Log("bytes:", bytes) } + +func TestServerDailyStatDAO_SumHourlyRequests(t *testing.T) { + dbs.NotifyReady() + var tx *dbs.Tx + + stats, err := NewServerDailyStatDAO().SumHourlyStats(tx, 23, timeutil.Format("YmdH")) + if err != nil { + t.Fatal(err) + } + logs.PrintAsJSON(stats, t) +} diff --git a/internal/db/models/server_daily_stat_model.go b/internal/db/models/server_daily_stat_model.go index be3e4041..c7319f42 100644 --- a/internal/db/models/server_daily_stat_model.go +++ b/internal/db/models/server_daily_stat_model.go @@ -1,26 +1,32 @@ package models -// 计费流量统计 +// ServerDailyStat 计费流量统计 type ServerDailyStat struct { - Id uint64 `field:"id"` // ID - ServerId uint32 `field:"serverId"` // 服务ID - RegionId uint32 `field:"regionId"` // 区域ID - Bytes uint64 `field:"bytes"` // 流量 - Day string `field:"day"` // 日期YYYYMMDD - TimeFrom string `field:"timeFrom"` // 开始时间HHMMSS - TimeTo string `field:"timeTo"` // 结束时间 - IsCharged uint8 `field:"isCharged"` // 是否已计算费用 + Id uint64 `field:"id"` // ID + ServerId uint32 `field:"serverId"` // 服务ID + RegionId uint32 `field:"regionId"` // 区域ID + Bytes uint64 `field:"bytes"` // 流量 + CachedBytes uint64 `field:"cachedBytes"` // 缓存的流量 + CountRequests uint64 `field:"countRequests"` // 请求数 + CountCachedRequests uint64 `field:"countCachedRequests"` // 缓存的请求数 + Day string `field:"day"` // 日期YYYYMMDD + TimeFrom string `field:"timeFrom"` // 开始时间HHMMSS + TimeTo string `field:"timeTo"` // 结束时间 + IsCharged uint8 `field:"isCharged"` // 是否已计算费用 } type ServerDailyStatOperator struct { - Id interface{} // ID - ServerId interface{} // 服务ID - RegionId interface{} // 区域ID - Bytes interface{} // 流量 - Day interface{} // 日期YYYYMMDD - TimeFrom interface{} // 开始时间HHMMSS - TimeTo interface{} // 结束时间 - IsCharged interface{} // 是否已计算费用 + Id interface{} // ID + ServerId interface{} // 服务ID + RegionId interface{} // 区域ID + Bytes interface{} // 流量 + CachedBytes interface{} // 缓存的流量 + CountRequests interface{} // 请求数 + CountCachedRequests interface{} // 缓存的请求数 + Day interface{} // 日期YYYYMMDD + TimeFrom interface{} // 开始时间HHMMSS + TimeTo interface{} // 结束时间 + IsCharged interface{} // 是否已计算费用 } func NewServerDailyStatOperator() *ServerDailyStatOperator { diff --git a/internal/rpc/services/service_server_daily_stat.go b/internal/rpc/services/service_server_daily_stat.go index 0caf9490..46ddef22 100644 --- a/internal/rpc/services/service_server_daily_stat.go +++ b/internal/rpc/services/service_server_daily_stat.go @@ -6,14 +6,15 @@ import ( "github.com/TeaOSLab/EdgeAPI/internal/db/models/stats" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" timeutil "github.com/iwind/TeaGo/utils/time" + "time" ) -// 服务统计相关服务 +// ServerDailyStatService 服务统计相关服务 type ServerDailyStatService struct { BaseService } -// 上传统计 +// UploadServerDailyStats 上传统计 func (this *ServerDailyStatService) UploadServerDailyStats(ctx context.Context, req *pb.UploadServerDailyStatsRequest) (*pb.RPCSuccess, error) { nodeId, err := this.ValidateNode(ctx) if err != nil { @@ -65,3 +66,34 @@ func (this *ServerDailyStatService) UploadServerDailyStats(ctx context.Context, return this.Success() } + +// FindServerHourlyStats 按小时读取统计数据 +func (this *ServerDailyStatService) FindServerHourlyStats(ctx context.Context, req *pb.FindServerHourlyStatsRequest) (*pb.FindServerHourlyStatsResponse, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + tx := this.NullTx() + + result := []*pb.FindServerHourlyStatsResponse_HourlyStat{} + if req.Hours > 0 { + for i := int32(0); i < req.Hours; i++ { + hourString := timeutil.Format("YmdH", time.Now().Add(-time.Duration(i)*time.Hour)) + stat, err := models.SharedServerDailyStatDAO.SumHourlyStat(tx, req.ServerId, hourString) + if err != nil { + return nil, err + } + if stat != nil { + result = append(result, &pb.FindServerHourlyStatsResponse_HourlyStat{ + Hour: hourString, + Bytes: stat.Bytes, + CachedBytes: stat.CachedBytes, + CountRequests: stat.CountRequests, + CountCachedRequests: stat.CountCachedRequests, + }) + } + } + } + return &pb.FindServerHourlyStatsResponse{Stats: result}, nil +}