增加服务流量统计

This commit is contained in:
GoEdgeLab
2021-06-08 11:24:41 +08:00
parent a29f90beda
commit 9ca60abdc1
11 changed files with 104 additions and 30 deletions

41
internal/caches/errors.go Normal file
View File

@@ -0,0 +1,41 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package caches
import "errors"
// 常用的几个错误
var (
ErrNotFound = errors.New("cache not found")
ErrFileIsWriting = errors.New("the file is writing")
ErrInvalidRange = errors.New("invalid range")
)
// CapacityError 容量错误
// 独立出来是为了可以在有些场合下可以忽略,防止产生没必要的错误提示数量太多
type CapacityError struct {
err string
}
func NewCapacityError(err string) error {
return &CapacityError{err: err}
}
func (this *CapacityError) Error() string {
return this.err
}
// CanIgnoreErr 检查错误是否可以忽略
func CanIgnoreErr(err error) bool {
if err == nil {
return true
}
if err == ErrFileIsWriting {
return true
}
_, ok := err.(*CapacityError)
if ok {
return true
}
return false
}

View File

@@ -0,0 +1,16 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package caches
import (
"github.com/iwind/TeaGo/assert"
"testing"
)
func TestCanIgnoreErr(t *testing.T) {
a := assert.NewAssertion(t)
a.IsTrue(CanIgnoreErr(ErrFileIsWriting))
a.IsTrue(CanIgnoreErr(NewCapacityError("over capcity")))
a.IsFalse(CanIgnoreErr(ErrNotFound))
}

View File

@@ -36,12 +36,6 @@ const (
SizeMeta = SizeExpiresAt + SizeStatus + SizeURLLength + SizeHeaderLength + SizeBodyLength SizeMeta = SizeExpiresAt + SizeStatus + SizeURLLength + SizeHeaderLength + SizeBodyLength
) )
var (
ErrNotFound = errors.New("cache not found")
ErrFileIsWriting = errors.New("the file is writing")
ErrInvalidRange = errors.New("invalid range")
)
// FileStorage 文件缓存 // FileStorage 文件缓存
// 文件结构: // 文件结构:
// [expires time] | [ status ] | [url length] | [header length] | [body length] | [url] [header data] [body data] // [expires time] | [ status ] | [url length] | [header length] | [body length] | [url] [header data] [body data]
@@ -254,11 +248,11 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int) (Wr
return nil, err return nil, err
} }
if this.policy.MaxKeys > 0 && count > this.policy.MaxKeys { if this.policy.MaxKeys > 0 && count > this.policy.MaxKeys {
return nil, errors.New("write file cache failed: too many keys in cache storage") return nil, NewCapacityError("write file cache failed: too many keys in cache storage")
} }
capacityBytes := this.diskCapacityBytes() capacityBytes := this.diskCapacityBytes()
if capacityBytes > 0 && capacityBytes <= this.totalSize { if capacityBytes > 0 && capacityBytes <= this.totalSize {
return nil, errors.New("write file cache failed: over disk size, current total size: " + strconv.FormatInt(this.totalSize, 10) + " bytes, capacity: " + strconv.FormatInt(capacityBytes, 10)) return nil, NewCapacityError("write file cache failed: over disk size, current total size: " + strconv.FormatInt(this.totalSize, 10) + " bytes, capacity: " + strconv.FormatInt(capacityBytes, 10))
} }
hash := stringutil.Md5(key) hash := stringutil.Md5(key)

View File

@@ -3,7 +3,6 @@ package caches
import ( import (
"fmt" "fmt"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeNode/internal/errors"
"github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/cespare/xxhash" "github.com/cespare/xxhash"
"strconv" "strconv"
@@ -126,11 +125,11 @@ func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int) (
return nil, err return nil, err
} }
if this.policy.MaxKeys > 0 && totalKeys > this.policy.MaxKeys { if this.policy.MaxKeys > 0 && totalKeys > this.policy.MaxKeys {
return nil, errors.New("write memory cache failed: too many keys in cache storage") return nil, NewCapacityError("write memory cache failed: too many keys in cache storage")
} }
capacityBytes := this.memoryCapacityBytes() capacityBytes := this.memoryCapacityBytes()
if capacityBytes > 0 && capacityBytes <= this.totalSize { if capacityBytes > 0 && capacityBytes <= this.totalSize {
return nil, errors.New("write memory cache failed: over memory size: " + strconv.FormatInt(capacityBytes, 10) + ", current size: " + strconv.FormatInt(this.totalSize, 10) + " bytes") return nil, NewCapacityError("write memory cache failed: over memory size: " + strconv.FormatInt(capacityBytes, 10) + ", current size: " + strconv.FormatInt(this.totalSize, 10) + " bytes")
} }
// 先删除 // 先删除

