Files
EdgeNode/internal/metrics/manager.go

181 lines
4.0 KiB
Go
Raw Permalink Normal View History

2024-05-17 18:30:33 +08:00
// Copyright 2021 GoEdge goedge.cdn@gmail.com. All rights reserved.
2021-06-30 20:01:00 +08:00
package metrics
import (
2024-07-27 15:42:50 +08:00
"strconv"
"sync"
2021-06-30 20:01:00 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
2022-09-02 16:12:58 +08:00
"github.com/TeaOSLab/EdgeNode/internal/events"
2021-06-30 20:01:00 +08:00
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
)
var SharedManager = NewManager()
2022-09-02 16:12:58 +08:00
func init() {
if !teaconst.IsMain {
return
}
events.OnClose(func() {
2022-09-02 16:12:58 +08:00
SharedManager.Quit()
})
}
2021-06-30 20:01:00 +08:00
type Manager struct {
2022-09-02 16:12:58 +08:00
isQuiting bool
2024-04-02 19:54:04 +08:00
taskMap map[int64]Task // itemId => Task
categoryTaskMap map[string][]Task // category => []Task
2023-08-15 20:12:09 +08:00
locker sync.RWMutex
2021-06-30 20:01:00 +08:00
hasHTTPMetrics bool
hasTCPMetrics bool
hasUDPMetrics bool
}
func NewManager() *Manager {
return &Manager{
2024-04-02 19:54:04 +08:00
taskMap: map[int64]Task{},
categoryTaskMap: map[string][]Task{},
2021-06-30 20:01:00 +08:00
}
}
func (this *Manager) Update(items []*serverconfigs.MetricItemConfig) {
2022-09-02 16:12:58 +08:00
if this.isQuiting {
return
}
2021-06-30 20:01:00 +08:00
this.locker.Lock()
defer this.locker.Unlock()
var newMap = map[int64]*serverconfigs.MetricItemConfig{}
for _, item := range items {
newMap[item.Id] = item
}
// 停用以前的 或 修改现在的
2023-08-15 20:12:09 +08:00
for itemId, task := range this.taskMap {
2021-06-30 20:01:00 +08:00
newItem, ok := newMap[itemId]
if !ok || !newItem.IsOn { // 停用以前的
remotelogs.Println("METRIC_MANAGER", "stop task '"+strconv.FormatInt(itemId, 10)+"'")
err := task.Stop()
if err != nil {
remotelogs.Error("METRIC_MANAGER", "stop task '"+strconv.FormatInt(itemId, 10)+"' failed: "+err.Error())
}
2024-04-02 19:54:04 +08:00
// deleted
if newItem != nil && !newItem.IsOn {
deleteErr := task.Delete()
if deleteErr != nil {
remotelogs.Error("METRIC_MANAGER", "delete task '"+strconv.FormatInt(itemId, 10)+"' failed: "+err.Error())
}
}
2023-08-15 20:12:09 +08:00
delete(this.taskMap, itemId)
2021-06-30 20:01:00 +08:00
} else { // 更新已存在的
2024-04-02 19:54:04 +08:00
if newItem.Version != task.Item().Version {
2021-06-30 20:01:00 +08:00
remotelogs.Println("METRIC_MANAGER", "update task '"+strconv.FormatInt(itemId, 10)+"'")
2024-04-02 19:54:04 +08:00
task.SetItem(newItem)
2021-06-30 20:01:00 +08:00
}
}
}
// 启动新的
for _, newItem := range items {
if !newItem.IsOn {
continue
}
2023-08-15 20:12:09 +08:00
_, ok := this.taskMap[newItem.Id]
2021-06-30 20:01:00 +08:00
if !ok {
remotelogs.Println("METRIC_MANAGER", "start task '"+strconv.FormatInt(newItem.Id, 10)+"'")
2024-04-02 19:54:04 +08:00
var task Task
if CheckSQLiteDB(newItem.Id) || !teaconst.EnableKVCacheStore {
task = NewSQLiteTask(newItem)
} else {
task = NewKVTask(newItem)
}
2021-06-30 20:01:00 +08:00
err := task.Init()
if err != nil {
remotelogs.Error("METRIC_MANAGER", "initialized task failed: "+err.Error())
continue
}
err = task.Start()
if err != nil {
remotelogs.Error("METRIC_MANAGER", "start task failed: "+err.Error())
continue
}
2023-08-15 20:12:09 +08:00
this.taskMap[newItem.Id] = task
2021-06-30 20:01:00 +08:00
}
}
// 按分类存放
this.hasHTTPMetrics = false
this.hasTCPMetrics = false
this.hasUDPMetrics = false
2024-04-02 19:54:04 +08:00
this.categoryTaskMap = map[string][]Task{}
2023-08-15 20:12:09 +08:00
for _, task := range this.taskMap {
2024-04-02 19:54:04 +08:00
var tasks = this.categoryTaskMap[task.Item().Category]
2021-06-30 20:01:00 +08:00
tasks = append(tasks, task)
2024-04-02 19:54:04 +08:00
this.categoryTaskMap[task.Item().Category] = tasks
2021-06-30 20:01:00 +08:00
2024-04-02 19:54:04 +08:00
switch task.Item().Category {
2021-06-30 20:01:00 +08:00
case serverconfigs.MetricItemCategoryHTTP:
this.hasHTTPMetrics = true
case serverconfigs.MetricItemCategoryTCP:
this.hasTCPMetrics = true
case serverconfigs.MetricItemCategoryUDP:
this.hasUDPMetrics = true
}
}
}
// Add 添加数据
func (this *Manager) Add(obj MetricInterface) {
2022-09-02 16:12:58 +08:00
if this.isQuiting {
return
}
2021-06-30 20:01:00 +08:00
this.locker.RLock()
2023-08-15 20:12:09 +08:00
var tasks = this.categoryTaskMap[obj.MetricCategory()]
this.locker.RUnlock()
for _, task := range tasks {
2021-06-30 20:01:00 +08:00
task.Add(obj)
}
}
func (this *Manager) HasHTTPMetrics() bool {
return this.hasHTTPMetrics
}
func (this *Manager) HasTCPMetrics() bool {
return this.hasTCPMetrics
}
func (this *Manager) HasUDPMetrics() bool {
return this.hasUDPMetrics
}
2022-09-02 16:12:58 +08:00
2024-04-02 19:54:04 +08:00
func (this *Manager) TaskMap() map[int64]Task {
return this.taskMap
}
2022-09-02 16:12:58 +08:00
// Quit 退出管理器
func (this *Manager) Quit() {
this.isQuiting = true
remotelogs.Println("METRIC_MANAGER", "quit")
this.locker.Lock()
2023-08-15 20:12:09 +08:00
for _, task := range this.taskMap {
2022-09-02 16:12:58 +08:00
_ = task.Stop()
}
2024-04-02 19:54:04 +08:00
this.taskMap = map[int64]Task{}
2022-09-02 16:12:58 +08:00
this.locker.Unlock()
}