From 3b8d1b4cd85c3951f3ea55b7b6cac4c99be1b9f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E7=A5=A5=E8=B6=85?= Date: Thu, 21 Oct 2021 17:10:53 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E5=8D=95=E4=B8=AA=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1=E7=9A=84=E5=B8=A6=E5=AE=BD=E9=99=90=E5=88=B6=EF=BC=88?= =?UTF-8?q?=E5=95=86=E4=B8=9A=E7=89=88=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/db/models/server_daily_stat_dao.go | 49 ++++- internal/db/models/server_dao.go | 191 +++++++++++++++++- internal/db/models/server_dao_test.go | 24 +++ internal/db/models/server_model.go | 134 ++++++------ internal/rpc/services/service_server.go | 42 ++++ .../rpc/services/service_server_daily_stat.go | 1 + 6 files changed, 375 insertions(+), 66 deletions(-) diff --git a/internal/db/models/server_daily_stat_dao.go b/internal/db/models/server_daily_stat_dao.go index 4bc56a39..1038ffda 100644 --- a/internal/db/models/server_daily_stat_dao.go +++ b/internal/db/models/server_daily_stat_dao.go @@ -1,6 +1,7 @@ package models import ( + "github.com/TeaOSLab/EdgeAPI/internal/errors" "github.com/TeaOSLab/EdgeAPI/internal/utils" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" _ "github.com/go-sql-driver/mysql" @@ -53,12 +54,14 @@ func init() { // SaveStats 提交数据 func (this *ServerDailyStatDAO) SaveStats(tx *dbs.Tx, stats []*pb.ServerDailyStat) error { var serverUserMap = map[int64]int64{} // serverId => userId + var cacheMap = maps.Map{} for _, stat := range stats { day := timeutil.FormatTime("Ymd", stat.CreatedAt) hour := timeutil.FormatTime("YmdH", stat.CreatedAt) timeFrom := timeutil.FormatTime("His", stat.CreatedAt) timeTo := timeutil.FormatTime("His", stat.CreatedAt+5*60-1) // 5分钟 + // 所属用户 serverUserId, ok := serverUserMap[stat.ServerId] if !ok { userId, err := SharedServerDAO.FindServerUserId(tx, stat.ServerId) @@ -100,7 +103,20 @@ func (this *ServerDailyStatDAO) SaveStats(tx *dbs.Tx, stats []*pb.ServerDailySta if err != nil { return err } + + // 更新带宽限制状态 + bandwidthLimit, err := SharedServerDAO.FindServerBandwidthLimitConfig(tx, stat.ServerId, cacheMap) + if err != nil { + return err + } + if bandwidthLimit != nil && bandwidthLimit.IsOn && !bandwidthLimit.IsEmpty() { + err = SharedServerDAO.UpdateServerBandwidthLimitStatus(tx, bandwidthLimit, stat.ServerId, false) + if err != nil { + return err + } + } } + return nil } @@ -233,7 +249,7 @@ func (this *ServerDailyStatDAO) SumDailyStat(tx *dbs.Tx, serverId int64, day str stat = &pb.ServerDailyStat{} if !regexp.MustCompile(`^\d{8}$`).MatchString(day) { - return + return nil, errors.New("invalid day '" + day + "'") } one, _, err := this.Query(tx). @@ -258,6 +274,37 @@ func (this *ServerDailyStatDAO) SumDailyStat(tx *dbs.Tx, serverId int64, day str return } +// SumMonthlyStat 获取某月内的流量 +// month 格式为YYYYMM +func (this *ServerDailyStatDAO) SumMonthlyStat(tx *dbs.Tx, serverId int64, month string) (stat *pb.ServerDailyStat, err error) { + stat = &pb.ServerDailyStat{} + + if !regexp.MustCompile(`^\d{6}$`).MatchString(month) { + return + } + + one, _, err := this.Query(tx). + Result("SUM(bytes) AS bytes, SUM(cachedBytes) AS cachedBytes, SUM(countRequests) AS countRequests, SUM(countCachedRequests) AS countCachedRequests, SUM(countAttackRequests) AS countAttackRequests, SUM(attackBytes) AS attackBytes"). + Attr("serverId", serverId). + Between("day", month+"01", month+"31"). + FindOne() + if err != nil { + return nil, err + } + + if one == nil { + return + } + + stat.Bytes = one.GetInt64("bytes") + stat.CachedBytes = one.GetInt64("cachedBytes") + stat.CountRequests = one.GetInt64("countRequests") + stat.CountCachedRequests = one.GetInt64("countCachedRequests") + stat.CountAttackRequests = one.GetInt64("countAttackRequests") + stat.AttackBytes = one.GetInt64("attackBytes") + return +} + // FindDailyStats 按天统计 func (this *ServerDailyStatDAO) FindDailyStats(tx *dbs.Tx, serverId int64, dayFrom string, dayTo string) (result []*ServerDailyStat, err error) { ones, err := this.Query(tx). diff --git a/internal/db/models/server_dao.go b/internal/db/models/server_dao.go index 63c95bd4..a2ea864d 100644 --- a/internal/db/models/server_dao.go +++ b/internal/db/models/server_dao.go @@ -3,6 +3,7 @@ package models import ( "encoding/json" "errors" + "fmt" "github.com/TeaOSLab/EdgeAPI/internal/db/models/dns" "github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils" "github.com/TeaOSLab/EdgeCommon/pkg/configutils" @@ -16,6 +17,7 @@ import ( "github.com/iwind/TeaGo/maps" "github.com/iwind/TeaGo/rands" "github.com/iwind/TeaGo/types" + timeutil "github.com/iwind/TeaGo/utils/time" "regexp" "strconv" "strings" @@ -50,7 +52,7 @@ func init() { // Init 初始化 func (this *ServerDAO) Init() { - this.DAOObject.Init() + _ = this.DAOObject.Init() // 这里不处理增删改事件,是为了避免Server修改本身的时候,也要触发别的Server变更 } @@ -1025,6 +1027,29 @@ func (this *ServerDAO) ComposeServerConfig(tx *dbs.Tx, server *Server, cacheMap config.HTTPCachePolicyId = httpCachePolicyId } + // bandwidth limit + if len(server.BandwidthLimit) > 0 { + var bandwidthLimitConfig = &serverconfigs.BandwidthLimitConfig{} + err = json.Unmarshal([]byte(server.BandwidthLimit), bandwidthLimitConfig) + if err != nil { + return nil, err + } + config.BandwidthLimit = bandwidthLimitConfig + + if bandwidthLimitConfig.IsOn && !bandwidthLimitConfig.IsEmpty() { + if len(server.BandwidthLimitStatus) > 0 { + var status = &serverconfigs.BandwidthLimitStatus{} + err = json.Unmarshal([]byte(server.BandwidthLimitStatus), status) + if err != nil { + return nil, err + } + if status.IsValid() { + config.BandwidthLimitStatus = status + } + } + } + } + return config, nil } @@ -1738,6 +1763,170 @@ func (this *ServerDAO) NotifyServerPortsUpdate(tx *dbs.Tx, serverId int64) error UpdateQuickly() } +// FindServerBandwidthLimitConfig 查找服务的带宽限制 +func (this *ServerDAO) FindServerBandwidthLimitConfig(tx *dbs.Tx, serverId int64, cacheMap maps.Map) (*serverconfigs.BandwidthLimitConfig, error) { + if cacheMap == nil { + cacheMap = maps.Map{} + } + var cacheKey = this.Table + ":FindServerBandwidthLimitConfig:" + types.String(serverId) + result, ok := cacheMap[cacheKey] + if ok { + return result.(*serverconfigs.BandwidthLimitConfig), nil + } + + bandwidthLimit, err := this.Query(tx). + Pk(serverId). + Result("bandwidthLimit"). + FindStringCol("") + if err != nil { + return nil, err + } + + var limit = &serverconfigs.BandwidthLimitConfig{} + if len(bandwidthLimit) == 0 { + return limit, nil + } + err = json.Unmarshal([]byte(bandwidthLimit), limit) + if err != nil { + return nil, err + } + + cacheMap[cacheKey] = limit + + return limit, nil +} + +// UpdateServerBandwidthLimitConfig 修改服务的带宽限制 +func (this *ServerDAO) UpdateServerBandwidthLimitConfig(tx *dbs.Tx, serverId int64, bandwidthLimitConfig *serverconfigs.BandwidthLimitConfig) error { + if serverId <= 0 { + return errors.New("invalid serverId") + } + limitJSON, err := json.Marshal(bandwidthLimitConfig) + if err != nil { + return err + } + + err = this.Query(tx). + Pk(serverId). + Set("bandwidthLimit", limitJSON). + UpdateQuickly() + if err != nil { + return err + } + + // 更新状态 + return this.UpdateServerBandwidthLimitStatus(tx, bandwidthLimitConfig, serverId, true) +} + +func (this *ServerDAO) UpdateServerBandwidthLimitStatus(tx *dbs.Tx, bandwidthLimitConfig *serverconfigs.BandwidthLimitConfig, serverId int64, isUpdatingConfig bool) error { + if !bandwidthLimitConfig.IsOn { + if isUpdatingConfig { + return this.NotifyUpdate(tx, serverId) + } + return nil + } + + oldStatusString, err := this.Query(tx). + Pk(serverId). + Result("bandwidthLimitStatus"). + FindStringCol("") + if err != nil { + return err + } + var oldStatus = &serverconfigs.BandwidthLimitStatus{} + if len(oldStatusString) > 0 { + err = json.Unmarshal([]byte(oldStatusString), oldStatus) + if err != nil { + return err + } + + // 如果已经达到限制了,而且还在有效期,那就没必要再更新 + if !isUpdatingConfig && oldStatus.IsValid() { + return nil + } + } + + var untilDay = "" + + // daily + if bandwidthLimitConfig.DailyBytes() > 0 { + stat, err := SharedServerDailyStatDAO.SumDailyStat(tx, serverId, timeutil.Format("Ymd")) + if err != nil { + return err + } + if stat != nil && stat.Bytes >= bandwidthLimitConfig.DailyBytes() { + untilDay = timeutil.Format("Ymd") + } + } + + // monthly + if bandwidthLimitConfig.MonthlyBytes() > 0 { + stat, err := SharedServerDailyStatDAO.SumMonthlyStat(tx, serverId, timeutil.Format("Ym")) + if err != nil { + return err + } + if stat != nil && stat.Bytes >= bandwidthLimitConfig.MonthlyBytes() { + untilDay = timeutil.Format("Ym") + fmt.Sprintf("%02d", types.Int(timeutil.Format("t"))) + } + } + + // totally + if bandwidthLimitConfig.TotalBytes() > 0 { + totalBandwidth, err := this.Query(tx). + Pk(serverId). + Result("totalBandwidth"). + FindFloat64Col(0) + if err != nil { + return err + } + + if totalBandwidth >= float64(bandwidthLimitConfig.TotalBytes()) { + untilDay = "20990101" + } + } + + var isChanged = oldStatus.UntilDay != untilDay + if isChanged { + statusJSON, err := json.Marshal(&serverconfigs.BandwidthLimitStatus{UntilDay: untilDay}) + if err != nil { + return err + } + + err = this.Query(tx). + Pk(serverId). + Set("bandwidthLimitStatus", statusJSON). + UpdateQuickly() + if err != nil { + return err + } + return this.NotifyUpdate(tx, serverId) + } + + if isUpdatingConfig { + return this.NotifyUpdate(tx, serverId) + } + return nil +} + +// IncreaseServerTotalBandwidth 增加服务的总带宽 +func (this *ServerDAO) IncreaseServerTotalBandwidth(tx *dbs.Tx, serverId int64, bytes int64) error { + var gb = float64(bytes) / 1024 / 1024 / 1024 + return this.Query(tx). + Pk(serverId). + Set("totalBandwidth", dbs.SQL("totalBandwidth+:bandwidthGB")). + Param("bandwidthGB", gb). + UpdateQuickly() + +} + +// ResetServerTotalBandwidth 重置服务总带宽 +func (this *ServerDAO) ResetServerTotalBandwidth(tx *dbs.Tx, serverId int64) error { + return this.Query(tx). + Pk(serverId). + Set("totalBandwidth", 0). + UpdateQuickly() +} + // NotifyUpdate 同步集群 func (this *ServerDAO) NotifyUpdate(tx *dbs.Tx, serverId int64) error { // 创建任务 diff --git a/internal/db/models/server_dao_test.go b/internal/db/models/server_dao_test.go index 9ebab308..46c619f5 100644 --- a/internal/db/models/server_dao_test.go +++ b/internal/db/models/server_dao_test.go @@ -3,10 +3,13 @@ package models import ( "crypto/md5" "encoding/json" + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/shared" _ "github.com/go-sql-driver/mysql" "github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/logs" "testing" + "time" ) func TestServerDAO_ComposeServerConfig(t *testing.T) { @@ -161,6 +164,27 @@ func TestServerDAO_FindAllEnabledServersWithDomain(t *testing.T) { } } +func TestServerDAO_UpdateServerBandwidthLimitStatus(t *testing.T) { + dbs.NotifyReady() + + var tx *dbs.Tx + before := time.Now() + defer func() { + t.Log(time.Since(before).Seconds()*1000, "ms") + }() + err := NewServerDAO().UpdateServerBandwidthLimitStatus(tx, &serverconfigs.BandwidthLimitConfig{ + IsOn: true, + DailySize: &shared.SizeCapacity{Count: 1, Unit: "mb"}, + MonthlySize: &shared.SizeCapacity{Count: 10, Unit: "mb"}, + TotalSize: nil, + NoticePageBody: "", + }, 23) + if err != nil { + t.Fatal(err) + } + t.Log("ok") +} + func BenchmarkServerDAO_CountAllEnabledServers(b *testing.B) { SharedServerDAO = NewServerDAO() diff --git a/internal/db/models/server_model.go b/internal/db/models/server_model.go index 709e9e7c..dde8da31 100644 --- a/internal/db/models/server_model.go +++ b/internal/db/models/server_model.go @@ -2,73 +2,79 @@ package models // Server 服务 type Server struct { - Id uint32 `field:"id"` // ID - IsOn uint8 `field:"isOn"` // 是否启用 - UserId uint32 `field:"userId"` // 用户ID - AdminId uint32 `field:"adminId"` // 管理员ID - Type string `field:"type"` // 服务类型 - Name string `field:"name"` // 名称 - Description string `field:"description"` // 描述 - ServerNames string `field:"serverNames"` // 域名列表 - AuditingServerNames string `field:"auditingServerNames"` // 审核中的域名 - IsAuditing uint8 `field:"isAuditing"` // 是否正在审核 - AuditingResult string `field:"auditingResult"` // 审核结果 - Http string `field:"http"` // HTTP配置 - Https string `field:"https"` // HTTPS配置 - Tcp string `field:"tcp"` // TCP配置 - Tls string `field:"tls"` // TLS配置 - Unix string `field:"unix"` // Unix配置 - Udp string `field:"udp"` // UDP配置 - WebId uint32 `field:"webId"` // WEB配置 - ReverseProxy string `field:"reverseProxy"` // 反向代理配置 - GroupIds string `field:"groupIds"` // 分组ID列表 - Config string `field:"config"` // 服务配置,自动生成 - ConfigMd5 string `field:"configMd5"` // Md5 - ClusterId uint32 `field:"clusterId"` // 集群ID - IncludeNodes string `field:"includeNodes"` // 部署条件 - ExcludeNodes string `field:"excludeNodes"` // 节点排除条件 - Version uint32 `field:"version"` // 版本号 - CreatedAt uint64 `field:"createdAt"` // 创建时间 - State uint8 `field:"state"` // 状态 - DnsName string `field:"dnsName"` // DNS名称 - TcpPorts string `field:"tcpPorts"` // 所包含TCP端口 - UdpPorts string `field:"udpPorts"` // 所包含UDP端口 - SupportCNAME uint8 `field:"supportCNAME"` // 允许CNAME不在域名名单 + Id uint32 `field:"id"` // ID + IsOn uint8 `field:"isOn"` // 是否启用 + UserId uint32 `field:"userId"` // 用户ID + AdminId uint32 `field:"adminId"` // 管理员ID + Type string `field:"type"` // 服务类型 + Name string `field:"name"` // 名称 + Description string `field:"description"` // 描述 + ServerNames string `field:"serverNames"` // 域名列表 + AuditingServerNames string `field:"auditingServerNames"` // 审核中的域名 + IsAuditing uint8 `field:"isAuditing"` // 是否正在审核 + AuditingResult string `field:"auditingResult"` // 审核结果 + Http string `field:"http"` // HTTP配置 + Https string `field:"https"` // HTTPS配置 + Tcp string `field:"tcp"` // TCP配置 + Tls string `field:"tls"` // TLS配置 + Unix string `field:"unix"` // Unix配置 + Udp string `field:"udp"` // UDP配置 + WebId uint32 `field:"webId"` // WEB配置 + ReverseProxy string `field:"reverseProxy"` // 反向代理配置 + GroupIds string `field:"groupIds"` // 分组ID列表 + Config string `field:"config"` // 服务配置,自动生成 + ConfigMd5 string `field:"configMd5"` // Md5 + ClusterId uint32 `field:"clusterId"` // 集群ID + IncludeNodes string `field:"includeNodes"` // 部署条件 + ExcludeNodes string `field:"excludeNodes"` // 节点排除条件 + Version uint32 `field:"version"` // 版本号 + CreatedAt uint64 `field:"createdAt"` // 创建时间 + State uint8 `field:"state"` // 状态 + DnsName string `field:"dnsName"` // DNS名称 + TcpPorts string `field:"tcpPorts"` // 所包含TCP端口 + UdpPorts string `field:"udpPorts"` // 所包含UDP端口 + SupportCNAME uint8 `field:"supportCNAME"` // 允许CNAME不在域名名单 + BandwidthLimit string `field:"bandwidthLimit"` // 带宽限制 + TotalBandwidth float64 `field:"totalBandwidth"` // 总带宽用量(单位GB) + BandwidthLimitStatus string `field:"bandwidthLimitStatus"` // 带宽限制状态 } type ServerOperator struct { - Id interface{} // ID - IsOn interface{} // 是否启用 - UserId interface{} // 用户ID - AdminId interface{} // 管理员ID - Type interface{} // 服务类型 - Name interface{} // 名称 - Description interface{} // 描述 - ServerNames interface{} // 域名列表 - AuditingServerNames interface{} // 审核中的域名 - IsAuditing interface{} // 是否正在审核 - AuditingResult interface{} // 审核结果 - Http interface{} // HTTP配置 - Https interface{} // HTTPS配置 - Tcp interface{} // TCP配置 - Tls interface{} // TLS配置 - Unix interface{} // Unix配置 - Udp interface{} // UDP配置 - WebId interface{} // WEB配置 - ReverseProxy interface{} // 反向代理配置 - GroupIds interface{} // 分组ID列表 - Config interface{} // 服务配置,自动生成 - ConfigMd5 interface{} // Md5 - ClusterId interface{} // 集群ID - IncludeNodes interface{} // 部署条件 - ExcludeNodes interface{} // 节点排除条件 - Version interface{} // 版本号 - CreatedAt interface{} // 创建时间 - State interface{} // 状态 - DnsName interface{} // DNS名称 - TcpPorts interface{} // 所包含TCP端口 - UdpPorts interface{} // 所包含UDP端口 - SupportCNAME interface{} // 允许CNAME不在域名名单 + Id interface{} // ID + IsOn interface{} // 是否启用 + UserId interface{} // 用户ID + AdminId interface{} // 管理员ID + Type interface{} // 服务类型 + Name interface{} // 名称 + Description interface{} // 描述 + ServerNames interface{} // 域名列表 + AuditingServerNames interface{} // 审核中的域名 + IsAuditing interface{} // 是否正在审核 + AuditingResult interface{} // 审核结果 + Http interface{} // HTTP配置 + Https interface{} // HTTPS配置 + Tcp interface{} // TCP配置 + Tls interface{} // TLS配置 + Unix interface{} // Unix配置 + Udp interface{} // UDP配置 + WebId interface{} // WEB配置 + ReverseProxy interface{} // 反向代理配置 + GroupIds interface{} // 分组ID列表 + Config interface{} // 服务配置,自动生成 + ConfigMd5 interface{} // Md5 + ClusterId interface{} // 集群ID + IncludeNodes interface{} // 部署条件 + ExcludeNodes interface{} // 节点排除条件 + Version interface{} // 版本号 + CreatedAt interface{} // 创建时间 + State interface{} // 状态 + DnsName interface{} // DNS名称 + TcpPorts interface{} // 所包含TCP端口 + UdpPorts interface{} // 所包含UDP端口 + SupportCNAME interface{} // 允许CNAME不在域名名单 + BandwidthLimit interface{} // 带宽限制 + TotalBandwidth interface{} // 总带宽用量(单位GB) + BandwidthLimitStatus interface{} // 带宽限制状态 } func NewServerOperator() *ServerOperator { diff --git a/internal/rpc/services/service_server.go b/internal/rpc/services/service_server.go index b4222c82..4bf1a2ab 100644 --- a/internal/rpc/services/service_server.go +++ b/internal/rpc/services/service_server.go @@ -1652,3 +1652,45 @@ func (this *ServerService) PurgeServerCache(ctx context.Context, req *pb.PurgeSe return purgeResponse, nil } + +// FindEnabledServerBandwidthLimit 查找带宽限制 +func (this *ServerService) FindEnabledServerBandwidthLimit(ctx context.Context, req *pb.FindEnabledServerBandwidthLimitRequest) (*pb.FindEnabledServerBandwidthLimitResponse, error) { + _, _, err := this.ValidateAdminAndUser(ctx, 0, 0) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + limitConfig, err := models.SharedServerDAO.FindServerBandwidthLimitConfig(tx, req.ServerId, nil) + if err != nil { + return nil, err + } + limitConfigJSON, err := json.Marshal(limitConfig) + if err != nil { + return nil, err + } + return &pb.FindEnabledServerBandwidthLimitResponse{ + BandwidthLimitJSON: limitConfigJSON, + }, nil +} + +// UpdateServerBandwidthLimit 设置带宽限制 +func (this *ServerService) UpdateServerBandwidthLimit(ctx context.Context, req *pb.UpdateServerBandwidthLimitRequest) (*pb.RPCSuccess, error) { + _, _, err := this.ValidateAdminAndUser(ctx, 0, 0) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + var config = &serverconfigs.BandwidthLimitConfig{} + err = json.Unmarshal(req.BandwidthLimitJSON, config) + if err != nil { + return nil, err + } + + err = models.SharedServerDAO.UpdateServerBandwidthLimitConfig(tx, req.ServerId, config) + if err != nil { + return nil, err + } + return this.Success() +} diff --git a/internal/rpc/services/service_server_daily_stat.go b/internal/rpc/services/service_server_daily_stat.go index a1880e7d..4238bc50 100644 --- a/internal/rpc/services/service_server_daily_stat.go +++ b/internal/rpc/services/service_server_daily_stat.go @@ -25,6 +25,7 @@ func (this *ServerDailyStatService) UploadServerDailyStats(ctx context.Context, tx := this.NullTx() + // 保存统计数据 err = models.SharedServerDailyStatDAO.SaveStats(tx, req.Stats) if err != nil { return nil, err