mirror of
				https://github.com/TeaOSLab/EdgeAPI.git
				synced 2025-11-04 07:50:25 +08:00 
			
		
		
		
	增加服务流量统计
This commit is contained in:
		@@ -12,13 +12,13 @@ func TestDBNodeInitializer_loop(t *testing.T) {
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
	t.Log(len(accessLogDBMapping), len(accessLogDAOMapping))
 | 
			
		||||
	t.Log(len(accessLogDBMapping), len(httpAccessLogDAOMapping))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestFindAccessLogTable(t *testing.T) {
 | 
			
		||||
	before := time.Now()
 | 
			
		||||
	db := SharedHTTPAccessLogDAO.Instance
 | 
			
		||||
	tableName, err := findAccessLogTable(db, "20201010", false)
 | 
			
		||||
	tableName, err := findHTTPAccessLogTable(db, "20201010", false)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
@@ -26,7 +26,7 @@ func TestFindAccessLogTable(t *testing.T) {
 | 
			
		||||
	t.Log(time.Since(before).Seconds()*1000, "ms")
 | 
			
		||||
 | 
			
		||||
	before = time.Now()
 | 
			
		||||
	tableName, err = findAccessLogTable(db, "20201010", false)
 | 
			
		||||
	tableName, err = findHTTPAccessLogTable(db, "20201010", false)
 | 
			
		||||
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
@@ -40,6 +40,6 @@ func BenchmarkFindAccessLogTable(b *testing.B) {
 | 
			
		||||
 | 
			
		||||
	runtime.GOMAXPROCS(1)
 | 
			
		||||
	for i := 0; i < b.N; i++ {
 | 
			
		||||
		_, _ = findAccessLogTable(db, "20201010", false)
 | 
			
		||||
		_, _ = findHTTPAccessLogTable(db, "20201010", false)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -24,7 +24,7 @@ func TestCreateHTTPAccessLogs(t *testing.T) {
 | 
			
		||||
		Status:    200,
 | 
			
		||||
		Timestamp: time.Now().Unix(),
 | 
			
		||||
	}
 | 
			
		||||
	dao := randomAccessLogDAO()
 | 
			
		||||
	dao := randomHTTPAccessLogDAO()
 | 
			
		||||
	t.Log("dao:", dao)
 | 
			
		||||
	err = SharedHTTPAccessLogDAO.CreateHTTPAccessLogsWithDAO(tx, dao, []*pb.HTTPAccessLog{accessLog})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
@@ -41,7 +41,7 @@ func TestHTTPAccessLogDAO_ListAccessLogs(t *testing.T) {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, "", 10, timeutil.Format("Ymd"), 0, false, false, 0, 0, 0, false, 0)
 | 
			
		||||
	accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, "", 10, timeutil.Format("Ymd"), 0, false, false, 0, 0, 0, false, 0, "")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
@@ -68,7 +68,7 @@ func TestHTTPAccessLogDAO_ListAccessLogs_Page(t *testing.T) {
 | 
			
		||||
	times := 0 // 防止循环次数太多
 | 
			
		||||
	for {
 | 
			
		||||
		before := time.Now()
 | 
			
		||||
		accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, lastRequestId, 2, timeutil.Format("Ymd"), 0, false, false, 0, 0, 0, false, 0)
 | 
			
		||||
		accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, lastRequestId, 2, timeutil.Format("Ymd"), 0, false, false, 0, 0, 0, false, 0, "")
 | 
			
		||||
		cost := time.Since(before).Seconds()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			t.Fatal(err)
 | 
			
		||||
@@ -99,7 +99,7 @@ func TestHTTPAccessLogDAO_ListAccessLogs_Reverse(t *testing.T) {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	before := time.Now()
 | 
			
		||||
	accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, "16023261176446590001000000000000003500000004", 2, timeutil.Format("Ymd"), 0, true, false, 0, 0, 0, false, 0)
 | 
			
		||||
	accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, "16023261176446590001000000000000003500000004", 2, timeutil.Format("Ymd"), 0, true, false, 0, 0, 0, false, 0, "")
 | 
			
		||||
	cost := time.Since(before).Seconds()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
@@ -124,7 +124,7 @@ func TestHTTPAccessLogDAO_ListAccessLogs_Page_NotExists(t *testing.T) {
 | 
			
		||||
	times := 0 // 防止循环次数太多
 | 
			
		||||
	for {
 | 
			
		||||
		before := time.Now()
 | 
			
		||||
		accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, lastRequestId, 2, timeutil.Format("Ymd", time.Now().AddDate(0, 0, 1)), 0, false, false, 0, 0, 0, false, 0)
 | 
			
		||||
		accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, lastRequestId, 2, timeutil.Format("Ymd", time.Now().AddDate(0, 0, 1)), 0, false, false, 0, 0, 0, false, 0, "")
 | 
			
		||||
		cost := time.Since(before).Seconds()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			t.Fatal(err)
 | 
			
		||||
 
 | 
			
		||||
@@ -1,6 +1,7 @@
 | 
			
		||||
package models
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
 | 
			
		||||
	_ "github.com/go-sql-driver/mysql"
 | 
			
		||||
	_ "github.com/iwind/TeaGo/bootstrap"
 | 
			
		||||
	"github.com/iwind/TeaGo/maps"
 | 
			
		||||
@@ -13,7 +14,7 @@ func TestNodeValueDAO_CreateValue(t *testing.T) {
 | 
			
		||||
	m := maps.Map{
 | 
			
		||||
		"hello": "world12344",
 | 
			
		||||
	}
 | 
			
		||||
	err := dao.CreateValue(nil, NodeRoleNode, 1, "test", m.AsJSON(), time.Now().Unix())
 | 
			
		||||
	err := dao.CreateValue(nil, nodeconfigs.NodeRoleNode, 1, "test", m.AsJSON(), time.Now().Unix())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
 
 | 
			
		||||
@@ -7,6 +7,7 @@ import (
 | 
			
		||||
	"github.com/iwind/TeaGo/dbs"
 | 
			
		||||
	"github.com/iwind/TeaGo/maps"
 | 
			
		||||
	timeutil "github.com/iwind/TeaGo/utils/time"
 | 
			
		||||
	"regexp"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type ServerDailyStatDAO dbs.DAO
 | 
			
		||||
@@ -30,24 +31,33 @@ func init() {
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// 提交数据
 | 
			
		||||
// SaveStats 提交数据
 | 
			
		||||
func (this *ServerDailyStatDAO) SaveStats(tx *dbs.Tx, stats []*pb.ServerDailyStat) error {
 | 
			
		||||
	for _, stat := range stats {
 | 
			
		||||
		day := timeutil.FormatTime("Ymd", stat.CreatedAt)
 | 
			
		||||
		timeFrom := timeutil.FormatTime("His", stat.CreatedAt)
 | 
			
		||||
		timeTo := timeutil.FormatTime("His", stat.CreatedAt+5*60) // 5分钟
 | 
			
		||||
		timeTo := timeutil.FormatTime("His", stat.CreatedAt+5*60-1) // 5分钟
 | 
			
		||||
 | 
			
		||||
		_, _, err := this.Query(tx).
 | 
			
		||||
			Param("bytes", stat.Bytes).
 | 
			
		||||
			Param("cachedBytes", stat.CachedBytes).
 | 
			
		||||
			Param("countRequests", stat.CountRequests).
 | 
			
		||||
			Param("countCachedRequests", stat.CountCachedRequests).
 | 
			
		||||
			InsertOrUpdate(maps.Map{
 | 
			
		||||
				"serverId":            stat.ServerId,
 | 
			
		||||
				"regionId":            stat.RegionId,
 | 
			
		||||
				"bytes":               dbs.SQL("bytes+:bytes"),
 | 
			
		||||
				"cachedBytes":         dbs.SQL("cachedBytes+:cachedBytes"),
 | 
			
		||||
				"countRequests":       dbs.SQL("countRequests+:countRequests"),
 | 
			
		||||
				"countCachedRequests": dbs.SQL("countCachedRequests+:countCachedRequests"),
 | 
			
		||||
				"day":                 day,
 | 
			
		||||
				"timeFrom":            timeFrom,
 | 
			
		||||
				"timeTo":              timeTo,
 | 
			
		||||
			}, maps.Map{
 | 
			
		||||
				"bytes":               dbs.SQL("bytes+:bytes"),
 | 
			
		||||
				"cachedBytes":         dbs.SQL("cachedBytes+:cachedBytes"),
 | 
			
		||||
				"countRequests":       dbs.SQL("countRequests+:countRequests"),
 | 
			
		||||
				"countCachedRequests": dbs.SQL("countCachedRequests+:countCachedRequests"),
 | 
			
		||||
			})
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
@@ -56,7 +66,7 @@ func (this *ServerDailyStatDAO) SaveStats(tx *dbs.Tx, stats []*pb.ServerDailySta
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// 根据用户计算某月合计
 | 
			
		||||
// SumUserMonthly 根据用户计算某月合计
 | 
			
		||||
// month 格式为YYYYMM
 | 
			
		||||
func (this *ServerDailyStatDAO) SumUserMonthly(tx *dbs.Tx, userId int64, regionId int64, month string) (int64, error) {
 | 
			
		||||
	query := this.Query(tx)
 | 
			
		||||
@@ -69,7 +79,7 @@ func (this *ServerDailyStatDAO) SumUserMonthly(tx *dbs.Tx, userId int64, regionI
 | 
			
		||||
		SumInt64("bytes", 0)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// 获取某月带宽峰值
 | 
			
		||||
// SumUserMonthlyPeek 获取某月带宽峰值
 | 
			
		||||
// month 格式为YYYYMM
 | 
			
		||||
func (this *ServerDailyStatDAO) SumUserMonthlyPeek(tx *dbs.Tx, userId int64, regionId int64, month string) (int64, error) {
 | 
			
		||||
	query := this.Query(tx)
 | 
			
		||||
@@ -86,7 +96,7 @@ func (this *ServerDailyStatDAO) SumUserMonthlyPeek(tx *dbs.Tx, userId int64, reg
 | 
			
		||||
	return int64(max), nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// 获取某天流量总和
 | 
			
		||||
// SumUserDaily 获取某天流量总和
 | 
			
		||||
// day 格式为YYYYMMDD
 | 
			
		||||
func (this *ServerDailyStatDAO) SumUserDaily(tx *dbs.Tx, userId int64, regionId int64, day string) (int64, error) {
 | 
			
		||||
	query := this.Query(tx)
 | 
			
		||||
@@ -100,7 +110,7 @@ func (this *ServerDailyStatDAO) SumUserDaily(tx *dbs.Tx, userId int64, regionId
 | 
			
		||||
		SumInt64("bytes", 0)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// 获取某天带宽峰值
 | 
			
		||||
// SumUserDailyPeek 获取某天带宽峰值
 | 
			
		||||
// day 格式为YYYYMMDD
 | 
			
		||||
func (this *ServerDailyStatDAO) SumUserDailyPeek(tx *dbs.Tx, userId int64, regionId int64, day string) (int64, error) {
 | 
			
		||||
	query := this.Query(tx)
 | 
			
		||||
@@ -117,3 +127,34 @@ func (this *ServerDailyStatDAO) SumUserDailyPeek(tx *dbs.Tx, userId int64, regio
 | 
			
		||||
	}
 | 
			
		||||
	return int64(max), nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SumHourlyStat 获取 N 小时内的流量
 | 
			
		||||
// hour 格式为YYYYMMDDHH
 | 
			
		||||
func (this *ServerDailyStatDAO) SumHourlyStat(tx *dbs.Tx, serverId int64, hour string) (stat *pb.ServerDailyStat, err error) {
 | 
			
		||||
	stat = &pb.ServerDailyStat{}
 | 
			
		||||
 | 
			
		||||
	if !regexp.MustCompile(`^\d{10}$`).MatchString(hour) {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	one, _, err := this.Query(tx).
 | 
			
		||||
		Result("SUM(bytes) AS bytes, SUM(cachedBytes) AS cachedBytes, SUM(countRequests) AS countRequests, SUM(countCachedRequests) AS countCachedRequests").
 | 
			
		||||
		Attr("serverId", serverId).
 | 
			
		||||
		Attr("day", hour[:8]).
 | 
			
		||||
		Gte("timeFrom", hour[8:]+"0000").
 | 
			
		||||
		Lte("timeTo", hour[8:]+"5959").
 | 
			
		||||
		FindOne()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if one == nil {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	stat.Bytes = one.GetInt64("bytes")
 | 
			
		||||
	stat.CachedBytes = one.GetInt64("cachedBytes")
 | 
			
		||||
	stat.CountRequests = one.GetInt64("countRequests")
 | 
			
		||||
	stat.CountCachedRequests = one.GetInt64("countCachedRequests")
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -4,6 +4,7 @@ import (
 | 
			
		||||
	"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
 | 
			
		||||
	_ "github.com/go-sql-driver/mysql"
 | 
			
		||||
	"github.com/iwind/TeaGo/dbs"
 | 
			
		||||
	"github.com/iwind/TeaGo/logs"
 | 
			
		||||
	timeutil "github.com/iwind/TeaGo/utils/time"
 | 
			
		||||
	"testing"
 | 
			
		||||
)
 | 
			
		||||
@@ -51,3 +52,14 @@ func TestServerDailyStatDAO_SumUserMonthly(t *testing.T) {
 | 
			
		||||
	}
 | 
			
		||||
	t.Log("bytes:", bytes)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestServerDailyStatDAO_SumHourlyRequests(t *testing.T) {
 | 
			
		||||
	dbs.NotifyReady()
 | 
			
		||||
	var tx *dbs.Tx
 | 
			
		||||
 | 
			
		||||
	stats, err := NewServerDailyStatDAO().SumHourlyStats(tx, 23, timeutil.Format("YmdH"))
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
	logs.PrintAsJSON(stats, t)
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -1,11 +1,14 @@
 | 
			
		||||
package models
 | 
			
		||||
 | 
			
		||||
// 计费流量统计
 | 
			
		||||
// ServerDailyStat 计费流量统计
 | 
			
		||||
type ServerDailyStat struct {
 | 
			
		||||
	Id                  uint64 `field:"id"`                  // ID
 | 
			
		||||
	ServerId            uint32 `field:"serverId"`            // 服务ID
 | 
			
		||||
	RegionId            uint32 `field:"regionId"`            // 区域ID
 | 
			
		||||
	Bytes               uint64 `field:"bytes"`               // 流量
 | 
			
		||||
	CachedBytes         uint64 `field:"cachedBytes"`         // 缓存的流量
 | 
			
		||||
	CountRequests       uint64 `field:"countRequests"`       // 请求数
 | 
			
		||||
	CountCachedRequests uint64 `field:"countCachedRequests"` // 缓存的请求数
 | 
			
		||||
	Day                 string `field:"day"`                 // 日期YYYYMMDD
 | 
			
		||||
	TimeFrom            string `field:"timeFrom"`            // 开始时间HHMMSS
 | 
			
		||||
	TimeTo              string `field:"timeTo"`              // 结束时间
 | 
			
		||||
@@ -17,6 +20,9 @@ type ServerDailyStatOperator struct {
 | 
			
		||||
	ServerId            interface{} // 服务ID
 | 
			
		||||
	RegionId            interface{} // 区域ID
 | 
			
		||||
	Bytes               interface{} // 流量
 | 
			
		||||
	CachedBytes         interface{} // 缓存的流量
 | 
			
		||||
	CountRequests       interface{} // 请求数
 | 
			
		||||
	CountCachedRequests interface{} // 缓存的请求数
 | 
			
		||||
	Day                 interface{} // 日期YYYYMMDD
 | 
			
		||||
	TimeFrom            interface{} // 开始时间HHMMSS
 | 
			
		||||
	TimeTo              interface{} // 结束时间
 | 
			
		||||
 
 | 
			
		||||
@@ -6,14 +6,15 @@ import (
 | 
			
		||||
	"github.com/TeaOSLab/EdgeAPI/internal/db/models/stats"
 | 
			
		||||
	"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
 | 
			
		||||
	timeutil "github.com/iwind/TeaGo/utils/time"
 | 
			
		||||
	"time"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// 服务统计相关服务
 | 
			
		||||
// ServerDailyStatService 服务统计相关服务
 | 
			
		||||
type ServerDailyStatService struct {
 | 
			
		||||
	BaseService
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// 上传统计
 | 
			
		||||
// UploadServerDailyStats 上传统计
 | 
			
		||||
func (this *ServerDailyStatService) UploadServerDailyStats(ctx context.Context, req *pb.UploadServerDailyStatsRequest) (*pb.RPCSuccess, error) {
 | 
			
		||||
	nodeId, err := this.ValidateNode(ctx)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
@@ -65,3 +66,34 @@ func (this *ServerDailyStatService) UploadServerDailyStats(ctx context.Context,
 | 
			
		||||
 | 
			
		||||
	return this.Success()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// FindServerHourlyStats 按小时读取统计数据
 | 
			
		||||
func (this *ServerDailyStatService) FindServerHourlyStats(ctx context.Context, req *pb.FindServerHourlyStatsRequest) (*pb.FindServerHourlyStatsResponse, error) {
 | 
			
		||||
	_, err := this.ValidateAdmin(ctx, 0)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	tx := this.NullTx()
 | 
			
		||||
 | 
			
		||||
	result := []*pb.FindServerHourlyStatsResponse_HourlyStat{}
 | 
			
		||||
	if req.Hours > 0 {
 | 
			
		||||
		for i := int32(0); i < req.Hours; i++ {
 | 
			
		||||
			hourString := timeutil.Format("YmdH", time.Now().Add(-time.Duration(i)*time.Hour))
 | 
			
		||||
			stat, err := models.SharedServerDailyStatDAO.SumHourlyStat(tx, req.ServerId, hourString)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return nil, err
 | 
			
		||||
			}
 | 
			
		||||
			if stat != nil {
 | 
			
		||||
				result = append(result, &pb.FindServerHourlyStatsResponse_HourlyStat{
 | 
			
		||||
					Hour:                hourString,
 | 
			
		||||
					Bytes:               stat.Bytes,
 | 
			
		||||
					CachedBytes:         stat.CachedBytes,
 | 
			
		||||
					CountRequests:       stat.CountRequests,
 | 
			
		||||
					CountCachedRequests: stat.CountCachedRequests,
 | 
			
		||||
				})
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return &pb.FindServerHourlyStatsResponse{Stats: result}, nil
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user