// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. package metrics import ( "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "strconv" "sync" ) var SharedManager = NewManager() type Manager struct { tasks map[int64]*Task // itemId => *Task categoryTasks map[string][]*Task // category => []*Task locker sync.RWMutex hasHTTPMetrics bool hasTCPMetrics bool hasUDPMetrics bool } func NewManager() *Manager { return &Manager{ tasks: map[int64]*Task{}, categoryTasks: map[string][]*Task{}, } } func (this *Manager) Update(items []*serverconfigs.MetricItemConfig) { this.locker.Lock() defer this.locker.Unlock() var newMap = map[int64]*serverconfigs.MetricItemConfig{} for _, item := range items { newMap[item.Id] = item } // 停用以前的 或 修改现在的 for itemId, task := range this.tasks { newItem, ok := newMap[itemId] if !ok || !newItem.IsOn { // 停用以前的 remotelogs.Println("METRIC_MANAGER", "stop task '"+strconv.FormatInt(itemId, 10)+"'") err := task.Stop() if err != nil { remotelogs.Error("METRIC_MANAGER", "stop task '"+strconv.FormatInt(itemId, 10)+"' failed: "+err.Error()) } delete(this.tasks, itemId) } else { // 更新已存在的 if newItem.Version != task.item.Version { remotelogs.Println("METRIC_MANAGER", "update task '"+strconv.FormatInt(itemId, 10)+"'") task.item = newItem } } } // 启动新的 for _, newItem := range items { if !newItem.IsOn { continue } _, ok := this.tasks[newItem.Id] if !ok { remotelogs.Println("METRIC_MANAGER", "start task '"+strconv.FormatInt(newItem.Id, 10)+"'") task := NewTask(newItem) err := task.Init() if err != nil { remotelogs.Error("METRIC_MANAGER", "initialized task failed: "+err.Error()) continue } err = task.Start() if err != nil { remotelogs.Error("METRIC_MANAGER", "start task failed: "+err.Error()) continue } this.tasks[newItem.Id] = task } } // 按分类存放 this.hasHTTPMetrics = false this.hasTCPMetrics = false this.hasUDPMetrics = false this.categoryTasks = map[string][]*Task{} for _, task := range this.tasks { tasks := this.categoryTasks[task.item.Category] tasks = append(tasks, task) this.categoryTasks[task.item.Category] = tasks switch task.item.Category { case serverconfigs.MetricItemCategoryHTTP: this.hasHTTPMetrics = true case serverconfigs.MetricItemCategoryTCP: this.hasTCPMetrics = true case serverconfigs.MetricItemCategoryUDP: this.hasUDPMetrics = true } } } // Add 添加数据 func (this *Manager) Add(obj MetricInterface) { this.locker.RLock() for _, task := range this.categoryTasks[obj.MetricCategory()] { task.Add(obj) } this.locker.RUnlock() } func (this *Manager) HasHTTPMetrics() bool { return this.hasHTTPMetrics } func (this *Manager) HasTCPMetrics() bool { return this.hasTCPMetrics } func (this *Manager) HasUDPMetrics() bool { return this.hasUDPMetrics }