Files
EdgeAPI/internal/tasks/health_check_cluster_task.go
GoEdgeLab 5a17ae9d79 v1.4.1
2024-07-27 14:15:25 +08:00

172 lines
4.0 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package tasks
import (
"bytes"
"encoding/json"
"strings"
"time"
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/goman"
"github.com/TeaOSLab/EdgeAPI/internal/utils"
"github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/iwind/TeaGo/maps"
"github.com/iwind/TeaGo/types"
)
// HealthCheckClusterTask 单个集群的健康检查任务
type HealthCheckClusterTask struct {
BaseTask
clusterId int64
config *serverconfigs.HealthCheckConfig
ticker *utils.Ticker
notifiedTime time.Time
}
// NewHealthCheckClusterTask 创建新任务
func NewHealthCheckClusterTask(clusterId int64, config *serverconfigs.HealthCheckConfig) *HealthCheckClusterTask {
return &HealthCheckClusterTask{
clusterId: clusterId,
config: config,
}
}
// Reset 重置配置
func (this *HealthCheckClusterTask) Reset(config *serverconfigs.HealthCheckConfig) {
// 检查是否有变化
oldJSON, err := json.Marshal(this.config)
if err != nil {
this.logErr("HealthCheckClusterTask", err.Error())
return
}
newJSON, err := json.Marshal(config)
if err != nil {
this.logErr("HealthCheckClusterTask", err.Error())
return
}
if !bytes.Equal(oldJSON, newJSON) {
this.config = config
this.Run()
}
}
// Run 执行
func (this *HealthCheckClusterTask) Run() {
this.Stop()
if this.config == nil {
return
}
if !this.config.IsOn {
return
}
if this.config.Interval == nil {
return
}
var duration = this.config.Interval.Duration()
if duration <= 0 {
return
}
var ticker = utils.NewTicker(duration)
goman.New(func() {
for ticker.Wait() {
err := this.Loop()
if err != nil {
this.logErr("HealthCheckClusterTask", err.Error())
}
}
})
this.ticker = ticker
}
// Stop 停止
func (this *HealthCheckClusterTask) Stop() {
if this.ticker == nil {
return
}
this.ticker.Stop()
this.ticker = nil
}
// Loop 单个循环任务
func (this *HealthCheckClusterTask) Loop() error {
// 检查是否为主节点
if !this.IsPrimaryNode() {
return nil
}
// 开始运行
var executor = NewHealthCheckExecutor(this.clusterId)
results, err := executor.Run()
if err != nil {
return err
}
var failedResults = []maps.Map{}
for _, result := range results {
if !result.IsOk {
failedResults = append(failedResults, maps.Map{
"node": maps.Map{
"id": result.Node.Id,
"name": result.Node.Name,
},
"isOk": false,
"error": result.Error,
"nodeAddr": result.NodeAddr,
})
}
}
if len(failedResults) > 0 {
// 10分钟内不重复提醒
if time.Since(this.notifiedTime) > 10*time.Minute {
this.notifiedTime = time.Now()
failedResultsJSON, err := json.Marshal(failedResults)
if err != nil {
return err
}
var subject = "有" + numberutils.FormatInt(len(failedResults)) + "个节点IP在健康检查中出现问题"
var message = "有" + numberutils.FormatInt(len(failedResults)) + "个节点IP在健康检查中出现问题"
var failedDescriptions = []string{}
var failedIndex int
for _, result := range results {
if result.IsOk || result.Node == nil {
continue
}
failedIndex++
failedDescriptions = append(failedDescriptions, "节点"+types.String(failedIndex)+""+result.Node.Name+"IP"+result.NodeAddr)
}
const maxNodeDescriptions = 10
var isOverMax = false
if len(failedDescriptions) > maxNodeDescriptions {
failedDescriptions = failedDescriptions[:maxNodeDescriptions]
isOverMax = true
}
message += strings.Join(failedDescriptions, "")
if isOverMax {
message += " ..."
} else {
message += "。"
}
err = models.NewMessageDAO().CreateClusterMessage(nil, nodeconfigs.NodeRoleNode, this.clusterId, models.MessageTypeHealthCheckFailed, models.MessageLevelError, subject, subject, message, failedResultsJSON)
if err != nil {
return err
}
}
}
return nil
}
// Config 获取当前配置
func (this *HealthCheckClusterTask) Config() *serverconfigs.HealthCheckConfig {
return this.config
}