mirror of
https://gitee.com/dromara/mayfly-go
synced 2025-11-03 16:00:25 +08:00
reafctor: pool
This commit is contained in:
209
server/pkg/pool/cache_pool.go
Normal file
209
server/pkg/pool/cache_pool.go
Normal file
@@ -0,0 +1,209 @@
|
||||
package pool
|
||||
|
||||
import (
|
||||
"context"
|
||||
"mayfly-go/pkg/logx"
|
||||
"mayfly-go/pkg/utils/stringx"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
var CachePoolDefaultConfig = PoolConfig{
|
||||
MaxConns: 1,
|
||||
IdleTimeout: 60 * time.Minute,
|
||||
WaitTimeout: 10 * time.Second,
|
||||
HealthCheckInterval: 10 * time.Minute,
|
||||
}
|
||||
|
||||
type cacheEntry[T Conn] struct {
|
||||
conn T
|
||||
lastActive time.Time
|
||||
}
|
||||
|
||||
type CachePool[T Conn] struct {
|
||||
factory func() (T, error)
|
||||
mu sync.RWMutex
|
||||
cache map[string]*cacheEntry[T] // 使用字符串键的缓存
|
||||
config PoolConfig
|
||||
closeCh chan struct{}
|
||||
closed bool
|
||||
}
|
||||
|
||||
func NewCachePool[T Conn](factory func() (T, error), opts ...Option) *CachePool[T] {
|
||||
config := CachePoolDefaultConfig
|
||||
for _, opt := range opts {
|
||||
opt(&config)
|
||||
}
|
||||
|
||||
p := &CachePool[T]{
|
||||
factory: factory,
|
||||
cache: make(map[string]*cacheEntry[T]),
|
||||
config: config,
|
||||
closeCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
go p.backgroundMaintenance()
|
||||
return p
|
||||
}
|
||||
|
||||
// Get 获取连接(自动创建或复用缓存连接)
|
||||
func (p *CachePool[T]) Get(ctx context.Context) (T, error) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
var zero T
|
||||
|
||||
if p.closed {
|
||||
return zero, ErrPoolClosed
|
||||
}
|
||||
|
||||
// 1. 尝试从缓存中获取可用连接
|
||||
for key, entry := range p.cache {
|
||||
if time.Since(entry.lastActive) <= p.config.IdleTimeout {
|
||||
entry.lastActive = time.Now() // 更新活跃时间
|
||||
return entry.conn, nil
|
||||
}
|
||||
// 自动清理闲置超时的连接
|
||||
entry.conn.Close()
|
||||
delete(p.cache, key)
|
||||
}
|
||||
|
||||
// 2. 创建新连接并缓存
|
||||
conn, err := p.factory()
|
||||
if err != nil {
|
||||
return zero, err
|
||||
}
|
||||
|
||||
p.cache[generateCacheKey()] = &cacheEntry[T]{
|
||||
conn: conn,
|
||||
lastActive: time.Now(),
|
||||
}
|
||||
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
// Put 将连接放回缓存
|
||||
func (p *CachePool[T]) Put(conn T) error {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
if p.closed {
|
||||
return conn.Close()
|
||||
}
|
||||
|
||||
p.cache[generateCacheKey()] = &cacheEntry[T]{
|
||||
conn: conn,
|
||||
lastActive: time.Now(),
|
||||
}
|
||||
|
||||
// 如果超出最大连接数,清理最久未使用的
|
||||
if len(p.cache) > p.config.MaxConns {
|
||||
p.removeOldest()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// 移除最久未使用的连接
|
||||
func (p *CachePool[T]) removeOldest() {
|
||||
var oldestKey string
|
||||
var oldestTime time.Time
|
||||
|
||||
for key, entry := range p.cache {
|
||||
if oldestKey == "" || entry.lastActive.Before(oldestTime) {
|
||||
oldestKey = key
|
||||
oldestTime = entry.lastActive
|
||||
}
|
||||
}
|
||||
|
||||
if oldestKey != "" {
|
||||
p.cache[oldestKey].conn.Close()
|
||||
delete(p.cache, oldestKey)
|
||||
}
|
||||
}
|
||||
|
||||
// Close 关闭连接池
|
||||
func (p *CachePool[T]) Close() {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
if p.closed {
|
||||
return
|
||||
}
|
||||
|
||||
p.closed = true
|
||||
close(p.closeCh)
|
||||
|
||||
for _, entry := range p.cache {
|
||||
if err := entry.conn.Close(); err != nil {
|
||||
logx.Errorf("cache pool - error closing connection: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// 触发关闭回调
|
||||
if p.config.OnPoolClose != nil {
|
||||
p.config.OnPoolClose()
|
||||
}
|
||||
|
||||
p.cache = make(map[string]*cacheEntry[T])
|
||||
}
|
||||
|
||||
// Resize 动态调整大小
|
||||
func (p *CachePool[T]) Resize(newSize int) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
if p.closed || newSize == p.config.MaxConns {
|
||||
return
|
||||
}
|
||||
|
||||
p.config.MaxConns = newSize
|
||||
|
||||
// 如果新大小小于当前缓存数量,清理多余的连接
|
||||
for len(p.cache) > newSize {
|
||||
p.removeOldest()
|
||||
}
|
||||
}
|
||||
|
||||
// Stats 获取统计信息
|
||||
func (p *CachePool[T]) Stats() PoolStats {
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
|
||||
return PoolStats{
|
||||
TotalConns: int32(len(p.cache)),
|
||||
}
|
||||
}
|
||||
|
||||
// 后台维护协程
|
||||
func (p *CachePool[T]) backgroundMaintenance() {
|
||||
ticker := time.NewTicker(p.config.HealthCheckInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
p.cleanupIdle()
|
||||
case <-p.closeCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 清理闲置超时的连接
|
||||
func (p *CachePool[T]) cleanupIdle() {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
cutoff := time.Now().Add(-p.config.IdleTimeout)
|
||||
for key, entry := range p.cache {
|
||||
if entry.lastActive.Before(cutoff) {
|
||||
entry.conn.Close()
|
||||
delete(p.cache, key)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 生成缓存键
|
||||
func generateCacheKey() string {
|
||||
return stringx.RandUUID()
|
||||
}
|
||||
366
server/pkg/pool/chan_pool.go
Normal file
366
server/pkg/pool/chan_pool.go
Normal file
@@ -0,0 +1,366 @@
|
||||
package pool
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"mayfly-go/pkg/logx"
|
||||
"mayfly-go/pkg/utils/anyx"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
var ChanPoolDefaultConfig = PoolConfig{
|
||||
MaxConns: 5,
|
||||
IdleTimeout: 60 * time.Minute,
|
||||
WaitTimeout: 10 * time.Second,
|
||||
HealthCheckInterval: 10 * time.Minute,
|
||||
}
|
||||
|
||||
// ConnWrapper 封装连接及其元数据
|
||||
type ConnWrapper[T Conn] struct {
|
||||
conn T
|
||||
lastActive time.Time // 最后活跃时间
|
||||
isValid bool // 连接是否有效
|
||||
}
|
||||
|
||||
func (w *ConnWrapper[T]) Ping() error {
|
||||
if !w.isValid {
|
||||
return errors.New("connection marked invalid")
|
||||
}
|
||||
return w.conn.Ping()
|
||||
}
|
||||
|
||||
func (w *ConnWrapper[T]) Close() error {
|
||||
w.isValid = false
|
||||
return w.conn.Close()
|
||||
}
|
||||
|
||||
// ChanPool 连接池结构
|
||||
type ChanPool[T Conn] struct {
|
||||
mu sync.RWMutex
|
||||
factory func() (T, error)
|
||||
idleConns chan *ConnWrapper[T]
|
||||
config PoolConfig
|
||||
currentConns int32
|
||||
stats PoolStats
|
||||
closeChan chan struct{} // 用于关闭健康检查 goroutine
|
||||
closed bool // 关闭状态标识
|
||||
}
|
||||
|
||||
// PoolStats 统计信息
|
||||
type PoolStats struct {
|
||||
TotalConns int32 // 总连接数
|
||||
IdleConns int32 // 空闲连接数
|
||||
ActiveConns int32 // 活跃连接数
|
||||
WaitCount int64 // 等待连接次数
|
||||
}
|
||||
|
||||
func NewChannelPool[T Conn](factory func() (T, error), opts ...Option) *ChanPool[T] {
|
||||
// 1. 初始化配置(使用默认值 + Option 覆盖)
|
||||
config := ChanPoolDefaultConfig
|
||||
for _, opt := range opts {
|
||||
opt(&config)
|
||||
}
|
||||
|
||||
// 2. 创建连接池
|
||||
p := &ChanPool[T]{
|
||||
factory: factory,
|
||||
idleConns: make(chan *ConnWrapper[T], config.MaxConns),
|
||||
config: config,
|
||||
closeChan: make(chan struct{}),
|
||||
}
|
||||
|
||||
// 3. 启动健康检查
|
||||
go p.healthCheck()
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *ChanPool[T]) Get(ctx context.Context) (T, error) {
|
||||
connChan := make(chan T, 1)
|
||||
errChan := make(chan error, 1)
|
||||
|
||||
go func() {
|
||||
conn, err := p.get()
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
} else {
|
||||
connChan <- conn
|
||||
}
|
||||
}()
|
||||
|
||||
var zero T
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return zero, ctx.Err()
|
||||
case err := <-errChan:
|
||||
return zero, err
|
||||
case conn := <-connChan:
|
||||
// 启动监控协程
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
// 上下文被取消后,将连接放回连接池
|
||||
if err := p.Put(conn); err != nil {
|
||||
logx.Errorf("Failed to return leaked connection: %v", err)
|
||||
conn.Close()
|
||||
atomic.AddInt32(&p.currentConns, -1)
|
||||
}
|
||||
}()
|
||||
return conn, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (p *ChanPool[T]) get() (T, error) {
|
||||
// 优先从 channel 获取空闲连接(无锁)
|
||||
select {
|
||||
case wrapper := <-p.idleConns:
|
||||
atomic.AddInt32(&p.stats.IdleConns, -1)
|
||||
atomic.AddInt32(&p.stats.ActiveConns, 1)
|
||||
wrapper.lastActive = time.Now()
|
||||
return wrapper.conn, nil
|
||||
default:
|
||||
return p.createConn()
|
||||
}
|
||||
}
|
||||
|
||||
func (p *ChanPool[T]) createConn() (T, error) {
|
||||
var zero T
|
||||
|
||||
// 使用CAS保证原子性
|
||||
for {
|
||||
current := atomic.LoadInt32(&p.currentConns)
|
||||
if current >= int32(p.config.MaxConns) {
|
||||
if p.config.WaitTimeout > 0 {
|
||||
return p.waitForConn()
|
||||
}
|
||||
return zero, errors.New("connection pool exhausted")
|
||||
}
|
||||
|
||||
if atomic.CompareAndSwapInt32(&p.currentConns, current, current+1) {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// 直接创建新连接
|
||||
conn, err := p.factory()
|
||||
if err != nil {
|
||||
atomic.AddInt32(&p.currentConns, -1)
|
||||
return zero, err
|
||||
}
|
||||
|
||||
// 更新状态
|
||||
atomic.AddInt32(&p.stats.ActiveConns, 1)
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
// 新增等待连接方法
|
||||
func (p *ChanPool[T]) waitForConn() (T, error) {
|
||||
var zero T
|
||||
timeout := time.NewTimer(p.config.WaitTimeout)
|
||||
defer timeout.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case wrapper := <-p.idleConns:
|
||||
if wrapper.isValid && wrapper.Ping() == nil {
|
||||
atomic.AddInt32(&p.stats.IdleConns, -1)
|
||||
atomic.AddInt32(&p.stats.ActiveConns, 1)
|
||||
wrapper.lastActive = time.Now()
|
||||
return wrapper.conn, nil
|
||||
}
|
||||
wrapper.Close()
|
||||
atomic.AddInt32(&p.currentConns, -1)
|
||||
case <-timeout.C:
|
||||
atomic.AddInt64(&p.stats.WaitCount, 1)
|
||||
return zero, errors.New("connection pool wait timeout")
|
||||
default:
|
||||
// 非阻塞检查后短暂休眠避免CPU空转
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *ChanPool[T]) Put(conn T) error {
|
||||
if anyx.IsBlank(conn) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 快速路径
|
||||
select {
|
||||
case p.idleConns <- &ConnWrapper[T]{conn: conn, lastActive: time.Now(), isValid: true}:
|
||||
atomic.AddInt32(&p.stats.IdleConns, 1)
|
||||
atomic.AddInt32(&p.stats.ActiveConns, -1)
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
|
||||
// 慢速路径
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
// 检查是否超过最大连接数
|
||||
if atomic.LoadInt32(&p.currentConns) > int32(p.config.MaxConns) {
|
||||
conn.Close()
|
||||
atomic.AddInt32(&p.currentConns, -1)
|
||||
} else {
|
||||
// 直接放入空闲队列
|
||||
select {
|
||||
case p.idleConns <- &ConnWrapper[T]{conn: conn, lastActive: time.Now(), isValid: true}:
|
||||
default:
|
||||
conn.Close()
|
||||
atomic.AddInt32(&p.currentConns, -1)
|
||||
}
|
||||
}
|
||||
atomic.AddInt32(&p.stats.ActiveConns, -1)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *ChanPool[T]) Close() {
|
||||
p.mu.Lock()
|
||||
if p.closed {
|
||||
p.mu.Unlock()
|
||||
return
|
||||
}
|
||||
p.closed = true
|
||||
|
||||
// 1. 停止健康检查
|
||||
close(p.closeChan)
|
||||
|
||||
// 2. 临时转移空闲连接
|
||||
idle := make([]*ConnWrapper[T], 0, len(p.idleConns))
|
||||
for len(p.idleConns) > 0 {
|
||||
idle = append(idle, <-p.idleConns)
|
||||
}
|
||||
close(p.idleConns) // 安全关闭通道
|
||||
|
||||
p.mu.Unlock() // 提前释放锁,避免阻塞其他操作
|
||||
|
||||
// 3. 关闭所有连接(无需持有锁)
|
||||
for _, wrapper := range idle {
|
||||
wrapper.Close()
|
||||
}
|
||||
|
||||
// 4. 触发关闭回调
|
||||
if p.config.OnPoolClose != nil {
|
||||
p.config.OnPoolClose()
|
||||
}
|
||||
}
|
||||
|
||||
func (p *ChanPool[T]) healthCheck() {
|
||||
ticker := time.NewTicker(p.config.HealthCheckInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
p.checkIdleConns()
|
||||
case <-p.closeChan:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *ChanPool[T]) checkIdleConns() {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
if p.closed {
|
||||
return
|
||||
}
|
||||
|
||||
idle := make([]*ConnWrapper[T], 0, len(p.idleConns))
|
||||
for len(p.idleConns) > 0 {
|
||||
idle = append(idle, <-p.idleConns)
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
for _, wrapper := range idle {
|
||||
if now.Sub(wrapper.lastActive) > p.config.IdleTimeout || wrapper.Ping() != nil {
|
||||
wrapper.Close()
|
||||
atomic.AddInt32(&p.currentConns, -1)
|
||||
} else {
|
||||
select {
|
||||
case p.idleConns <- wrapper:
|
||||
default:
|
||||
wrapper.Close()
|
||||
atomic.AddInt32(&p.currentConns, -1)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *ChanPool[T]) Resize(newMaxConns int) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
oldMax := p.config.MaxConns
|
||||
p.config.MaxConns = newMaxConns
|
||||
|
||||
// 缩小连接池:关闭多余的空闲连接
|
||||
if newMaxConns < oldMax {
|
||||
toClose := oldMax - newMaxConns
|
||||
closed := 0
|
||||
|
||||
// 非阻塞取出待关闭的连接
|
||||
var wrappers []*ConnWrapper[T]
|
||||
for len(p.idleConns) > 0 && closed < toClose {
|
||||
wrappers = append(wrappers, <-p.idleConns)
|
||||
closed++
|
||||
}
|
||||
|
||||
// 关闭连接并更新计数
|
||||
for _, wrapper := range wrappers {
|
||||
wrapper.Close()
|
||||
atomic.AddInt32(&p.currentConns, -1)
|
||||
atomic.AddInt32(&p.stats.IdleConns, -1)
|
||||
}
|
||||
}
|
||||
|
||||
// 重建空闲连接通道(无需迁移连接,因 channel 本身无状态)
|
||||
p.idleConns = make(chan *ConnWrapper[T], newMaxConns)
|
||||
}
|
||||
|
||||
func (p *ChanPool[T]) CheckLeaks() []T {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
var leaks []T
|
||||
now := time.Now()
|
||||
|
||||
// 检查所有空闲连接
|
||||
idle := make([]*ConnWrapper[T], 0, len(p.idleConns))
|
||||
for len(p.idleConns) > 0 {
|
||||
idle = append(idle, <-p.idleConns)
|
||||
}
|
||||
|
||||
for _, wrapper := range idle {
|
||||
// 判定泄漏条件:长期未使用且未被标记为活跃
|
||||
if now.Sub(wrapper.lastActive) > 10*p.config.IdleTimeout {
|
||||
leaks = append(leaks, wrapper.conn)
|
||||
wrapper.Close()
|
||||
atomic.AddInt32(&p.currentConns, -1)
|
||||
atomic.AddInt32(&p.stats.IdleConns, -1)
|
||||
} else {
|
||||
// 放回空闲池
|
||||
select {
|
||||
case p.idleConns <- wrapper:
|
||||
default:
|
||||
wrapper.Close()
|
||||
atomic.AddInt32(&p.currentConns, -1)
|
||||
}
|
||||
}
|
||||
}
|
||||
return leaks
|
||||
}
|
||||
|
||||
func (p *ChanPool[T]) Stats() PoolStats {
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
return PoolStats{
|
||||
TotalConns: atomic.LoadInt32(&p.currentConns),
|
||||
IdleConns: int32(len(p.idleConns)), // 直接读取通道长度
|
||||
ActiveConns: atomic.LoadInt32(&p.stats.ActiveConns),
|
||||
WaitCount: atomic.LoadInt64(&p.stats.WaitCount),
|
||||
}
|
||||
}
|
||||
@@ -1,216 +0,0 @@
|
||||
package pool
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"mayfly-go/pkg/logx"
|
||||
"sync"
|
||||
"time"
|
||||
//"reflect"
|
||||
)
|
||||
|
||||
var (
|
||||
//ErrMaxActiveConnReached 连接池超限
|
||||
ErrMaxActiveConnReached = errors.New("MaxActiveConnReached")
|
||||
)
|
||||
|
||||
// Config 连接池相关配置
|
||||
type Config struct {
|
||||
//连接池中拥有的最小连接数
|
||||
InitialCap int
|
||||
//最大并发存活连接数
|
||||
MaxCap int
|
||||
//最大空闲连接
|
||||
MaxIdle int
|
||||
//生成连接的方法
|
||||
Factory func() (interface{}, error)
|
||||
//关闭连接的方法
|
||||
Close func(interface{}) error
|
||||
//检查连接是否有效的方法
|
||||
Ping func(interface{}) error
|
||||
//连接最大空闲时间,超过该事件则将失效
|
||||
IdleTimeout time.Duration
|
||||
}
|
||||
|
||||
// channelPool 存放连接信息
|
||||
type channelPool struct {
|
||||
mu sync.RWMutex
|
||||
conns chan *idleConn
|
||||
factory func() (interface{}, error)
|
||||
close func(interface{}) error
|
||||
ping func(interface{}) error
|
||||
idleTimeout, waitTimeOut time.Duration
|
||||
maxActive int
|
||||
openingConns int
|
||||
}
|
||||
|
||||
type idleConn struct {
|
||||
conn interface{}
|
||||
t time.Time
|
||||
}
|
||||
|
||||
// NewChannelPool 初始化连接
|
||||
func NewChannelPool(poolConfig *Config) (Pool, error) {
|
||||
if !(poolConfig.InitialCap <= poolConfig.MaxIdle && poolConfig.MaxCap >= poolConfig.MaxIdle && poolConfig.InitialCap >= 0) {
|
||||
return nil, errors.New("invalid capacity settings")
|
||||
}
|
||||
if poolConfig.Factory == nil {
|
||||
return nil, errors.New("invalid factory func settings")
|
||||
}
|
||||
if poolConfig.Close == nil {
|
||||
return nil, errors.New("invalid close func settings")
|
||||
}
|
||||
|
||||
c := &channelPool{
|
||||
conns: make(chan *idleConn, poolConfig.MaxIdle),
|
||||
factory: poolConfig.Factory,
|
||||
close: poolConfig.Close,
|
||||
idleTimeout: poolConfig.IdleTimeout,
|
||||
maxActive: poolConfig.MaxCap,
|
||||
openingConns: poolConfig.InitialCap,
|
||||
}
|
||||
|
||||
if poolConfig.Ping != nil {
|
||||
c.ping = poolConfig.Ping
|
||||
}
|
||||
|
||||
for i := 0; i < poolConfig.InitialCap; i++ {
|
||||
conn, err := c.factory()
|
||||
if err != nil {
|
||||
c.Release()
|
||||
return nil, fmt.Errorf("factory is not able to fill the pool: %s", err)
|
||||
}
|
||||
c.conns <- &idleConn{conn: conn, t: time.Now()}
|
||||
}
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// getConns 获取所有连接
|
||||
func (c *channelPool) getConns() chan *idleConn {
|
||||
c.mu.Lock()
|
||||
conns := c.conns
|
||||
c.mu.Unlock()
|
||||
return conns
|
||||
}
|
||||
|
||||
// Get 从pool中取一个连接
|
||||
func (c *channelPool) Get() (interface{}, error) {
|
||||
conns := c.getConns()
|
||||
if conns == nil {
|
||||
return nil, ErrClosed
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case wrapConn := <-conns:
|
||||
if wrapConn == nil {
|
||||
return nil, ErrClosed
|
||||
}
|
||||
//判断是否超时,超时则丢弃
|
||||
if timeout := c.idleTimeout; timeout > 0 {
|
||||
if wrapConn.t.Add(timeout).Before(time.Now()) {
|
||||
//丢弃并关闭该连接
|
||||
c.Close(wrapConn.conn)
|
||||
continue
|
||||
}
|
||||
}
|
||||
//判断是否失效,失效则丢弃,如果用户没有设定 ping 方法,就不检查
|
||||
if c.ping != nil {
|
||||
if err := c.Ping(wrapConn.conn); err != nil {
|
||||
c.Close(wrapConn.conn)
|
||||
continue
|
||||
}
|
||||
}
|
||||
return wrapConn.conn, nil
|
||||
default:
|
||||
c.mu.Lock()
|
||||
logx.Debugf("openConn %v %v", c.openingConns, c.maxActive)
|
||||
defer c.mu.Unlock()
|
||||
if c.openingConns >= c.maxActive {
|
||||
return nil, ErrMaxActiveConnReached
|
||||
}
|
||||
if c.factory == nil {
|
||||
return nil, ErrClosed
|
||||
}
|
||||
conn, err := c.factory()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.openingConns++
|
||||
return conn, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Put 将连接放回pool中
|
||||
func (c *channelPool) Put(conn interface{}) error {
|
||||
if conn == nil {
|
||||
return errors.New("connection is nil. rejecting")
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
|
||||
if c.conns == nil {
|
||||
c.mu.Unlock()
|
||||
return c.Close(conn)
|
||||
}
|
||||
|
||||
select {
|
||||
case c.conns <- &idleConn{conn: conn, t: time.Now()}:
|
||||
c.mu.Unlock()
|
||||
return nil
|
||||
default:
|
||||
c.mu.Unlock()
|
||||
//连接池已满,直接关闭该连接
|
||||
return c.Close(conn)
|
||||
}
|
||||
}
|
||||
|
||||
// Close 关闭单条连接
|
||||
func (c *channelPool) Close(conn interface{}) error {
|
||||
if conn == nil {
|
||||
return errors.New("connection is nil. rejecting")
|
||||
}
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
if c.close == nil {
|
||||
return nil
|
||||
}
|
||||
c.openingConns--
|
||||
return c.close(conn)
|
||||
}
|
||||
|
||||
// Ping 检查单条连接是否有效
|
||||
func (c *channelPool) Ping(conn interface{}) error {
|
||||
if conn == nil {
|
||||
return errors.New("connection is nil. rejecting")
|
||||
}
|
||||
return c.ping(conn)
|
||||
}
|
||||
|
||||
// Release 释放连接池中所有连接
|
||||
func (c *channelPool) Release() {
|
||||
c.mu.Lock()
|
||||
conns := c.conns
|
||||
c.conns = nil
|
||||
c.factory = nil
|
||||
c.ping = nil
|
||||
closeFun := c.close
|
||||
c.close = nil
|
||||
c.mu.Unlock()
|
||||
|
||||
if conns == nil {
|
||||
return
|
||||
}
|
||||
|
||||
close(conns)
|
||||
for wrapConn := range conns {
|
||||
//log.Printf("Type %v\n",reflect.TypeOf(wrapConn.conn))
|
||||
_ = closeFun(wrapConn.conn)
|
||||
}
|
||||
}
|
||||
|
||||
// Len 连接池中已有的连接
|
||||
func (c *channelPool) Len() int {
|
||||
return len(c.getConns())
|
||||
}
|
||||
55
server/pkg/pool/config.go
Normal file
55
server/pkg/pool/config.go
Normal file
@@ -0,0 +1,55 @@
|
||||
package pool
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"time"
|
||||
)
|
||||
|
||||
var ErrPoolClosed = errors.New("pool is closed")
|
||||
|
||||
// PoolConfig 连接池配置
|
||||
type PoolConfig struct {
|
||||
MaxConns int // 最大连接数
|
||||
IdleTimeout time.Duration // 空闲连接超时时间
|
||||
WaitTimeout time.Duration // 获取连接超时时间
|
||||
HealthCheckInterval time.Duration // 健康检查间隔
|
||||
OnPoolClose func() error // 连接池关闭时的回调
|
||||
}
|
||||
|
||||
// Option 函数类型,用于配置 Pool
|
||||
type Option func(*PoolConfig)
|
||||
|
||||
// WithMaxConns 设置最大连接数
|
||||
func WithMaxConns(maxConns int) Option {
|
||||
return func(c *PoolConfig) {
|
||||
c.MaxConns = maxConns
|
||||
}
|
||||
}
|
||||
|
||||
// WithIdleTimeout 设置空闲超时
|
||||
func WithIdleTimeout(timeout time.Duration) Option {
|
||||
return func(c *PoolConfig) {
|
||||
c.IdleTimeout = timeout
|
||||
}
|
||||
}
|
||||
|
||||
// WithWaitTimeout 设置等待超时
|
||||
func WithWaitTimeout(timeout time.Duration) Option {
|
||||
return func(c *PoolConfig) {
|
||||
c.WaitTimeout = timeout
|
||||
}
|
||||
}
|
||||
|
||||
// WithHealthCheckInterval 设置健康检查间隔
|
||||
func WithHealthCheckInterval(interval time.Duration) Option {
|
||||
return func(c *PoolConfig) {
|
||||
c.HealthCheckInterval = interval
|
||||
}
|
||||
}
|
||||
|
||||
// WithOnPoolClose 设置连接池关闭回调
|
||||
func WithOnPoolClose(fn func() error) Option {
|
||||
return func(c *PoolConfig) {
|
||||
c.OnPoolClose = fn
|
||||
}
|
||||
}
|
||||
76
server/pkg/pool/group.go
Normal file
76
server/pkg/pool/group.go
Normal file
@@ -0,0 +1,76 @@
|
||||
package pool
|
||||
|
||||
import (
|
||||
"mayfly-go/pkg/logx"
|
||||
|
||||
"golang.org/x/sync/singleflight"
|
||||
)
|
||||
|
||||
type PoolGroup[T Conn] struct {
|
||||
poolGroup map[string]Pool[T]
|
||||
createGroup singleflight.Group
|
||||
}
|
||||
|
||||
func NewPoolGroup[T Conn]() *PoolGroup[T] {
|
||||
return &PoolGroup[T]{
|
||||
poolGroup: make(map[string]Pool[T]),
|
||||
createGroup: singleflight.Group{},
|
||||
}
|
||||
}
|
||||
|
||||
func (pg *PoolGroup[T]) GetOrCreate(
|
||||
key string,
|
||||
poolFactory func() Pool[T],
|
||||
opts ...Option,
|
||||
) (Pool[T], error) {
|
||||
if p, ok := pg.poolGroup[key]; ok {
|
||||
return p, nil
|
||||
}
|
||||
|
||||
v, err, _ := pg.createGroup.Do(key, func() (any, error) {
|
||||
logx.Infof("pool group - create pool, key: %s", key)
|
||||
p := poolFactory()
|
||||
pg.poolGroup[key] = p
|
||||
return p, nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return v.(Pool[T]), nil
|
||||
}
|
||||
|
||||
// GetChanPool 获取或创建 ChannelPool 类型连接池
|
||||
func (pg *PoolGroup[T]) GetChanPool(key string, factory func() (T, error), opts ...Option) (Pool[T], error) {
|
||||
return pg.GetOrCreate(key, func() Pool[T] {
|
||||
return NewChannelPool(factory, opts...)
|
||||
}, opts...)
|
||||
}
|
||||
|
||||
// GetCachePool 获取或创建 CachePool 类型连接池
|
||||
func (pg *PoolGroup[T]) GetCachePool(key string, factory func() (T, error), opts ...Option) (Pool[T], error) {
|
||||
return pg.GetOrCreate(key, func() Pool[T] {
|
||||
return NewCachePool(factory, opts...)
|
||||
}, opts...)
|
||||
}
|
||||
|
||||
func (pg *PoolGroup[T]) Close(key string) error {
|
||||
if p, ok := pg.poolGroup[key]; ok {
|
||||
logx.Infof("pool group - close pool, key: %s", key)
|
||||
p.Close()
|
||||
pg.createGroup.Forget(key)
|
||||
delete(pg.poolGroup, key)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pg *PoolGroup[T]) CloseAll() {
|
||||
for key := range pg.poolGroup {
|
||||
pg.Close(key)
|
||||
}
|
||||
}
|
||||
|
||||
func (pg *PoolGroup[T]) AllPool() map[string]Pool[T] {
|
||||
return pg.poolGroup
|
||||
}
|
||||
@@ -1,21 +1,27 @@
|
||||
package pool
|
||||
|
||||
import "errors"
|
||||
|
||||
var (
|
||||
//ErrClosed 连接池已经关闭Error
|
||||
ErrClosed = errors.New("pool is closed")
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
// Pool 基本方法
|
||||
type Pool interface {
|
||||
Get() (interface{}, error)
|
||||
// Conn 连接接口
|
||||
// 连接池的连接必须实现 Conn 接口
|
||||
type Conn interface {
|
||||
// Close 关闭连接
|
||||
Close() error
|
||||
|
||||
Put(interface{}) error
|
||||
|
||||
Close(interface{}) error
|
||||
|
||||
Release()
|
||||
|
||||
Len() int
|
||||
// Ping 检查连接是否有效
|
||||
Ping() error
|
||||
}
|
||||
|
||||
// Pool 连接池接口
|
||||
type Pool[T Conn] interface {
|
||||
// 核心方法
|
||||
Get(ctx context.Context) (T, error)
|
||||
Put(T) error
|
||||
Close()
|
||||
|
||||
// 管理方法
|
||||
Resize(int) // 动态调整大小
|
||||
Stats() PoolStats // 获取统计信息
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user