mirror of
				https://github.com/TeaOSLab/EdgeAPI.git
				synced 2025-11-04 16:00:24 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			97 lines
		
	
	
		
			2.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			97 lines
		
	
	
		
			2.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package tasks
 | 
						|
 | 
						|
import (
 | 
						|
	"bytes"
 | 
						|
	"encoding/json"
 | 
						|
	"github.com/TeaOSLab/EdgeAPI/internal/db/models"
 | 
						|
	"github.com/TeaOSLab/EdgeAPI/internal/utils"
 | 
						|
	"github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils"
 | 
						|
	"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
 | 
						|
	"github.com/iwind/TeaGo/dbs"
 | 
						|
	"github.com/iwind/TeaGo/lists"
 | 
						|
	"github.com/iwind/TeaGo/logs"
 | 
						|
	"time"
 | 
						|
)
 | 
						|
 | 
						|
func init() {
 | 
						|
	dbs.OnReady(func() {
 | 
						|
		go NewHealthCheckTask().Run()
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
// 节点健康检查任务
 | 
						|
type HealthCheckTask struct {
 | 
						|
	tasksMap map[int64]*HealthCheckClusterTask // taskId => task
 | 
						|
}
 | 
						|
 | 
						|
func NewHealthCheckTask() *HealthCheckTask {
 | 
						|
	return &HealthCheckTask{
 | 
						|
		tasksMap: map[int64]*HealthCheckClusterTask{},
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (this *HealthCheckTask) Run() {
 | 
						|
	err := this.loop()
 | 
						|
	if err != nil {
 | 
						|
		logs.Println("[TASK][HEALTH_CHECK]" + err.Error())
 | 
						|
	}
 | 
						|
 | 
						|
	ticker := utils.NewTicker(60 * time.Second)
 | 
						|
	for ticker.Wait() {
 | 
						|
		err := this.loop()
 | 
						|
		if err != nil {
 | 
						|
			logs.Println("[TASK][HEALTH_CHECK]" + err.Error())
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (this *HealthCheckTask) loop() error {
 | 
						|
	clusters, err := models.NewNodeClusterDAO().FindAllEnableClusters(nil)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	clusterIds := []int64{}
 | 
						|
	for _, cluster := range clusters {
 | 
						|
		clusterIds = append(clusterIds, int64(cluster.Id))
 | 
						|
	}
 | 
						|
 | 
						|
	// 停掉删除的
 | 
						|
	for clusterId, task := range this.tasksMap {
 | 
						|
		if !lists.ContainsInt64(clusterIds, clusterId) {
 | 
						|
			task.Stop()
 | 
						|
			delete(this.tasksMap, clusterId)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// 启动新的或更新老的
 | 
						|
	for _, cluster := range clusters {
 | 
						|
		clusterId := int64(cluster.Id)
 | 
						|
 | 
						|
		config := &serverconfigs.HealthCheckConfig{}
 | 
						|
		if len(cluster.HealthCheck) > 0 && cluster.HealthCheck != "null" {
 | 
						|
			err = json.Unmarshal([]byte(cluster.HealthCheck), config)
 | 
						|
			if err != nil {
 | 
						|
				logs.Println("[TASK][HEALTH_CHECK]" + err.Error())
 | 
						|
				continue
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		task, ok := this.tasksMap[clusterId]
 | 
						|
		if ok {
 | 
						|
			// 检查是否有变化
 | 
						|
			newJSON, _ := json.Marshal(config)
 | 
						|
			oldJSON, _ := json.Marshal(task.Config())
 | 
						|
			if bytes.Compare(oldJSON, newJSON) != 0 {
 | 
						|
				logs.Println("[TASK][HEALTH_CHECK]update cluster '" + numberutils.FormatInt64(clusterId) + "'")
 | 
						|
				go task.Reset(config)
 | 
						|
			}
 | 
						|
		} else {
 | 
						|
			task := NewHealthCheckClusterTask(clusterId, config)
 | 
						|
			this.tasksMap[clusterId] = task
 | 
						|
			go task.Run()
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 |