服务列表带宽使用新的算法

This commit is contained in:
GoEdgeLab
2022-08-27 18:39:00 +08:00
parent d72f98c8b1
commit 43158faabb
6 changed files with 111 additions and 115 deletions

View File

@@ -862,49 +862,32 @@ func (this *ServerDAO) ListEnabledServersMatch(tx *dbs.Tx, offset int64, size in
}
// 排序
var day = timeutil.Format("Ymd")
var minute = timeutil.FormatTime("His", time.Now().Unix()/300*300-300)
var selfTable = this.Table
var statTable = SharedServerDailyStatDAO.Table
var hasOnlyIds = false
var timestamp = time.Now().Unix() / 300 * 300
var currentTime = timeutil.FormatTime("YmdHi", timestamp)
var prevTime = timeutil.FormatTime("YmdHi", timestamp-300)
switch order {
case "trafficOutAsc":
query.Result("id")
query.Join(SharedServerDailyStatDAO, dbs.QueryJoinLeft, selfTable+".id="+statTable+".serverId AND "+statTable+".day=:day AND "+statTable+".timeFrom=:minute")
query.Param("day", day)
query.Param("minute", minute)
query.Group(selfTable + ".id")
query.Asc("SUM(" + statTable + ".bytes)").
DescPk()
hasOnlyIds = true
query.Asc("IF(IF(bandwidthTime=:currentTime, bandwidthBytes, 0) > 0, IF(bandwidthTime=:currentTime, bandwidthBytes, 0), IF(bandwidthTime=:prevTime, bandwidthBytes, 0))")
query.Param("currentTime", currentTime)
query.Param("prevTime", prevTime)
query.DescPk()
case "trafficOutDesc":
query.Result("id")
query.Join(SharedServerDailyStatDAO, dbs.QueryJoinLeft, selfTable+".id="+statTable+".serverId AND "+statTable+".day=:day AND "+statTable+".timeFrom=:minute")
query.Param("day", day)
query.Param("minute", minute)
query.Group(selfTable + ".id")
query.Desc("SUM(" + statTable + ".bytes)").
DescPk()
hasOnlyIds = true
query.Desc("IF(IF(bandwidthTime=:currentTime, bandwidthBytes, 0) > 0, IF(bandwidthTime=:currentTime, bandwidthBytes, 0), IF(bandwidthTime=:prevTime, bandwidthBytes, 0))")
query.Param("currentTime", currentTime)
query.Param("prevTime", prevTime)
query.DescPk()
default:
query.DescPk()
}
_, err = query.FindAll()
if hasOnlyIds {
var newResult = []*Server{}
for _, one := range result {
server, err := this.Find(tx, one.Id)
if err != nil {
return nil, err
// 修正带宽统计数据
for _, server := range result {
if len(server.BandwidthTime) > 0 && server.BandwidthBytes > 0 && server.BandwidthTime < prevTime {
server.BandwidthBytes = 0
}
if server == nil {
continue
}
newResult = append(newResult, server.(*Server))
}
result = newResult
}
return
@@ -2578,6 +2561,22 @@ func (this *ServerDAO) FindUserServerClusterIds(tx *dbs.Tx, userId int64) ([]int
return clusterIds, nil
}
// UpdateServerBandwidth 更新服务带宽
// fullTime YYYYMMDDHHII
func (this *ServerDAO) UpdateServerBandwidth(tx *dbs.Tx, serverId int64, fullTime string, bandwidthBytes int64) error {
if serverId <= 0 {
return nil
}
if bandwidthBytes < 0 {
bandwidthBytes = 0
}
return this.Query(tx).
Pk(serverId).
Set("bandwidthTime", fullTime).
Set("bandwidthBytes", bandwidthBytes).
UpdateQuickly()
}
// NotifyUpdate 同步服务所在的集群
func (this *ServerDAO) NotifyUpdate(tx *dbs.Tx, serverId int64) error {
// 创建任务

View File

@@ -48,53 +48,57 @@ type Server struct {
UserPlanId uint32 `field:"userPlanId"` // 所属套餐ID
LastUserPlanId uint32 `field:"lastUserPlanId"` // 上一次使用的套餐
Uam dbs.JSON `field:"uam"` // UAM设置
BandwidthTime string `field:"bandwidthTime"` // 带宽更新时间YYYYMMDDHHII
BandwidthBytes uint64 `field:"bandwidthBytes"` // 最近带宽峰值
}
type ServerOperator struct {
Id interface{} // ID
IsOn interface{} // 是否启用
UserId interface{} // 用户ID
AdminId interface{} // 管理员ID
Type interface{} // 服务类型
Name interface{} // 名称
Description interface{} // 描述
PlainServerNames interface{} // 扁平化域名列表
ServerNames interface{} // 域名列表
AuditingAt interface{} // 审核提交时间
AuditingServerNames interface{} // 审核中的域名
IsAuditing interface{} // 是否正在审核
AuditingResult interface{} // 审核结果
Http interface{} // HTTP配置
Https interface{} // HTTPS配置
Tcp interface{} // TCP配置
Tls interface{} // TLS配置
Unix interface{} // Unix配置
Udp interface{} // UDP配置
WebId interface{} // WEB配置
ReverseProxy interface{} // 反向代理配置
GroupIds interface{} // 分组ID列表
Config interface{} // 服务配置,自动生成
ConfigMd5 interface{} // Md5
ClusterId interface{} // 集群ID
IncludeNodes interface{} // 部署条件
ExcludeNodes interface{} // 节点排除条件
Version interface{} // 版本号
CreatedAt interface{} // 创建时间
State interface{} // 状态
DnsName interface{} // DNS名称
TcpPorts interface{} // 所包含TCP端口
UdpPorts interface{} // 所包含UDP端口
SupportCNAME interface{} // 允许CNAME不在域名名单
TrafficLimit interface{} // 流量限制
TrafficDay interface{} // YYYYMMDD
TrafficMonth interface{} // YYYYMM
TotalDailyTraffic interface{} // 日流量
TotalMonthlyTraffic interface{} // 月流量
TrafficLimitStatus interface{} // 流量限制状态
TotalTraffic interface{} // 总流量
UserPlanId interface{} // 所属套餐ID
LastUserPlanId interface{} // 上一次使用的套餐
Uam interface{} // UAM设置
Id any // ID
IsOn any // 是否启用
UserId any // 用户ID
AdminId any // 管理员ID
Type any // 服务类型
Name any // 名称
Description any // 描述
PlainServerNames any // 扁平化域名列表
ServerNames any // 域名列表
AuditingAt any // 审核提交时间
AuditingServerNames any // 审核中的域名
IsAuditing any // 是否正在审核
AuditingResult any // 审核结果
Http any // HTTP配置
Https any // HTTPS配置
Tcp any // TCP配置
Tls any // TLS配置
Unix any // Unix配置
Udp any // UDP配置
WebId any // WEB配置
ReverseProxy any // 反向代理配置
GroupIds any // 分组ID列表
Config any // 服务配置,自动生成
ConfigMd5 any // Md5
ClusterId any // 集群ID
IncludeNodes any // 部署条件
ExcludeNodes any // 节点排除条件
Version any // 版本号
CreatedAt any // 创建时间
State any // 状态
DnsName any // DNS名称
TcpPorts any // 所包含TCP端口
UdpPorts any // 所包含UDP端口
SupportCNAME any // 允许CNAME不在域名名单
TrafficLimit any // 流量限制
TrafficDay any // YYYYMMDD
TrafficMonth any // YYYYMM
TotalDailyTraffic any // 日流量
TotalMonthlyTraffic any // 月流量
TrafficLimitStatus any // 流量限制状态
TotalTraffic any // 总流量
UserPlanId any // 所属套餐ID
LastUserPlanId any // 上一次使用的套餐
Uam any // UAM设置
BandwidthTime any // 带宽更新时间YYYYMMDDHHII
BandwidthBytes any // 最近带宽峰值
}
func NewServerOperator() *ServerOperator {

View File

@@ -782,7 +782,7 @@ func (this *ServerService) ListEnabledServersMatch(ctx context.Context, req *pb.
if err != nil {
return nil, err
}
result := []*pb.Server{}
var result = []*pb.Server{}
for _, server := range servers {
clusterName, err := models.SharedNodeClusterDAO.FindNodeClusterName(tx, int64(server.ClusterId))
if err != nil {
@@ -790,9 +790,9 @@ func (this *ServerService) ListEnabledServersMatch(ctx context.Context, req *pb.
}
// 分组信息
pbGroups := []*pb.ServerGroup{}
var pbGroups = []*pb.ServerGroup{}
if models.IsNotNull(server.GroupIds) {
groupIds := []int64{}
var groupIds = []int64{}
err = json.Unmarshal(server.GroupIds, &groupIds)
if err != nil {
return nil, err
@@ -827,7 +827,7 @@ func (this *ServerService) ListEnabledServersMatch(ctx context.Context, req *pb.
}
// 审核结果
auditingResult := &pb.ServerNameAuditingResult{}
var auditingResult = &pb.ServerNameAuditingResult{}
if len(server.AuditingResult) > 0 {
err = json.Unmarshal(server.AuditingResult, auditingResult)
if err != nil {
@@ -847,27 +847,6 @@ func (this *ServerService) ListEnabledServersMatch(ctx context.Context, req *pb.
return nil, err
}
// 当前统计
dailyStat, err := models.SharedServerDailyStatDAO.SumCurrentDailyStat(tx, int64(server.Id))
if err != nil {
return nil, err
}
var pbDailyStat *pb.ServerDailyStat
if dailyStat != nil {
pbDailyStat = &pb.ServerDailyStat{
Bytes: int64(dailyStat.Bytes),
CachedBytes: int64(dailyStat.CachedBytes),
AttackBytes: int64(dailyStat.AttackBytes),
CountRequests: int64(dailyStat.CountRequests),
CountCachedRequests: int64(dailyStat.CountCachedRequests),
CountAttackRequests: int64(dailyStat.CountAttackRequests),
Day: dailyStat.Day,
Hour: dailyStat.Hour,
TimeFrom: dailyStat.TimeFrom,
TimeTo: dailyStat.TimeTo,
}
}
result = append(result, &pb.Server{
Id: int64(server.Id),
IsOn: server.IsOn,
@@ -897,7 +876,8 @@ func (this *ServerService) ListEnabledServersMatch(ctx context.Context, req *pb.
},
ServerGroups: pbGroups,
User: pbUser,
LatestServerDailyStat: pbDailyStat,
BandwidthTime: server.BandwidthTime,
BandwidthBytes: int64(server.BandwidthBytes),
})
}

View File

@@ -29,19 +29,36 @@ func init() {
goman.New(func() {
for range ticker.C {
func() {
var tx *dbs.Tx
serverBandwidthStatsLocker.Lock()
var m = serverBandwidthStatsMap
serverBandwidthStatsMap = map[string]*pb.ServerBandwidthStat{}
serverBandwidthStatsLocker.Unlock()
tx, err := models.SharedServerBandwidthStatDAO.Instance.Begin()
if err != nil {
remotelogs.Error("ServerBandwidthStatService", "begin transaction failed: "+err.Error())
return
}
defer func() {
_ = tx.Commit()
}()
for _, stat := range m {
// 更新服务的带宽峰值
if stat.ServerId > 0 {
err := models.SharedServerBandwidthStatDAO.UpdateServerBandwidth(tx, stat.UserId, stat.ServerId, stat.Day, stat.TimeAt, stat.Bytes)
if err != nil {
remotelogs.Error("ServerBandwidthStatService", "dump bandwidth stats failed: "+err.Error())
}
err = models.SharedServerDAO.UpdateServerBandwidth(tx, stat.ServerId, stat.Day+stat.TimeAt, stat.Bytes)
if err != nil {
remotelogs.Error("ServerBandwidthStatService", "update server bandwidth failed: "+err.Error())
}
}
// 更新服务的带宽峰值
if stat.UserId > 0 {
err = models.SharedUserBandwidthStatDAO.UpdateUserBandwidth(tx, stat.UserId, stat.Day, stat.TimeAt, stat.Bytes)
if err != nil {

View File

@@ -424,10 +424,6 @@ func (this *ServerStatBoardService) ComposeServerStatBoard(ctx context.Context,
if err != nil {
return nil, err
}
if bytes == 0 {
// 尝试从缓存中读取
bytes = ServerBandwidthGetCacheBytes(req.ServerId, day, minute)
}
if bytes > 0 {
result.MinutelyPeekBandwidthBytes = bytes

File diff suppressed because one or more lines are too long