增加${requestId}变量

This commit is contained in:
GoEdgeLab
2021-12-02 11:30:47 +08:00
parent dab6efb424
commit cf4d80bd0c
8 changed files with 101 additions and 25 deletions

View File

@@ -8,5 +8,6 @@ var (
InTrafficBytes = uint64(0) InTrafficBytes = uint64(0)
OutTrafficBytes = uint64(0) OutTrafficBytes = uint64(0)
NodeId int64 = 0 NodeId int64 = 0
NodeIdString = ""
) )

View File

@@ -4,9 +4,7 @@ import (
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"reflect" "reflect"
"strconv"
"strings" "strings"
"time" "time"
) )
@@ -57,24 +55,11 @@ func (this *HTTPAccessLogQueue) Push(accessLog *pb.HTTPAccessLog) {
func (this *HTTPAccessLogQueue) loop() error { func (this *HTTPAccessLogQueue) loop() error {
var accessLogs = []*pb.HTTPAccessLog{} var accessLogs = []*pb.HTTPAccessLog{}
var count = 0 var count = 0
var timestamp int64
var requestId = 1_000_000
Loop: Loop:
for { for {
select { select {
case accessLog := <-this.queue: case accessLog := <-this.queue:
var unixTime = utils.UnixTime()
if unixTime > timestamp {
requestId = 1_000_000
timestamp = unixTime
} else {
requestId++
}
// timestamp + requestId + nodeId
accessLog.RequestId = strconv.FormatInt(unixTime, 10) + strconv.Itoa(requestId) + strconv.FormatInt(accessLog.NodeId, 10)
accessLogs = append(accessLogs, accessLog) accessLogs = append(accessLogs, accessLog)
count++ count++

View File

@@ -36,6 +36,8 @@ var errWritingToClient = errors.New("writing to client error")
// HTTPRequest HTTP请求 // HTTPRequest HTTP请求
type HTTPRequest struct { type HTTPRequest struct {
requestId string
// 外部参数 // 外部参数
RawReq *http.Request RawReq *http.Request
RawWriter http.ResponseWriter RawWriter http.ResponseWriter
@@ -107,12 +109,14 @@ func (this *HTTPRequest) init() {
this.varMapping = map[string]string{ this.varMapping = map[string]string{
// 缓存相关初始化 // 缓存相关初始化
"cache.status": "BYPASS", "cache.status": "BYPASS",
"cache.age": "0",
"cache.policy.name": "", "cache.policy.name": "",
"cache.policy.id": "0", "cache.policy.id": "0",
"cache.policy.type": "", "cache.policy.type": "",
} }
this.logAttrs = map[string]string{} this.logAttrs = map[string]string{}
this.requestFromTime = time.Now() this.requestFromTime = time.Now()
this.requestId = httpRequestNextId()
} }
// Do 执行请求 // Do 执行请求
@@ -556,6 +560,8 @@ func (this *HTTPRequest) Format(source string) string {
return strconv.Itoa(this.requestRemotePort()) return strconv.Itoa(this.requestRemotePort())
case "remoteUser": case "remoteUser":
return this.requestRemoteUser() return this.requestRemoteUser()
case "requestId":
return this.requestId
case "requestURI", "requestUri": case "requestURI", "requestUri":
return this.rawURI return this.rawURI
case "requestURL": case "requestURL":

View File

@@ -86,7 +86,7 @@ func (this *HTTPRequest) log() {
} }
accessLog := &pb.HTTPAccessLog{ accessLog := &pb.HTTPAccessLog{
RequestId: "", RequestId: this.requestId,
NodeId: sharedNodeConfig.Id, NodeId: sharedNodeConfig.Id,
ServerId: this.Server.Id, ServerId: this.Server.Id,
RemoteAddr: this.requestRemoteAddr(true), RemoteAddr: this.requestRemoteAddr(true),

View File

@@ -3,9 +3,12 @@ package nodes
import ( import (
"crypto/rand" "crypto/rand"
"fmt" "fmt"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"io" "io"
"strconv" "strconv"
"strings" "strings"
"sync/atomic"
) )
// 分解Range // 分解Range
@@ -125,3 +128,18 @@ func httpRequestGenBoundary() string {
} }
return fmt.Sprintf("%x", buf[:]) return fmt.Sprintf("%x", buf[:])
} }
// 生成请求ID
var httpRequestTimestamp int64
var httpRequestId int32 = 1_000_000
func httpRequestNextId() string {
var unixTime = utils.UnixTimeMilli()
if unixTime > httpRequestTimestamp {
atomic.StoreInt32(&httpRequestId, 1_000_000)
httpRequestTimestamp = unixTime
}
// timestamp + requestId + nodeId
return strconv.FormatInt(unixTime, 10) + strconv.Itoa(int(atomic.AddInt32(&httpRequestId, 1))) + teaconst.NodeIdString
}

View File

@@ -1,8 +1,12 @@
package nodes package nodes
import ( import (
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/iwind/TeaGo/assert" "github.com/iwind/TeaGo/assert"
"runtime"
"sync"
"testing" "testing"
"time"
) )
func TestHTTPRequest_httpRequestParseContentRange(t *testing.T) { func TestHTTPRequest_httpRequestParseContentRange(t *testing.T) {
@@ -53,3 +57,60 @@ func TestHTTPRequest_httpRequestParseContentRange(t *testing.T) {
t.Log(set) t.Log(set)
} }
} }
func TestHTTPRequest_httpRequestNextId(t *testing.T) {
teaconst.NodeId = 123
teaconst.NodeIdString = "123"
t.Log(httpRequestNextId())
t.Log(httpRequestNextId())
t.Log(httpRequestNextId())
time.Sleep(1 * time.Second)
t.Log(httpRequestNextId())
t.Log(httpRequestNextId())
time.Sleep(1 * time.Second)
t.Log(httpRequestNextId())
}
func TestHTTPRequest_httpRequestNextId_Concurrent(t *testing.T) {
var m = map[string]bool{}
var locker = sync.Mutex{}
var count = 4000
var wg = &sync.WaitGroup{}
wg.Add(count)
var countDuplicated = 0
for i := 0; i < count; i++ {
go func() {
defer wg.Done()
var requestId = httpRequestNextId()
locker.Lock()
_, ok := m[requestId]
if ok {
t.Log("duplicated:", requestId)
countDuplicated++
}
m[requestId] = true
locker.Unlock()
}()
}
wg.Wait()
t.Log("ok", countDuplicated, "duplicated")
var a = assert.NewAssertion(t)
a.IsTrue(countDuplicated == 0)
}
func BenchmarkHTTPRequest_httpRequestNextId(b *testing.B) {
runtime.GOMAXPROCS(1)
teaconst.NodeIdString = "123"
for i := 0; i < b.N; i++ {
_ = httpRequestNextId()
}
}

View File

@@ -24,6 +24,7 @@ import (
"github.com/iwind/TeaGo/lists" "github.com/iwind/TeaGo/lists"
"github.com/iwind/TeaGo/logs" "github.com/iwind/TeaGo/logs"
"github.com/iwind/TeaGo/maps" "github.com/iwind/TeaGo/maps"
"github.com/iwind/TeaGo/types"
"github.com/iwind/gosock/pkg/gosock" "github.com/iwind/gosock/pkg/gosock"
"io/ioutil" "io/ioutil"
"log" "log"
@@ -132,6 +133,7 @@ func (this *Node) Start() {
return return
} }
teaconst.NodeId = nodeConfig.Id teaconst.NodeId = nodeConfig.Id
teaconst.NodeIdString = types.String(teaconst.NodeId)
err, serverErrors := nodeConfig.Init() err, serverErrors := nodeConfig.Init()
if err != nil { if err != nil {
remotelogs.Error("NODE", "init node config failed: "+err.Error()) remotelogs.Error("NODE", "init node config failed: "+err.Error())
@@ -370,6 +372,7 @@ func (this *Node) syncConfig(taskVersion int64) error {
return errors.New("decode config failed: " + err.Error()) return errors.New("decode config failed: " + err.Error())
} }
teaconst.NodeId = nodeConfig.Id teaconst.NodeId = nodeConfig.Id
teaconst.NodeIdString = types.String(teaconst.NodeId)
// 写入到文件中 // 写入到文件中
err = nodeConfig.Save() err = nodeConfig.Save()

View File

@@ -5,22 +5,24 @@ import (
) )
var unixTime = time.Now().Unix() var unixTime = time.Now().Unix()
var unixTimerIsReady = false var unixTimeMilli = time.Now().UnixMilli()
func init() { func init() {
ticker := time.NewTicker(500 * time.Millisecond) ticker := time.NewTicker(200 * time.Millisecond)
go func() { go func() {
for range ticker.C { for range ticker.C {
unixTimerIsReady = true
unixTime = time.Now().Unix() unixTime = time.Now().Unix()
unixTimeMilli = time.Now().UnixMilli()
} }
}() }()
} }
// 最快获取时间戳的方式,通常用在不需要特别精确时间戳的场景 // UnixTime 最快获取时间戳的方式,通常用在不需要特别精确时间戳的场景
func UnixTime() int64 { func UnixTime() int64 {
if unixTimerIsReady { return unixTime
return unixTime }
}
return time.Now().Unix() // UnixTimeMilli 获取时间戳,精确到毫秒
func UnixTimeMilli() int64 {
return unixTimeMilli
} }