优化系统goroutine使用,减少goroutine数量,增加goman查看goroutine数量指令

This commit is contained in:
刘祥超
2021-12-08 15:17:45 +08:00
parent 24fbd740b5
commit 1279f0d394
48 changed files with 469 additions and 146 deletions

View File

@@ -1,6 +1,7 @@
package main package main
import ( import (
"encoding/json"
"fmt" "fmt"
"github.com/TeaOSLab/EdgeNode/internal/apps" "github.com/TeaOSLab/EdgeNode/internal/apps"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const" teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
@@ -88,6 +89,25 @@ func main() {
} }
} }
}) })
app.On("goman", func() {
var sock = gosock.NewTmpSock(teaconst.ProcessName)
reply, err := sock.Send(&gosock.Command{Code: "goman"})
if err != nil {
fmt.Println("[ERROR]" + err.Error())
} else {
instances, ok := reply.Params["result"]
if ok {
instancesJSON, err := json.MarshalIndent(instances, "", " ")
if err != nil {
fmt.Println("[ERROR]" + err.Error())
} else {
fmt.Println(string(instancesJSON))
}
} else {
fmt.Println("no instances yet.")
}
}
})
app.Run(func() { app.Run(func() {
node := nodes.NewNode() node := nodes.NewNode()
node.Start() node.Start()

View File

@@ -4,6 +4,7 @@ package caches
import ( import (
"database/sql" "database/sql"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/ttlcache" "github.com/TeaOSLab/EdgeNode/internal/ttlcache"
"github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/TeaOSLab/EdgeNode/internal/utils"
@@ -568,11 +569,11 @@ func (this *FileList) removeOldTables() error {
} }
if lists.ContainsString(this.oldTables, name) { if lists.ContainsString(this.oldTables, name) {
// 异步执行 // 异步执行
go func() { goman.New(func() {
remotelogs.Println("CACHE", "remove old table '"+name+"' ...") remotelogs.Println("CACHE", "remove old table '"+name+"' ...")
_, _ = this.db.Exec(`DROP TABLE "` + name + `"`) _, _ = this.db.Exec(`DROP TABLE "` + name + `"`)
remotelogs.Println("CACHE", "remove old table '"+name+"' done") remotelogs.Println("CACHE", "remove old table '"+name+"' done")
}() })
} }
} }

View File

@@ -3,6 +3,7 @@
package caches package caches
import ( import (
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/rands" "github.com/iwind/TeaGo/rands"
"github.com/iwind/TeaGo/types" "github.com/iwind/TeaGo/types"
@@ -127,7 +128,7 @@ func TestFileList_Exist_Many_DB(t *testing.T) {
}() }()
for i := 0; i < threads; i++ { for i := 0; i < threads; i++ {
go func() { goman.New(func() {
defer wg.Done() defer wg.Done()
for { for {
@@ -143,7 +144,7 @@ func TestFileList_Exist_Many_DB(t *testing.T) {
return return
} }
} }
}() })
} }
wg.Wait() wg.Wait()
t.Log("left:", count) t.Log("left:", count)

View File

@@ -8,6 +8,7 @@ import (
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/shared" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/shared"
"github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/trackers" "github.com/TeaOSLab/EdgeNode/internal/trackers"
"github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/TeaOSLab/EdgeNode/internal/utils"
@@ -562,12 +563,12 @@ func (this *FileStorage) CleanAll() error {
} }
// 重新遍历待删除 // 重新遍历待删除
go func() { goman.New(func() {
err = this.cleanDeletedDirs(dir) err = this.cleanDeletedDirs(dir)
if err != nil { if err != nil {
remotelogs.Warn("CACHE", "delete '*-deleted' dirs failed: "+err.Error()) remotelogs.Warn("CACHE", "delete '*-deleted' dirs failed: "+err.Error())
} }
}() })
return nil return nil
} }
@@ -672,12 +673,12 @@ func (this *FileStorage) initList() error {
} }
// 使用异步防止阻塞主线程 // 使用异步防止阻塞主线程
/**go func() { /**goman.New(func() {
dir := this.dir() dir := this.dir()
// 清除tmp // 清除tmp
// TODO 需要一个更加高效的实现 // TODO 需要一个更加高效的实现
}()**/ })**/
// 启动定时清理任务 // 启动定时清理任务
var autoPurgeInterval = this.policy.PersistenceAutoPurgeInterval var autoPurgeInterval = this.policy.PersistenceAutoPurgeInterval
@@ -695,26 +696,26 @@ func (this *FileStorage) initList() error {
ticker.Stop() ticker.Stop()
} }
}) })
go func() { goman.New(func() {
for this.purgeTicker.Next() { for this.purgeTicker.Next() {
trackers.Run("FILE_CACHE_STORAGE_PURGE_LOOP", func() { trackers.Run("FILE_CACHE_STORAGE_PURGE_LOOP", func() {
this.purgeLoop() this.purgeLoop()
}) })
} }
}() })
// 热点处理任务 // 热点处理任务
this.hotTicker = utils.NewTicker(1 * time.Minute) this.hotTicker = utils.NewTicker(1 * time.Minute)
if Tea.IsTesting() { if Tea.IsTesting() {
this.hotTicker = utils.NewTicker(10 * time.Second) this.hotTicker = utils.NewTicker(10 * time.Second)
} }
go func() { goman.New(func() {
for this.hotTicker.Next() { for this.hotTicker.Next() {
trackers.Run("FILE_CACHE_STORAGE_HOT_LOOP", func() { trackers.Run("FILE_CACHE_STORAGE_HOT_LOOP", func() {
this.hotLoop() this.hotLoop()
}) })
} }
}() })
return nil return nil
} }

View File

