实现单个服务的带宽限制(商业版)

This commit is contained in:
刘祥超
2021-10-21 17:10:53 +08:00
parent f86180b93c
commit 3b8d1b4cd8
6 changed files with 375 additions and 66 deletions

View File

@@ -3,6 +3,7 @@ package models
import (
"encoding/json"
"errors"
"fmt"
"github.com/TeaOSLab/EdgeAPI/internal/db/models/dns"
"github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils"
"github.com/TeaOSLab/EdgeCommon/pkg/configutils"
@@ -16,6 +17,7 @@ import (
"github.com/iwind/TeaGo/maps"
"github.com/iwind/TeaGo/rands"
"github.com/iwind/TeaGo/types"
timeutil "github.com/iwind/TeaGo/utils/time"
"regexp"
"strconv"
"strings"
@@ -50,7 +52,7 @@ func init() {
// Init 初始化
func (this *ServerDAO) Init() {
this.DAOObject.Init()
_ = this.DAOObject.Init()
// 这里不处理增删改事件是为了避免Server修改本身的时候也要触发别的Server变更
}
@@ -1025,6 +1027,29 @@ func (this *ServerDAO) ComposeServerConfig(tx *dbs.Tx, server *Server, cacheMap
config.HTTPCachePolicyId = httpCachePolicyId
}
// bandwidth limit
if len(server.BandwidthLimit) > 0 {
var bandwidthLimitConfig = &serverconfigs.BandwidthLimitConfig{}
err = json.Unmarshal([]byte(server.BandwidthLimit), bandwidthLimitConfig)
if err != nil {
return nil, err
}
config.BandwidthLimit = bandwidthLimitConfig
if bandwidthLimitConfig.IsOn && !bandwidthLimitConfig.IsEmpty() {
if len(server.BandwidthLimitStatus) > 0 {
var status = &serverconfigs.BandwidthLimitStatus{}
err = json.Unmarshal([]byte(server.BandwidthLimitStatus), status)
if err != nil {
return nil, err
}
if status.IsValid() {
config.BandwidthLimitStatus = status
}
}
}
}
return config, nil
}
@@ -1738,6 +1763,170 @@ func (this *ServerDAO) NotifyServerPortsUpdate(tx *dbs.Tx, serverId int64) error
UpdateQuickly()
}
// FindServerBandwidthLimitConfig 查找服务的带宽限制
func (this *ServerDAO) FindServerBandwidthLimitConfig(tx *dbs.Tx, serverId int64, cacheMap maps.Map) (*serverconfigs.BandwidthLimitConfig, error) {
if cacheMap == nil {
cacheMap = maps.Map{}
}
var cacheKey = this.Table + ":FindServerBandwidthLimitConfig:" + types.String(serverId)
result, ok := cacheMap[cacheKey]
if ok {
return result.(*serverconfigs.BandwidthLimitConfig), nil
}
bandwidthLimit, err := this.Query(tx).
Pk(serverId).
Result("bandwidthLimit").
FindStringCol("")
if err != nil {
return nil, err
}
var limit = &serverconfigs.BandwidthLimitConfig{}
if len(bandwidthLimit) == 0 {
return limit, nil
}
err = json.Unmarshal([]byte(bandwidthLimit), limit)
if err != nil {
return nil, err
}
cacheMap[cacheKey] = limit
return limit, nil
}
// UpdateServerBandwidthLimitConfig 修改服务的带宽限制
func (this *ServerDAO) UpdateServerBandwidthLimitConfig(tx *dbs.Tx, serverId int64, bandwidthLimitConfig *serverconfigs.BandwidthLimitConfig) error {
if serverId <= 0 {
return errors.New("invalid serverId")
}
limitJSON, err := json.Marshal(bandwidthLimitConfig)
if err != nil {
return err
}
err = this.Query(tx).
Pk(serverId).
Set("bandwidthLimit", limitJSON).
UpdateQuickly()
if err != nil {
return err
}
// 更新状态
return this.UpdateServerBandwidthLimitStatus(tx, bandwidthLimitConfig, serverId, true)
}
func (this *ServerDAO) UpdateServerBandwidthLimitStatus(tx *dbs.Tx, bandwidthLimitConfig *serverconfigs.BandwidthLimitConfig, serverId int64, isUpdatingConfig bool) error {
if !bandwidthLimitConfig.IsOn {
if isUpdatingConfig {
return this.NotifyUpdate(tx, serverId)
}
return nil
}
oldStatusString, err := this.Query(tx).
Pk(serverId).
Result("bandwidthLimitStatus").
FindStringCol("")
if err != nil {
return err
}
var oldStatus = &serverconfigs.BandwidthLimitStatus{}
if len(oldStatusString) > 0 {
err = json.Unmarshal([]byte(oldStatusString), oldStatus)
if err != nil {
return err
}
// 如果已经达到限制了,而且还在有效期,那就没必要再更新
if !isUpdatingConfig && oldStatus.IsValid() {
return nil
}
}
var untilDay = ""
// daily
if bandwidthLimitConfig.DailyBytes() > 0 {
stat, err := SharedServerDailyStatDAO.SumDailyStat(tx, serverId, timeutil.Format("Ymd"))
if err != nil {
return err
}
if stat != nil && stat.Bytes >= bandwidthLimitConfig.DailyBytes() {
untilDay = timeutil.Format("Ymd")
}
}
// monthly
if bandwidthLimitConfig.MonthlyBytes() > 0 {
stat, err := SharedServerDailyStatDAO.SumMonthlyStat(tx, serverId, timeutil.Format("Ym"))
if err != nil {
return err
}
if stat != nil && stat.Bytes >= bandwidthLimitConfig.MonthlyBytes() {
untilDay = timeutil.Format("Ym") + fmt.Sprintf("%02d", types.Int(timeutil.Format("t")))
}
}
// totally
if bandwidthLimitConfig.TotalBytes() > 0 {
totalBandwidth, err := this.Query(tx).
Pk(serverId).
Result("totalBandwidth").
FindFloat64Col(0)
if err != nil {
return err
}
if totalBandwidth >= float64(bandwidthLimitConfig.TotalBytes()) {
untilDay = "20990101"
}
}
var isChanged = oldStatus.UntilDay != untilDay
if isChanged {
statusJSON, err := json.Marshal(&serverconfigs.BandwidthLimitStatus{UntilDay: untilDay})
if err != nil {
return err
}
err = this.Query(tx).
Pk(serverId).
Set("bandwidthLimitStatus", statusJSON).
UpdateQuickly()
if err != nil {
return err
}
return this.NotifyUpdate(tx, serverId)
}
if isUpdatingConfig {
return this.NotifyUpdate(tx, serverId)
}
return nil
}
// IncreaseServerTotalBandwidth 增加服务的总带宽
func (this *ServerDAO) IncreaseServerTotalBandwidth(tx *dbs.Tx, serverId int64, bytes int64) error {
var gb = float64(bytes) / 1024 / 1024 / 1024
return this.Query(tx).
Pk(serverId).
Set("totalBandwidth", dbs.SQL("totalBandwidth+:bandwidthGB")).
Param("bandwidthGB", gb).
UpdateQuickly()
}
// ResetServerTotalBandwidth 重置服务总带宽
func (this *ServerDAO) ResetServerTotalBandwidth(tx *dbs.Tx, serverId int64) error {
return this.Query(tx).
Pk(serverId).
Set("totalBandwidth", 0).
UpdateQuickly()
}
// NotifyUpdate 同步集群
func (this *ServerDAO) NotifyUpdate(tx *dbs.Tx, serverId int64) error {
// 创建任务