mirror of
https://gitee.com/dromara/mayfly-go
synced 2025-11-03 16:00:25 +08:00
fix: 连接池修复
This commit is contained in:
@@ -1,8 +1,11 @@
|
||||
package pool
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"mayfly-go/pkg/logx"
|
||||
"runtime"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/sync/singleflight"
|
||||
)
|
||||
@@ -11,12 +14,16 @@ type PoolGroup[T Conn] struct {
|
||||
mu sync.RWMutex
|
||||
poolGroup map[string]Pool[T]
|
||||
createGroup singleflight.Group
|
||||
closingWg sync.WaitGroup
|
||||
closingMu sync.Mutex
|
||||
closingCh chan struct{} // 添加关闭通道
|
||||
}
|
||||
|
||||
func NewPoolGroup[T Conn]() *PoolGroup[T] {
|
||||
return &PoolGroup[T]{
|
||||
poolGroup: make(map[string]Pool[T]),
|
||||
createGroup: singleflight.Group{},
|
||||
closingCh: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -86,28 +93,92 @@ func (pg *PoolGroup[T]) Get(key string) (Pool[T], bool) {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// 添加一个异步关闭的辅助函数
|
||||
func (pg *PoolGroup[T]) asyncClose(pool Pool[T], key string) {
|
||||
pg.closingMu.Lock()
|
||||
pg.closingWg.Add(1)
|
||||
pg.closingMu.Unlock()
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
pg.closingMu.Lock()
|
||||
pg.closingWg.Done()
|
||||
pg.closingMu.Unlock()
|
||||
}()
|
||||
|
||||
// 设置超时检测
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
pool.Close()
|
||||
close(done)
|
||||
}()
|
||||
|
||||
// 等待关闭完成或超时
|
||||
select {
|
||||
case <-done:
|
||||
logx.Infof("pool group - pool closed successfully, key: %s", key)
|
||||
case <-time.After(5 * time.Second):
|
||||
logx.Errorf("pool group - pool close timeout, possible deadlock detected, key: %s", key)
|
||||
// 打印当前 goroutine 的堆栈信息
|
||||
buf := make([]byte, 1<<16)
|
||||
runtime.Stack(buf, true)
|
||||
logx.Errorf("pool group - goroutine stack trace:\n%s", buf)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (pg *PoolGroup[T]) Close(key string) error {
|
||||
pg.mu.Lock()
|
||||
defer pg.mu.Unlock()
|
||||
|
||||
if p, ok := pg.poolGroup[key]; ok {
|
||||
logx.Infof("pool group - close pool, key: %s", key)
|
||||
p.Close()
|
||||
logx.Infof("pool group - closing pool, key: %s", key)
|
||||
pg.createGroup.Forget(key)
|
||||
delete(pg.poolGroup, key)
|
||||
pg.mu.Unlock()
|
||||
pg.asyncClose(p, key)
|
||||
return nil
|
||||
}
|
||||
pg.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pg *PoolGroup[T]) CloseAll() {
|
||||
pg.mu.Lock()
|
||||
defer pg.mu.Unlock()
|
||||
|
||||
for key := range pg.poolGroup {
|
||||
pg.poolGroup[key].Close()
|
||||
pg.createGroup.Forget(key)
|
||||
pools := make(map[string]Pool[T], len(pg.poolGroup))
|
||||
for k, v := range pg.poolGroup {
|
||||
pools[k] = v
|
||||
}
|
||||
pg.poolGroup = make(map[string]Pool[T])
|
||||
pg.mu.Unlock()
|
||||
|
||||
// 异步关闭所有池
|
||||
for key, pool := range pools {
|
||||
pg.asyncClose(pool, key)
|
||||
}
|
||||
}
|
||||
|
||||
// 添加一个用于监控连接池关闭状态的方法
|
||||
func (pg *PoolGroup[T]) WaitForClose(timeout time.Duration) error {
|
||||
// 创建一个新的通道用于通知等待完成
|
||||
done := make(chan struct{})
|
||||
|
||||
// 启动一个 goroutine 来等待所有关闭操作完成
|
||||
go func() {
|
||||
pg.closingWg.Wait()
|
||||
close(done)
|
||||
}()
|
||||
|
||||
// 等待完成或超时
|
||||
select {
|
||||
case <-done:
|
||||
return nil
|
||||
case <-time.After(timeout):
|
||||
// 在超时时打印当前状态
|
||||
pg.mu.RLock()
|
||||
remainingPools := len(pg.poolGroup)
|
||||
pg.mu.RUnlock()
|
||||
logx.Errorf("pool group - close timeout, remaining pools: %d", remainingPools)
|
||||
return fmt.Errorf("wait for pool group close timeout after %v", timeout)
|
||||
}
|
||||
}
|
||||
|
||||
func (pg *PoolGroup[T]) AllPool() map[string]Pool[T] {
|
||||
|
||||
@@ -183,11 +183,11 @@ func TestCachePool_Basic(t *testing.T) {
|
||||
|
||||
ctx := context.Background()
|
||||
conn1, _ := pool.Get(ctx)
|
||||
_ = pool.Put(conn1)
|
||||
conn2, _ := pool.Get(ctx)
|
||||
if conn1 != conn2 {
|
||||
t.Fatal("缓存池应复用同一连接")
|
||||
}
|
||||
_ = pool.Put(conn1)
|
||||
_ = pool.Put(conn2)
|
||||
pool.Close()
|
||||
}
|
||||
@@ -564,6 +564,12 @@ func TestPoolGroup_ConcurrentAccess(t *testing.T) {
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// 等待所有池关闭完成
|
||||
err := group.WaitForClose(10 * time.Second)
|
||||
if err != nil {
|
||||
t.Errorf("等待池关闭超时: %v", err)
|
||||
}
|
||||
|
||||
// 验证所有池都已关闭
|
||||
pools = group.AllPool()
|
||||
if len(pools) != 0 {
|
||||
@@ -597,6 +603,12 @@ func TestPoolGroup_ConcurrentClose(t *testing.T) {
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// 等待所有池关闭完成
|
||||
err := group.WaitForClose(10 * time.Second)
|
||||
if err != nil {
|
||||
t.Errorf("等待池关闭超时: %v", err)
|
||||
}
|
||||
|
||||
// 验证所有池都已关闭
|
||||
pools := group.AllPool()
|
||||
if len(pools) != 0 {
|
||||
|
||||
Reference in New Issue
Block a user