diff --git a/internal/iplibrary/ip_list_kv.go b/internal/iplibrary/ip_list_kv.go index a56469e..b4da32f 100644 --- a/internal/iplibrary/ip_list_kv.go +++ b/internal/iplibrary/ip_list_kv.go @@ -10,7 +10,7 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/utils/fasttime" - fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs" + "github.com/TeaOSLab/EdgeNode/internal/utils/idles" "github.com/TeaOSLab/EdgeNode/internal/utils/kvstore" "testing" "time" @@ -78,14 +78,15 @@ func (this *KVIPList) init() error { this.cleanTicker.Stop() }) - for range this.cleanTicker.C { - fsutils.WaitLoad(15, 16, 1*time.Hour) - + idles.RunTicker(this.cleanTicker, func() { + if this.isClosed { + return + } deleteErr := this.DeleteExpiredItems() if deleteErr != nil { remotelogs.Error("IP_LIST_DB", "clean expired items failed: "+deleteErr.Error()) } - } + }) }) return nil diff --git a/internal/iplibrary/ip_list_sqlite.go b/internal/iplibrary/ip_list_sqlite.go index 8e12fc8..ed94821 100644 --- a/internal/iplibrary/ip_list_sqlite.go +++ b/internal/iplibrary/ip_list_sqlite.go @@ -8,7 +8,7 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/utils/dbs" - fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs" + "github.com/TeaOSLab/EdgeNode/internal/utils/idles" "github.com/iwind/TeaGo/Tea" "os" "path/filepath" @@ -167,14 +167,12 @@ ON "` + this.itemTableName + `" ( this.cleanTicker.Stop() }) - for range this.cleanTicker.C { - fsutils.WaitLoad(15, 16, 1*time.Hour) - + idles.RunTicker(this.cleanTicker, func() { deleteErr := this.DeleteExpiredItems() if deleteErr != nil { remotelogs.Error("IP_LIST_DB", "clean expired items failed: "+deleteErr.Error()) } - } + }) }) return nil diff --git a/internal/iplibrary/manager_ip_list.go b/internal/iplibrary/manager_ip_list.go index ec0d4b8..ac442b3 100644 --- a/internal/iplibrary/manager_ip_list.go +++ b/internal/iplibrary/manager_ip_list.go @@ -10,6 +10,7 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/trackers" + "github.com/TeaOSLab/EdgeNode/internal/utils/idles" "github.com/TeaOSLab/EdgeNode/internal/waf" "github.com/TeaOSLab/EdgeNode/internal/zero" "github.com/iwind/TeaGo/Tea" @@ -38,9 +39,9 @@ func init() { var ticker = time.NewTicker(24 * time.Hour) goman.New(func() { - for range ticker.C { + idles.RunTicker(ticker, func() { SharedIPListManager.DeleteExpiredItems() - } + }) }) } @@ -54,7 +55,7 @@ type IPListManager struct { fetchPageSize int64 listMap map[int64]*IPList - mu sync.RWMutex + mu sync.RWMutex isFirstTime bool } diff --git a/internal/metrics/task_kv.go b/internal/metrics/task_kv.go index febd108..fc7dc75 100644 --- a/internal/metrics/task_kv.go +++ b/internal/metrics/task_kv.go @@ -11,7 +11,7 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/trackers" "github.com/TeaOSLab/EdgeNode/internal/utils" byteutils "github.com/TeaOSLab/EdgeNode/internal/utils/byte" - fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs" + "github.com/TeaOSLab/EdgeNode/internal/utils/idles" "github.com/TeaOSLab/EdgeNode/internal/utils/kvstore" "github.com/TeaOSLab/EdgeNode/internal/zero" "github.com/cockroachdb/pebble" @@ -37,7 +37,7 @@ type KVTask struct { serverIdMapLocker sync.Mutex statsTicker *utils.Ticker - cleanTicker *utils.Ticker + cleanTicker *time.Ticker uploadTicker *utils.Ticker valuesCacheMap map[string]int64 // hash => value @@ -285,18 +285,16 @@ func (this *KVTask) Start() error { }) // 清理 - this.cleanTicker = utils.NewTicker(24 * time.Hour) + this.cleanTicker = time.NewTicker(24 * time.Hour) goman.New(func() { - for this.cleanTicker.Next() { - fsutils.WaitLoad(15, 16, 1*time.Hour) - + idles.RunTicker(this.cleanTicker, func() { var tr = trackers.Begin("METRIC:CLEAN_EXPIRED") err := this.CleanExpired() tr.End() if err != nil { remotelogs.Error("METRIC", "clean expired stats failed: "+err.Error()) } - } + }) }) // 上传 diff --git a/internal/metrics/task_sqlite.go b/internal/metrics/task_sqlite.go index 6cda3d1..cafddf1 100644 --- a/internal/metrics/task_sqlite.go +++ b/internal/metrics/task_sqlite.go @@ -13,7 +13,7 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/trackers" "github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/TeaOSLab/EdgeNode/internal/utils/dbs" - fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs" + "github.com/TeaOSLab/EdgeNode/internal/utils/idles" "github.com/TeaOSLab/EdgeNode/internal/zero" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/types" @@ -42,7 +42,7 @@ type SQLiteTask struct { db *dbs.DB statTableName string - cleanTicker *utils.Ticker + cleanTicker *time.Ticker uploadTicker *utils.Ticker cleanVersion int32 @@ -207,18 +207,16 @@ func (this *SQLiteTask) Start() error { }) // 清理 - this.cleanTicker = utils.NewTicker(24 * time.Hour) + this.cleanTicker = time.NewTicker(24 * time.Hour) goman.New(func() { - for this.cleanTicker.Next() { - fsutils.WaitLoad(15, 16, 1*time.Hour) - + idles.RunTicker(this.cleanTicker, func() { var tr = trackers.Begin("METRIC:CLEAN_EXPIRED") err := this.CleanExpired() tr.End() if err != nil { remotelogs.Error("METRIC", "clean expired stats failed: "+err.Error()) } - } + }) }) // 上传 diff --git a/internal/nodes/task_trim_disks.go b/internal/nodes/task_trim_disks.go index f3ebfbb..e08e2cf 100644 --- a/internal/nodes/task_trim_disks.go +++ b/internal/nodes/task_trim_disks.go @@ -6,7 +6,7 @@ import ( "fmt" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" executils "github.com/TeaOSLab/EdgeNode/internal/utils/exec" - fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs" + "github.com/TeaOSLab/EdgeNode/internal/utils/idles" "runtime" "time" ) @@ -29,16 +29,13 @@ func (this *TrimDisksTask) Start() { } var ticker = time.NewTicker(2 * 24 * time.Hour) // every 2 days - for range ticker.C { - // prevent system overload - fsutils.WaitLoad(15, 24, 1*time.Hour) - + idles.RunTicker(ticker, func() { // run the task err = this.loop() if err != nil { remotelogs.Warn("TRIM_DISKS", "trim disks failed: "+err.Error()) } - } + }) } // run the task once diff --git a/internal/stats/dau_manager.go b/internal/stats/dau_manager.go index 557210e..0e78e8a 100644 --- a/internal/stats/dau_manager.go +++ b/internal/stats/dau_manager.go @@ -9,7 +9,7 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/trackers" "github.com/TeaOSLab/EdgeNode/internal/utils/fasttime" - fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs" + "github.com/TeaOSLab/EdgeNode/internal/utils/idles" "github.com/TeaOSLab/EdgeNode/internal/utils/kvstore" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/types" @@ -87,14 +87,12 @@ func (this *DAUManager) Init() error { // clean expires items goman.New(func() { - for range this.cleanTicker.C { - fsutils.WaitLoad(15, 16, 1*time.Hour) - + idles.RunTicker(this.cleanTicker, func() { err := this.CleanStats() if err != nil { remotelogs.Error("DAU_MANAGER", "clean stats failed: "+err.Error()) } - } + }) }) // dump ip to kvstore @@ -208,6 +206,8 @@ func (this *DAUManager) TestInspect(t *testing.T) { } func (this *DAUManager) Close() error { + this.cleanTicker.Stop() + this.statLocker.Lock() var statMap = this.statMap this.statMap = map[string]int64{} diff --git a/internal/utils/cachehits/stat.go b/internal/utils/cachehits/stat.go index 464ff8a..345dd0e 100644 --- a/internal/utils/cachehits/stat.go +++ b/internal/utils/cachehits/stat.go @@ -5,6 +5,7 @@ package cachehits import ( "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/utils/fasttime" + "github.com/TeaOSLab/EdgeNode/internal/utils/idles" memutils "github.com/TeaOSLab/EdgeNode/internal/utils/mem" "github.com/iwind/TeaGo/Tea" "sync" @@ -58,7 +59,7 @@ func NewStat(goodRatio uint64) *Stat { } func (this *Stat) init() { - for range this.ticker.C { + idles.RunTicker(this.ticker, func() { var currentTime = fasttime.Now().Unix() this.mu.RLock() @@ -73,7 +74,7 @@ func (this *Stat) init() { } } this.mu.RUnlock() - } + }) } func (this *Stat) IncreaseCached(category string) { diff --git a/internal/utils/counters/counter.go b/internal/utils/counters/counter.go index 47dcd41..7b020bc 100644 --- a/internal/utils/counters/counter.go +++ b/internal/utils/counters/counter.go @@ -3,6 +3,7 @@ package counters import ( + "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/TeaOSLab/EdgeNode/internal/utils/fasttime" memutils "github.com/TeaOSLab/EdgeNode/internal/utils/mem" syncutils "github.com/TeaOSLab/EdgeNode/internal/utils/sync" @@ -56,11 +57,11 @@ func (this *Counter[T]) WithGC() *Counter[T] { return this } this.gcTicker = time.NewTicker(1 * time.Second) - go func() { + goman.New(func() { for range this.gcTicker.C { this.GC() } - }() + }) return this } diff --git a/internal/utils/fs/status.go b/internal/utils/fs/status.go index 1c6d7ad..3bdf426 100644 --- a/internal/utils/fs/status.go +++ b/internal/utils/fs/status.go @@ -5,6 +5,7 @@ package fsutils import ( "encoding/json" teaconst "github.com/TeaOSLab/EdgeNode/internal/const" + "github.com/TeaOSLab/EdgeNode/internal/goman" "github.com/iwind/TeaGo/Tea" "github.com/shirou/gopsutil/v3/load" "os" @@ -55,7 +56,7 @@ func init() { } // test disk - go func() { + goman.New(func() { // load last result from local disk cacheData, cacheErr := os.ReadFile(Tea.Root + "/data/" + diskSpeedDataFile) if cacheErr == nil { @@ -83,17 +84,17 @@ func init() { } } } - }() + }) // check high load - go func() { + goman.New(func() { var ticker = time.NewTicker(5 * time.Second) for range ticker.C { stat, _ := load.Avg() IsInExtremelyHighLoad = stat != nil && stat.Load1 > extremelyHighLoad1Threshold IsInHighLoad = stat != nil && stat.Load1 > highLoad1Threshold && !DiskIsFast() } - }() + }) } func DiskIsFast() bool { @@ -145,8 +146,12 @@ func calculateDiskMaxWrites() { func WaitLoad(maxLoad float64, maxLoops int, delay time.Duration) { for i := 0; i < maxLoops; i++ { stat, err := load.Avg() - if err == nil && stat.Load1 > maxLoad { - time.Sleep(delay) + if err == nil { + if stat.Load1 > maxLoad { + time.Sleep(delay) + } else { + return + } } } } diff --git a/internal/utils/fs/status_test.go b/internal/utils/fs/status_test.go index 82450c5..88db891 100644 --- a/internal/utils/fs/status_test.go +++ b/internal/utils/fs/status_test.go @@ -22,7 +22,7 @@ func TestWrites(t *testing.T) { } func TestWaitLoad(t *testing.T) { - fsutils.WaitLoad(100, 1, 1 * time.Minute) + fsutils.WaitLoad(100, 5, 1*time.Minute) } func BenchmarkWrites(b *testing.B) { diff --git a/internal/utils/idles/run.go b/internal/utils/idles/run.go new file mode 100644 index 0000000..8048e01 --- /dev/null +++ b/internal/utils/idles/run.go @@ -0,0 +1,128 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package idles + +import ( + "encoding/json" + teaconst "github.com/TeaOSLab/EdgeNode/internal/const" + "github.com/TeaOSLab/EdgeNode/internal/goman" + fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs" + "github.com/iwind/TeaGo/Tea" + "github.com/shirou/gopsutil/v3/load" + "os" + "sort" + "time" +) + +const maxSamples = 7 +const cacheFile = "idles.cache" + +var hourlyLoadMap = map[int]*HourlyLoad{} +var minLoadHour = -1 + +type HourlyLoad struct { + Hour int `json:"hour"` + Avg float64 `json:"avg"` + Values []float64 `json:"values"` +} + +func init() { + if !teaconst.IsMain { + return + } + + // recover from cache + { + data, err := os.ReadFile(Tea.Root + "/data/" + cacheFile) + if err == nil { + _ = json.Unmarshal(data, &hourlyLoadMap) + } + } + + goman.New(func() { + var ticker = time.NewTicker(1 * time.Hour) + for range ticker.C { + CheckHourlyLoad(time.Now().Hour()) + } + }) +} + +func CheckHourlyLoad(hour int) { + avgLoad, err := load.Avg() + if err != nil { + return + } + + hourlyLoad, ok := hourlyLoadMap[hour] + if !ok { + hourlyLoad = &HourlyLoad{ + Hour: hour, + } + hourlyLoadMap[hour] = hourlyLoad + } + + if len(hourlyLoad.Values) >= maxSamples { + hourlyLoad.Values = hourlyLoad.Values[:maxSamples-1] + } + hourlyLoad.Values = append(hourlyLoad.Values, avgLoad.Load15) + + var sum float64 + for _, v := range hourlyLoad.Values { + sum += v + } + hourlyLoad.Avg = sum / float64(len(hourlyLoad.Values)) + + // calculate min load hour + var allLoads = []*HourlyLoad{} + for _, v := range hourlyLoadMap { + allLoads = append(allLoads, v) + } + + sort.Slice(allLoads, func(i, j int) bool { + return allLoads[i].Avg < allLoads[j].Avg + }) + + minLoadHour = allLoads[0].Hour + + // write to cache + hourlyLoadMapJSON, err := json.Marshal(hourlyLoadMap) + if err == nil { + _ = os.WriteFile(Tea.Root+"/data/"+cacheFile, hourlyLoadMapJSON, 0666) + } +} + +func Run(f func()) { + defer f() + + if minLoadHour < 0 { + fsutils.WaitLoad(15, 8, time.Hour) + return + } + + var hour = time.Now().Hour() + if minLoadHour == hour { + fsutils.WaitLoad(15, 10, time.Minute) + return + } + + if minLoadHour < hour { + time.Sleep(time.Duration(24-hour+minLoadHour) * time.Hour) + } else { + time.Sleep(time.Duration(minLoadHour-hour) * time.Hour) + } + fsutils.WaitLoad(15, 10, time.Minute) +} + +func RunTicker(ticker *time.Ticker, f func()) { + for range ticker.C { + Run(f) + } +} + +func TestMinLoadHour() int { + return minLoadHour +} + +func TestHourlyLoadMap() map[int]*HourlyLoad { + return hourlyLoadMap +} diff --git a/internal/utils/idles/run_test.go b/internal/utils/idles/run_test.go new file mode 100644 index 0000000..c68a6c0 --- /dev/null +++ b/internal/utils/idles/run_test.go @@ -0,0 +1,41 @@ +// Copyright 2024 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package idles_test + +import ( + "github.com/TeaOSLab/EdgeNode/internal/utils/idles" + "github.com/TeaOSLab/EdgeNode/internal/utils/testutils" + "github.com/iwind/TeaGo/logs" + timeutil "github.com/iwind/TeaGo/utils/time" + "testing" + "time" +) + +func TestCheckHourlyLoad(t *testing.T) { + for i := 0; i < 10; i++ { + idles.CheckHourlyLoad(1) + idles.CheckHourlyLoad(2) + idles.CheckHourlyLoad(3) + } + + t.Log(idles.TestMinLoadHour()) + logs.PrintAsJSON(idles.TestHourlyLoadMap(), t) +} + +func TestRun(t *testing.T) { + //idles.CheckHourlyLoad(time.Now().Hour()) + idles.Run(func() { + t.Log("run once") + }) +} + +func TestRunTicker(t *testing.T) { + if !testutils.IsSingleTesting() { + return + } + + var ticker = time.NewTicker(10 * time.Second) + idles.RunTicker(ticker, func() { + t.Log(timeutil.Format("H:i:s"), "run once") + }) +}