diff --git a/internal/nodes/client_conn.go b/internal/nodes/client_conn.go index 2e2215f..c971478 100644 --- a/internal/nodes/client_conn.go +++ b/internal/nodes/client_conn.go @@ -39,7 +39,7 @@ func NewClientConn(conn net.Conn, isTLS bool, quickClose bool, globalLimiter *ra func (this *ClientConn) Read(b []byte) (n int, err error) { if this.isTLS { if !this.hasRead { - _ = this.rawConn.SetReadDeadline(time.Now().Add(5 * time.Second)) // TODO 握手超时时间可以设置 + _ = this.rawConn.SetReadDeadline(time.Now().Add(time.Duration(nodeconfigs.DefaultTLSHandshakeTimeout) * time.Second)) // TODO 握手超时时间可以设置 this.hasRead = true defer func() { _ = this.rawConn.SetReadDeadline(time.Time{}) @@ -48,7 +48,6 @@ func (this *ClientConn) Read(b []byte) (n int, err error) { } n, err = this.rawConn.Read(b) - if n > 0 { atomic.AddUint64(&teaconst.InTrafficBytes, uint64(n)) } @@ -66,6 +65,8 @@ func (this *ClientConn) Write(b []byte) (n int, err error) { func (this *ClientConn) Close() error { this.isClosed = true + err := this.rawConn.Close() + // 全局并发数限制 this.once.Do(func() { if this.globalLimiter != nil { @@ -76,7 +77,7 @@ func (this *ClientConn) Close() error { // 单个服务并发数限制 sharedClientConnLimiter.Remove(this.rawConn.RemoteAddr().String()) - return this.rawConn.Close() + return err } func (this *ClientConn) LocalAddr() net.Addr { diff --git a/internal/nodes/client_listener.go b/internal/nodes/client_listener.go index b11017e..1368a30 100644 --- a/internal/nodes/client_listener.go +++ b/internal/nodes/client_listener.go @@ -20,7 +20,7 @@ type ClientListener struct { quickClose bool } -func NewClientListener1(listener net.Listener, quickClose bool) *ClientListener { +func NewClientListener(listener net.Listener, quickClose bool) *ClientListener { return &ClientListener{ rawListener: listener, quickClose: quickClose, diff --git a/internal/nodes/listener.go b/internal/nodes/listener.go index b74dc81..d3904cd 100644 --- a/internal/nodes/listener.go +++ b/internal/nodes/listener.go @@ -60,7 +60,7 @@ func (this *Listener) listenTCP() error { if err != nil { return err } - var netListener = NewClientListener1(tcpListener, protocol.IsHTTPFamily() || protocol.IsHTTPSFamily()) + var netListener = NewClientListener(tcpListener, protocol.IsHTTPFamily() || protocol.IsHTTPSFamily()) events.On(events.EventQuit, func() { remotelogs.Println("LISTENER", "quit "+this.group.FullAddr()) _ = netListener.Close() diff --git a/internal/ttlcache/cache.go b/internal/ttlcache/cache.go index 29c3429..d36d83f 100644 --- a/internal/ttlcache/cache.go +++ b/internal/ttlcache/cache.go @@ -21,12 +21,18 @@ type Cache struct { } func NewCache(opt ...OptionInterface) *Cache { - countPieces := 128 - maxItems := 2_000_000 + var countPieces = 128 + var maxItems = 2_000_000 - var delta = systemMemoryGB() / 8 - if delta > 0 { - maxItems *= delta + var totalMemory = systemMemoryGB() + if totalMemory < 2 { + // 我们限制内存过小的服务能够使用的数量 + maxItems = 1_000_000 + } else { + var delta = totalMemory / 8 + if delta > 0 { + maxItems *= delta + } } for _, option := range opt { diff --git a/internal/ttlcache/cache_test.go b/internal/ttlcache/cache_test.go index 7bfcf63..9e5c1dc 100644 --- a/internal/ttlcache/cache_test.go +++ b/internal/ttlcache/cache_test.go @@ -29,12 +29,19 @@ func TestNewCache(t *testing.T) { } func TestCache_Memory(t *testing.T) { + var stat1 = &runtime.MemStats{} + runtime.ReadMemStats(stat1) + cache := NewCache() - for i := 0; i < 20_000_000; i++ { + for i := 0; i < 10_000_000; i++ { cache.Write("a"+strconv.Itoa(i), 1, time.Now().Unix()+3600) } t.Log("waiting ...") - time.Sleep(10 * time.Second) + + var stat2 = &runtime.MemStats{} + runtime.ReadMemStats(stat2) + + t.Log((stat2.HeapInuse-stat1.HeapInuse)/1024/1024, "MB") } func BenchmarkCache_Add(b *testing.B) { diff --git a/internal/ttlcache/piece.go b/internal/ttlcache/piece.go index a6904c0..6370c9f 100644 --- a/internal/ttlcache/piece.go +++ b/internal/ttlcache/piece.go @@ -20,8 +20,14 @@ func NewPiece(maxItems int) *Piece { func (this *Piece) Add(key uint64, item *Item) (ok bool) { this.locker.Lock() if len(this.m) >= this.maxItems { - this.locker.Unlock() - return + // 尝试先删除过期的 + this.gcWithoutLocker() + + // 仍然是满的就跳过 + if len(this.m) >= this.maxItems { + this.locker.Unlock() + return + } } this.m[key] = item this.locker.Unlock() @@ -74,12 +80,7 @@ func (this *Piece) Count() (count int) { func (this *Piece) GC() { this.locker.Lock() - timestamp := time.Now().Unix() - for k, item := range this.m { - if item.expiredAt <= timestamp { - delete(this.m, k) - } - } + this.gcWithoutLocker() this.locker.Unlock() } @@ -94,3 +95,13 @@ func (this *Piece) Destroy() { this.m = nil this.locker.Unlock() } + +// 不加锁的gc +func (this *Piece) gcWithoutLocker() { + timestamp := time.Now().Unix() + for k, item := range this.m { + if item.expiredAt <= timestamp { + delete(this.m, k) + } + } +}