@@ -3,6 +3,7 @@ package caches
import ( import (
"fmt" "fmt"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/trackers" "github.com/TeaOSLab/EdgeNode/internal/trackers"
"github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/TeaOSLab/EdgeNode/internal/utils"
@@ -84,20 +85,20 @@ func (this *MemoryStorage) Init() error {
// 启动定时清理任务 // 启动定时清理任务
this.purgeTicker = utils.NewTicker(time.Duration(autoPurgeInterval) * time.Second) this.purgeTicker = utils.NewTicker(time.Duration(autoPurgeInterval) * time.Second)
go func() { goman.New(func() {
for this.purgeTicker.Next() { for this.purgeTicker.Next() {
var tr = trackers.Begin("MEMORY_CACHE_STORAGE_PURGE_LOOP") var tr = trackers.Begin("MEMORY_CACHE_STORAGE_PURGE_LOOP")
this.purgeLoop() this.purgeLoop()
tr.End() tr.End()
} }
}() })
// 启动定时Flush memory to disk任务 // 启动定时Flush memory to disk任务
go func() { goman.New(func() {
for hash := range this.dirtyChan { for hash := range this.dirtyChan {
this.flushItem(hash) this.flushItem(hash)
} }
}() })
return nil return nil
} }

View File

@@ -0,0 +1,12 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package goman
import "time"
type Instance struct {
Id uint64
CreatedTime time.Time
File string
Line int
}

53
internal/goman/lib.go Normal file
View File

@@ -0,0 +1,53 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package goman
import (
"runtime"
"sync"
"time"
)
var locker = &sync.Mutex{}
var instanceMap = map[uint64]*Instance{} // id => *Instance
var instanceId = uint64(0)
// New 新创建goroutine
func New(f func()) {
_, file, line, _ := runtime.Caller(1)
go func() {
locker.Lock()
instanceId++
var instance = &Instance{
Id: instanceId,
CreatedTime: time.Now(),
}
instance.File = file
instance.Line = line
instanceMap[instanceId] = instance
locker.Unlock()
// run function
f()
locker.Lock()
delete(instanceMap, instanceId)
locker.Unlock()
}()
}
// List 列出所有正在运行goroutine
func List() []*Instance {
locker.Lock()
defer locker.Unlock()
var result = []*Instance{}
for _, instance := range instanceMap {
result = append(result, instance)
}
return result
}

View File

@@ -0,0 +1,21 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package goman
import (
"testing"
"time"
)
func TestNew(t *testing.T) {
New(func() {
t.Log("Hello")
t.Log(List())
})
time.Sleep(1 * time.Second)
t.Log(List())
time.Sleep(1 * time.Second)
}

View File

@@ -29,11 +29,9 @@ func NewIPList() *IPList {
} }
expireList := expires.NewList() expireList := expires.NewList()
go func() { expireList.OnGC(func(itemId int64) {
expireList.StartGC(func(itemId int64) {
list.Delete(itemId) list.Delete(itemId)
}) })
}()
list.expireList = expireList list.expireList = expireList
return list return list
} }

View File

