优化代码

This commit is contained in:
GoEdgeLab
2022-01-12 20:31:04 +08:00
parent 83d041b8bf
commit 938278725e
23 changed files with 243 additions and 110 deletions

View File

@@ -482,11 +482,7 @@ func TestFileStorage_DecodeFile(t *testing.T) {
t.Fatal(err)
}
_, path := storage.keyPath("my-key")
item, err := storage.decodeFile(path)
if err != nil {
t.Fatal(err)
}
logs.PrintAsJSON(item, t)
t.Log(path)
}
func BenchmarkFileStorage_Read(b *testing.B) {

View File

@@ -1,27 +1,71 @@
package events
import "sync"
import (
"sync"
)
var eventsMap = map[string][]func(){} // event => []callbacks
type Callbacks = []func()
var eventsMap = map[Event]map[interface{}]Callbacks{} // event => map[event key][]callback
var locker = sync.Mutex{}
var eventKeyId = 0
func NewKey() interface{} {
locker.Lock()
defer locker.Unlock()
eventKeyId++
return eventKeyId
}
// On 增加事件回调
func On(event string, callback func()) {
func On(event Event, callback func()) {
OnKey(event, nil, callback)
}
// OnKey 使用Key增加事件回调
func OnKey(event Event, key interface{}, callback func()) {
if key == nil {
key = NewKey()
}
locker.Lock()
defer locker.Unlock()
callbacks, _ := eventsMap[event]
callbacks = append(callbacks, callback)
eventsMap[event] = callbacks
m, ok := eventsMap[event]
if !ok {
m = map[interface{}]Callbacks{}
eventsMap[event] = m
}
m[key] = append(m[key], callback)
}
// Remove 删除事件回调
func Remove(key interface{}) {
if key == nil {
return
}
locker.Lock()
for k, m := range eventsMap {
_, ok := m[key]
if ok {
delete(m, key)
eventsMap[k] = m
}
}
locker.Unlock()
}
// Notify 通知事件
func Notify(event string) {
func Notify(event Event) {
locker.Lock()
callbacks, _ := eventsMap[event]
m := eventsMap[event]
locker.Unlock()
for _, callback := range callbacks {
callback()
for _, callbacks := range m {
for _, callback := range callbacks {
callback()
}
}
}

View File

@@ -1,16 +1,33 @@
package events
package events_test
import "testing"
import (
"github.com/TeaOSLab/EdgeNode/internal/events"
"testing"
)
func TestOn(t *testing.T) {
On("hello", func() {
type User struct {
name string
}
var u = &User{}
var u2 = &User{}
events.On("hello", func() {
t.Log("world")
})
On("hello", func() {
events.On("hello", func() {
t.Log("world2")
})
On("hello2", func() {
events.OnKey("hello", u, func() {
t.Log("world3")
})
events.OnKey("hello", u, func() {
t.Log("world4")
})
events.Remove(u)
events.Remove(u2)
events.OnKey("hello2", nil, func() {
t.Log("world2")
})
Notify("hello")
events.Notify("hello")
}

View File

@@ -9,7 +9,6 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/iwind/TeaGo/Tea"
_ "github.com/iwind/TeaGo/bootstrap"
"github.com/iwind/TeaGo/types"
@@ -27,10 +26,15 @@ func init() {
SharedCityManager.Start()
})
})
events.On(events.EventQuit, func() {
SharedCityManager.Stop()
})
}
// CityManager 中国省份信息管理
type CityManager struct {
ticker *time.Ticker
cacheFile string
cityMap map[string]int64 // provinceName_cityName => cityName
@@ -62,11 +66,8 @@ func (this *CityManager) Start() {
}
// 定时更新
ticker := utils.NewTicker(4 * time.Hour)
events.On(events.EventQuit, func() {
ticker.Stop()
})
for ticker.Next() {
this.ticker = time.NewTicker(4 * time.Hour)
for range this.ticker.C {
err := this.loop()
if err != nil {
remotelogs.ErrorObject("CITY_MANAGER", err)
@@ -74,6 +75,12 @@ func (this *CityManager) Start() {
}
}
func (this *CityManager) Stop() {
if this.ticker != nil {
this.ticker.Stop()
}
}
func (this *CityManager) Lookup(provinceId int64, cityName string) (cityId int64) {
this.locker.RLock()
cityId, _ = this.cityMap[types.String(provinceId)+"_"+cityName]

View File

@@ -9,7 +9,6 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/iwind/TeaGo/Tea"
_ "github.com/iwind/TeaGo/bootstrap"
"io/ioutil"
@@ -26,10 +25,15 @@ func init() {
SharedCountryManager.Start()
})
})
events.On(events.EventQuit, func() {
SharedCountryManager.Stop()
})
}
// CountryManager 国家/地区信息管理
type CountryManager struct {
ticker *time.Ticker
cacheFile string
countryMap map[string]int64 // countryName => countryId
@@ -61,11 +65,8 @@ func (this *CountryManager) Start() {
}
// 定时更新
ticker := utils.NewTicker(4 * time.Hour)
events.On(events.EventQuit, func() {
ticker.Stop()
})
for ticker.Next() {
this.ticker = time.NewTicker(4 * time.Hour)
for range this.ticker.C {
err := this.loop()
if err != nil {
remotelogs.ErrorObject("COUNTRY_MANAGER", err)
@@ -73,6 +74,12 @@ func (this *CountryManager) Start() {
}
}
func (this *CountryManager) Stop() {
if this.ticker != nil {
this.ticker.Stop()
}
}
func (this *CountryManager) Lookup(countryName string) (countryId int64) {
this.locker.RLock()
countryId, _ = this.countryMap[countryName]

View File

@@ -23,10 +23,15 @@ func init() {
SharedIPListManager.Start()
})
})
events.On(events.EventQuit, func() {
SharedIPListManager.Stop()
})
}
// IPListManager IP名单管理
type IPListManager struct {
ticker *time.Ticker
db *IPListDB
version int64
@@ -52,17 +57,14 @@ func (this *IPListManager) Start() {
remotelogs.ErrorObject("IP_LIST_MANAGER", err)
}
ticker := time.NewTicker(60 * time.Second)
this.ticker = time.NewTicker(60 * time.Second)
if Tea.IsTesting() {
ticker = time.NewTicker(10 * time.Second)
this.ticker = time.NewTicker(10 * time.Second)
}
events.On(events.EventQuit, func() {
ticker.Stop()
})
countErrors := 0
for {
select {
case <-ticker.C:
case <-this.ticker.C:
case <-IPListUpdateNotify:
}
err := this.loop()
@@ -84,6 +86,12 @@ func (this *IPListManager) Start() {
}
}
func (this *IPListManager) Stop() {
if this.ticker != nil {
this.ticker.Stop()
}
}
func (this *IPListManager) init() {
// 从数据库中当中读取数据
db, err := NewIPListDB()

View File

@@ -9,7 +9,6 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/iwind/TeaGo/Tea"
_ "github.com/iwind/TeaGo/bootstrap"
"io/ioutil"
@@ -26,10 +25,15 @@ func init() {
SharedProviderManager.Start()
})
})
events.On(events.EventQuit, func() {
SharedProviderManager.Stop()
})
}
// ProviderManager 中国省份信息管理
type ProviderManager struct {
ticker *time.Ticker
cacheFile string
providerMap map[string]int64 // name => id
@@ -61,11 +65,8 @@ func (this *ProviderManager) Start() {
}
// 定时更新
ticker := utils.NewTicker(4 * time.Hour)
events.On(events.EventQuit, func() {
ticker.Stop()
})
for ticker.Next() {
this.ticker = time.NewTicker(4 * time.Hour)
for range this.ticker.C {
err := this.loop()
if err != nil {
remotelogs.ErrorObject("PROVIDER_MANAGER", err)
@@ -73,6 +74,12 @@ func (this *ProviderManager) Start() {
}
}
func (this *ProviderManager) Stop() {
if this.ticker != nil {
this.ticker.Stop()
}
}
func (this *ProviderManager) Lookup(providerName string) (providerId int64) {
this.locker.RLock()
providerId, _ = this.providerMap[providerName]

View File

@@ -9,7 +9,6 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/iwind/TeaGo/Tea"
_ "github.com/iwind/TeaGo/bootstrap"
"io/ioutil"
@@ -30,10 +29,15 @@ func init() {
SharedProvinceManager.Start()
})
})
events.On(events.EventQuit, func() {
SharedProvinceManager.Stop()
})
}
// ProvinceManager 中国省份信息管理
type ProvinceManager struct {
ticker *time.Ticker
cacheFile string
provinceMap map[string]int64 // provinceName => provinceId
@@ -65,11 +69,8 @@ func (this *ProvinceManager) Start() {
}
// 定时更新
ticker := utils.NewTicker(4 * time.Hour)
events.On(events.EventQuit, func() {
ticker.Stop()
})
for ticker.Next() {
this.ticker = time.NewTicker(4 * time.Hour)
for range this.ticker.C {
err := this.loop()
if err != nil {
remotelogs.ErrorObject("PROVINCE_MANAGER", err)
@@ -77,6 +78,12 @@ func (this *ProvinceManager) Start() {
}
}
func (this *ProvinceManager) Stop() {
if this.ticker != nil {
this.ticker.Stop()
}
}
func (this *ProvinceManager) Lookup(provinceName string) (provinceId int64) {
this.locker.RLock()
provinceId, _ = this.provinceMap[provinceName]

View File

@@ -15,15 +15,22 @@ import (
"time"
)
var SharedUpdater = NewUpdater()
func init() {
events.On(events.EventStart, func() {
updater := NewUpdater()
updater.Start()
goman.New(func() {
SharedUpdater.Start()
})
})
events.On(events.EventQuit, func() {
SharedUpdater.Stop()
})
}
// Updater IP库更新程序
type Updater struct {
ticker *time.Ticker
}
// NewUpdater 获取新对象
@@ -34,15 +41,19 @@ func NewUpdater() *Updater {
// Start 开始更新
func (this *Updater) Start() {
// 这里不需要太频繁检查更新因为通常不需要更新IP库
ticker := time.NewTicker(1 * time.Hour)
goman.New(func() {
for range ticker.C {
err := this.loop()
if err != nil {
remotelogs.ErrorObject("IP_LIBRARY", err)
}
this.ticker = time.NewTicker(1 * time.Hour)
for range this.ticker.C {
err := this.loop()
if err != nil {
remotelogs.ErrorObject("IP_LIBRARY", err)
}
})
}
}
func (this *Updater) Stop() {
if this.ticker != nil {
this.ticker.Stop()
}
}
// 单次任务

View File

@@ -41,7 +41,7 @@ func NewAPIStream() *APIStream {
}
func (this *APIStream) Start() {
events.On(events.EventQuit, func() {
events.OnKey(events.EventQuit, this, func() {
this.isQuiting = true
if this.cancelFunc != nil {
this.cancelFunc()

View File

@@ -16,7 +16,7 @@ import (
// 发送监控流量
func init() {
events.On(events.EventStart, func() {
ticker := time.NewTicker(1 * time.Minute)
var ticker = time.NewTicker(1 * time.Minute)
goman.New(func() {
for range ticker.C {
// 加入到数据队列中

View File

@@ -61,7 +61,7 @@ func (this *Listener) listenTCP() error {
return err
}
var netListener = NewClientListener(tcpListener, protocol.IsHTTPFamily() || protocol.IsHTTPSFamily())
events.On(events.EventQuit, func() {
events.OnKey(events.EventQuit, this, func() {
remotelogs.Println("LISTENER", "quit "+this.group.FullAddr())
_ = netListener.Close()
})
@@ -122,7 +122,7 @@ func (this *Listener) listenUDP() error {
if err != nil {
return err
}
events.On(events.EventQuit, func() {
events.OnKey(events.EventQuit, this, func() {
remotelogs.Println("LISTENER", "quit "+this.group.FullAddr())
_ = listener.Close()
})
@@ -143,6 +143,8 @@ func (this *Listener) listenUDP() error {
}
func (this *Listener) Close() error {
events.Remove(this)
if this.listener == nil {
return nil
}

View File

@@ -25,8 +25,8 @@ func (this *BaseListener) Reset() {
}
// CountActiveListeners 获取当前活跃连接数
func (this *BaseListener) CountActiveListeners() int {
// CountActiveConnections 获取当前活跃连接数
func (this *BaseListener) CountActiveConnections() int {
return types.Int(this.countActiveConnections)
}

View File

@@ -16,6 +16,6 @@ type ListenerInterface interface {
// Reload 重载配置
Reload(serverGroup *serverconfigs.ServerAddressGroup)
// CountActiveListeners 获取当前活跃的连接数
CountActiveListeners() int
// CountActiveConnections 获取当前活跃的连接数
CountActiveConnections() int
}

View File

@@ -159,7 +159,7 @@ func (this *ListenerManager) TotalActiveConnections() int {
total := 0
for _, listener := range this.listenersMap {
total += listener.listener.CountActiveListeners()
total += listener.listener.CountActiveConnections()
}
return total
}

View File

@@ -451,7 +451,7 @@ func (this *Node) syncConfig(taskVersion int64) error {
func (this *Node) startSyncTimer() {
// TODO 这个时间间隔可以自行设置
ticker := time.NewTicker(60 * time.Second)
events.On(events.EventQuit, func() {
events.OnKey(events.EventQuit, this, func() {
remotelogs.Println("NODE", "quit sync timer")
ticker.Stop()
})
@@ -691,7 +691,7 @@ func (this *Node) listenSock() error {
}
})
events.On(events.EventQuit, func() {
events.OnKey(events.EventQuit, this, func() {
logs.Println("NODE", "quit unix sock")
_ = this.sock.Close()
})

View File

@@ -41,9 +41,9 @@ func (this *NodeStatusExecutor) Listen() {
this.update()
// TODO 这个时间间隔可以配置
ticker := time.NewTicker(30 * time.Second)
var ticker = time.NewTicker(30 * time.Second)
events.On(events.EventQuit, func() {
events.OnKey(events.EventQuit, this, func() {
remotelogs.Println("NODE_STATUS", "quit executor")
ticker.Stop()
})

View File

@@ -21,6 +21,9 @@ func init() {
SharedOriginStateManager.Start()
})
})
events.On(events.EventQuit, func() {
SharedOriginStateManager.Stop()
})
}
// OriginStateManager 源站状态管理
@@ -41,7 +44,7 @@ func NewOriginStateManager() *OriginStateManager {
// Start 启动
func (this *OriginStateManager) Start() {
events.On(events.EventReload, func() {
events.OnKey(events.EventReload, this, func() {
this.locker.Lock()
this.stateMap = map[int64]*OriginState{}
this.locker.Unlock()
@@ -58,6 +61,12 @@ func (this *OriginStateManager) Start() {
}
}
func (this *OriginStateManager) Stop() {
if this.ticker != nil {
this.ticker.Stop()
}
}
// Loop 单次循环检查
func (this *OriginStateManager) Loop() error {
if sharedNodeConfig == nil {

View File

@@ -28,7 +28,7 @@ func init() {
})
}
// 系统服务管理
// SystemServiceManager 系统服务管理
type SystemServiceManager struct {
}

View File

@@ -7,7 +7,6 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/configs"
"github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/trackers"
"github.com/iwind/TeaGo/Tea"
@@ -21,17 +20,22 @@ import (
"time"
)
var sharedSyncAPINodesTask = NewSyncAPINodesTask()
func init() {
events.On(events.EventStart, func() {
task := NewSyncAPINodesTask()
goman.New(func() {
task.Start()
sharedSyncAPINodesTask.Start()
})
})
events.On(events.EventQuit, func() {
sharedSyncAPINodesTask.Stop()
})
}
// SyncAPINodesTask API节点同步任务
type SyncAPINodesTask struct {
ticker *time.Ticker
}
func NewSyncAPINodesTask() *SyncAPINodesTask {
@@ -39,16 +43,12 @@ func NewSyncAPINodesTask() *SyncAPINodesTask {
}
func (this *SyncAPINodesTask) Start() {
ticker := time.NewTicker(5 * time.Minute)
this.ticker = time.NewTicker(5 * time.Minute)
if Tea.IsTesting() {
// 快速测试
ticker = time.NewTicker(1 * time.Minute)
this.ticker = time.NewTicker(1 * time.Minute)
}
events.On(events.EventQuit, func() {
remotelogs.Println("SYNC_API_NODES_TASK", "quit task")
ticker.Stop()
})
for range ticker.C {
for range this.ticker.C {
err := this.Loop()
if err != nil {
logs.Println("[TASK][SYNC_API_NODES_TASK]" + err.Error())
@@ -56,6 +56,12 @@ func (this *SyncAPINodesTask) Start() {
}
}
func (this *SyncAPINodesTask) Stop() {
if this.ticker != nil {
this.ticker.Stop()
}
}
func (this *SyncAPINodesTask) Loop() error {
var tr = trackers.Begin("SYNC_API_NODES")
defer tr.End()

View File

@@ -64,25 +64,26 @@ func NewHTTPRequestStatManager() *HTTPRequestStatManager {
// Start 启动
func (this *HTTPRequestStatManager) Start() {
// 上传请求总数
var monitorTicker = time.NewTicker(1 * time.Minute)
events.OnKey(events.EventQuit, this, func() {
monitorTicker.Stop()
})
goman.New(func() {
ticker := time.NewTicker(1 * time.Minute)
goman.New(func() {
for range ticker.C {
if this.totalAttackRequests > 0 {
monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemAttackRequests, maps.Map{"total": this.totalAttackRequests})
this.totalAttackRequests = 0
}
for range monitorTicker.C {
if this.totalAttackRequests > 0 {
monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemAttackRequests, maps.Map{"total": this.totalAttackRequests})
this.totalAttackRequests = 0
}
})
}
})
loopTicker := time.NewTicker(1 * time.Second)
uploadTicker := time.NewTicker(30 * time.Minute)
var loopTicker = time.NewTicker(1 * time.Second)
var uploadTicker = time.NewTicker(30 * time.Minute)
if Tea.IsTesting() {
uploadTicker = time.NewTicker(10 * time.Second) // 在测试环境下缩短Ticker时间以方便我们调试
}
remotelogs.Println("HTTP_REQUEST_STAT_MANAGER", "start ...")
events.On(events.EventQuit, func() {
events.OnKey(events.EventQuit, this, func() {
remotelogs.Println("HTTP_REQUEST_STAT_MANAGER", "quit")
loopTicker.Stop()
uploadTicker.Stop()

View File

@@ -56,16 +56,17 @@ func (this *TrafficStatManager) Start(configFunc func() *nodeconfigs.NodeConfig)
this.configFunc = configFunc
// 上传请求总数
var monitorTicker = time.NewTicker(1 * time.Minute)
events.OnKey(events.EventQuit, this, func() {
monitorTicker.Stop()
})
goman.New(func() {
ticker := time.NewTicker(1 * time.Minute)
goman.New(func() {
for range ticker.C {
if this.totalRequests > 0 {
monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemRequests, maps.Map{"total": this.totalRequests})
this.totalRequests = 0
}
for range monitorTicker.C {
if this.totalRequests > 0 {
monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemRequests, maps.Map{"total": this.totalRequests})
this.totalRequests = 0
}
})
}
})
// 上传统计数据
@@ -74,8 +75,8 @@ func (this *TrafficStatManager) Start(configFunc func() *nodeconfigs.NodeConfig)
// 测试环境缩短上传时间,方便我们调试
duration = 30 * time.Second
}
ticker := time.NewTicker(duration)
events.On(events.EventQuit, func() {
var ticker = time.NewTicker(duration)
events.OnKey(events.EventQuit, this, func() {
remotelogs.Println("TRAFFIC_STAT_MANAGER", "quit")
ticker.Stop()
})

View File

@@ -20,10 +20,13 @@ func init() {
SharedFreeHoursManager.Start()
})
})
events.On(events.EventQuit, func() {
SharedFreeHoursManager.Stop()
})
}
// FreeHoursManager 计算节点空闲时间
// 以便于我们在空闲时间执行高强度的任务,如清理缓存等
// 以便于我们在空闲时间执行高强度的任务,如清理缓存等
type FreeHoursManager struct {
dayTrafficMap map[int][24]uint64 // day => [ traffic bytes ]
lastBytes uint64
@@ -32,6 +35,7 @@ type FreeHoursManager struct {
count int
locker sync.Mutex
ticker *time.Ticker
}
func NewFreeHoursManager() *FreeHoursManager {
@@ -39,8 +43,8 @@ func NewFreeHoursManager() *FreeHoursManager {
}
func (this *FreeHoursManager) Start() {
var ticker = time.NewTicker(30 * time.Minute)
for range ticker.C {
this.ticker = time.NewTicker(30 * time.Minute)
for range this.ticker.C {
this.Update(atomic.LoadUint64(&teaconst.InTrafficBytes))
}
}
@@ -113,6 +117,12 @@ func (this *FreeHoursManager) IsFreeHour() bool {
return false
}
func (this *FreeHoursManager) Stop() {
if this.ticker != nil {
this.ticker.Stop()
}
}
// 对数组进行排序,并返回权重
func (this *FreeHoursManager) sortUintArrayWeights(arr [24]uint64) [24]uint64 {
var l = []map[string]interface{}{}