View File

@@ -61,6 +61,7 @@ type HTTPRequest struct {
rewriteIsExternalURL bool // 重写目标是否为外部URL rewriteIsExternalURL bool // 重写目标是否为外部URL
cacheRef *serverconfigs.HTTPCacheRef // 缓存设置 cacheRef *serverconfigs.HTTPCacheRef // 缓存设置
cacheKey string // 缓存使用的Key cacheKey string // 缓存使用的Key
isCached bool // 是否已经被缓存
// WAF相关 // WAF相关
firewallPolicyId int64 firewallPolicyId int64
@@ -231,7 +232,11 @@ func (this *HTTPRequest) doEnd() {
// 流量统计 // 流量统计
// TODO 增加是否开启开关 // TODO 增加是否开启开关
if this.Server != nil { if this.Server != nil {
stats.SharedTrafficStatManager.Add(this.Server.Id, this.writer.sentBodyBytes) if this.isCached {
stats.SharedTrafficStatManager.Add(this.Server.Id, this.writer.sentBodyBytes, this.writer.sentBodyBytes, 1, 1)
} else {
stats.SharedTrafficStatManager.Add(this.Server.Id, this.writer.sentBodyBytes, 0, 1, 0)
}
} }
} }

View File

@@ -308,5 +308,6 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
} }
this.cacheRef = nil // 终止读取不再往下传递 this.cacheRef = nil // 终止读取不再往下传递
this.isCached = true
return true return true
} }

View File

