From 938278725e3c4080e473ef491259fcd0c300a7f9 Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Wed, 12 Jan 2022 20:31:04 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/caches/storage_file_test.go | 6 +- internal/events/utils.go | 64 +++++++++++++++++---- internal/events/utils_test.go | 29 ++++++++-- internal/iplibrary/manager_city.go | 19 ++++-- internal/iplibrary/manager_country.go | 19 ++++-- internal/iplibrary/manager_ip_list.go | 20 +++++-- internal/iplibrary/manager_provider.go | 19 ++++-- internal/iplibrary/manager_province.go | 19 ++++-- internal/iplibrary/updater.go | 31 ++++++---- internal/nodes/api_stream.go | 2 +- internal/nodes/client_conn_traffic.go | 2 +- internal/nodes/listener.go | 6 +- internal/nodes/listener_base.go | 4 +- internal/nodes/listener_interface.go | 4 +- internal/nodes/listener_manager.go | 2 +- internal/nodes/node.go | 4 +- internal/nodes/node_status_executor.go | 4 +- internal/nodes/origin_state_manager.go | 11 +++- internal/nodes/system_services.go | 2 +- internal/nodes/task_sync_api_nodes.go | 26 +++++---- internal/stats/http_request_stat_manager.go | 23 ++++---- internal/stats/traffic_stat_manager.go | 21 +++---- internal/utils/free_hours_manager.go | 16 +++++- 23 files changed, 243 insertions(+), 110 deletions(-) diff --git a/internal/caches/storage_file_test.go b/internal/caches/storage_file_test.go index 15505cb..d794df2 100644 --- a/internal/caches/storage_file_test.go +++ b/internal/caches/storage_file_test.go @@ -482,11 +482,7 @@ func TestFileStorage_DecodeFile(t *testing.T) { t.Fatal(err) } _, path := storage.keyPath("my-key") - item, err := storage.decodeFile(path) - if err != nil { - t.Fatal(err) - } - logs.PrintAsJSON(item, t) + t.Log(path) } func BenchmarkFileStorage_Read(b *testing.B) { diff --git a/internal/events/utils.go b/internal/events/utils.go index 46426ff..45e2bd2 100644 --- a/internal/events/utils.go +++ b/internal/events/utils.go @@ -1,27 +1,71 @@ package events -import "sync" +import ( + "sync" +) -var eventsMap = map[string][]func(){} // event => []callbacks +type Callbacks = []func() + +var eventsMap = map[Event]map[interface{}]Callbacks{} // event => map[event key][]callback var locker = sync.Mutex{} +var eventKeyId = 0 + +func NewKey() interface{} { + locker.Lock() + defer locker.Unlock() + eventKeyId++ + return eventKeyId +} + // On 增加事件回调 -func On(event string, callback func()) { +func On(event Event, callback func()) { + OnKey(event, nil, callback) +} + +// OnKey 使用Key增加事件回调 +func OnKey(event Event, key interface{}, callback func()) { + if key == nil { + key = NewKey() + } + locker.Lock() defer locker.Unlock() - callbacks, _ := eventsMap[event] - callbacks = append(callbacks, callback) - eventsMap[event] = callbacks + m, ok := eventsMap[event] + if !ok { + m = map[interface{}]Callbacks{} + eventsMap[event] = m + } + m[key] = append(m[key], callback) +} + +// Remove 删除事件回调 +func Remove(key interface{}) { + if key == nil { + return + } + + locker.Lock() + for k, m := range eventsMap { + _, ok := m[key] + if ok { + delete(m, key) + eventsMap[k] = m + } + } + locker.Unlock() } // Notify 通知事件 -func Notify(event string) { +func Notify(event Event) { locker.Lock() - callbacks, _ := eventsMap[event] + m := eventsMap[event] locker.Unlock() - for _, callback := range callbacks { - callback() + for _, callbacks := range m { + for _, callback := range callbacks { + callback() + } } } diff --git a/internal/events/utils_test.go b/internal/events/utils_test.go index 4cb41e3..da01f79 100644 --- a/internal/events/utils_test.go +++ b/internal/events/utils_test.go @@ -1,16 +1,33 @@ -package events +package events_test -import "testing" +import ( + "github.com/TeaOSLab/EdgeNode/internal/events" + "testing" +) func TestOn(t *testing.T) { - On("hello", func() { + type User struct { + name string + } + var u = &User{} + var u2 = &User{} + + events.On("hello", func() { t.Log("world") }) - On("hello", func() { + events.On("hello", func() { t.Log("world2") }) - On("hello2", func() { + events.OnKey("hello", u, func() { + t.Log("world3") + }) + events.OnKey("hello", u, func() { + t.Log("world4") + }) + events.Remove(u) + events.Remove(u2) + events.OnKey("hello2", nil, func() { t.Log("world2") }) - Notify("hello") + events.Notify("hello") } diff --git a/internal/iplibrary/manager_city.go b/internal/iplibrary/manager_city.go index a07e701..79a2b82 100644 --- a/internal/iplibrary/manager_city.go +++ b/internal/iplibrary/manager_city.go @@ -9,7 +9,6 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/rpc" - "github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/iwind/TeaGo/Tea" _ "github.com/iwind/TeaGo/bootstrap" "github.com/iwind/TeaGo/types" @@ -27,10 +26,15 @@ func init() { SharedCityManager.Start() }) }) + events.On(events.EventQuit, func() { + SharedCityManager.Stop() + }) } // CityManager 中国省份信息管理 type CityManager struct { + ticker *time.Ticker + cacheFile string cityMap map[string]int64 // provinceName_cityName => cityName @@ -62,11 +66,8 @@ func (this *CityManager) Start() { } // 定时更新 - ticker := utils.NewTicker(4 * time.Hour) - events.On(events.EventQuit, func() { - ticker.Stop() - }) - for ticker.Next() { + this.ticker = time.NewTicker(4 * time.Hour) + for range this.ticker.C { err := this.loop() if err != nil { remotelogs.ErrorObject("CITY_MANAGER", err) @@ -74,6 +75,12 @@ func (this *CityManager) Start() { } } +func (this *CityManager) Stop() { + if this.ticker != nil { + this.ticker.Stop() + } +} + func (this *CityManager) Lookup(provinceId int64, cityName string) (cityId int64) { this.locker.RLock() cityId, _ = this.cityMap[types.String(provinceId)+"_"+cityName] diff --git a/internal/iplibrary/manager_country.go b/internal/iplibrary/manager_country.go index 9c85b20..93c12b8 100644 --- a/internal/iplibrary/manager_country.go +++ b/internal/iplibrary/manager_country.go @@ -9,7 +9,6 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/rpc" - "github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/iwind/TeaGo/Tea" _ "github.com/iwind/TeaGo/bootstrap" "io/ioutil" @@ -26,10 +25,15 @@ func init() { SharedCountryManager.Start() }) }) + events.On(events.EventQuit, func() { + SharedCountryManager.Stop() + }) } // CountryManager 国家/地区信息管理 type CountryManager struct { + ticker *time.Ticker + cacheFile string countryMap map[string]int64 // countryName => countryId @@ -61,11 +65,8 @@ func (this *CountryManager) Start() { } // 定时更新 - ticker := utils.NewTicker(4 * time.Hour) - events.On(events.EventQuit, func() { - ticker.Stop() - }) - for ticker.Next() { + this.ticker = time.NewTicker(4 * time.Hour) + for range this.ticker.C { err := this.loop() if err != nil { remotelogs.ErrorObject("COUNTRY_MANAGER", err) @@ -73,6 +74,12 @@ func (this *CountryManager) Start() { } } +func (this *CountryManager) Stop() { + if this.ticker != nil { + this.ticker.Stop() + } +} + func (this *CountryManager) Lookup(countryName string) (countryId int64) { this.locker.RLock() countryId, _ = this.countryMap[countryName] diff --git a/internal/iplibrary/manager_ip_list.go b/internal/iplibrary/manager_ip_list.go index fc8b236..6ca01ee 100644 --- a/internal/iplibrary/manager_ip_list.go +++ b/internal/iplibrary/manager_ip_list.go @@ -23,10 +23,15 @@ func init() { SharedIPListManager.Start() }) }) + events.On(events.EventQuit, func() { + SharedIPListManager.Stop() + }) } // IPListManager IP名单管理 type IPListManager struct { + ticker *time.Ticker + db *IPListDB version int64 @@ -52,17 +57,14 @@ func (this *IPListManager) Start() { remotelogs.ErrorObject("IP_LIST_MANAGER", err) } - ticker := time.NewTicker(60 * time.Second) + this.ticker = time.NewTicker(60 * time.Second) if Tea.IsTesting() { - ticker = time.NewTicker(10 * time.Second) + this.ticker = time.NewTicker(10 * time.Second) } - events.On(events.EventQuit, func() { - ticker.Stop() - }) countErrors := 0 for { select { - case <-ticker.C: + case <-this.ticker.C: case <-IPListUpdateNotify: } err := this.loop() @@ -84,6 +86,12 @@ func (this *IPListManager) Start() { } } +func (this *IPListManager) Stop() { + if this.ticker != nil { + this.ticker.Stop() + } +} + func (this *IPListManager) init() { // 从数据库中当中读取数据 db, err := NewIPListDB() diff --git a/internal/iplibrary/manager_provider.go b/internal/iplibrary/manager_provider.go index b6c9a7c..a08f5dc 100644 --- a/internal/iplibrary/manager_provider.go +++ b/internal/iplibrary/manager_provider.go @@ -9,7 +9,6 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/rpc" - "github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/iwind/TeaGo/Tea" _ "github.com/iwind/TeaGo/bootstrap" "io/ioutil" @@ -26,10 +25,15 @@ func init() { SharedProviderManager.Start() }) }) + events.On(events.EventQuit, func() { + SharedProviderManager.Stop() + }) } // ProviderManager 中国省份信息管理 type ProviderManager struct { + ticker *time.Ticker + cacheFile string providerMap map[string]int64 // name => id @@ -61,11 +65,8 @@ func (this *ProviderManager) Start() { } // 定时更新 - ticker := utils.NewTicker(4 * time.Hour) - events.On(events.EventQuit, func() { - ticker.Stop() - }) - for ticker.Next() { + this.ticker = time.NewTicker(4 * time.Hour) + for range this.ticker.C { err := this.loop() if err != nil { remotelogs.ErrorObject("PROVIDER_MANAGER", err) @@ -73,6 +74,12 @@ func (this *ProviderManager) Start() { } } +func (this *ProviderManager) Stop() { + if this.ticker != nil { + this.ticker.Stop() + } +} + func (this *ProviderManager) Lookup(providerName string) (providerId int64) { this.locker.RLock() providerId, _ = this.providerMap[providerName] diff --git a/internal/iplibrary/manager_province.go b/internal/iplibrary/manager_province.go index a8923e6..fb5fe28 100644 --- a/internal/iplibrary/manager_province.go +++ b/internal/iplibrary/manager_province.go @@ -9,7 +9,6 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/rpc" - "github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/iwind/TeaGo/Tea" _ "github.com/iwind/TeaGo/bootstrap" "io/ioutil" @@ -30,10 +29,15 @@ func init() { SharedProvinceManager.Start() }) }) + events.On(events.EventQuit, func() { + SharedProvinceManager.Stop() + }) } // ProvinceManager 中国省份信息管理 type ProvinceManager struct { + ticker *time.Ticker + cacheFile string provinceMap map[string]int64 // provinceName => provinceId @@ -65,11 +69,8 @@ func (this *ProvinceManager) Start() { } // 定时更新 - ticker := utils.NewTicker(4 * time.Hour) - events.On(events.EventQuit, func() { - ticker.Stop() - }) - for ticker.Next() { + this.ticker = time.NewTicker(4 * time.Hour) + for range this.ticker.C { err := this.loop() if err != nil { remotelogs.ErrorObject("PROVINCE_MANAGER", err) @@ -77,6 +78,12 @@ func (this *ProvinceManager) Start() { } } +func (this *ProvinceManager) Stop() { + if this.ticker != nil { + this.ticker.Stop() + } +} + func (this *ProvinceManager) Lookup(provinceName string) (provinceId int64) { this.locker.RLock() provinceId, _ = this.provinceMap[provinceName] diff --git a/internal/iplibrary/updater.go b/internal/iplibrary/updater.go index 840e62f..0939a17 100644 --- a/internal/iplibrary/updater.go +++ b/internal/iplibrary/updater.go @@ -15,15 +15,22 @@ import ( "time" ) +var SharedUpdater = NewUpdater() + func init() { events.On(events.EventStart, func() { - updater := NewUpdater() - updater.Start() + goman.New(func() { + SharedUpdater.Start() + }) + }) + events.On(events.EventQuit, func() { + SharedUpdater.Stop() }) } // Updater IP库更新程序 type Updater struct { + ticker *time.Ticker } // NewUpdater 获取新对象 @@ -34,15 +41,19 @@ func NewUpdater() *Updater { // Start 开始更新 func (this *Updater) Start() { // 这里不需要太频繁检查更新,因为通常不需要更新IP库 - ticker := time.NewTicker(1 * time.Hour) - goman.New(func() { - for range ticker.C { - err := this.loop() - if err != nil { - remotelogs.ErrorObject("IP_LIBRARY", err) - } + this.ticker = time.NewTicker(1 * time.Hour) + for range this.ticker.C { + err := this.loop() + if err != nil { + remotelogs.ErrorObject("IP_LIBRARY", err) } - }) + } +} + +func (this *Updater) Stop() { + if this.ticker != nil { + this.ticker.Stop() + } } // 单次任务 diff --git a/internal/nodes/api_stream.go b/internal/nodes/api_stream.go index f49b8aa..04c5b4a 100644 --- a/internal/nodes/api_stream.go +++ b/internal/nodes/api_stream.go @@ -41,7 +41,7 @@ func NewAPIStream() *APIStream { } func (this *APIStream) Start() { - events.On(events.EventQuit, func() { + events.OnKey(events.EventQuit, this, func() { this.isQuiting = true if this.cancelFunc != nil { this.cancelFunc() diff --git a/internal/nodes/client_conn_traffic.go b/internal/nodes/client_conn_traffic.go index fca2d73..22f3ce5 100644 --- a/internal/nodes/client_conn_traffic.go +++ b/internal/nodes/client_conn_traffic.go @@ -16,7 +16,7 @@ import ( // 发送监控流量 func init() { events.On(events.EventStart, func() { - ticker := time.NewTicker(1 * time.Minute) + var ticker = time.NewTicker(1 * time.Minute) goman.New(func() { for range ticker.C { // 加入到数据队列中 diff --git a/internal/nodes/listener.go b/internal/nodes/listener.go index d3904cd..5e99b31 100644 --- a/internal/nodes/listener.go +++ b/internal/nodes/listener.go @@ -61,7 +61,7 @@ func (this *Listener) listenTCP() error { return err } var netListener = NewClientListener(tcpListener, protocol.IsHTTPFamily() || protocol.IsHTTPSFamily()) - events.On(events.EventQuit, func() { + events.OnKey(events.EventQuit, this, func() { remotelogs.Println("LISTENER", "quit "+this.group.FullAddr()) _ = netListener.Close() }) @@ -122,7 +122,7 @@ func (this *Listener) listenUDP() error { if err != nil { return err } - events.On(events.EventQuit, func() { + events.OnKey(events.EventQuit, this, func() { remotelogs.Println("LISTENER", "quit "+this.group.FullAddr()) _ = listener.Close() }) @@ -143,6 +143,8 @@ func (this *Listener) listenUDP() error { } func (this *Listener) Close() error { + events.Remove(this) + if this.listener == nil { return nil } diff --git a/internal/nodes/listener_base.go b/internal/nodes/listener_base.go index e29e5eb..46b4d01 100644 --- a/internal/nodes/listener_base.go +++ b/internal/nodes/listener_base.go @@ -25,8 +25,8 @@ func (this *BaseListener) Reset() { } -// CountActiveListeners 获取当前活跃连接数 -func (this *BaseListener) CountActiveListeners() int { +// CountActiveConnections 获取当前活跃连接数 +func (this *BaseListener) CountActiveConnections() int { return types.Int(this.countActiveConnections) } diff --git a/internal/nodes/listener_interface.go b/internal/nodes/listener_interface.go index 1a3be7d..1b94e3c 100644 --- a/internal/nodes/listener_interface.go +++ b/internal/nodes/listener_interface.go @@ -16,6 +16,6 @@ type ListenerInterface interface { // Reload 重载配置 Reload(serverGroup *serverconfigs.ServerAddressGroup) - // CountActiveListeners 获取当前活跃的连接数 - CountActiveListeners() int + // CountActiveConnections 获取当前活跃的连接数 + CountActiveConnections() int } diff --git a/internal/nodes/listener_manager.go b/internal/nodes/listener_manager.go index 845e74f..d8771be 100644 --- a/internal/nodes/listener_manager.go +++ b/internal/nodes/listener_manager.go @@ -159,7 +159,7 @@ func (this *ListenerManager) TotalActiveConnections() int { total := 0 for _, listener := range this.listenersMap { - total += listener.listener.CountActiveListeners() + total += listener.listener.CountActiveConnections() } return total } diff --git a/internal/nodes/node.go b/internal/nodes/node.go index e5b0166..955eefd 100644 --- a/internal/nodes/node.go +++ b/internal/nodes/node.go @@ -451,7 +451,7 @@ func (this *Node) syncConfig(taskVersion int64) error { func (this *Node) startSyncTimer() { // TODO 这个时间间隔可以自行设置 ticker := time.NewTicker(60 * time.Second) - events.On(events.EventQuit, func() { + events.OnKey(events.EventQuit, this, func() { remotelogs.Println("NODE", "quit sync timer") ticker.Stop() }) @@ -691,7 +691,7 @@ func (this *Node) listenSock() error { } }) - events.On(events.EventQuit, func() { + events.OnKey(events.EventQuit, this, func() { logs.Println("NODE", "quit unix sock") _ = this.sock.Close() }) diff --git a/internal/nodes/node_status_executor.go b/internal/nodes/node_status_executor.go index e724940..4650eaa 100644 --- a/internal/nodes/node_status_executor.go +++ b/internal/nodes/node_status_executor.go @@ -41,9 +41,9 @@ func (this *NodeStatusExecutor) Listen() { this.update() // TODO 这个时间间隔可以配置 - ticker := time.NewTicker(30 * time.Second) + var ticker = time.NewTicker(30 * time.Second) - events.On(events.EventQuit, func() { + events.OnKey(events.EventQuit, this, func() { remotelogs.Println("NODE_STATUS", "quit executor") ticker.Stop() }) diff --git a/internal/nodes/origin_state_manager.go b/internal/nodes/origin_state_manager.go index a15218b..4489096 100644 --- a/internal/nodes/origin_state_manager.go +++ b/internal/nodes/origin_state_manager.go @@ -21,6 +21,9 @@ func init() { SharedOriginStateManager.Start() }) }) + events.On(events.EventQuit, func() { + SharedOriginStateManager.Stop() + }) } // OriginStateManager 源站状态管理 @@ -41,7 +44,7 @@ func NewOriginStateManager() *OriginStateManager { // Start 启动 func (this *OriginStateManager) Start() { - events.On(events.EventReload, func() { + events.OnKey(events.EventReload, this, func() { this.locker.Lock() this.stateMap = map[int64]*OriginState{} this.locker.Unlock() @@ -58,6 +61,12 @@ func (this *OriginStateManager) Start() { } } +func (this *OriginStateManager) Stop() { + if this.ticker != nil { + this.ticker.Stop() + } +} + // Loop 单次循环检查 func (this *OriginStateManager) Loop() error { if sharedNodeConfig == nil { diff --git a/internal/nodes/system_services.go b/internal/nodes/system_services.go index e4150e7..8a9cafd 100644 --- a/internal/nodes/system_services.go +++ b/internal/nodes/system_services.go @@ -28,7 +28,7 @@ func init() { }) } -// 系统服务管理 +// SystemServiceManager 系统服务管理 type SystemServiceManager struct { } diff --git a/internal/nodes/task_sync_api_nodes.go b/internal/nodes/task_sync_api_nodes.go index 448c3ba..a2c6140 100644 --- a/internal/nodes/task_sync_api_nodes.go +++ b/internal/nodes/task_sync_api_nodes.go @@ -7,7 +7,6 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/configs" "github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/goman" - "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/trackers" "github.com/iwind/TeaGo/Tea" @@ -21,17 +20,22 @@ import ( "time" ) +var sharedSyncAPINodesTask = NewSyncAPINodesTask() + func init() { events.On(events.EventStart, func() { - task := NewSyncAPINodesTask() goman.New(func() { - task.Start() + sharedSyncAPINodesTask.Start() }) }) + events.On(events.EventQuit, func() { + sharedSyncAPINodesTask.Stop() + }) } // SyncAPINodesTask API节点同步任务 type SyncAPINodesTask struct { + ticker *time.Ticker } func NewSyncAPINodesTask() *SyncAPINodesTask { @@ -39,16 +43,12 @@ func NewSyncAPINodesTask() *SyncAPINodesTask { } func (this *SyncAPINodesTask) Start() { - ticker := time.NewTicker(5 * time.Minute) + this.ticker = time.NewTicker(5 * time.Minute) if Tea.IsTesting() { // 快速测试 - ticker = time.NewTicker(1 * time.Minute) + this.ticker = time.NewTicker(1 * time.Minute) } - events.On(events.EventQuit, func() { - remotelogs.Println("SYNC_API_NODES_TASK", "quit task") - ticker.Stop() - }) - for range ticker.C { + for range this.ticker.C { err := this.Loop() if err != nil { logs.Println("[TASK][SYNC_API_NODES_TASK]" + err.Error()) @@ -56,6 +56,12 @@ func (this *SyncAPINodesTask) Start() { } } +func (this *SyncAPINodesTask) Stop() { + if this.ticker != nil { + this.ticker.Stop() + } +} + func (this *SyncAPINodesTask) Loop() error { var tr = trackers.Begin("SYNC_API_NODES") defer tr.End() diff --git a/internal/stats/http_request_stat_manager.go b/internal/stats/http_request_stat_manager.go index 3a29d27..3549ab5 100644 --- a/internal/stats/http_request_stat_manager.go +++ b/internal/stats/http_request_stat_manager.go @@ -64,25 +64,26 @@ func NewHTTPRequestStatManager() *HTTPRequestStatManager { // Start 启动 func (this *HTTPRequestStatManager) Start() { // 上传请求总数 + var monitorTicker = time.NewTicker(1 * time.Minute) + events.OnKey(events.EventQuit, this, func() { + monitorTicker.Stop() + }) goman.New(func() { - ticker := time.NewTicker(1 * time.Minute) - goman.New(func() { - for range ticker.C { - if this.totalAttackRequests > 0 { - monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemAttackRequests, maps.Map{"total": this.totalAttackRequests}) - this.totalAttackRequests = 0 - } + for range monitorTicker.C { + if this.totalAttackRequests > 0 { + monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemAttackRequests, maps.Map{"total": this.totalAttackRequests}) + this.totalAttackRequests = 0 } - }) + } }) - loopTicker := time.NewTicker(1 * time.Second) - uploadTicker := time.NewTicker(30 * time.Minute) + var loopTicker = time.NewTicker(1 * time.Second) + var uploadTicker = time.NewTicker(30 * time.Minute) if Tea.IsTesting() { uploadTicker = time.NewTicker(10 * time.Second) // 在测试环境下缩短Ticker时间,以方便我们调试 } remotelogs.Println("HTTP_REQUEST_STAT_MANAGER", "start ...") - events.On(events.EventQuit, func() { + events.OnKey(events.EventQuit, this, func() { remotelogs.Println("HTTP_REQUEST_STAT_MANAGER", "quit") loopTicker.Stop() uploadTicker.Stop() diff --git a/internal/stats/traffic_stat_manager.go b/internal/stats/traffic_stat_manager.go index bad410b..d691e83 100644 --- a/internal/stats/traffic_stat_manager.go +++ b/internal/stats/traffic_stat_manager.go @@ -56,16 +56,17 @@ func (this *TrafficStatManager) Start(configFunc func() *nodeconfigs.NodeConfig) this.configFunc = configFunc // 上传请求总数 + var monitorTicker = time.NewTicker(1 * time.Minute) + events.OnKey(events.EventQuit, this, func() { + monitorTicker.Stop() + }) goman.New(func() { - ticker := time.NewTicker(1 * time.Minute) - goman.New(func() { - for range ticker.C { - if this.totalRequests > 0 { - monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemRequests, maps.Map{"total": this.totalRequests}) - this.totalRequests = 0 - } + for range monitorTicker.C { + if this.totalRequests > 0 { + monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemRequests, maps.Map{"total": this.totalRequests}) + this.totalRequests = 0 } - }) + } }) // 上传统计数据 @@ -74,8 +75,8 @@ func (this *TrafficStatManager) Start(configFunc func() *nodeconfigs.NodeConfig) // 测试环境缩短上传时间,方便我们调试 duration = 30 * time.Second } - ticker := time.NewTicker(duration) - events.On(events.EventQuit, func() { + var ticker = time.NewTicker(duration) + events.OnKey(events.EventQuit, this, func() { remotelogs.Println("TRAFFIC_STAT_MANAGER", "quit") ticker.Stop() }) diff --git a/internal/utils/free_hours_manager.go b/internal/utils/free_hours_manager.go index 7017672..f99af19 100644 --- a/internal/utils/free_hours_manager.go +++ b/internal/utils/free_hours_manager.go @@ -20,10 +20,13 @@ func init() { SharedFreeHoursManager.Start() }) }) + events.On(events.EventQuit, func() { + SharedFreeHoursManager.Stop() + }) } // FreeHoursManager 计算节点空闲时间 -// 以便于我们在空闲时间执行高强度的任务,如果清理缓存等 +// 以便于我们在空闲时间执行高强度的任务,如清理缓存等 type FreeHoursManager struct { dayTrafficMap map[int][24]uint64 // day => [ traffic bytes ] lastBytes uint64 @@ -32,6 +35,7 @@ type FreeHoursManager struct { count int locker sync.Mutex + ticker *time.Ticker } func NewFreeHoursManager() *FreeHoursManager { @@ -39,8 +43,8 @@ func NewFreeHoursManager() *FreeHoursManager { } func (this *FreeHoursManager) Start() { - var ticker = time.NewTicker(30 * time.Minute) - for range ticker.C { + this.ticker = time.NewTicker(30 * time.Minute) + for range this.ticker.C { this.Update(atomic.LoadUint64(&teaconst.InTrafficBytes)) } } @@ -113,6 +117,12 @@ func (this *FreeHoursManager) IsFreeHour() bool { return false } +func (this *FreeHoursManager) Stop() { + if this.ticker != nil { + this.ticker.Stop() + } +} + // 对数组进行排序,并返回权重 func (this *FreeHoursManager) sortUintArrayWeights(arr [24]uint64) [24]uint64 { var l = []map[string]interface{}{}