diff --git a/cmd/edge-node/main.go b/cmd/edge-node/main.go index 734908f..715d240 100644 --- a/cmd/edge-node/main.go +++ b/cmd/edge-node/main.go @@ -1,6 +1,7 @@ package main import ( + "encoding/json" "fmt" "github.com/TeaOSLab/EdgeNode/internal/apps" teaconst "github.com/TeaOSLab/EdgeNode/internal/const" @@ -88,6 +89,25 @@ func main() { } } }) + app.On("goman", func() { + var sock = gosock.NewTmpSock(teaconst.ProcessName) + reply, err := sock.Send(&gosock.Command{Code: "goman"}) + if err != nil { + fmt.Println("[ERROR]" + err.Error()) + } else { + instances, ok := reply.Params["result"] + if ok { + instancesJSON, err := json.MarshalIndent(instances, "", " ") + if err != nil { + fmt.Println("[ERROR]" + err.Error()) + } else { + fmt.Println(string(instancesJSON)) + } + } else { + fmt.Println("no instances yet.") + } + } + }) app.Run(func() { node := nodes.NewNode() node.Start() diff --git a/internal/caches/list_file.go b/internal/caches/list_file.go index 7896e88..f2f156a 100644 --- a/internal/caches/list_file.go +++ b/internal/caches/list_file.go @@ -4,6 +4,7 @@ package caches import ( "database/sql" + "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/ttlcache" "github.com/TeaOSLab/EdgeNode/internal/utils" @@ -568,11 +569,11 @@ func (this *FileList) removeOldTables() error { } if lists.ContainsString(this.oldTables, name) { // 异步执行 - go func() { + goman.New(func() { remotelogs.Println("CACHE", "remove old table '"+name+"' ...") _, _ = this.db.Exec(`DROP TABLE "` + name + `"`) remotelogs.Println("CACHE", "remove old table '"+name+"' done") - }() + }) } } diff --git a/internal/caches/list_file_test.go b/internal/caches/list_file_test.go index 1d25e86..8628b8d 100644 --- a/internal/caches/list_file_test.go +++ b/internal/caches/list_file_test.go @@ -3,6 +3,7 @@ package caches import ( + "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/rands" "github.com/iwind/TeaGo/types" @@ -127,7 +128,7 @@ func TestFileList_Exist_Many_DB(t *testing.T) { }() for i := 0; i < threads; i++ { - go func() { + goman.New(func() { defer wg.Done() for { @@ -143,7 +144,7 @@ func TestFileList_Exist_Many_DB(t *testing.T) { return } } - }() + }) } wg.Wait() t.Log("left:", count) diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index e51b599..0edeb82 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -8,6 +8,7 @@ import ( "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/shared" "github.com/TeaOSLab/EdgeNode/internal/events" + "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/trackers" "github.com/TeaOSLab/EdgeNode/internal/utils" @@ -562,12 +563,12 @@ func (this *FileStorage) CleanAll() error { } // 重新遍历待删除 - go func() { + goman.New(func() { err = this.cleanDeletedDirs(dir) if err != nil { remotelogs.Warn("CACHE", "delete '*-deleted' dirs failed: "+err.Error()) } - }() + }) return nil } @@ -672,12 +673,12 @@ func (this *FileStorage) initList() error { } // 使用异步防止阻塞主线程 - /**go func() { + /**goman.New(func() { dir := this.dir() // 清除tmp // TODO 需要一个更加高效的实现 - }()**/ + })**/ // 启动定时清理任务 var autoPurgeInterval = this.policy.PersistenceAutoPurgeInterval @@ -695,26 +696,26 @@ func (this *FileStorage) initList() error { ticker.Stop() } }) - go func() { + goman.New(func() { for this.purgeTicker.Next() { trackers.Run("FILE_CACHE_STORAGE_PURGE_LOOP", func() { this.purgeLoop() }) } - }() + }) // 热点处理任务 this.hotTicker = utils.NewTicker(1 * time.Minute) if Tea.IsTesting() { this.hotTicker = utils.NewTicker(10 * time.Second) } - go func() { + goman.New(func() { for this.hotTicker.Next() { trackers.Run("FILE_CACHE_STORAGE_HOT_LOOP", func() { this.hotLoop() }) } - }() + }) return nil } diff --git a/internal/caches/storage_memory.go b/internal/caches/storage_memory.go index bf4cf89..ddd6d5e 100644 --- a/internal/caches/storage_memory.go +++ b/internal/caches/storage_memory.go @@ -3,6 +3,7 @@ package caches import ( "fmt" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/trackers" "github.com/TeaOSLab/EdgeNode/internal/utils" @@ -84,20 +85,20 @@ func (this *MemoryStorage) Init() error { // 启动定时清理任务 this.purgeTicker = utils.NewTicker(time.Duration(autoPurgeInterval) * time.Second) - go func() { + goman.New(func() { for this.purgeTicker.Next() { var tr = trackers.Begin("MEMORY_CACHE_STORAGE_PURGE_LOOP") this.purgeLoop() tr.End() } - }() + }) // 启动定时Flush memory to disk任务 - go func() { + goman.New(func() { for hash := range this.dirtyChan { this.flushItem(hash) } - }() + }) return nil } diff --git a/internal/goman/instance.go b/internal/goman/instance.go new file mode 100644 index 0000000..f765924 --- /dev/null +++ b/internal/goman/instance.go @@ -0,0 +1,12 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package goman + +import "time" + +type Instance struct { + Id uint64 + CreatedTime time.Time + File string + Line int +} diff --git a/internal/goman/lib.go b/internal/goman/lib.go new file mode 100644 index 0000000..fa0dbbb --- /dev/null +++ b/internal/goman/lib.go @@ -0,0 +1,53 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package goman + +import ( + "runtime" + "sync" + "time" +) + +var locker = &sync.Mutex{} +var instanceMap = map[uint64]*Instance{} // id => *Instance +var instanceId = uint64(0) + +// New 新创建goroutine +func New(f func()) { + _, file, line, _ := runtime.Caller(1) + + go func() { + locker.Lock() + instanceId++ + + var instance = &Instance{ + Id: instanceId, + CreatedTime: time.Now(), + } + + instance.File = file + instance.Line = line + + instanceMap[instanceId] = instance + locker.Unlock() + + // run function + f() + + locker.Lock() + delete(instanceMap, instanceId) + locker.Unlock() + }() +} + +// List 列出所有正在运行goroutine +func List() []*Instance { + locker.Lock() + defer locker.Unlock() + + var result = []*Instance{} + for _, instance := range instanceMap { + result = append(result, instance) + } + return result +} diff --git a/internal/goman/lib_test.go b/internal/goman/lib_test.go new file mode 100644 index 0000000..622a91f --- /dev/null +++ b/internal/goman/lib_test.go @@ -0,0 +1,21 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package goman + +import ( + "testing" + "time" +) + +func TestNew(t *testing.T) { + New(func() { + t.Log("Hello") + + t.Log(List()) + }) + + time.Sleep(1 * time.Second) + t.Log(List()) + + time.Sleep(1 * time.Second) +} diff --git a/internal/iplibrary/ip_list.go b/internal/iplibrary/ip_list.go index ddeba5a..529ba96 100644 --- a/internal/iplibrary/ip_list.go +++ b/internal/iplibrary/ip_list.go @@ -29,11 +29,9 @@ func NewIPList() *IPList { } expireList := expires.NewList() - go func() { - expireList.StartGC(func(itemId int64) { - list.Delete(itemId) - }) - }() + expireList.OnGC(func(itemId int64) { + list.Delete(itemId) + }) list.expireList = expireList return list } diff --git a/internal/iplibrary/ip_list_test.go b/internal/iplibrary/ip_list_test.go index 009f0b9..467dfa5 100644 --- a/internal/iplibrary/ip_list_test.go +++ b/internal/iplibrary/ip_list_test.go @@ -6,7 +6,9 @@ import ( "github.com/iwind/TeaGo/logs" "github.com/iwind/TeaGo/rands" "runtime" + "runtime/debug" "strconv" + "sync" "testing" "time" ) @@ -281,6 +283,22 @@ func TestGC(t *testing.T) { logs.PrintAsJSON(list.sortedItems, t) } +func TestTooManyLists(t *testing.T) { + debug.SetMaxThreads(20) + + var lists = []*IPList{} + var locker = &sync.Mutex{} + for i := 0; i < 1000; i++ { + locker.Lock() + lists = append(lists, NewIPList()) + locker.Unlock() + } + + time.Sleep(1 * time.Second) + t.Log(runtime.NumGoroutine()) + t.Log(len(lists), "lists") +} + func BenchmarkIPList_Contains(b *testing.B) { runtime.GOMAXPROCS(1) diff --git a/internal/iplibrary/manager_country.go b/internal/iplibrary/manager_country.go index 46dbe14..f17e4bd 100644 --- a/internal/iplibrary/manager_country.go +++ b/internal/iplibrary/manager_country.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeNode/internal/events" + "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/utils" @@ -21,7 +22,9 @@ var SharedCountryManager = NewCountryManager() func init() { events.On(events.EventLoaded, func() { - go SharedCountryManager.Start() + goman.New(func() { + SharedCountryManager.Start() + }) }) } diff --git a/internal/iplibrary/manager_ip_list.go b/internal/iplibrary/manager_ip_list.go index 0137622..c9be440 100644 --- a/internal/iplibrary/manager_ip_list.go +++ b/internal/iplibrary/manager_ip_list.go @@ -3,6 +3,7 @@ package iplibrary import ( "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeNode/internal/events" + "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/utils" @@ -17,7 +18,9 @@ var IPListUpdateNotify = make(chan bool, 1) func init() { events.On(events.EventLoaded, func() { - go SharedIPListManager.Start() + goman.New(func() { + SharedIPListManager.Start() + }) }) } diff --git a/internal/iplibrary/manager_province.go b/internal/iplibrary/manager_province.go index a0de6b2..0fa76c5 100644 --- a/internal/iplibrary/manager_province.go +++ b/internal/iplibrary/manager_province.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeNode/internal/events" + "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/utils" @@ -25,7 +26,9 @@ var SharedProvinceManager = NewProvinceManager() func init() { events.On(events.EventLoaded, func() { - go SharedProvinceManager.Start() + goman.New(func() { + SharedProvinceManager.Start() + }) }) } diff --git a/internal/iplibrary/updater.go b/internal/iplibrary/updater.go index 1c471af..840e62f 100644 --- a/internal/iplibrary/updater.go +++ b/internal/iplibrary/updater.go @@ -7,6 +7,7 @@ import ( "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeNode/internal/errors" "github.com/TeaOSLab/EdgeNode/internal/events" + "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/iwind/TeaGo/Tea" @@ -34,14 +35,14 @@ func NewUpdater() *Updater { func (this *Updater) Start() { // 这里不需要太频繁检查更新,因为通常不需要更新IP库 ticker := time.NewTicker(1 * time.Hour) - go func() { + goman.New(func() { for range ticker.C { err := this.loop() if err != nil { remotelogs.ErrorObject("IP_LIBRARY", err) } } - }() + }) } // 单次任务 diff --git a/internal/metrics/task.go b/internal/metrics/task.go index 66b2682..285c80c 100644 --- a/internal/metrics/task.go +++ b/internal/metrics/task.go @@ -7,6 +7,7 @@ import ( "encoding/json" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/trackers" @@ -163,7 +164,7 @@ ON "` + this.statTableName + `" ( func (this *Task) Start() error { // 读取数据 this.statsTicker = utils.NewTicker(1 * time.Minute) - go func() { + goman.New(func() { for this.statsTicker.Next() { var tr = trackers.Begin("[METRIC]DUMP_STATS_TO_LOCAL_DATABASE") @@ -181,11 +182,11 @@ func (this *Task) Start() error { tr.End() } - }() + }) // 清理 this.cleanTicker = utils.NewTicker(24 * time.Hour) - go func() { + goman.New(func() { for this.cleanTicker.Next() { var tr = trackers.Begin("[METRIC]CLEAN_EXPIRED") err := this.CleanExpired() @@ -194,11 +195,11 @@ func (this *Task) Start() error { remotelogs.Error("METRIC", "clean expired stats failed: "+err.Error()) } } - }() + }) // 上传 this.uploadTicker = utils.NewTicker(this.item.UploadDuration()) - go func() { + goman.New(func() { for this.uploadTicker.Next() { var tr = trackers.Begin("[METRIC]UPLOAD_STATS") err := this.Upload(1 * time.Second) @@ -207,7 +208,7 @@ func (this *Task) Start() error { remotelogs.Error("METRIC", "upload stats failed: "+err.Error()) } } - }() + }) return nil } diff --git a/internal/monitor/value_queue.go b/internal/monitor/value_queue.go index eac7472..52b59e6 100644 --- a/internal/monitor/value_queue.go +++ b/internal/monitor/value_queue.go @@ -6,6 +6,7 @@ import ( "encoding/json" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeNode/internal/events" + "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/iwind/TeaGo/maps" @@ -16,7 +17,9 @@ var SharedValueQueue = NewValueQueue() func init() { events.On(events.EventLoaded, func() { - go SharedValueQueue.Start() + goman.New(func() { + SharedValueQueue.Start() + }) }) } diff --git a/internal/nodes/api_stream.go b/internal/nodes/api_stream.go index cdccf10..7dad1fb 100644 --- a/internal/nodes/api_stream.go +++ b/internal/nodes/api_stream.go @@ -13,6 +13,7 @@ import ( teaconst "github.com/TeaOSLab/EdgeNode/internal/const" "github.com/TeaOSLab/EdgeNode/internal/errors" "github.com/TeaOSLab/EdgeNode/internal/events" + "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/utils" @@ -607,7 +608,7 @@ func (this *APIStream) handleChangeAPINode(message *pb.NodeStreamMessage) error this.replyOk(message.RequestId, "") - go func() { + goman.New(func() { // 延后生效,防止变更前的API无法读取到状态 time.Sleep(1 * time.Second) @@ -629,7 +630,7 @@ func (this *APIStream) handleChangeAPINode(message *pb.NodeStreamMessage) error remotelogs.Println("API_STREAM", "change rpc endpoint to '"+ messageData.Addr+"' successfully") - }() + }) return nil } diff --git a/internal/nodes/client_conn.go b/internal/nodes/client_conn.go index e474dcb..639f73e 100644 --- a/internal/nodes/client_conn.go +++ b/internal/nodes/client_conn.go @@ -6,6 +6,7 @@ import ( "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" teaconst "github.com/TeaOSLab/EdgeNode/internal/const" "github.com/TeaOSLab/EdgeNode/internal/events" + "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/monitor" "github.com/iwind/TeaGo/maps" "net" @@ -17,7 +18,7 @@ import ( func init() { events.On(events.EventStart, func() { ticker := time.NewTicker(1 * time.Minute) - go func() { + goman.New(func() { for range ticker.C { // 加入到数据队列中 if teaconst.InTrafficBytes > 0 { @@ -35,7 +36,7 @@ func init() { atomic.StoreUint64(&teaconst.InTrafficBytes, 0) atomic.StoreUint64(&teaconst.OutTrafficBytes, 0) } - }() + }) }) } diff --git a/internal/nodes/http_access_log_queue.go b/internal/nodes/http_access_log_queue.go index 4bf83b8..3abd264 100644 --- a/internal/nodes/http_access_log_queue.go +++ b/internal/nodes/http_access_log_queue.go @@ -2,6 +2,7 @@ package nodes import ( "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/rpc" "reflect" @@ -26,7 +27,9 @@ func NewHTTPAccessLogQueue() *HTTPAccessLogQueue { queue := &HTTPAccessLogQueue{ queue: make(chan *pb.HTTPAccessLog, maxSize), } - go queue.Start() + goman.New(func() { + queue.Start() + }) return queue } diff --git a/internal/nodes/http_client_pool.go b/internal/nodes/http_client_pool.go index db92a45..8b442b3 100644 --- a/internal/nodes/http_client_pool.go +++ b/internal/nodes/http_client_pool.go @@ -5,6 +5,7 @@ import ( "crypto/tls" "errors" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/pires/go-proxyproto" "net" @@ -33,7 +34,9 @@ func NewHTTPClientPool() *HTTPClientPool { clientsMap: map[string]*HTTPClient{}, } - go pool.cleanClients() + goman.New(func() { + pool.cleanClients() + }) return pool } diff --git a/internal/nodes/http_request.go b/internal/nodes/http_request.go index 3018a3c..2f98395 100644 --- a/internal/nodes/http_request.go +++ b/internal/nodes/http_request.go @@ -952,7 +952,7 @@ func (this *HTTPRequest) requestRemotePort() int { return 0 } -// 情趣的URI中的参数部分 +// 获取的URI中的参数部分 func (this *HTTPRequest) requestQueryString() string { uri, err := url.ParseRequestURI(this.uri) if err != nil { diff --git a/internal/nodes/http_request_cache.go b/internal/nodes/http_request_cache.go index 7c2c443..daa2e3e 100644 --- a/internal/nodes/http_request_cache.go +++ b/internal/nodes/http_request_cache.go @@ -5,6 +5,7 @@ import ( "errors" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeNode/internal/caches" + "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/utils" @@ -128,7 +129,7 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) { remotelogs.Error("HTTP_REQUEST_CACHE", "purge failed: "+err.Error()) } - go func() { + goman.New(func() { rpcClient, err := rpc.SharedRPC() if err == nil { for _, rpcServerService := range rpcClient.ServerRPCList() { @@ -142,7 +143,7 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) { } } } - }() + }) return true } diff --git a/internal/nodes/http_request_websocket.go b/internal/nodes/http_request_websocket.go index 91de38f..39d9855 100644 --- a/internal/nodes/http_request_websocket.go +++ b/internal/nodes/http_request_websocket.go @@ -2,6 +2,7 @@ package nodes import ( "errors" + "github.com/TeaOSLab/EdgeNode/internal/goman" "io" "net/http" "net/url" @@ -64,7 +65,7 @@ func (this *HTTPRequest) doWebsocket() { _ = clientConn.Close() }() - go func() { + goman.New(func() { buf := make([]byte, 4*1024) // TODO 使用内存池 for { n, err := originConn.Read(buf) @@ -81,6 +82,6 @@ func (this *HTTPRequest) doWebsocket() { } _ = clientConn.Close() _ = originConn.Close() - }() + }) _, _ = io.Copy(originConn, clientConn) } diff --git a/internal/nodes/listener.go b/internal/nodes/listener.go index 5967e84..9d57f81 100644 --- a/internal/nodes/listener.go +++ b/internal/nodes/listener.go @@ -5,6 +5,7 @@ import ( "errors" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeNode/internal/events" + "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "net" "sync" @@ -97,7 +98,7 @@ func (this *Listener) listenTCP() error { this.listener.Init() - go func() { + goman.New(func() { err := this.listener.Serve() if err != nil { // 在这里屏蔽accept错误,防止在优雅关闭的时候有多余的提示 @@ -109,7 +110,7 @@ func (this *Listener) listenTCP() error { // 打印其他错误 remotelogs.Error("LISTENER", err.Error()) } - }() + }) return nil } @@ -129,12 +130,12 @@ func (this *Listener) listenUDP() error { Listener: listener, } - go func() { + goman.New(func() { err := this.listener.Serve() if err != nil { remotelogs.Error("LISTENER", err.Error()) } - }() + }) return nil } diff --git a/internal/nodes/listener_manager.go b/internal/nodes/listener_manager.go index a6c17cf..40d76aa 100644 --- a/internal/nodes/listener_manager.go +++ b/internal/nodes/listener_manager.go @@ -5,6 +5,7 @@ import ( "errors" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/lists" @@ -43,11 +44,11 @@ func NewListenerManager() *ListenerManager { manager.ticker = time.NewTicker(5 * time.Second) } - go func() { + goman.New(func() { for range manager.ticker.C { manager.retryListeners() } - }() + }) return manager } diff --git a/internal/nodes/listener_tcp.go b/internal/nodes/listener_tcp.go index 6bcc311..6f43087 100644 --- a/internal/nodes/listener_tcp.go +++ b/internal/nodes/listener_tcp.go @@ -4,6 +4,7 @@ import ( "crypto/tls" "errors" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/stats" "github.com/pires/go-proxyproto" @@ -110,7 +111,7 @@ func (this *TCPListener) handleConn(conn net.Conn) error { } // 从源站读取 - go func() { + goman.New(func() { originBuffer := bytePool32k.Get() defer func() { bytePool32k.Put(originBuffer) @@ -134,7 +135,7 @@ func (this *TCPListener) handleConn(conn net.Conn) error { break } } - }() + }) // 从客户端读取 clientBuffer := bytePool32k.Get() diff --git a/internal/nodes/listener_udp.go b/internal/nodes/listener_udp.go index fa24457..448738c 100644 --- a/internal/nodes/listener_udp.go +++ b/internal/nodes/listener_udp.go @@ -3,6 +3,7 @@ package nodes import ( "errors" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/stats" "github.com/TeaOSLab/EdgeNode/internal/utils" @@ -37,11 +38,11 @@ func (this *UDPListener) Serve() error { this.connMap = map[string]*UDPConn{} this.connTicker = utils.NewTicker(1 * time.Minute) - go func() { + goman.New(func() { for this.connTicker.Next() { this.gcConns() } - }() + }) var buffer = make([]byte, 4*1024) for { @@ -188,7 +189,7 @@ func NewUDPConn(server *serverconfigs.ServerConfig, addr net.Addr, proxyConn *ne stats.SharedTrafficStatManager.Add(server.Id, "", 0, 0, 1, 0, 0, 0, server.ShouldCheckTrafficLimit(), server.PlanId()) } - go func() { + goman.New(func() { buffer := bytePool32k.Get() defer func() { bytePool32k.Put(buffer) @@ -214,7 +215,7 @@ func NewUDPConn(server *serverconfigs.ServerConfig, addr net.Addr, proxyConn *ne break } } - }() + }) return conn } diff --git a/internal/nodes/node.go b/internal/nodes/node.go index 0b8ff66..7fd2fc8 100644 --- a/internal/nodes/node.go +++ b/internal/nodes/node.go @@ -11,6 +11,7 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/configs" teaconst "github.com/TeaOSLab/EdgeNode/internal/const" "github.com/TeaOSLab/EdgeNode/internal/events" + "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/iplibrary" "github.com/TeaOSLab/EdgeNode/internal/metrics" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" @@ -31,6 +32,7 @@ import ( "os" "os/exec" "runtime" + "sort" "sync" "time" ) @@ -124,7 +126,9 @@ func (this *Node) Start() { this.startSyncTimer() // 状态变更计时器 - go NewNodeStatusExecutor().Listen() + goman.New(func() { + NewNodeStatusExecutor().Listen() + }) // 读取配置 nodeConfig, err := nodeconfigs.SharedNodeConfig() @@ -153,13 +157,17 @@ func (this *Node) Start() { _ = utils.SetRLimit(1024 * 1024) // 连接API - go NewAPIStream().Start() + goman.New(func() { + NewAPIStream().Start() + }) // 统计 go stats.SharedTrafficStatManager.Start(func() *nodeconfigs.NodeConfig { return sharedNodeConfig }) - go stats.SharedHTTPRequestStatManager.Start() + goman.New(func() { + stats.SharedHTTPRequestStatManager.Start() + }) // 启动端口 err = sharedListenerManager.Start(nodeConfig) @@ -297,7 +305,9 @@ func (this *Node) loop() error { return err } case "nodeVersionChanged": - go sharedUpgradeManager.Start() + goman.New(func() { + sharedUpgradeManager.Start() + }) } } @@ -439,7 +449,7 @@ func (this *Node) startSyncTimer() { remotelogs.Println("NODE", "quit sync timer") ticker.Stop() }) - go func() { + goman.New(func() { for { select { case <-ticker.C: @@ -462,7 +472,7 @@ func (this *Node) startSyncTimer() { } } } - }() + }) } // 检查集群设置 @@ -530,7 +540,7 @@ func (this *Node) listenSock() error { } // 启动监听 - go func() { + goman.New(func() { this.sock.OnCommand(func(cmd *gosock.Command) { switch cmd.Code { case "pid": @@ -563,7 +573,7 @@ func (this *Node) listenSock() error { events.Notify(events.EventQuit) // 监控连接数,如果连接数为0,则退出进程 - go func() { + goman.New(func() { for { countActiveConnections := sharedListenerManager.TotalActiveConnections() if countActiveConnections <= 0 { @@ -572,13 +582,43 @@ func (this *Node) listenSock() error { } time.Sleep(1 * time.Second) } - }() + }) case "trackers": _ = cmd.Reply(&gosock.Command{ Params: map[string]interface{}{ "labels": trackers.SharedManager.Labels(), }, }) + case "goman": + var posMap = map[string]maps.Map{} // file#line => Map + for _, instance := range goman.List() { + var pos = instance.File + "#" + types.String(instance.Line) + m, ok := posMap[pos] + if ok { + m["count"] = m["count"].(int) + 1 + } else { + m = maps.Map{ + "pos": pos, + "count": 1, + } + posMap[pos] = m + } + } + + var result = []maps.Map{} + for _, m := range posMap { + result = append(result, m) + } + + sort.Slice(result, func(i, j int) bool { + return result[i]["count"].(int) > result[j]["count"].(int) + }) + + _ = cmd.Reply(&gosock.Command{ + Params: map[string]interface{}{ + "result": result, + }, + }) } }) @@ -586,7 +626,7 @@ func (this *Node) listenSock() error { if err != nil { logs.Println("NODE", err.Error()) } - }() + }) events.On(events.EventQuit, func() { logs.Println("NODE", "quit unix sock") diff --git a/internal/nodes/origin_state_manager.go b/internal/nodes/origin_state_manager.go index a52d3bd..a15218b 100644 --- a/internal/nodes/origin_state_manager.go +++ b/internal/nodes/origin_state_manager.go @@ -5,6 +5,7 @@ package nodes import ( "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeNode/internal/events" + "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/trackers" "github.com/iwind/TeaGo/Tea" @@ -16,7 +17,9 @@ var SharedOriginStateManager = NewOriginStateManager() func init() { events.On(events.EventLoaded, func() { - go SharedOriginStateManager.Start() + goman.New(func() { + SharedOriginStateManager.Start() + }) }) } diff --git a/internal/nodes/system_services.go b/internal/nodes/system_services.go index 31df900..e4150e7 100644 --- a/internal/nodes/system_services.go +++ b/internal/nodes/system_services.go @@ -7,6 +7,7 @@ import ( "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" teaconst "github.com/TeaOSLab/EdgeNode/internal/const" "github.com/TeaOSLab/EdgeNode/internal/events" + "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/iwind/TeaGo/maps" @@ -96,10 +97,10 @@ func (this *SystemServiceManager) setupSystemd(params maps.Map) error { } // 启动Service - go func() { + goman.New(func() { time.Sleep(5 * time.Second) _ = exec.Command(systemctl, "start", teaconst.SystemdServiceName).Start() - }() + }) if output == "enabled" { // 检查文件路径是否变化 diff --git a/internal/nodes/task_sync_api_nodes.go b/internal/nodes/task_sync_api_nodes.go index 3e19536..4ce8c52 100644 --- a/internal/nodes/task_sync_api_nodes.go +++ b/internal/nodes/task_sync_api_nodes.go @@ -6,6 +6,7 @@ import ( "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeNode/internal/configs" "github.com/TeaOSLab/EdgeNode/internal/events" + "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/trackers" @@ -23,7 +24,9 @@ import ( func init() { events.On(events.EventStart, func() { task := NewSyncAPINodesTask() - go task.Start() + goman.New(func() { + task.Start() + }) }) } diff --git a/internal/nodes/toa_manager.go b/internal/nodes/toa_manager.go index f7c037e..5f64a08 100644 --- a/internal/nodes/toa_manager.go +++ b/internal/nodes/toa_manager.go @@ -3,6 +3,7 @@ package nodes import ( "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeNode/internal/events" + "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/iwind/TeaGo/Tea" "net" @@ -67,7 +68,9 @@ func (this *TOAManager) Run(config *nodeconfigs.TOAConfig) error { } this.pid = cmd.Process.Pid - go func() { _ = cmd.Wait() }() + goman.New(func() { + _ = cmd.Wait() + }) return nil } diff --git a/internal/nodes/upgrade_manager.go b/internal/nodes/upgrade_manager.go index ff88b6d..f26409d 100644 --- a/internal/nodes/upgrade_manager.go +++ b/internal/nodes/upgrade_manager.go @@ -8,6 +8,7 @@ import ( "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" teaconst "github.com/TeaOSLab/EdgeNode/internal/const" "github.com/TeaOSLab/EdgeNode/internal/events" + "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/utils" @@ -60,12 +61,12 @@ func (this *UpgradeManager) Start() { remotelogs.Println("UPGRADE_MANAGER", "upgrade successfully") - go func() { + goman.New(func() { err = this.restart() if err != nil { logs.Println("UPGRADE_MANAGER", err.Error()) } - }() + }) } func (this *UpgradeManager) install() error { diff --git a/internal/remotelogs/utils.go b/internal/remotelogs/utils.go index 26681f1..7b42d85 100644 --- a/internal/remotelogs/utils.go +++ b/internal/remotelogs/utils.go @@ -5,6 +5,7 @@ import ( "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" teaconst "github.com/TeaOSLab/EdgeNode/internal/const" + "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/trackers" "github.com/cespare/xxhash" @@ -24,7 +25,7 @@ func init() { if Tea.IsTesting() { ticker = time.NewTicker(10 * time.Second) } - go func() { + goman.New(func() { for range ticker.C { var tr = trackers.Begin("UPLOAD_REMOTE_LOGS") err := uploadLogs() @@ -33,7 +34,7 @@ func init() { logs.Println("[LOG]" + err.Error()) } } - }() + }) } // Println 打印普通信息 diff --git a/internal/stats/http_request_stat_manager.go b/internal/stats/http_request_stat_manager.go index 918f02d..17345a3 100644 --- a/internal/stats/http_request_stat_manager.go +++ b/internal/stats/http_request_stat_manager.go @@ -4,6 +4,7 @@ import ( "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeNode/internal/events" + "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/iplibrary" "github.com/TeaOSLab/EdgeNode/internal/monitor" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" @@ -63,17 +64,17 @@ func NewHTTPRequestStatManager() *HTTPRequestStatManager { // Start 启动 func (this *HTTPRequestStatManager) Start() { // 上传请求总数 - go func() { + goman.New(func() { ticker := time.NewTicker(1 * time.Minute) - go func() { + goman.New(func() { for range ticker.C { if this.totalAttackRequests > 0 { monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemAttackRequests, maps.Map{"total": this.totalAttackRequests}) this.totalAttackRequests = 0 } } - }() - }() + }) + }) loopTicker := time.NewTicker(1 * time.Second) uploadTicker := time.NewTicker(30 * time.Minute) diff --git a/internal/stats/traffic_stat_manager.go b/internal/stats/traffic_stat_manager.go index de22123..6ac5746 100644 --- a/internal/stats/traffic_stat_manager.go +++ b/internal/stats/traffic_stat_manager.go @@ -4,6 +4,7 @@ import ( "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeNode/internal/events" + "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/monitor" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/rpc" @@ -55,17 +56,17 @@ func (this *TrafficStatManager) Start(configFunc func() *nodeconfigs.NodeConfig) this.configFunc = configFunc // 上传请求总数 - go func() { + goman.New(func() { ticker := time.NewTicker(1 * time.Minute) - go func() { + goman.New(func() { for range ticker.C { if this.totalRequests > 0 { monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemRequests, maps.Map{"total": this.totalRequests}) this.totalRequests = 0 } } - }() - }() + }) + }) // 上传统计数据 duration := 5 * time.Minute diff --git a/internal/ttlcache/cache.go b/internal/ttlcache/cache.go index 668a52d..29c3429 100644 --- a/internal/ttlcache/cache.go +++ b/internal/ttlcache/cache.go @@ -1,7 +1,6 @@ package ttlcache import ( - "github.com/TeaOSLab/EdgeNode/internal/utils" "time" ) @@ -19,7 +18,6 @@ type Cache struct { maxItems int gcPieceIndex int - ticker *utils.Ticker } func NewCache(opt ...OptionInterface) *Cache { @@ -56,13 +54,8 @@ func NewCache(opt ...OptionInterface) *Cache { cache.pieces = append(cache.pieces, NewPiece(maxItems/countPieces)) } - // start timer - go func() { - cache.ticker = utils.NewTicker(5 * time.Second) - for cache.ticker.Next() { - cache.GC() - } - }() + // Add to manager + SharedManager.Add(cache) return cache } @@ -149,12 +142,10 @@ func (this *Cache) Clean() { } func (this *Cache) Destroy() { + SharedManager.Remove(this) + this.isDestroyed = true - if this.ticker != nil { - this.ticker.Stop() - this.ticker = nil - } for _, piece := range this.pieces { piece.Destroy() } diff --git a/internal/ttlcache/cache_test.go b/internal/ttlcache/cache_test.go index ffd11c2..7bfcf63 100644 --- a/internal/ttlcache/cache_test.go +++ b/internal/ttlcache/cache_test.go @@ -123,13 +123,18 @@ func TestCache_GC(t *testing.T) { func TestCache_GC2(t *testing.T) { runtime.GOMAXPROCS(1) - cache := NewCache() + cache1 := NewCache(NewPiecesOption(32)) for i := 0; i < 1_000_000; i++ { - cache.Write(strconv.Itoa(i), i, time.Now().Unix()+int64(rands.Int(0, 100))) + cache1.Write(strconv.Itoa(i), i, time.Now().Unix()+int64(rands.Int(0, 10))) + } + + cache2 := NewCache(NewPiecesOption(5)) + for i := 0; i < 1_000_000; i++ { + cache2.Write(strconv.Itoa(i), i, time.Now().Unix()+int64(rands.Int(0, 10))) } for i := 0; i < 100; i++ { - t.Log(cache.Count(), "items") + t.Log(cache1.Count(), "items", cache2.Count(), "items") time.Sleep(1 * time.Second) } } diff --git a/internal/ttlcache/manager.go b/internal/ttlcache/manager.go new file mode 100644 index 0000000..50df6d6 --- /dev/null +++ b/internal/ttlcache/manager.go @@ -0,0 +1,53 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package ttlcache + +import ( + "github.com/TeaOSLab/EdgeNode/internal/goman" + "sync" + "time" +) + +var SharedManager = NewManager() + +type Manager struct { + ticker *time.Ticker + locker sync.Mutex + + cacheMap map[*Cache]bool +} + +func NewManager() *Manager { + var manager = &Manager{ + ticker: time.NewTicker(3 * time.Second), + cacheMap: map[*Cache]bool{}, + } + + goman.New(func() { + manager.init() + }) + + return manager +} + +func (this *Manager) init() { + for range this.ticker.C { + this.locker.Lock() + for cache := range this.cacheMap { + cache.GC() + } + this.locker.Unlock() + } +} + +func (this *Manager) Add(cache *Cache) { + this.locker.Lock() + this.cacheMap[cache] = true + this.locker.Unlock() +} + +func (this *Manager) Remove(cache *Cache) { + this.locker.Lock() + delete(this.cacheMap, cache) + this.locker.Unlock() +} diff --git a/internal/utils/expires/list.go b/internal/utils/expires/list.go index 98fb485..6f6a334 100644 --- a/internal/utils/expires/list.go +++ b/internal/utils/expires/list.go @@ -2,7 +2,6 @@ package expires import ( "sync" - "time" ) type ItemMap = map[int64]bool @@ -12,14 +11,19 @@ type List struct { itemsMap map[int64]int64 // itemId => timestamp locker sync.Mutex - ticker *time.Ticker + + gcCallback func(itemId int64) } func NewList() *List { - return &List{ + var list = &List{ expireMap: map[int64]ItemMap{}, itemsMap: map[int64]int64{}, } + + SharedManager.Add(list) + + return list } func (this *List) Add(itemId int64, expiresAt int64) { @@ -56,33 +60,15 @@ func (this *List) GC(timestamp int64, callback func(itemId int64)) { itemMap := this.gcItems(timestamp) this.locker.Unlock() - for itemId := range itemMap { - callback(itemId) + if callback != nil { + for itemId := range itemMap { + callback(itemId) + } } } -func (this *List) StartGC(callback func(itemId int64)) { - this.ticker = time.NewTicker(1 * time.Second) - lastTimestamp := int64(0) - for range this.ticker.C { - timestamp := time.Now().Unix() - if lastTimestamp == 0 { - lastTimestamp = timestamp - 3600 - } - - if timestamp >= lastTimestamp { - for i := lastTimestamp; i <= timestamp; i++ { - this.GC(i, callback) - } - } else { - for i := timestamp; i <= lastTimestamp; i++ { - this.GC(i, callback) - } - } - - // 这样做是为了防止系统时钟突变 - lastTimestamp = timestamp - } +func (this *List) OnGC(callback func(itemId int64)) { + this.gcCallback = callback } func (this *List) removeItem(itemId int64) { diff --git a/internal/utils/expires/list_test.go b/internal/utils/expires/list_test.go index bca42e9..fb96549 100644 --- a/internal/utils/expires/list_test.go +++ b/internal/utils/expires/list_test.go @@ -63,11 +63,13 @@ func TestList_Start_GC(t *testing.T) { list.Add(7, time.Now().Unix()+6) list.Add(8, time.Now().Unix()+6) + list.OnGC(func(itemId int64) { + t.Log("gc:", itemId, timeutil.Format("H:i:s")) + time.Sleep(2 * time.Second) + }) + go func() { - list.StartGC(func(itemId int64) { - t.Log("gc:", itemId, timeutil.Format("H:i:s")) - time.Sleep(2 * time.Second) - }) + SharedManager.Add(list) }() time.Sleep(20 * time.Second) diff --git a/internal/utils/expires/manager.go b/internal/utils/expires/manager.go new file mode 100644 index 0000000..d49fe23 --- /dev/null +++ b/internal/utils/expires/manager.go @@ -0,0 +1,71 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package expires + +import ( + "github.com/TeaOSLab/EdgeNode/internal/goman" + "sync" + "time" +) + +var SharedManager = NewManager() + +type Manager struct { + listMap map[*List]bool + locker sync.Mutex + ticker *time.Ticker +} + +func NewManager() *Manager { + var manager = &Manager{ + listMap: map[*List]bool{}, + ticker: time.NewTicker(1 * time.Second), + } + goman.New(func() { + manager.init() + }) + return manager +} + +func (this *Manager) init() { + var lastTimestamp = int64(0) + for range this.ticker.C { + timestamp := time.Now().Unix() + if lastTimestamp == 0 { + lastTimestamp = timestamp - 3600 + } + + if timestamp >= lastTimestamp { + for i := lastTimestamp; i <= timestamp; i++ { + this.locker.Lock() + for list := range this.listMap { + list.GC(i, list.gcCallback) + } + this.locker.Unlock() + } + } else { + for i := timestamp; i <= lastTimestamp; i++ { + this.locker.Lock() + for list := range this.listMap { + list.GC(i, list.gcCallback) + } + this.locker.Unlock() + } + } + + // 这样做是为了防止系统时钟突变 + lastTimestamp = timestamp + } +} + +func (this *Manager) Add(list *List) { + this.locker.Lock() + this.listMap[list] = true + this.locker.Unlock() +} + +func (this *Manager) Remove(list *List) { + this.locker.Lock() + delete(this.listMap, list) + this.locker.Unlock() +} diff --git a/internal/utils/free_hours_manager.go b/internal/utils/free_hours_manager.go index fb22523..7017672 100644 --- a/internal/utils/free_hours_manager.go +++ b/internal/utils/free_hours_manager.go @@ -5,6 +5,7 @@ package utils import ( teaconst "github.com/TeaOSLab/EdgeNode/internal/const" "github.com/TeaOSLab/EdgeNode/internal/events" + "github.com/TeaOSLab/EdgeNode/internal/goman" "sort" "sync" "sync/atomic" @@ -15,7 +16,9 @@ var SharedFreeHoursManager = NewFreeHoursManager() func init() { events.On(events.EventLoaded, func() { - go SharedFreeHoursManager.Start() + goman.New(func() { + SharedFreeHoursManager.Start() + }) }) } diff --git a/internal/utils/ticker_utils.go b/internal/utils/ticker_utils.go index 1e62e81..e141d99 100644 --- a/internal/utils/ticker_utils.go +++ b/internal/utils/ticker_utils.go @@ -1,15 +1,18 @@ package utils -import "time" +import ( + "github.com/TeaOSLab/EdgeNode/internal/goman" + "time" +) -// 定时运行某个函数 +// Every 定时运行某个函数 func Every(duration time.Duration, f func(ticker *Ticker)) *Ticker { ticker := NewTicker(duration) - go func() { + goman.New(func() { for ticker.Next() { f(ticker) } - }() + }) return ticker } diff --git a/internal/utils/time.go b/internal/utils/time.go index 57cf7a0..2094b5e 100644 --- a/internal/utils/time.go +++ b/internal/utils/time.go @@ -1,6 +1,7 @@ package utils import ( + "github.com/TeaOSLab/EdgeNode/internal/goman" "time" ) @@ -9,12 +10,12 @@ var unixTimeMilli = time.Now().UnixMilli() func init() { ticker := time.NewTicker(200 * time.Millisecond) - go func() { + goman.New(func() { for range ticker.C { unixTime = time.Now().Unix() unixTimeMilli = time.Now().UnixMilli() } - }() + }) } // UnixTime 最快获取时间戳的方式,通常用在不需要特别精确时间戳的场景 diff --git a/internal/waf/action_notify.go b/internal/waf/action_notify.go index 1df7d4e..766f6c8 100644 --- a/internal/waf/action_notify.go +++ b/internal/waf/action_notify.go @@ -5,6 +5,7 @@ package waf import ( "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeNode/internal/events" + "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/waf/requests" @@ -25,7 +26,7 @@ var notifyChan = make(chan *notifyTask, 128) func init() { events.On(events.EventLoaded, func() { - go func() { + goman.New(func() { rpcClient, err := rpc.SharedRPC() if err != nil { remotelogs.Error("WAF_NOTIFY_ACTION", "create rpc client failed: "+err.Error()) @@ -44,7 +45,7 @@ func init() { remotelogs.Error("WAF_NOTIFY_ACTION", "notify failed: "+err.Error()) } } - }() + }) }) } diff --git a/internal/waf/action_record_ip.go b/internal/waf/action_record_ip.go index fce8535..a04c983 100644 --- a/internal/waf/action_record_ip.go +++ b/internal/waf/action_record_ip.go @@ -5,6 +5,7 @@ import ( "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/firewallconfigs" teaconst "github.com/TeaOSLab/EdgeNode/internal/const" "github.com/TeaOSLab/EdgeNode/internal/events" + "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/waf/requests" @@ -31,7 +32,7 @@ var recordIPTaskChan = make(chan *recordIPTask, 1024) func init() { events.On(events.EventLoaded, func() { - go func() { + goman.New(func() { rpcClient, err := rpc.SharedRPC() if err != nil { remotelogs.Error("WAF_RECORD_IP_ACTION", "create rpc client failed: "+err.Error()) @@ -62,7 +63,7 @@ func init() { remotelogs.Error("WAF_RECORD_IP_ACTION", "create ip item failed: "+err.Error()) } } - }() + }) }) } diff --git a/internal/waf/ip_list.go b/internal/waf/ip_list.go index 9c2120b..bb55575 100644 --- a/internal/waf/ip_list.go +++ b/internal/waf/ip_list.go @@ -44,11 +44,9 @@ func NewIPList(listType IPListType) *IPList { e := expires.NewList() list.expireList = e - go func() { - e.StartGC(func(itemId int64) { - list.remove(itemId) - }) - }() + e.OnGC(func(itemId int64) { + list.remove(itemId) + }) return list }