From 5ef307273497dba8a0fb117b73f175f75a060103 Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Sun, 12 Dec 2021 11:48:01 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E8=AF=B7=E6=B1=82=E8=BF=9E?= =?UTF-8?q?=E6=8E=A5=E6=95=B0=E7=AD=89=E9=99=90=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/edge-node/main.go | 14 +++ internal/nodes/client_conn.go | 57 +++------ internal/nodes/client_conn_base.go | 38 ++++++ internal/nodes/client_conn_closer.go | 7 -- internal/nodes/client_conn_interface.go | 14 +++ internal/nodes/client_conn_limiter.go | 130 +++++++++++++++++++++ internal/nodes/client_conn_limiter_test.go | 38 ++++++ internal/nodes/client_conn_traffic.go | 40 +++++++ internal/nodes/client_conn_utils.go | 2 +- internal/nodes/client_tls_conn.go | 14 +-- internal/nodes/http_request.go | 13 +++ internal/nodes/http_request_error.go | 10 ++ internal/nodes/http_request_limit.go | 32 +++++ internal/nodes/http_writer.go | 24 ++-- internal/nodes/http_writer_rate.go | 90 ++++++++++++++ internal/nodes/node.go | 9 ++ internal/utils/expires/id_key_map.go | 64 ++++++++++ internal/utils/expires/id_key_map_test.go | 46 ++++++++ internal/utils/expires/list.go | 12 +- internal/utils/expires/list_test.go | 64 ++++++++-- 20 files changed, 632 insertions(+), 86 deletions(-) create mode 100644 internal/nodes/client_conn_base.go delete mode 100644 internal/nodes/client_conn_closer.go create mode 100644 internal/nodes/client_conn_interface.go create mode 100644 internal/nodes/client_conn_limiter.go create mode 100644 internal/nodes/client_conn_limiter_test.go create mode 100644 internal/nodes/client_conn_traffic.go create mode 100644 internal/nodes/http_request_limit.go create mode 100644 internal/nodes/http_writer_rate.go create mode 100644 internal/utils/expires/id_key_map.go create mode 100644 internal/utils/expires/id_key_map_test.go diff --git a/cmd/edge-node/main.go b/cmd/edge-node/main.go index 715d240..b92dd9e 100644 --- a/cmd/edge-node/main.go +++ b/cmd/edge-node/main.go @@ -108,6 +108,20 @@ func main() { } } }) + app.On("conns", func() { + var sock = gosock.NewTmpSock(teaconst.ProcessName) + reply, err := sock.Send(&gosock.Command{Code: "conns"}) + if err != nil { + fmt.Println("[ERROR]" + err.Error()) + } else { + resultJSON, err := json.MarshalIndent(reply.Params, "", " ") + if err != nil { + fmt.Println("[ERROR]" + err.Error()) + } else { + fmt.Println(string(resultJSON)) + } + } + }) app.Run(func() { node := nodes.NewNode() node.Start() diff --git a/internal/nodes/client_conn.go b/internal/nodes/client_conn.go index a28c13c..5ce86c9 100644 --- a/internal/nodes/client_conn.go +++ b/internal/nodes/client_conn.go @@ -5,63 +5,32 @@ package nodes import ( "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" teaconst "github.com/TeaOSLab/EdgeNode/internal/const" - "github.com/TeaOSLab/EdgeNode/internal/events" - "github.com/TeaOSLab/EdgeNode/internal/goman" - "github.com/TeaOSLab/EdgeNode/internal/monitor" "github.com/TeaOSLab/EdgeNode/internal/ratelimit" - "github.com/iwind/TeaGo/maps" "net" "sync" "sync/atomic" "time" ) -// 发送监控流量 -func init() { - events.On(events.EventStart, func() { - ticker := time.NewTicker(1 * time.Minute) - goman.New(func() { - for range ticker.C { - // 加入到数据队列中 - if teaconst.InTrafficBytes > 0 { - monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemTrafficIn, maps.Map{ - "total": teaconst.InTrafficBytes, - }) - } - if teaconst.OutTrafficBytes > 0 { - monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemTrafficOut, maps.Map{ - "total": teaconst.OutTrafficBytes, - }) - } - - // 重置数据 - atomic.StoreUint64(&teaconst.InTrafficBytes, 0) - atomic.StoreUint64(&teaconst.OutTrafficBytes, 0) - } - }) - }) -} - // ClientConn 客户端连接 type ClientConn struct { - rawConn net.Conn - isClosed bool + once sync.Once + globalLimiter *ratelimit.Counter - once sync.Once - limiter *ratelimit.Counter + BaseClientConn } -func NewClientConn(conn net.Conn, quickClose bool, limiter *ratelimit.Counter) net.Conn { +func NewClientConn(conn net.Conn, quickClose bool, globalLimiter *ratelimit.Counter) net.Conn { if quickClose { // TCP tcpConn, ok := conn.(*net.TCPConn) if ok { - // TODO 可以设置此值 + // TODO 可以在配置中设置此值 _ = tcpConn.SetLinger(nodeconfigs.DefaultTCPLinger) } } - return &ClientConn{rawConn: conn, limiter: limiter} + return &ClientConn{BaseClientConn: BaseClientConn{rawConn: conn}, globalLimiter: globalLimiter} } func (this *ClientConn) Read(b []byte) (n int, err error) { @@ -82,11 +51,17 @@ func (this *ClientConn) Write(b []byte) (n int, err error) { func (this *ClientConn) Close() error { this.isClosed = true + + // 全局并发数限制 this.once.Do(func() { - if this.limiter != nil { - this.limiter.Release() + if this.globalLimiter != nil { + this.globalLimiter.Release() } }) + + // 单个服务并发数限制 + sharedClientConnLimiter.Remove(this.rawConn.RemoteAddr().String()) + return this.rawConn.Close() } @@ -109,7 +84,3 @@ func (this *ClientConn) SetReadDeadline(t time.Time) error { func (this *ClientConn) SetWriteDeadline(t time.Time) error { return this.rawConn.SetWriteDeadline(t) } - -func (this *ClientConn) IsClosed() bool { - return this.isClosed -} diff --git a/internal/nodes/client_conn_base.go b/internal/nodes/client_conn_base.go new file mode 100644 index 0000000..aad784e --- /dev/null +++ b/internal/nodes/client_conn_base.go @@ -0,0 +1,38 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package nodes + +import "net" + +type BaseClientConn struct { + rawConn net.Conn + + isBound bool + serverId int64 + remoteAddr string + + isClosed bool +} + +func (this *BaseClientConn) IsClosed() bool { + return this.isClosed +} + +// IsBound 是否已绑定服务 +func (this *BaseClientConn) IsBound() bool { + return this.isBound +} + +// Bind 绑定服务 +func (this *BaseClientConn) Bind(serverId int64, remoteAddr string, maxConnsPerServer int, maxConnsPerIP int) bool { + if this.isBound { + return true + } + this.isBound = true + this.serverId = serverId + this.remoteAddr = remoteAddr + + // 检查是否可以连接 + return sharedClientConnLimiter.Add(this.rawConn.RemoteAddr().String(), serverId, remoteAddr, maxConnsPerServer, maxConnsPerIP) +} + diff --git a/internal/nodes/client_conn_closer.go b/internal/nodes/client_conn_closer.go deleted file mode 100644 index 3eaec10..0000000 --- a/internal/nodes/client_conn_closer.go +++ /dev/null @@ -1,7 +0,0 @@ -// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. - -package nodes - -type ClientConnCloser interface { - IsClosed() bool -} diff --git a/internal/nodes/client_conn_interface.go b/internal/nodes/client_conn_interface.go new file mode 100644 index 0000000..34f0b20 --- /dev/null +++ b/internal/nodes/client_conn_interface.go @@ -0,0 +1,14 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package nodes + +type ClientConnInterface interface { + // IsClosed 是否已关闭 + IsClosed() bool + + // IsBound 是否已绑定服务 + IsBound() bool + + // Bind 绑定服务 + Bind(serverId int64, remoteAddr string, maxConnsPerServer int, maxConnsPerIP int) bool +} diff --git a/internal/nodes/client_conn_limiter.go b/internal/nodes/client_conn_limiter.go new file mode 100644 index 0000000..e3dd0c1 --- /dev/null +++ b/internal/nodes/client_conn_limiter.go @@ -0,0 +1,130 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package nodes + +import ( + "github.com/TeaOSLab/EdgeNode/internal/zero" + "sync" +) + +var sharedClientConnLimiter = NewClientConnLimiter() + +// ClientConnRemoteAddr 客户端地址定义 +type ClientConnRemoteAddr struct { + remoteAddr string + serverId int64 +} + +// ClientConnLimiter 客户端连接数限制 +type ClientConnLimiter struct { + remoteAddrMap map[string]*ClientConnRemoteAddr // raw remote addr => remoteAddr + ipConns map[string]map[string]zero.Zero // remoteAddr => { raw remote addr => Zero } + serverConns map[int64]map[string]zero.Zero // serverId => { remoteAddr => Zero } + + locker sync.Mutex +} + +func NewClientConnLimiter() *ClientConnLimiter { + return &ClientConnLimiter{ + remoteAddrMap: map[string]*ClientConnRemoteAddr{}, + ipConns: map[string]map[string]zero.Zero{}, + serverConns: map[int64]map[string]zero.Zero{}, + } +} + +// Add 添加新连接 +// 返回值为true的时候表示允许添加;否则表示不允许添加 +func (this *ClientConnLimiter) Add(rawRemoteAddr string, serverId int64, remoteAddr string, maxConnsPerServer int, maxConnsPerIP int) bool { + if maxConnsPerServer <= 0 || maxConnsPerIP <= 0 || len(remoteAddr) == 0 || serverId <= 0 { + return true + } + + this.locker.Lock() + defer this.locker.Unlock() + + // 检查服务连接数 + var serverMap = this.serverConns[serverId] + if maxConnsPerServer > 0 { + if serverMap == nil { + serverMap = map[string]zero.Zero{} + this.serverConns[serverId] = serverMap + } + + if maxConnsPerServer <= len(serverMap) { + return false + } + } + + // 检查IP连接数 + var ipMap = this.ipConns[remoteAddr] + if maxConnsPerIP > 0 { + if ipMap == nil { + ipMap = map[string]zero.Zero{} + this.ipConns[remoteAddr] = ipMap + } + if maxConnsPerIP > 0 && maxConnsPerIP <= len(ipMap) { + return false + } + } + + this.remoteAddrMap[rawRemoteAddr] = &ClientConnRemoteAddr{ + remoteAddr: remoteAddr, + serverId: serverId, + } + + if maxConnsPerServer > 0 { + serverMap[rawRemoteAddr] = zero.New() + } + if maxConnsPerIP > 0 { + ipMap[rawRemoteAddr] = zero.New() + } + + return true +} + +// Remove 删除连接 +func (this *ClientConnLimiter) Remove(rawRemoteAddr string) { + this.locker.Lock() + defer this.locker.Unlock() + + addr, ok := this.remoteAddrMap[rawRemoteAddr] + if !ok { + return + } + + delete(this.remoteAddrMap, rawRemoteAddr) + delete(this.ipConns[addr.remoteAddr], rawRemoteAddr) + delete(this.serverConns[addr.serverId], rawRemoteAddr) + + if len(this.ipConns[addr.remoteAddr]) == 0 { + delete(this.ipConns, addr.remoteAddr) + } + + if len(this.serverConns[addr.serverId]) == 0 { + delete(this.serverConns, addr.serverId) + } +} + +// Conns 获取连接信息 +// 用于调试 +func (this *ClientConnLimiter) Conns() (ipConns map[string][]string, serverConns map[int64][]string) { + this.locker.Lock() + defer this.locker.Unlock() + + ipConns = map[string][]string{} // ip => [addr1, addr2, ...] + serverConns = map[int64][]string{} // serverId => [addr1, addr2, ...] + + for ip, m := range this.ipConns { + for addr := range m { + ipConns[ip] = append(ipConns[ip], addr) + } + } + + for serverId, m := range this.serverConns { + for addr := range m { + serverConns[serverId] = append(serverConns[serverId], addr) + } + } + + return +} diff --git a/internal/nodes/client_conn_limiter_test.go b/internal/nodes/client_conn_limiter_test.go new file mode 100644 index 0000000..145d7b8 --- /dev/null +++ b/internal/nodes/client_conn_limiter_test.go @@ -0,0 +1,38 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package nodes + +import ( + "github.com/iwind/TeaGo/logs" + "testing" +) + +func TestClientConnLimiter_Add(t *testing.T) { + var limiter = NewClientConnLimiter() + { + b := limiter.Add("127.0.0.1:1234", 1, "192.168.1.100", 10, 5) + t.Log(b) + } + { + b := limiter.Add("127.0.0.1:1235", 1, "192.168.1.100", 10, 5) + t.Log(b) + } + { + b := limiter.Add("127.0.0.1:1236", 1, "192.168.1.100", 10, 5) + t.Log(b) + } + { + b := limiter.Add("127.0.0.1:1237", 1, "192.168.1.101", 10, 5) + t.Log(b) + } + { + b := limiter.Add("127.0.0.1:1238", 1, "192.168.1.100", 5, 5) + t.Log(b) + } + limiter.Remove("127.0.0.1:1238") + limiter.Remove("127.0.0.1:1239") + limiter.Remove("127.0.0.1:1237") + logs.PrintAsJSON(limiter.remoteAddrMap, t) + logs.PrintAsJSON(limiter.ipConns, t) + logs.PrintAsJSON(limiter.serverConns, t) +} diff --git a/internal/nodes/client_conn_traffic.go b/internal/nodes/client_conn_traffic.go new file mode 100644 index 0000000..fca2d73 --- /dev/null +++ b/internal/nodes/client_conn_traffic.go @@ -0,0 +1,40 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package nodes + +import ( + "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" + teaconst "github.com/TeaOSLab/EdgeNode/internal/const" + "github.com/TeaOSLab/EdgeNode/internal/events" + "github.com/TeaOSLab/EdgeNode/internal/goman" + "github.com/TeaOSLab/EdgeNode/internal/monitor" + "github.com/iwind/TeaGo/maps" + "sync/atomic" + "time" +) + +// 发送监控流量 +func init() { + events.On(events.EventStart, func() { + ticker := time.NewTicker(1 * time.Minute) + goman.New(func() { + for range ticker.C { + // 加入到数据队列中 + if teaconst.InTrafficBytes > 0 { + monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemTrafficIn, maps.Map{ + "total": teaconst.InTrafficBytes, + }) + } + if teaconst.OutTrafficBytes > 0 { + monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemTrafficOut, maps.Map{ + "total": teaconst.OutTrafficBytes, + }) + } + + // 重置数据 + atomic.StoreUint64(&teaconst.InTrafficBytes, 0) + atomic.StoreUint64(&teaconst.OutTrafficBytes, 0) + } + }) + }) +} diff --git a/internal/nodes/client_conn_utils.go b/internal/nodes/client_conn_utils.go index 6a5e782..674e7a4 100644 --- a/internal/nodes/client_conn_utils.go +++ b/internal/nodes/client_conn_utils.go @@ -11,7 +11,7 @@ func isClientConnClosed(conn net.Conn) bool { if conn == nil { return true } - clientConn, ok := conn.(ClientConnCloser) + clientConn, ok := conn.(ClientConnInterface) if ok { return clientConn.IsClosed() } diff --git a/internal/nodes/client_tls_conn.go b/internal/nodes/client_tls_conn.go index 1597ee5..f0ee400 100644 --- a/internal/nodes/client_tls_conn.go +++ b/internal/nodes/client_tls_conn.go @@ -8,13 +8,13 @@ import ( "time" ) +// ClientTLSConn TLS连接封装 type ClientTLSConn struct { - rawConn *tls.Conn - isClosed bool + BaseClientConn } func NewClientTLSConn(conn *tls.Conn) net.Conn { - return &ClientTLSConn{rawConn: conn} + return &ClientTLSConn{BaseClientConn{rawConn: conn}} } func (this *ClientTLSConn) Read(b []byte) (n int, err error) { @@ -29,6 +29,10 @@ func (this *ClientTLSConn) Write(b []byte) (n int, err error) { func (this *ClientTLSConn) Close() error { this.isClosed = true + + // 单个服务并发数限制 + sharedClientConnLimiter.Remove(this.rawConn.RemoteAddr().String()) + return this.rawConn.Close() } @@ -51,7 +55,3 @@ func (this *ClientTLSConn) SetReadDeadline(t time.Time) error { func (this *ClientTLSConn) SetWriteDeadline(t time.Time) error { return this.rawConn.SetWriteDeadline(t) } - -func (this *ClientTLSConn) IsClosed() bool { - return this.isClosed -} diff --git a/internal/nodes/http_request.go b/internal/nodes/http_request.go index 2f98395..277e2b9 100644 --- a/internal/nodes/http_request.go +++ b/internal/nodes/http_request.go @@ -207,6 +207,14 @@ func (this *HTTPRequest) Do() { // 开始调用 func (this *HTTPRequest) doBegin() { + // 处理request limit + if this.web.RequestLimit != nil && + this.web.RequestLimit.IsOn { + if this.doRequestLimit() { + return + } + } + // 处理requestBody if this.RawReq.ContentLength > 0 && this.web.AccessLogRef != nil && @@ -441,6 +449,11 @@ func (this *HTTPRequest) configureWeb(web *serverconfigs.HTTPWebConfig, isTop bo this.web.Auth = web.Auth } + // request limit + if web.RequestLimit != nil && (web.RequestLimit.IsPrior || isTop) { + this.web.RequestLimit = web.RequestLimit + } + // 重写规则 if len(web.RewriteRefs) > 0 { for index, ref := range web.RewriteRefs { diff --git a/internal/nodes/http_request_error.go b/internal/nodes/http_request_error.go index 6e27696..753478e 100644 --- a/internal/nodes/http_request_error.go +++ b/internal/nodes/http_request_error.go @@ -15,6 +15,16 @@ func (this *HTTPRequest) write404() { _, _ = this.writer.Write([]byte("404 page not found: '" + this.requestFullURL() + "'" + " (Request Id: " + this.requestId + ")")) } +func (this *HTTPRequest) writeCode(code int) { + if this.doPage(code) { + return + } + + this.processResponseHeaders(code) + this.writer.WriteHeader(code) + _, _ = this.writer.Write([]byte(types.String(code) + " " + http.StatusText(code) + ": '" + this.requestFullURL() + "'" + " (Request Id: " + this.requestId + ")")) +} + func (this *HTTPRequest) write50x(err error, statusCode int) { if err != nil { this.addError(err) diff --git a/internal/nodes/http_request_limit.go b/internal/nodes/http_request_limit.go new file mode 100644 index 0000000..9aeeea8 --- /dev/null +++ b/internal/nodes/http_request_limit.go @@ -0,0 +1,32 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package nodes + +import "net/http" + +func (this *HTTPRequest) doRequestLimit() (shouldStop bool) { + // 检查请求Body尺寸 + // TODO 处理分片提交的内容 + if this.web.RequestLimit.MaxBodyBytes() > 0 && + this.RawReq.ContentLength > this.web.RequestLimit.MaxBodyBytes() { + this.writeCode(http.StatusRequestEntityTooLarge) + return true + } + + // 设置连接相关参数 + if this.web.RequestLimit.MaxConns > 0 || this.web.RequestLimit.MaxConnsPerIP > 0 { + requestConn := this.RawReq.Context().Value(HTTPConnContextKey) + if requestConn != nil { + clientConn, ok := requestConn.(ClientConnInterface) + if ok && !clientConn.IsBound() { + if !clientConn.Bind(this.Server.Id, this.requestRemoteAddr(true), this.web.RequestLimit.MaxConns, this.web.RequestLimit.MaxConnsPerIP) { + this.writeCode(http.StatusTooManyRequests) + this.closeConn() + return true + } + } + } + } + + return false +} diff --git a/internal/nodes/http_writer.go b/internal/nodes/http_writer.go index 617677e..b989212 100644 --- a/internal/nodes/http_writer.go +++ b/internal/nodes/http_writer.go @@ -76,22 +76,6 @@ func NewHTTPWriter(req *HTTPRequest, httpResponseWriter http.ResponseWriter) *HT } } -// Reset 重置 -func (this *HTTPWriter) Reset(httpResponseWriter http.ResponseWriter) { - this.writer = httpResponseWriter - - this.compressionConfig = nil - this.compressionWriter = nil - - this.statusCode = 0 - this.sentBodyBytes = 0 - - this.bodyCopying = false - this.body = nil - this.compressionBodyBuffer = nil - this.compressionBodyWriter = nil -} - // SetCompression 设置内容压缩配置 func (this *HTTPWriter) SetCompression(config *serverconfigs.HTTPCompressionConfig) { this.compressionConfig = config @@ -118,6 +102,14 @@ func (this *HTTPWriter) Prepare(size int64, status int) (delayHeaders bool) { this.PrepareCompression(size) } + // 是否限速写入 + if this.req.web != nil && + this.req.web.RequestLimit != nil && + this.req.web.RequestLimit.IsOn && + this.req.web.RequestLimit.OutBandwidthPerConnBytes() > 0 { + this.writer = NewHTTPRateWriter(this.writer, this.req.web.RequestLimit.OutBandwidthPerConnBytes()) + } + return } diff --git a/internal/nodes/http_writer_rate.go b/internal/nodes/http_writer_rate.go new file mode 100644 index 0000000..3479f68 --- /dev/null +++ b/internal/nodes/http_writer_rate.go @@ -0,0 +1,90 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package nodes + +import ( + "bufio" + "github.com/iwind/TeaGo/types" + "net" + "net/http" + "time" +) + +// HTTPRateWriter 限速写入 +type HTTPRateWriter struct { + parentWriter http.ResponseWriter + + rateBytes int + lastBytes int + timeCost time.Duration +} + +func NewHTTPRateWriter(writer http.ResponseWriter, rateBytes int64) http.ResponseWriter { + return &HTTPRateWriter{ + parentWriter: writer, + rateBytes: types.Int(rateBytes), + } +} + +func (this *HTTPRateWriter) Header() http.Header { + return this.parentWriter.Header() +} + +func (this *HTTPRateWriter) Write(data []byte) (int, error) { + if len(data) == 0 { + return 0, nil + } + + var left = this.rateBytes - this.lastBytes + + if left <= 0 { + if this.timeCost > 0 && this.timeCost < 1*time.Second { + time.Sleep(1*time.Second - this.timeCost) + } + + this.lastBytes = 0 + this.timeCost = 0 + return this.Write(data) + } + + var n = len(data) + + // n <= left + if n <= left { + this.lastBytes += n + + var before = time.Now() + defer func() { + this.timeCost += time.Since(before) + }() + return this.parentWriter.Write(data) + } + + // n > left + var before = time.Now() + result, err := this.parentWriter.Write(data[:left]) + this.timeCost += time.Since(before) + + if err != nil { + return result, err + } + this.lastBytes += left + + return this.Write(data[left:]) +} + +func (this *HTTPRateWriter) WriteHeader(statusCode int) { + this.parentWriter.WriteHeader(statusCode) +} + +// Hijack Hijack +func (this *HTTPRateWriter) Hijack() (conn net.Conn, buf *bufio.ReadWriter, err error) { + if this.parentWriter == nil { + return + } + hijack, ok := this.parentWriter.(http.Hijacker) + if ok { + return hijack.Hijack() + } + return +} diff --git a/internal/nodes/node.go b/internal/nodes/node.go index 01fe7ed..de0bff5 100644 --- a/internal/nodes/node.go +++ b/internal/nodes/node.go @@ -621,6 +621,15 @@ func (this *Node) listenSock() error { "result": result, }, }) + case "conns": + ipConns, serverConns := sharedClientConnLimiter.Conns() + + _ = cmd.Reply(&gosock.Command{ + Params: map[string]interface{}{ + "ipConns": ipConns, + "serverConns": serverConns, + }, + }) } }) diff --git a/internal/utils/expires/id_key_map.go b/internal/utils/expires/id_key_map.go new file mode 100644 index 0000000..6448acd --- /dev/null +++ b/internal/utils/expires/id_key_map.go @@ -0,0 +1,64 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package expires + +import "sync" + +type IdKeyMap struct { + idKeys map[int64]string // id => key + keyIds map[string]int64 // key => id + + locker sync.Mutex +} + +func NewIdKeyMap() *IdKeyMap { + return &IdKeyMap{ + idKeys: map[int64]string{}, + keyIds: map[string]int64{}, + } +} + +func (this *IdKeyMap) Add(id int64, key string) { + oldKey, ok := this.idKeys[id] + if ok { + delete(this.keyIds, oldKey) + } + + oldId, ok := this.keyIds[key] + if ok { + delete(this.idKeys, oldId) + } + + this.idKeys[id] = key + this.keyIds[key] = id +} + +func (this *IdKeyMap) Key(id int64) (key string, ok bool) { + key, ok = this.idKeys[id] + return +} + +func (this *IdKeyMap) Id(key string) (id int64, ok bool) { + id, ok = this.keyIds[key] + return +} + +func (this *IdKeyMap) DeleteId(id int64) { + key, ok := this.idKeys[id] + if ok { + delete(this.keyIds, key) + } + delete(this.idKeys, id) +} + +func (this *IdKeyMap) DeleteKey(key string) { + id, ok := this.keyIds[key] + if ok { + delete(this.idKeys, id) + } + delete(this.keyIds, key) +} + +func (this *IdKeyMap) Len() int { + return len(this.idKeys) +} diff --git a/internal/utils/expires/id_key_map_test.go b/internal/utils/expires/id_key_map_test.go new file mode 100644 index 0000000..4fa6060 --- /dev/null +++ b/internal/utils/expires/id_key_map_test.go @@ -0,0 +1,46 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package expires + +import ( + "github.com/iwind/TeaGo/assert" + "github.com/iwind/TeaGo/logs" + "testing" +) + +func TestNewIdKeyMap(t *testing.T) { + var a = assert.NewAssertion(t) + + var m = NewIdKeyMap() + m.Add(1, "1") + m.Add(1, "2") + m.Add(100, "100") + logs.PrintAsJSON(m.idKeys, t) + logs.PrintAsJSON(m.keyIds, t) + + { + k, ok := m.Key(1) + a.IsTrue(ok) + a.IsTrue(k == "2") + } + + { + _, ok := m.Key(2) + a.IsFalse(ok) + } + + m.DeleteKey("2") + + { + _, ok := m.Key(1) + a.IsFalse(ok) + } + + logs.PrintAsJSON(m.idKeys, t) + logs.PrintAsJSON(m.keyIds, t) + + m.DeleteId(100) + + logs.PrintAsJSON(m.idKeys, t) + logs.PrintAsJSON(m.keyIds, t) +} diff --git a/internal/utils/expires/list.go b/internal/utils/expires/list.go index 5dfea5e..560e751 100644 --- a/internal/utils/expires/list.go +++ b/internal/utils/expires/list.go @@ -27,14 +27,19 @@ func NewList() *List { return list } +// Add 添加条目 +// 如果条目已经存在,则覆盖 func (this *List) Add(itemId int64, expiresAt int64) { this.locker.Lock() defer this.locker.Unlock() // 是否已经存在 - _, ok := this.itemsMap[itemId] + oldExpiresAt, ok := this.itemsMap[itemId] if ok { - this.removeItem(itemId) + if oldExpiresAt == expiresAt { + return + } + delete(this.expireMap, oldExpiresAt) } expireItemMap, ok := this.expireMap[expiresAt] @@ -68,8 +73,9 @@ func (this *List) GC(timestamp int64, callback func(itemId int64)) { } } -func (this *List) OnGC(callback func(itemId int64)) { +func (this *List) OnGC(callback func(itemId int64)) *List { this.gcCallback = callback + return this } func (this *List) removeItem(itemId int64) { diff --git a/internal/utils/expires/list_test.go b/internal/utils/expires/list_test.go index fb96549..e491ec4 100644 --- a/internal/utils/expires/list_test.go +++ b/internal/utils/expires/list_test.go @@ -1,9 +1,11 @@ package expires import ( + "github.com/iwind/TeaGo/assert" "github.com/iwind/TeaGo/logs" timeutil "github.com/iwind/TeaGo/utils/time" "math" + "runtime" "testing" "time" ) @@ -24,12 +26,19 @@ func TestList_Add(t *testing.T) { } func TestList_Add_Overwrite(t *testing.T) { + var timestamp = time.Now().Unix() + list := NewList() - list.Add(1, time.Now().Unix()+1) - list.Add(1, time.Now().Unix()+1) - list.Add(1, time.Now().Unix()+2) + list.Add(1, timestamp+1) + list.Add(1, timestamp+1) + list.Add(1, timestamp+2) logs.PrintAsJSON(list.expireMap, t) logs.PrintAsJSON(list.itemsMap, t) + + var a = assert.NewAssertion(t) + a.IsTrue(len(list.itemsMap) == 1) + a.IsTrue(len(list.expireMap) == 1) + a.IsTrue(list.itemsMap[1] == timestamp+2) } func TestList_Remove(t *testing.T) { @@ -77,7 +86,10 @@ func TestList_Start_GC(t *testing.T) { func TestList_ManyItems(t *testing.T) { list := NewList() - for i := 0; i < 100_000; i++ { + for i := 0; i < 1_000; i++ { + list.Add(int64(i), time.Now().Unix()) + } + for i := 0; i < 1_000; i++ { list.Add(int64(i), time.Now().Unix()+1) } @@ -87,35 +99,69 @@ func TestList_ManyItems(t *testing.T) { count++ }) t.Log("gc", count, "items") - t.Log(time.Since(now).Seconds()*1000, "ms") + t.Log(time.Now().Sub(now)) } func TestList_Map_Performance(t *testing.T) { t.Log("max uint32", math.MaxUint32) + var timestamp = time.Now().Unix() + { m := map[int64]int64{} for i := 0; i < 1_000_000; i++ { - m[int64(i)] = time.Now().Unix() + m[int64(i)] = timestamp } now := time.Now() for i := 0; i < 100_000; i++ { delete(m, int64(i)) } - t.Log(time.Since(now).Seconds()*1000, "ms") + t.Log(time.Now().Sub(now)) + } + + { + m := map[uint64]int64{} + for i := 0; i < 1_000_000; i++ { + m[uint64(i)] = timestamp + } + + now := time.Now() + for i := 0; i < 100_000; i++ { + delete(m, uint64(i)) + } + t.Log(time.Now().Sub(now)) } { m := map[uint32]int64{} for i := 0; i < 1_000_000; i++ { - m[uint32(i)] = time.Now().Unix() + m[uint32(i)] = timestamp } now := time.Now() for i := 0; i < 100_000; i++ { delete(m, uint32(i)) } - t.Log(time.Since(now).Seconds()*1000, "ms") + t.Log(time.Now().Sub(now)) + } +} + +func Benchmark_Map_Uint64(b *testing.B) { + runtime.GOMAXPROCS(1) + var timestamp = uint64(time.Now().Unix()) + + var i uint64 + var count uint64 = 1_000_000 + + m := map[uint64]uint64{} + for i = 0; i < count; i++ { + m[i] = timestamp + } + + for n := 0; n < b.N; n++ { + for i = 0; i < count; i++ { + _ = m[i] + } } }