@@ -6,7 +6,9 @@ import (
"github.com/iwind/TeaGo/logs" "github.com/iwind/TeaGo/logs"
"github.com/iwind/TeaGo/rands" "github.com/iwind/TeaGo/rands"
"runtime" "runtime"
"runtime/debug"
"strconv" "strconv"
"sync"
"testing" "testing"
"time" "time"
) )
@@ -281,6 +283,22 @@ func TestGC(t *testing.T) {
logs.PrintAsJSON(list.sortedItems, t) logs.PrintAsJSON(list.sortedItems, t)
} }
func TestTooManyLists(t *testing.T) {
debug.SetMaxThreads(20)
var lists = []*IPList{}
var locker = &sync.Mutex{}
for i := 0; i < 1000; i++ {
locker.Lock()
lists = append(lists, NewIPList())
locker.Unlock()
}
time.Sleep(1 * time.Second)
t.Log(runtime.NumGoroutine())
t.Log(len(lists), "lists")
}
func BenchmarkIPList_Contains(b *testing.B) { func BenchmarkIPList_Contains(b *testing.B) {
runtime.GOMAXPROCS(1) runtime.GOMAXPROCS(1)

View File

@@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/TeaOSLab/EdgeNode/internal/utils"
@@ -21,7 +22,9 @@ var SharedCountryManager = NewCountryManager()
func init() { func init() {
events.On(events.EventLoaded, func() { events.On(events.EventLoaded, func() {
go SharedCountryManager.Start() goman.New(func() {
SharedCountryManager.Start()
})
}) })
} }

View File

@@ -3,6 +3,7 @@ package iplibrary
import ( import (
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/TeaOSLab/EdgeNode/internal/utils"
@@ -17,7 +18,9 @@ var IPListUpdateNotify = make(chan bool, 1)
func init() { func init() {
events.On(events.EventLoaded, func() { events.On(events.EventLoaded, func() {
go SharedIPListManager.Start() goman.New(func() {
SharedIPListManager.Start()
})
}) })
} }

View File

@@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/TeaOSLab/EdgeNode/internal/utils"
@@ -25,7 +26,9 @@ var SharedProvinceManager = NewProvinceManager()
func init() { func init() {
events.On(events.EventLoaded, func() { events.On(events.EventLoaded, func() {
go SharedProvinceManager.Start() goman.New(func() {
SharedProvinceManager.Start()
})
}) })
} }

View File

@@ -7,6 +7,7 @@ import (
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeNode/internal/errors" "github.com/TeaOSLab/EdgeNode/internal/errors"
"github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/Tea"
@@ -34,14 +35,14 @@ func NewUpdater() *Updater {
func (this *Updater) Start() { func (this *Updater) Start() {
// 这里不需要太频繁检查更新因为通常不需要更新IP库 // 这里不需要太频繁检查更新因为通常不需要更新IP库
ticker := time.NewTicker(1 * time.Hour) ticker := time.NewTicker(1 * time.Hour)
go func() { goman.New(func() {
for range ticker.C { for range ticker.C {
err := this.loop() err := this.loop()
if err != nil { if err != nil {
remotelogs.ErrorObject("IP_LIBRARY", err) remotelogs.ErrorObject("IP_LIBRARY", err)
} }
} }
}() })
} }
// 单次任务 // 单次任务

View File

@@ -7,6 +7,7 @@ import (
"encoding/json" "encoding/json"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/trackers" "github.com/TeaOSLab/EdgeNode/internal/trackers"
@@ -163,7 +164,7 @@ ON "` + this.statTableName + `" (
func (this *Task) Start() error { func (this *Task) Start() error {
// 读取数据 // 读取数据
this.statsTicker = utils.NewTicker(1 * time.Minute) this.statsTicker = utils.NewTicker(1 * time.Minute)
go func() { goman.New(func() {
for this.statsTicker.Next() { for this.statsTicker.Next() {
var tr = trackers.Begin("[METRIC]DUMP_STATS_TO_LOCAL_DATABASE") var tr = trackers.Begin("[METRIC]DUMP_STATS_TO_LOCAL_DATABASE")
@@ -181,11 +182,11 @@ func (this *Task) Start() error {
tr.End() tr.End()
} }
}() })
// 清理 // 清理
this.cleanTicker = utils.NewTicker(24 * time.Hour) this.cleanTicker = utils.NewTicker(24 * time.Hour)
go func() { goman.New(func() {
for this.cleanTicker.Next() { for this.cleanTicker.Next() {
var tr = trackers.Begin("[METRIC]CLEAN_EXPIRED") var tr = trackers.Begin("[METRIC]CLEAN_EXPIRED")
err := this.CleanExpired() err := this.CleanExpired()
@@ -194,11 +195,11 @@ func (this *Task) Start() error {
remotelogs.Error("METRIC", "clean expired stats failed: "+err.Error()) remotelogs.Error("METRIC", "clean expired stats failed: "+err.Error())
} }
} }
}() })
// 上传 // 上传
this.uploadTicker = utils.NewTicker(this.item.UploadDuration()) this.uploadTicker = utils.NewTicker(this.item.UploadDuration())
go func() { goman.New(func() {
for this.uploadTicker.Next() { for this.uploadTicker.Next() {
var tr = trackers.Begin("[METRIC]UPLOAD_STATS") var tr = trackers.Begin("[METRIC]UPLOAD_STATS")
err := this.Upload(1 * time.Second) err := this.Upload(1 * time.Second)
@@ -207,7 +208,7 @@ func (this *Task) Start() error {
remotelogs.Error("METRIC", "upload stats failed: "+err.Error()) remotelogs.Error("METRIC", "upload stats failed: "+err.Error())
} }
} }
}() })
return nil return nil
} }

View File

@@ -6,6 +6,7 @@ import (
"encoding/json" "encoding/json"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/iwind/TeaGo/maps" "github.com/iwind/TeaGo/maps"
@@ -16,7 +17,9 @@ var SharedValueQueue = NewValueQueue()
func init() { func init() {
events.On(events.EventLoaded, func() { events.On(events.EventLoaded, func() {
go SharedValueQueue.Start() goman.New(func() {
SharedValueQueue.Start()
})
}) })
} }

View File

@@ -13,6 +13,7 @@ import (
teaconst "github.com/TeaOSLab/EdgeNode/internal/const" teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/errors" "github.com/TeaOSLab/EdgeNode/internal/errors"
"github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/TeaOSLab/EdgeNode/internal/utils"
@@ -607,7 +608,7 @@ func (this *APIStream) handleChangeAPINode(message *pb.NodeStreamMessage) error
this.replyOk(message.RequestId, "") this.replyOk(message.RequestId, "")
go func() { goman.New(func() {
// 延后生效防止变更前的API无法读取到状态 // 延后生效防止变更前的API无法读取到状态
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
@@ -629,7 +630,7 @@ func (this *APIStream) handleChangeAPINode(message *pb.NodeStreamMessage) error
remotelogs.Println("API_STREAM", "change rpc endpoint to '"+ remotelogs.Println("API_STREAM", "change rpc endpoint to '"+
messageData.Addr+"' successfully") messageData.Addr+"' successfully")
}() })
return nil return nil
} }

View File

@@ -6,6 +6,7 @@ import (
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const" teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/monitor" "github.com/TeaOSLab/EdgeNode/internal/monitor"
"github.com/iwind/TeaGo/maps" "github.com/iwind/TeaGo/maps"
"net" "net"
@@ -17,7 +18,7 @@ import (
func init() { func init() {
events.On(events.EventStart, func() { events.On(events.EventStart, func() {
ticker := time.NewTicker(1 * time.Minute) ticker := time.NewTicker(1 * time.Minute)
go func() { goman.New(func() {
for range ticker.C { for range ticker.C {
// 加入到数据队列中 // 加入到数据队列中
if teaconst.InTrafficBytes > 0 { if teaconst.InTrafficBytes > 0 {
@@ -35,7 +36,7 @@ func init() {
atomic.StoreUint64(&teaconst.InTrafficBytes, 0) atomic.StoreUint64(&teaconst.InTrafficBytes, 0)
atomic.StoreUint64(&teaconst.OutTrafficBytes, 0) atomic.StoreUint64(&teaconst.OutTrafficBytes, 0)
} }
}() })
}) })
} }

View File

@@ -2,6 +2,7 @@ package nodes
import ( import (
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/rpc"
"reflect" "reflect"
@@ -26,7 +27,9 @@ func NewHTTPAccessLogQueue() *HTTPAccessLogQueue {
queue := &HTTPAccessLogQueue{ queue := &HTTPAccessLogQueue{
queue: make(chan *pb.HTTPAccessLog, maxSize), queue: make(chan *pb.HTTPAccessLog, maxSize),
} }
go queue.Start() goman.New(func() {
queue.Start()
})
return queue return queue
} }

View File

@@ -5,6 +5,7 @@ import (
"crypto/tls" "crypto/tls"
"errors" "errors"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/pires/go-proxyproto" "github.com/pires/go-proxyproto"
"net" "net"
@@ -33,7 +34,9 @@ func NewHTTPClientPool() *HTTPClientPool {
clientsMap: map[string]*HTTPClient{}, clientsMap: map[string]*HTTPClient{},
} }
go pool.cleanClients() goman.New(func() {
pool.cleanClients()
})
return pool return pool
} }

View File

@@ -952,7 +952,7 @@ func (this *HTTPRequest) requestRemotePort() int {
return 0 return 0
} }
// 情趣的URI中的参数部分 // 获取的URI中的参数部分
func (this *HTTPRequest) requestQueryString() string { func (this *HTTPRequest) requestQueryString() string {
uri, err := url.ParseRequestURI(this.uri) uri, err := url.ParseRequestURI(this.uri)
if err != nil { if err != nil {

View File

@@ -5,6 +5,7 @@ import (
"errors" "errors"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeNode/internal/caches" "github.com/TeaOSLab/EdgeNode/internal/caches"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/TeaOSLab/EdgeNode/internal/utils"
@@ -128,7 +129,7 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
remotelogs.Error("HTTP_REQUEST_CACHE", "purge failed: "+err.Error()) remotelogs.Error("HTTP_REQUEST_CACHE", "purge failed: "+err.Error())
} }
go func() { goman.New(func() {
rpcClient, err := rpc.SharedRPC() rpcClient, err := rpc.SharedRPC()
if err == nil { if err == nil {
for _, rpcServerService := range rpcClient.ServerRPCList() { for _, rpcServerService := range rpcClient.ServerRPCList() {
@@ -142,7 +143,7 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
} }
} }
} }
}() })
return true return true
} }

View File

@@ -2,6 +2,7 @@ package nodes
import ( import (
"errors" "errors"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"io" "io"
"net/http" "net/http"
"net/url" "net/url"
@@ -64,7 +65,7 @@ func (this *HTTPRequest) doWebsocket() {
_ = clientConn.Close() _ = clientConn.Close()
}() }()
go func() { goman.New(func() {
buf := make([]byte, 4*1024) // TODO 使用内存池 buf := make([]byte, 4*1024) // TODO 使用内存池
for { for {
n, err := originConn.Read(buf) n, err := originConn.Read(buf)
@@ -81,6 +82,6 @@ func (this *HTTPRequest) doWebsocket() {
} }
_ = clientConn.Close() _ = clientConn.Close()
_ = originConn.Close() _ = originConn.Close()
}() })
_, _ = io.Copy(originConn, clientConn) _, _ = io.Copy(originConn, clientConn)
} }

View File

@@ -5,6 +5,7 @@ import (
"errors" "errors"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"net" "net"
"sync" "sync"
@@ -97,7 +98,7 @@ func (this *Listener) listenTCP() error {
this.listener.Init() this.listener.Init()
go func() { goman.New(func() {
err := this.listener.Serve() err := this.listener.Serve()
if err != nil { if err != nil {
// 在这里屏蔽accept错误防止在优雅关闭的时候有多余的提示 // 在这里屏蔽accept错误防止在优雅关闭的时候有多余的提示
@@ -109,7 +110,7 @@ func (this *Listener) listenTCP() error {
// 打印其他错误 // 打印其他错误
remotelogs.Error("LISTENER", err.Error()) remotelogs.Error("LISTENER", err.Error())
} }
}() })
return nil return nil
} }
@@ -129,12 +130,12 @@ func (this *Listener) listenUDP() error {
Listener: listener, Listener: listener,
} }
go func() { goman.New(func() {
err := this.listener.Serve() err := this.listener.Serve()
if err != nil { if err != nil {
remotelogs.Error("LISTENER", err.Error()) remotelogs.Error("LISTENER", err.Error())
} }
}() })
return nil return nil
} }

View File

@@ -5,6 +5,7 @@ import (
"errors" "errors"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/lists" "github.com/iwind/TeaGo/lists"
@@ -43,11 +44,11 @@ func NewListenerManager() *ListenerManager {
manager.ticker = time.NewTicker(5 * time.Second) manager.ticker = time.NewTicker(5 * time.Second)
} }
go func() { goman.New(func() {
for range manager.ticker.C { for range manager.ticker.C {
manager.retryListeners() manager.retryListeners()
} }
}() })
return manager return manager
} }

View File

@@ -4,6 +4,7 @@ import (
"crypto/tls" "crypto/tls"
"errors" "errors"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/stats" "github.com/TeaOSLab/EdgeNode/internal/stats"
"github.com/pires/go-proxyproto" "github.com/pires/go-proxyproto"
@@ -110,7 +111,7 @@ func (this *TCPListener) handleConn(conn net.Conn) error {
} }
// 从源站读取 // 从源站读取
go func() { goman.New(func() {
originBuffer := bytePool32k.Get() originBuffer := bytePool32k.Get()
defer func() { defer func() {
bytePool32k.Put(originBuffer) bytePool32k.Put(originBuffer)
@@ -134,7 +135,7 @@ func (this *TCPListener) handleConn(conn net.Conn) error {
break break
} }
} }
}() })
// 从客户端读取 // 从客户端读取
clientBuffer := bytePool32k.Get() clientBuffer := bytePool32k.Get()

View File

@@ -3,6 +3,7 @@ package nodes
import ( import (
"errors" "errors"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/stats" "github.com/TeaOSLab/EdgeNode/internal/stats"
"github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/TeaOSLab/EdgeNode/internal/utils"
@@ -37,11 +38,11 @@ func (this *UDPListener) Serve() error {
this.connMap = map[string]*UDPConn{} this.connMap = map[string]*UDPConn{}
this.connTicker = utils.NewTicker(1 * time.Minute) this.connTicker = utils.NewTicker(1 * time.Minute)
go func() { goman.New(func() {
for this.connTicker.Next() { for this.connTicker.Next() {
this.gcConns() this.gcConns()
} }
}() })
var buffer = make([]byte, 4*1024) var buffer = make([]byte, 4*1024)
for { for {
@@ -188,7 +189,7 @@ func NewUDPConn(server *serverconfigs.ServerConfig, addr net.Addr, proxyConn *ne
stats.SharedTrafficStatManager.Add(server.Id, "", 0, 0, 1, 0, 0, 0, server.ShouldCheckTrafficLimit(), server.PlanId()) stats.SharedTrafficStatManager.Add(server.Id, "", 0, 0, 1, 0, 0, 0, server.ShouldCheckTrafficLimit(), server.PlanId())
} }
go func() { goman.New(func() {
buffer := bytePool32k.Get() buffer := bytePool32k.Get()
defer func() { defer func() {
bytePool32k.Put(buffer) bytePool32k.Put(buffer)
@@ -214,7 +215,7 @@ func NewUDPConn(server *serverconfigs.ServerConfig, addr net.Addr, proxyConn *ne
break break
} }
} }
}() })
return conn return conn
} }

View File

@@ -11,6 +11,7 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/configs" "github.com/TeaOSLab/EdgeNode/internal/configs"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const" teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/iplibrary" "github.com/TeaOSLab/EdgeNode/internal/iplibrary"
"github.com/TeaOSLab/EdgeNode/internal/metrics" "github.com/TeaOSLab/EdgeNode/internal/metrics"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs"
@@ -31,6 +32,7 @@ import (
"os" "os"
"os/exec" "os/exec"
"runtime" "runtime"
"sort"
"sync" "sync"
"time" "time"
) )
@@ -124,7 +126,9 @@ func (this *Node) Start() {
this.startSyncTimer() this.startSyncTimer()
// 状态变更计时器 // 状态变更计时器
go NewNodeStatusExecutor().Listen() goman.New(func() {
NewNodeStatusExecutor().Listen()
})
// 读取配置 // 读取配置
nodeConfig, err := nodeconfigs.SharedNodeConfig() nodeConfig, err := nodeconfigs.SharedNodeConfig()
@@ -153,13 +157,17 @@ func (this *Node) Start() {
_ = utils.SetRLimit(1024 * 1024) _ = utils.SetRLimit(1024 * 1024)
// 连接API // 连接API
go NewAPIStream().Start() goman.New(func() {
NewAPIStream().Start()
})
// 统计 // 统计
go stats.SharedTrafficStatManager.Start(func() *nodeconfigs.NodeConfig { go stats.SharedTrafficStatManager.Start(func() *nodeconfigs.NodeConfig {
return sharedNodeConfig return sharedNodeConfig
}) })
go stats.SharedHTTPRequestStatManager.Start() goman.New(func() {
stats.SharedHTTPRequestStatManager.Start()
})
// 启动端口 // 启动端口
err = sharedListenerManager.Start(nodeConfig) err = sharedListenerManager.Start(nodeConfig)
@@ -297,7 +305,9 @@ func (this *Node) loop() error {
return err return err
} }
case "nodeVersionChanged": case "nodeVersionChanged":
go sharedUpgradeManager.Start() goman.New(func() {
sharedUpgradeManager.Start()
})
} }
} }
@@ -439,7 +449,7 @@ func (this *Node) startSyncTimer() {
remotelogs.Println("NODE", "quit sync timer") remotelogs.Println("NODE", "quit sync timer")
ticker.Stop() ticker.Stop()
}) })
go func() { goman.New(func() {
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
@@ -462,7 +472,7 @@ func (this *Node) startSyncTimer() {
} }
} }
} }
}() })
} }
// 检查集群设置 // 检查集群设置
@@ -530,7 +540,7 @@ func (this *Node) listenSock() error {
} }
// 启动监听 // 启动监听
go func() { goman.New(func() {
this.sock.OnCommand(func(cmd *gosock.Command) { this.sock.OnCommand(func(cmd *gosock.Command) {
switch cmd.Code { switch cmd.Code {
case "pid": case "pid":
@@ -563,7 +573,7 @@ func (this *Node) listenSock() error {
events.Notify(events.EventQuit) events.Notify(events.EventQuit)
// 监控连接数如果连接数为0则退出进程 // 监控连接数如果连接数为0则退出进程
go func() { goman.New(func() {
for { for {
countActiveConnections := sharedListenerManager.TotalActiveConnections() countActiveConnections := sharedListenerManager.TotalActiveConnections()
if countActiveConnections <= 0 { if countActiveConnections <= 0 {
@@ -572,13 +582,43 @@ func (this *Node) listenSock() error {
} }
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
} }
}() })
case "trackers": case "trackers":
_ = cmd.Reply(&gosock.Command{ _ = cmd.Reply(&gosock.Command{
Params: map[string]interface{}{ Params: map[string]interface{}{
"labels": trackers.SharedManager.Labels(), "labels": trackers.SharedManager.Labels(),
}, },
}) })
case "goman":
var posMap = map[string]maps.Map{} // file#line => Map
for _, instance := range goman.List() {
var pos = instance.File + "#" + types.String(instance.Line)
m, ok := posMap[pos]
if ok {
m["count"] = m["count"].(int) + 1
} else {
m = maps.Map{
"pos": pos,
"count": 1,
}
posMap[pos] = m
}
}
var result = []maps.Map{}
for _, m := range posMap {
result = append(result, m)
}
sort.Slice(result, func(i, j int) bool {
return result[i]["count"].(int) > result[j]["count"].(int)
})
_ = cmd.Reply(&gosock.Command{
Params: map[string]interface{}{
"result": result,
},
})
} }
}) })
@@ -586,7 +626,7 @@ func (this *Node) listenSock() error {
if err != nil { if err != nil {
logs.Println("NODE", err.Error()) logs.Println("NODE", err.Error())
} }
}() })
events.On(events.EventQuit, func() { events.On(events.EventQuit, func() {
logs.Println("NODE", "quit unix sock") logs.Println("NODE", "quit unix sock")

View File

@@ -5,6 +5,7 @@ package nodes
import ( import (
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/trackers" "github.com/TeaOSLab/EdgeNode/internal/trackers"
"github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/Tea"
@@ -16,7 +17,9 @@ var SharedOriginStateManager = NewOriginStateManager()
func init() { func init() {
events.On(events.EventLoaded, func() { events.On(events.EventLoaded, func() {
go SharedOriginStateManager.Start() goman.New(func() {
SharedOriginStateManager.Start()
})
}) })
} }

View File

@@ -7,6 +7,7 @@ import (
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const" teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/iwind/TeaGo/maps" "github.com/iwind/TeaGo/maps"
@@ -96,10 +97,10 @@ func (this *SystemServiceManager) setupSystemd(params maps.Map) error {
} }
// 启动Service // 启动Service
go func() { goman.New(func() {
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
_ = exec.Command(systemctl, "start", teaconst.SystemdServiceName).Start() _ = exec.Command(systemctl, "start", teaconst.SystemdServiceName).Start()
}() })
if output == "enabled" { if output == "enabled" {
// 检查文件路径是否变化 // 检查文件路径是否变化

View File

@@ -6,6 +6,7 @@ import (
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeNode/internal/configs" "github.com/TeaOSLab/EdgeNode/internal/configs"
"github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/trackers" "github.com/TeaOSLab/EdgeNode/internal/trackers"
@@ -23,7 +24,9 @@ import (
func init() { func init() {
events.On(events.EventStart, func() { events.On(events.EventStart, func() {
task := NewSyncAPINodesTask() task := NewSyncAPINodesTask()
go task.Start() goman.New(func() {
task.Start()
})
}) })
} }

View File

@@ -3,6 +3,7 @@ package nodes
import ( import (
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/Tea"
"net" "net"
@@ -67,7 +68,9 @@ func (this *TOAManager) Run(config *nodeconfigs.TOAConfig) error {
} }
this.pid = cmd.Process.Pid this.pid = cmd.Process.Pid
go func() { _ = cmd.Wait() }() goman.New(func() {
_ = cmd.Wait()
})
return nil return nil
} }

View File

@@ -8,6 +8,7 @@ import (
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const" teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/TeaOSLab/EdgeNode/internal/utils"
@@ -60,12 +61,12 @@ func (this *UpgradeManager) Start() {
remotelogs.Println("UPGRADE_MANAGER", "upgrade successfully") remotelogs.Println("UPGRADE_MANAGER", "upgrade successfully")
go func() { goman.New(func() {
err = this.restart() err = this.restart()
if err != nil { if err != nil {
logs.Println("UPGRADE_MANAGER", err.Error()) logs.Println("UPGRADE_MANAGER", err.Error())
} }
}() })
} }
func (this *UpgradeManager) install() error { func (this *UpgradeManager) install() error {

View File

@@ -5,6 +5,7 @@ import (
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const" teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/trackers" "github.com/TeaOSLab/EdgeNode/internal/trackers"
"github.com/cespare/xxhash" "github.com/cespare/xxhash"
@@ -24,7 +25,7 @@ func init() {
if Tea.IsTesting() { if Tea.IsTesting() {
ticker = time.NewTicker(10 * time.Second) ticker = time.NewTicker(10 * time.Second)
} }
go func() { goman.New(func() {
for range ticker.C { for range ticker.C {
var tr = trackers.Begin("UPLOAD_REMOTE_LOGS") var tr = trackers.Begin("UPLOAD_REMOTE_LOGS")
err := uploadLogs() err := uploadLogs()
@@ -33,7 +34,7 @@ func init() {
logs.Println("[LOG]" + err.Error()) logs.Println("[LOG]" + err.Error())
} }
} }
}() })
} }
// Println 打印普通信息 // Println 打印普通信息

View File

@@ -4,6 +4,7 @@ import (
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/iplibrary" "github.com/TeaOSLab/EdgeNode/internal/iplibrary"
"github.com/TeaOSLab/EdgeNode/internal/monitor" "github.com/TeaOSLab/EdgeNode/internal/monitor"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs"
@@ -63,17 +64,17 @@ func NewHTTPRequestStatManager() *HTTPRequestStatManager {
// Start 启动 // Start 启动
func (this *HTTPRequestStatManager) Start() { func (this *HTTPRequestStatManager) Start() {
// 上传请求总数 // 上传请求总数
go func() { goman.New(func() {
ticker := time.NewTicker(1 * time.Minute) ticker := time.NewTicker(1 * time.Minute)
go func() { goman.New(func() {
for range ticker.C { for range ticker.C {
if this.totalAttackRequests > 0 { if this.totalAttackRequests > 0 {
monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemAttackRequests, maps.Map{"total": this.totalAttackRequests}) monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemAttackRequests, maps.Map{"total": this.totalAttackRequests})
this.totalAttackRequests = 0 this.totalAttackRequests = 0
} }
} }
}() })
}() })
loopTicker := time.NewTicker(1 * time.Second) loopTicker := time.NewTicker(1 * time.Second)
uploadTicker := time.NewTicker(30 * time.Minute) uploadTicker := time.NewTicker(30 * time.Minute)

View File

@@ -4,6 +4,7 @@ import (
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/monitor" "github.com/TeaOSLab/EdgeNode/internal/monitor"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/rpc"
@@ -55,17 +56,17 @@ func (this *TrafficStatManager) Start(configFunc func() *nodeconfigs.NodeConfig)
this.configFunc = configFunc this.configFunc = configFunc
// 上传请求总数 // 上传请求总数
go func() { goman.New(func() {
ticker := time.NewTicker(1 * time.Minute) ticker := time.NewTicker(1 * time.Minute)
go func() { goman.New(func() {
for range ticker.C { for range ticker.C {
if this.totalRequests > 0 { if this.totalRequests > 0 {
monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemRequests, maps.Map{"total": this.totalRequests}) monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemRequests, maps.Map{"total": this.totalRequests})
this.totalRequests = 0 this.totalRequests = 0
} }
} }
}() })
}() })
// 上传统计数据 // 上传统计数据
duration := 5 * time.Minute duration := 5 * time.Minute

View File

@@ -1,7 +1,6 @@
package ttlcache package ttlcache
import ( import (
"github.com/TeaOSLab/EdgeNode/internal/utils"
"time" "time"
) )
@@ -19,7 +18,6 @@ type Cache struct {
maxItems int maxItems int
gcPieceIndex int gcPieceIndex int
ticker *utils.Ticker
} }
func NewCache(opt ...OptionInterface) *Cache { func NewCache(opt ...OptionInterface) *Cache {
@@ -56,13 +54,8 @@ func NewCache(opt ...OptionInterface) *Cache {
cache.pieces = append(cache.pieces, NewPiece(maxItems/countPieces)) cache.pieces = append(cache.pieces, NewPiece(maxItems/countPieces))
} }
// start timer // Add to manager
go func() { SharedManager.Add(cache)
cache.ticker = utils.NewTicker(5 * time.Second)
for cache.ticker.Next() {
cache.GC()
}
}()
return cache return cache
} }
@@ -149,12 +142,10 @@ func (this *Cache) Clean() {
} }
func (this *Cache) Destroy() { func (this *Cache) Destroy() {
SharedManager.Remove(this)
this.isDestroyed = true this.isDestroyed = true
if this.ticker != nil {
this.ticker.Stop()
this.ticker = nil
}
for _, piece := range this.pieces { for _, piece := range this.pieces {
piece.Destroy() piece.Destroy()
} }

View File

@@ -123,13 +123,18 @@ func TestCache_GC(t *testing.T) {
func TestCache_GC2(t *testing.T) { func TestCache_GC2(t *testing.T) {
runtime.GOMAXPROCS(1) runtime.GOMAXPROCS(1)
cache := NewCache() cache1 := NewCache(NewPiecesOption(32))
for i := 0; i < 1_000_000; i++ { for i := 0; i < 1_000_000; i++ {
cache.Write(strconv.Itoa(i), i, time.Now().Unix()+int64(rands.Int(0, 100))) cache1.Write(strconv.Itoa(i), i, time.Now().Unix()+int64(rands.Int(0, 10)))
}
cache2 := NewCache(NewPiecesOption(5))
for i := 0; i < 1_000_000; i++ {
cache2.Write(strconv.Itoa(i), i, time.Now().Unix()+int64(rands.Int(0, 10)))
} }
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
t.Log(cache.Count(), "items") t.Log(cache1.Count(), "items", cache2.Count(), "items")
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
} }
} }

View File

@@ -0,0 +1,53 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package ttlcache
import (
"github.com/TeaOSLab/EdgeNode/internal/goman"
"sync"
"time"
)
var SharedManager = NewManager()
type Manager struct {
ticker *time.Ticker
locker sync.Mutex
cacheMap map[*Cache]bool
}
func NewManager() *Manager {
var manager = &Manager{
ticker: time.NewTicker(3 * time.Second),
cacheMap: map[*Cache]bool{},
}
goman.New(func() {
manager.init()
})
return manager
}
func (this *Manager) init() {
for range this.ticker.C {
this.locker.Lock()
for cache := range this.cacheMap {
cache.GC()
}
this.locker.Unlock()
}
}
func (this *Manager) Add(cache *Cache) {
this.locker.Lock()
this.cacheMap[cache] = true
this.locker.Unlock()
}
func (this *Manager) Remove(cache *Cache) {
this.locker.Lock()
delete(this.cacheMap, cache)
this.locker.Unlock()
}

View File

@@ -2,7 +2,6 @@ package expires
import ( import (
"sync" "sync"
"time"
) )
type ItemMap = map[int64]bool type ItemMap = map[int64]bool
@@ -12,14 +11,19 @@ type List struct {
itemsMap map[int64]int64 // itemId => timestamp itemsMap map[int64]int64 // itemId => timestamp
locker sync.Mutex locker sync.Mutex
ticker *time.Ticker
gcCallback func(itemId int64)
} }
func NewList() *List { func NewList() *List {
return &List{ var list = &List{
expireMap: map[int64]ItemMap{}, expireMap: map[int64]ItemMap{},
itemsMap: map[int64]int64{}, itemsMap: map[int64]int64{},
} }
SharedManager.Add(list)
return list
} }
func (this *List) Add(itemId int64, expiresAt int64) { func (this *List) Add(itemId int64, expiresAt int64) {
@@ -56,33 +60,15 @@ func (this *List) GC(timestamp int64, callback func(itemId int64)) {
itemMap := this.gcItems(timestamp) itemMap := this.gcItems(timestamp)
this.locker.Unlock() this.locker.Unlock()
if callback != nil {
for itemId := range itemMap { for itemId := range itemMap {
callback(itemId) callback(itemId)
} }
}
} }
func (this *List) StartGC(callback func(itemId int64)) { func (this *List) OnGC(callback func(itemId int64)) {
this.ticker = time.NewTicker(1 * time.Second) this.gcCallback = callback
lastTimestamp := int64(0)
for range this.ticker.C {
timestamp := time.Now().Unix()
if lastTimestamp == 0 {
lastTimestamp = timestamp - 3600
}
if timestamp >= lastTimestamp {
for i := lastTimestamp; i <= timestamp; i++ {
this.GC(i, callback)
}
} else {
for i := timestamp; i <= lastTimestamp; i++ {
this.GC(i, callback)
}
}
// 这样做是为了防止系统时钟突变
lastTimestamp = timestamp
}
} }
func (this *List) removeItem(itemId int64) { func (this *List) removeItem(itemId int64) {

View File

@@ -63,11 +63,13 @@ func TestList_Start_GC(t *testing.T) {
list.Add(7, time.Now().Unix()+6) list.Add(7, time.Now().Unix()+6)
list.Add(8, time.Now().Unix()+6) list.Add(8, time.Now().Unix()+6)
go func() { list.OnGC(func(itemId int64) {
list.StartGC(func(itemId int64) {
t.Log("gc:", itemId, timeutil.Format("H:i:s")) t.Log("gc:", itemId, timeutil.Format("H:i:s"))
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
}) })
go func() {
SharedManager.Add(list)
}() }()
time.Sleep(20 * time.Second) time.Sleep(20 * time.Second)

View File

@@ -0,0 +1,71 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package expires
import (
"github.com/TeaOSLab/EdgeNode/internal/goman"
"sync"
"time"
)
var SharedManager = NewManager()
type Manager struct {
listMap map[*List]bool
locker sync.Mutex
ticker *time.Ticker
}
func NewManager() *Manager {
var manager = &Manager{
listMap: map[*List]bool{},
ticker: time.NewTicker(1 * time.Second),
}
goman.New(func() {
manager.init()
})
return manager
}
func (this *Manager) init() {
var lastTimestamp = int64(0)
for range this.ticker.C {
timestamp := time.Now().Unix()
if lastTimestamp == 0 {
lastTimestamp = timestamp - 3600
}
if timestamp >= lastTimestamp {
for i := lastTimestamp; i <= timestamp; i++ {
this.locker.Lock()
for list := range this.listMap {
list.GC(i, list.gcCallback)
}
this.locker.Unlock()
}
} else {
for i := timestamp; i <= lastTimestamp; i++ {
this.locker.Lock()
for list := range this.listMap {
list.GC(i, list.gcCallback)
}
this.locker.Unlock()
}
}
// 这样做是为了防止系统时钟突变
lastTimestamp = timestamp
}
}
func (this *Manager) Add(list *List) {
this.locker.Lock()
this.listMap[list] = true
this.locker.Unlock()
}
func (this *Manager) Remove(list *List) {
this.locker.Lock()
delete(this.listMap, list)
this.locker.Unlock()
}

View File

@@ -5,6 +5,7 @@ package utils
import ( import (
teaconst "github.com/TeaOSLab/EdgeNode/internal/const" teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"sort" "sort"
"sync" "sync"
"sync/atomic" "sync/atomic"
@@ -15,7 +16,9 @@ var SharedFreeHoursManager = NewFreeHoursManager()
func init() { func init() {
events.On(events.EventLoaded, func() { events.On(events.EventLoaded, func() {
go SharedFreeHoursManager.Start() goman.New(func() {
SharedFreeHoursManager.Start()
})
}) })
} }

View File

@@ -1,15 +1,18 @@
package utils package utils
import "time" import (
"github.com/TeaOSLab/EdgeNode/internal/goman"
"time"
)
// 定时运行某个函数 // Every 定时运行某个函数
func Every(duration time.Duration, f func(ticker *Ticker)) *Ticker { func Every(duration time.Duration, f func(ticker *Ticker)) *Ticker {
ticker := NewTicker(duration) ticker := NewTicker(duration)
go func() { goman.New(func() {
for ticker.Next() { for ticker.Next() {
f(ticker) f(ticker)
} }
}() })
return ticker return ticker
} }

View File

@@ -1,6 +1,7 @@
package utils package utils
import ( import (
"github.com/TeaOSLab/EdgeNode/internal/goman"
"time" "time"
) )
@@ -9,12 +10,12 @@ var unixTimeMilli = time.Now().UnixMilli()
func init() { func init() {
ticker := time.NewTicker(200 * time.Millisecond) ticker := time.NewTicker(200 * time.Millisecond)
go func() { goman.New(func() {
for range ticker.C { for range ticker.C {
unixTime = time.Now().Unix() unixTime = time.Now().Unix()
unixTimeMilli = time.Now().UnixMilli() unixTimeMilli = time.Now().UnixMilli()
} }
}() })
} }
// UnixTime 最快获取时间戳的方式,通常用在不需要特别精确时间戳的场景 // UnixTime 最快获取时间戳的方式,通常用在不需要特别精确时间戳的场景

View File

@@ -5,6 +5,7 @@ package waf
import ( import (
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/waf/requests" "github.com/TeaOSLab/EdgeNode/internal/waf/requests"
@@ -25,7 +26,7 @@ var notifyChan = make(chan *notifyTask, 128)
func init() { func init() {
events.On(events.EventLoaded, func() { events.On(events.EventLoaded, func() {
go func() { goman.New(func() {
rpcClient, err := rpc.SharedRPC() rpcClient, err := rpc.SharedRPC()
if err != nil { if err != nil {
remotelogs.Error("WAF_NOTIFY_ACTION", "create rpc client failed: "+err.Error()) remotelogs.Error("WAF_NOTIFY_ACTION", "create rpc client failed: "+err.Error())
@@ -44,7 +45,7 @@ func init() {
remotelogs.Error("WAF_NOTIFY_ACTION", "notify failed: "+err.Error()) remotelogs.Error("WAF_NOTIFY_ACTION", "notify failed: "+err.Error())
} }
} }
}() })
}) })
} }

View File

@@ -5,6 +5,7 @@ import (
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/firewallconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/firewallconfigs"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const" teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/waf/requests" "github.com/TeaOSLab/EdgeNode/internal/waf/requests"
@@ -31,7 +32,7 @@ var recordIPTaskChan = make(chan *recordIPTask, 1024)
func init() { func init() {
events.On(events.EventLoaded, func() { events.On(events.EventLoaded, func() {
go func() { goman.New(func() {
rpcClient, err := rpc.SharedRPC() rpcClient, err := rpc.SharedRPC()
if err != nil { if err != nil {
remotelogs.Error("WAF_RECORD_IP_ACTION", "create rpc client failed: "+err.Error()) remotelogs.Error("WAF_RECORD_IP_ACTION", "create rpc client failed: "+err.Error())
@@ -62,7 +63,7 @@ func init() {
remotelogs.Error("WAF_RECORD_IP_ACTION", "create ip item failed: "+err.Error()) remotelogs.Error("WAF_RECORD_IP_ACTION", "create ip item failed: "+err.Error())
} }
} }
}() })
}) })
} }

View File

@@ -44,11 +44,9 @@ func NewIPList(listType IPListType) *IPList {
e := expires.NewList() e := expires.NewList()
list.expireList = e list.expireList = e
go func() { e.OnGC(func(itemId int64) {
e.StartGC(func(itemId int64) {
list.remove(itemId) list.remove(itemId)
}) })
}()
return list return list
} }