diff --git a/internal/caches/errors.go b/internal/caches/errors.go new file mode 100644 index 0000000..6a158e2 --- /dev/null +++ b/internal/caches/errors.go @@ -0,0 +1,41 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package caches + +import "errors" + +// 常用的几个错误 +var ( + ErrNotFound = errors.New("cache not found") + ErrFileIsWriting = errors.New("the file is writing") + ErrInvalidRange = errors.New("invalid range") +) + +// CapacityError 容量错误 +// 独立出来是为了可以在有些场合下可以忽略,防止产生没必要的错误提示数量太多 +type CapacityError struct { + err string +} + +func NewCapacityError(err string) error { + return &CapacityError{err: err} +} + +func (this *CapacityError) Error() string { + return this.err +} + +// CanIgnoreErr 检查错误是否可以忽略 +func CanIgnoreErr(err error) bool { + if err == nil { + return true + } + if err == ErrFileIsWriting { + return true + } + _, ok := err.(*CapacityError) + if ok { + return true + } + return false +} diff --git a/internal/caches/errros_test.go b/internal/caches/errros_test.go new file mode 100644 index 0000000..92bcbc4 --- /dev/null +++ b/internal/caches/errros_test.go @@ -0,0 +1,16 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package caches + +import ( + "github.com/iwind/TeaGo/assert" + "testing" +) + +func TestCanIgnoreErr(t *testing.T) { + a := assert.NewAssertion(t) + + a.IsTrue(CanIgnoreErr(ErrFileIsWriting)) + a.IsTrue(CanIgnoreErr(NewCapacityError("over capcity"))) + a.IsFalse(CanIgnoreErr(ErrNotFound)) +} diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index cf9b541..94dc6e6 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -36,12 +36,6 @@ const ( SizeMeta = SizeExpiresAt + SizeStatus + SizeURLLength + SizeHeaderLength + SizeBodyLength ) -var ( - ErrNotFound = errors.New("cache not found") - ErrFileIsWriting = errors.New("the file is writing") - ErrInvalidRange = errors.New("invalid range") -) - // FileStorage 文件缓存 // 文件结构: // [expires time] | [ status ] | [url length] | [header length] | [body length] | [url] [header data] [body data] @@ -254,11 +248,11 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int) (Wr return nil, err } if this.policy.MaxKeys > 0 && count > this.policy.MaxKeys { - return nil, errors.New("write file cache failed: too many keys in cache storage") + return nil, NewCapacityError("write file cache failed: too many keys in cache storage") } capacityBytes := this.diskCapacityBytes() if capacityBytes > 0 && capacityBytes <= this.totalSize { - return nil, errors.New("write file cache failed: over disk size, current total size: " + strconv.FormatInt(this.totalSize, 10) + " bytes, capacity: " + strconv.FormatInt(capacityBytes, 10)) + return nil, NewCapacityError("write file cache failed: over disk size, current total size: " + strconv.FormatInt(this.totalSize, 10) + " bytes, capacity: " + strconv.FormatInt(capacityBytes, 10)) } hash := stringutil.Md5(key) diff --git a/internal/caches/storage_memory.go b/internal/caches/storage_memory.go index 67f110b..804a0b7 100644 --- a/internal/caches/storage_memory.go +++ b/internal/caches/storage_memory.go @@ -3,7 +3,6 @@ package caches import ( "fmt" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" - "github.com/TeaOSLab/EdgeNode/internal/errors" "github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/cespare/xxhash" "strconv" @@ -126,11 +125,11 @@ func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int) ( return nil, err } if this.policy.MaxKeys > 0 && totalKeys > this.policy.MaxKeys { - return nil, errors.New("write memory cache failed: too many keys in cache storage") + return nil, NewCapacityError("write memory cache failed: too many keys in cache storage") } capacityBytes := this.memoryCapacityBytes() if capacityBytes > 0 && capacityBytes <= this.totalSize { - return nil, errors.New("write memory cache failed: over memory size: " + strconv.FormatInt(capacityBytes, 10) + ", current size: " + strconv.FormatInt(this.totalSize, 10) + " bytes") + return nil, NewCapacityError("write memory cache failed: over memory size: " + strconv.FormatInt(capacityBytes, 10) + ", current size: " + strconv.FormatInt(this.totalSize, 10) + " bytes") } // 先删除 diff --git a/internal/nodes/http_request.go b/internal/nodes/http_request.go index 1702c84..13a0596 100644 --- a/internal/nodes/http_request.go +++ b/internal/nodes/http_request.go @@ -61,6 +61,7 @@ type HTTPRequest struct { rewriteIsExternalURL bool // 重写目标是否为外部URL cacheRef *serverconfigs.HTTPCacheRef // 缓存设置 cacheKey string // 缓存使用的Key + isCached bool // 是否已经被缓存 // WAF相关 firewallPolicyId int64 @@ -231,7 +232,11 @@ func (this *HTTPRequest) doEnd() { // 流量统计 // TODO 增加是否开启开关 if this.Server != nil { - stats.SharedTrafficStatManager.Add(this.Server.Id, this.writer.sentBodyBytes) + if this.isCached { + stats.SharedTrafficStatManager.Add(this.Server.Id, this.writer.sentBodyBytes, this.writer.sentBodyBytes, 1, 1) + } else { + stats.SharedTrafficStatManager.Add(this.Server.Id, this.writer.sentBodyBytes, 0, 1, 0) + } } } diff --git a/internal/nodes/http_request_cache.go b/internal/nodes/http_request_cache.go index f82242e..80dd4fc 100644 --- a/internal/nodes/http_request_cache.go +++ b/internal/nodes/http_request_cache.go @@ -308,5 +308,6 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) { } this.cacheRef = nil // 终止读取不再往下传递 + this.isCached = true return true } diff --git a/internal/nodes/http_writer.go b/internal/nodes/http_writer.go index 36ba65f..1034eb4 100644 --- a/internal/nodes/http_writer.go +++ b/internal/nodes/http_writer.go @@ -387,7 +387,7 @@ func (this *HTTPWriter) prepareCache(size int64) { expiredAt := utils.UnixTime() + life cacheWriter, err := storage.OpenWriter(this.req.cacheKey, expiredAt, this.StatusCode()) if err != nil { - if err != caches.ErrFileIsWriting { + if !caches.CanIgnoreErr(err) { remotelogs.Error("HTTP_WRITER", "write cache failed: "+err.Error()) } return diff --git a/internal/nodes/listener_tcp.go b/internal/nodes/listener_tcp.go index 3ab6b32..235e31e 100644 --- a/internal/nodes/listener_tcp.go +++ b/internal/nodes/listener_tcp.go @@ -80,7 +80,7 @@ func (this *TCPListener) handleConn(conn net.Conn) error { } // 记录流量 - stats.SharedTrafficStatManager.Add(firstServer.Id, int64(n)) + stats.SharedTrafficStatManager.Add(firstServer.Id, int64(n), 0, 0, 0) } if err != nil { closer() diff --git a/internal/nodes/listener_udp.go b/internal/nodes/listener_udp.go index eb2c6e2..f2bdc85 100644 --- a/internal/nodes/listener_udp.go +++ b/internal/nodes/listener_udp.go @@ -164,7 +164,7 @@ func NewUDPConn(serverId int64, addr net.Addr, proxyConn *net.UDPConn, serverCon } // 记录流量 - stats.SharedTrafficStatManager.Add(serverId, int64(n)) + stats.SharedTrafficStatManager.Add(serverId, int64(n), 0, 0, 0) } if err != nil { conn.isOk = false diff --git a/internal/stats/traffic_stat_manager.go b/internal/stats/traffic_stat_manager.go index 19bf3ee..af50616 100644 --- a/internal/stats/traffic_stat_manager.go +++ b/internal/stats/traffic_stat_manager.go @@ -15,9 +15,16 @@ import ( var SharedTrafficStatManager = NewTrafficStatManager() +type TrafficItem struct { + Bytes int64 + CachedBytes int64 + CountRequests int64 + CountCachedRequests int64 +} + // TrafficStatManager 区域流量统计 type TrafficStatManager struct { - m map[string]int64 // [timestamp serverId] => bytes + itemMap map[string]*TrafficItem // [timestamp serverId] => bytes locker sync.Mutex configFunc func() *nodeconfigs.NodeConfig } @@ -25,7 +32,7 @@ type TrafficStatManager struct { // NewTrafficStatManager 获取新对象 func NewTrafficStatManager() *TrafficStatManager { manager := &TrafficStatManager{ - m: map[string]int64{}, + itemMap: map[string]*TrafficItem{}, } return manager @@ -55,7 +62,7 @@ func (this *TrafficStatManager) Start(configFunc func() *nodeconfigs.NodeConfig) } // Add 添加流量 -func (this *TrafficStatManager) Add(serverId int64, bytes int64) { +func (this *TrafficStatManager) Add(serverId int64, bytes int64, cachedBytes int64, countRequests int64, countCachedRequests int64) { if bytes == 0 { return } @@ -64,7 +71,15 @@ func (this *TrafficStatManager) Add(serverId int64, bytes int64) { key := strconv.FormatInt(timestamp, 10) + strconv.FormatInt(serverId, 10) this.locker.Lock() - this.m[key] += bytes + item, ok := this.itemMap[key] + if !ok { + item = &TrafficItem{} + this.itemMap[key] = item + } + item.Bytes += bytes + item.CachedBytes += cachedBytes + item.CountRequests += countRequests + item.CountCachedRequests += countCachedRequests this.locker.Unlock() } @@ -81,12 +96,12 @@ func (this *TrafficStatManager) Upload() error { } this.locker.Lock() - m := this.m - this.m = map[string]int64{} + m := this.itemMap + this.itemMap = map[string]*TrafficItem{} this.locker.Unlock() pbStats := []*pb.ServerDailyStat{} - for key, bytes := range m { + for key, item := range m { timestamp, err := strconv.ParseInt(key[:10], 10, 64) if err != nil { return err @@ -97,10 +112,13 @@ func (this *TrafficStatManager) Upload() error { } pbStats = append(pbStats, &pb.ServerDailyStat{ - ServerId: serverId, - RegionId: config.RegionId, - Bytes: bytes, - CreatedAt: timestamp, + ServerId: serverId, + RegionId: config.RegionId, + Bytes: item.Bytes, + CachedBytes: item.CachedBytes, + CountRequests: item.CountRequests, + CountCachedRequests: item.CountCachedRequests, + CreatedAt: timestamp, }) } if len(pbStats) == 0 { diff --git a/internal/stats/traffic_stat_manager_test.go b/internal/stats/traffic_stat_manager_test.go index a635d39..f45328a 100644 --- a/internal/stats/traffic_stat_manager_test.go +++ b/internal/stats/traffic_stat_manager_test.go @@ -8,15 +8,15 @@ import ( func TestTrafficStatManager_Add(t *testing.T) { manager := NewTrafficStatManager() for i := 0; i < 100; i++ { - manager.Add(1, 10) + manager.Add(1, 10, 1, 0) } - t.Log(manager.m) + t.Log(manager.itemMap) } func TestTrafficStatManager_Upload(t *testing.T) { manager := NewTrafficStatManager() for i := 0; i < 100; i++ { - manager.Add(1, 10) + manager.Add(1, 10, 1, 0) } err := manager.Upload() if err != nil { @@ -30,6 +30,6 @@ func BenchmarkTrafficStatManager_Add(b *testing.B) { manager := NewTrafficStatManager() for i := 0; i < b.N; i++ { - manager.Add(1, 1024) + manager.Add(1, 1024, 1, 0) } }