@@ -387,7 +387,7 @@ func (this *HTTPWriter) prepareCache(size int64) {
expiredAt := utils.UnixTime() + life expiredAt := utils.UnixTime() + life
cacheWriter, err := storage.OpenWriter(this.req.cacheKey, expiredAt, this.StatusCode()) cacheWriter, err := storage.OpenWriter(this.req.cacheKey, expiredAt, this.StatusCode())
if err != nil { if err != nil {
if err != caches.ErrFileIsWriting { if !caches.CanIgnoreErr(err) {
remotelogs.Error("HTTP_WRITER", "write cache failed: "+err.Error()) remotelogs.Error("HTTP_WRITER", "write cache failed: "+err.Error())
} }
return return

View File

@@ -80,7 +80,7 @@ func (this *TCPListener) handleConn(conn net.Conn) error {
} }
// 记录流量 // 记录流量
stats.SharedTrafficStatManager.Add(firstServer.Id, int64(n)) stats.SharedTrafficStatManager.Add(firstServer.Id, int64(n), 0, 0, 0)
} }
if err != nil { if err != nil {
closer() closer()

View File

@@ -164,7 +164,7 @@ func NewUDPConn(serverId int64, addr net.Addr, proxyConn *net.UDPConn, serverCon
} }
// 记录流量 // 记录流量
stats.SharedTrafficStatManager.Add(serverId, int64(n)) stats.SharedTrafficStatManager.Add(serverId, int64(n), 0, 0, 0)
} }
if err != nil { if err != nil {
conn.isOk = false conn.isOk = false

View File

@@ -15,9 +15,16 @@ import (
var SharedTrafficStatManager = NewTrafficStatManager() var SharedTrafficStatManager = NewTrafficStatManager()
type TrafficItem struct {
Bytes int64
CachedBytes int64
CountRequests int64
CountCachedRequests int64
}
// TrafficStatManager 区域流量统计 // TrafficStatManager 区域流量统计
type TrafficStatManager struct { type TrafficStatManager struct {
m map[string]int64 // [timestamp serverId] => bytes itemMap map[string]*TrafficItem // [timestamp serverId] => bytes
locker sync.Mutex locker sync.Mutex
configFunc func() *nodeconfigs.NodeConfig configFunc func() *nodeconfigs.NodeConfig
} }
@@ -25,7 +32,7 @@ type TrafficStatManager struct {
// NewTrafficStatManager 获取新对象 // NewTrafficStatManager 获取新对象
func NewTrafficStatManager() *TrafficStatManager { func NewTrafficStatManager() *TrafficStatManager {
manager := &TrafficStatManager{ manager := &TrafficStatManager{
m: map[string]int64{}, itemMap: map[string]*TrafficItem{},
} }
return manager return manager
@@ -55,7 +62,7 @@ func (this *TrafficStatManager) Start(configFunc func() *nodeconfigs.NodeConfig)
} }
// Add 添加流量 // Add 添加流量
func (this *TrafficStatManager) Add(serverId int64, bytes int64) { func (this *TrafficStatManager) Add(serverId int64, bytes int64, cachedBytes int64, countRequests int64, countCachedRequests int64) {
if bytes == 0 { if bytes == 0 {
return return
} }
@@ -64,7 +71,15 @@ func (this *TrafficStatManager) Add(serverId int64, bytes int64) {
key := strconv.FormatInt(timestamp, 10) + strconv.FormatInt(serverId, 10) key := strconv.FormatInt(timestamp, 10) + strconv.FormatInt(serverId, 10)
this.locker.Lock() this.locker.Lock()
this.m[key] += bytes item, ok := this.itemMap[key]
if !ok {
item = &TrafficItem{}
this.itemMap[key] = item
}
item.Bytes += bytes
item.CachedBytes += cachedBytes
item.CountRequests += countRequests
item.CountCachedRequests += countCachedRequests
this.locker.Unlock() this.locker.Unlock()
} }
@@ -81,12 +96,12 @@ func (this *TrafficStatManager) Upload() error {
} }
this.locker.Lock() this.locker.Lock()
m := this.m m := this.itemMap
this.m = map[string]int64{} this.itemMap = map[string]*TrafficItem{}
this.locker.Unlock() this.locker.Unlock()
pbStats := []*pb.ServerDailyStat{} pbStats := []*pb.ServerDailyStat{}
for key, bytes := range m { for key, item := range m {
timestamp, err := strconv.ParseInt(key[:10], 10, 64) timestamp, err := strconv.ParseInt(key[:10], 10, 64)
if err != nil { if err != nil {
return err return err
@@ -97,10 +112,13 @@ func (this *TrafficStatManager) Upload() error {
} }
pbStats = append(pbStats, &pb.ServerDailyStat{ pbStats = append(pbStats, &pb.ServerDailyStat{
ServerId: serverId, ServerId: serverId,
RegionId: config.RegionId, RegionId: config.RegionId,
Bytes: bytes, Bytes: item.Bytes,
CreatedAt: timestamp, CachedBytes: item.CachedBytes,
CountRequests: item.CountRequests,
CountCachedRequests: item.CountCachedRequests,
CreatedAt: timestamp,
}) })
} }
if len(pbStats) == 0 { if len(pbStats) == 0 {

View File

@@ -8,15 +8,15 @@ import (
func TestTrafficStatManager_Add(t *testing.T) { func TestTrafficStatManager_Add(t *testing.T) {
manager := NewTrafficStatManager() manager := NewTrafficStatManager()
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
manager.Add(1, 10) manager.Add(1, 10, 1, 0)
} }
t.Log(manager.m) t.Log(manager.itemMap)
} }
func TestTrafficStatManager_Upload(t *testing.T) { func TestTrafficStatManager_Upload(t *testing.T) {
manager := NewTrafficStatManager() manager := NewTrafficStatManager()
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
manager.Add(1, 10) manager.Add(1, 10, 1, 0)
} }
err := manager.Upload() err := manager.Upload()
if err != nil { if err != nil {
@@ -30,6 +30,6 @@ func BenchmarkTrafficStatManager_Add(b *testing.B) {
manager := NewTrafficStatManager() manager := NewTrafficStatManager()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
manager.Add(1, 1024) manager.Add(1, 1024, 1, 0)
} }
} }