diff --git a/internal/const/vars.go b/internal/const/vars.go index f2b6fcc..c7b5b3c 100644 --- a/internal/const/vars.go +++ b/internal/const/vars.go @@ -8,5 +8,6 @@ var ( InTrafficBytes = uint64(0) OutTrafficBytes = uint64(0) - NodeId int64 = 0 + NodeId int64 = 0 + NodeIdString = "" ) diff --git a/internal/nodes/http_access_log_queue.go b/internal/nodes/http_access_log_queue.go index 9328b6f..4bf83b8 100644 --- a/internal/nodes/http_access_log_queue.go +++ b/internal/nodes/http_access_log_queue.go @@ -4,9 +4,7 @@ import ( "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/rpc" - "github.com/TeaOSLab/EdgeNode/internal/utils" "reflect" - "strconv" "strings" "time" ) @@ -57,24 +55,11 @@ func (this *HTTPAccessLogQueue) Push(accessLog *pb.HTTPAccessLog) { func (this *HTTPAccessLogQueue) loop() error { var accessLogs = []*pb.HTTPAccessLog{} var count = 0 - var timestamp int64 - var requestId = 1_000_000 Loop: for { select { case accessLog := <-this.queue: - var unixTime = utils.UnixTime() - if unixTime > timestamp { - requestId = 1_000_000 - timestamp = unixTime - } else { - requestId++ - } - - // timestamp + requestId + nodeId - accessLog.RequestId = strconv.FormatInt(unixTime, 10) + strconv.Itoa(requestId) + strconv.FormatInt(accessLog.NodeId, 10) - accessLogs = append(accessLogs, accessLog) count++ diff --git a/internal/nodes/http_request.go b/internal/nodes/http_request.go index c251010..701c37f 100644 --- a/internal/nodes/http_request.go +++ b/internal/nodes/http_request.go @@ -36,6 +36,8 @@ var errWritingToClient = errors.New("writing to client error") // HTTPRequest HTTP请求 type HTTPRequest struct { + requestId string + // 外部参数 RawReq *http.Request RawWriter http.ResponseWriter @@ -107,12 +109,14 @@ func (this *HTTPRequest) init() { this.varMapping = map[string]string{ // 缓存相关初始化 "cache.status": "BYPASS", + "cache.age": "0", "cache.policy.name": "", "cache.policy.id": "0", "cache.policy.type": "", } this.logAttrs = map[string]string{} this.requestFromTime = time.Now() + this.requestId = httpRequestNextId() } // Do 执行请求 @@ -556,6 +560,8 @@ func (this *HTTPRequest) Format(source string) string { return strconv.Itoa(this.requestRemotePort()) case "remoteUser": return this.requestRemoteUser() + case "requestId": + return this.requestId case "requestURI", "requestUri": return this.rawURI case "requestURL": diff --git a/internal/nodes/http_request_log.go b/internal/nodes/http_request_log.go index 0dfc3a2..b4bec5a 100644 --- a/internal/nodes/http_request_log.go +++ b/internal/nodes/http_request_log.go @@ -86,7 +86,7 @@ func (this *HTTPRequest) log() { } accessLog := &pb.HTTPAccessLog{ - RequestId: "", + RequestId: this.requestId, NodeId: sharedNodeConfig.Id, ServerId: this.Server.Id, RemoteAddr: this.requestRemoteAddr(true), diff --git a/internal/nodes/http_request_utils.go b/internal/nodes/http_request_utils.go index 7f0935d..8687260 100644 --- a/internal/nodes/http_request_utils.go +++ b/internal/nodes/http_request_utils.go @@ -3,9 +3,12 @@ package nodes import ( "crypto/rand" "fmt" + teaconst "github.com/TeaOSLab/EdgeNode/internal/const" + "github.com/TeaOSLab/EdgeNode/internal/utils" "io" "strconv" "strings" + "sync/atomic" ) // 分解Range @@ -125,3 +128,18 @@ func httpRequestGenBoundary() string { } return fmt.Sprintf("%x", buf[:]) } + +// 生成请求ID +var httpRequestTimestamp int64 +var httpRequestId int32 = 1_000_000 + +func httpRequestNextId() string { + var unixTime = utils.UnixTimeMilli() + if unixTime > httpRequestTimestamp { + atomic.StoreInt32(&httpRequestId, 1_000_000) + httpRequestTimestamp = unixTime + } + + // timestamp + requestId + nodeId + return strconv.FormatInt(unixTime, 10) + strconv.Itoa(int(atomic.AddInt32(&httpRequestId, 1))) + teaconst.NodeIdString +} diff --git a/internal/nodes/http_request_utils_test.go b/internal/nodes/http_request_utils_test.go index 1c0d816..455bb7f 100644 --- a/internal/nodes/http_request_utils_test.go +++ b/internal/nodes/http_request_utils_test.go @@ -1,8 +1,12 @@ package nodes import ( + teaconst "github.com/TeaOSLab/EdgeNode/internal/const" "github.com/iwind/TeaGo/assert" + "runtime" + "sync" "testing" + "time" ) func TestHTTPRequest_httpRequestParseContentRange(t *testing.T) { @@ -53,3 +57,60 @@ func TestHTTPRequest_httpRequestParseContentRange(t *testing.T) { t.Log(set) } } + +func TestHTTPRequest_httpRequestNextId(t *testing.T) { + teaconst.NodeId = 123 + teaconst.NodeIdString = "123" + t.Log(httpRequestNextId()) + t.Log(httpRequestNextId()) + t.Log(httpRequestNextId()) + time.Sleep(1 * time.Second) + t.Log(httpRequestNextId()) + t.Log(httpRequestNextId()) + time.Sleep(1 * time.Second) + t.Log(httpRequestNextId()) +} + +func TestHTTPRequest_httpRequestNextId_Concurrent(t *testing.T) { + var m = map[string]bool{} + var locker = sync.Mutex{} + + var count = 4000 + var wg = &sync.WaitGroup{} + wg.Add(count) + + var countDuplicated = 0 + for i := 0; i < count; i++ { + go func() { + defer wg.Done() + + var requestId = httpRequestNextId() + + locker.Lock() + + _, ok := m[requestId] + if ok { + t.Log("duplicated:", requestId) + countDuplicated++ + } + + m[requestId] = true + locker.Unlock() + }() + } + wg.Wait() + t.Log("ok", countDuplicated, "duplicated") + + var a = assert.NewAssertion(t) + a.IsTrue(countDuplicated == 0) +} + +func BenchmarkHTTPRequest_httpRequestNextId(b *testing.B) { + runtime.GOMAXPROCS(1) + + teaconst.NodeIdString = "123" + + for i := 0; i < b.N; i++ { + _ = httpRequestNextId() + } +} diff --git a/internal/nodes/node.go b/internal/nodes/node.go index d661735..0b8ff66 100644 --- a/internal/nodes/node.go +++ b/internal/nodes/node.go @@ -24,6 +24,7 @@ import ( "github.com/iwind/TeaGo/lists" "github.com/iwind/TeaGo/logs" "github.com/iwind/TeaGo/maps" + "github.com/iwind/TeaGo/types" "github.com/iwind/gosock/pkg/gosock" "io/ioutil" "log" @@ -132,6 +133,7 @@ func (this *Node) Start() { return } teaconst.NodeId = nodeConfig.Id + teaconst.NodeIdString = types.String(teaconst.NodeId) err, serverErrors := nodeConfig.Init() if err != nil { remotelogs.Error("NODE", "init node config failed: "+err.Error()) @@ -370,6 +372,7 @@ func (this *Node) syncConfig(taskVersion int64) error { return errors.New("decode config failed: " + err.Error()) } teaconst.NodeId = nodeConfig.Id + teaconst.NodeIdString = types.String(teaconst.NodeId) // 写入到文件中 err = nodeConfig.Save() diff --git a/internal/utils/time.go b/internal/utils/time.go index aa0595e..57cf7a0 100644 --- a/internal/utils/time.go +++ b/internal/utils/time.go @@ -5,22 +5,24 @@ import ( ) var unixTime = time.Now().Unix() -var unixTimerIsReady = false +var unixTimeMilli = time.Now().UnixMilli() func init() { - ticker := time.NewTicker(500 * time.Millisecond) + ticker := time.NewTicker(200 * time.Millisecond) go func() { for range ticker.C { - unixTimerIsReady = true unixTime = time.Now().Unix() + unixTimeMilli = time.Now().UnixMilli() } }() } -// 最快获取时间戳的方式,通常用在不需要特别精确时间戳的场景 +// UnixTime 最快获取时间戳的方式,通常用在不需要特别精确时间戳的场景 func UnixTime() int64 { - if unixTimerIsReady { - return unixTime - } - return time.Now().Unix() + return unixTime +} + +// UnixTimeMilli 获取时间戳,精确到毫秒 +func UnixTimeMilli() int64 { + return unixTimeMilli }