2020-10-25 18:26:46 +08:00
package tasks
import (
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
2021-12-14 10:49:29 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/goman"
2022-09-17 10:18:00 +08:00
"github.com/TeaOSLab/EdgeAPI/internal/installers"
2021-08-08 10:29:48 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
2020-10-25 18:26:46 +08:00
"github.com/iwind/TeaGo/dbs"
2022-01-18 10:35:30 +08:00
"github.com/iwind/TeaGo/types"
"strings"
2020-10-25 18:26:46 +08:00
"time"
)
func init ( ) {
2021-12-14 10:49:29 +08:00
dbs . OnReadyDone ( func ( ) {
goman . New ( func ( ) {
2022-04-23 12:32:30 +08:00
NewNodeMonitorTask ( 1 * time . Minute ) . Start ( )
2021-12-14 10:49:29 +08:00
} )
2020-10-25 18:26:46 +08:00
} )
}
2022-09-17 10:18:00 +08:00
// 节点启动尝试
type nodeStartingTry struct {
count int
timestamp int64
}
2021-08-08 10:29:48 +08:00
// NodeMonitorTask 边缘节点监控任务
2020-10-25 18:26:46 +08:00
type NodeMonitorTask struct {
2022-04-23 12:32:30 +08:00
BaseTask
ticker * time . Ticker
2022-01-18 10:13:41 +08:00
2022-01-18 10:35:30 +08:00
inactiveMap map [ string ] int // cluster@nodeId => count
notifiedMap map [ int64 ] int64 // nodeId => timestamp
2022-09-17 10:18:00 +08:00
recoverMap map [ int64 ] * nodeStartingTry // nodeId => *nodeStartingTry
2020-10-25 18:26:46 +08:00
}
2022-04-23 12:32:30 +08:00
func NewNodeMonitorTask ( duration time . Duration ) * NodeMonitorTask {
2020-10-25 18:26:46 +08:00
return & NodeMonitorTask {
2022-04-23 12:32:30 +08:00
ticker : time . NewTicker ( duration ) ,
inactiveMap : map [ string ] int { } ,
notifiedMap : map [ int64 ] int64 { } ,
2022-09-17 10:18:00 +08:00
recoverMap : map [ int64 ] * nodeStartingTry { } ,
2020-10-25 18:26:46 +08:00
}
}
2022-04-23 12:32:30 +08:00
func ( this * NodeMonitorTask ) Start ( ) {
for range this . ticker . C {
err := this . Loop ( )
if err != nil {
this . logErr ( "NodeMonitorTask" , err . Error ( ) )
}
}
2020-10-25 18:26:46 +08:00
}
2022-04-23 12:32:30 +08:00
func ( this * NodeMonitorTask ) Loop ( ) error {
// 检查是否为主节点
if ! models . SharedAPINodeDAO . CheckAPINodeIsPrimaryWithoutErr ( ) {
2020-10-25 18:26:46 +08:00
return nil
}
2021-01-01 23:31:30 +08:00
clusters , err := models . SharedNodeClusterDAO . FindAllEnableClusters ( nil )
2020-10-25 18:26:46 +08:00
if err != nil {
return err
}
for _ , cluster := range clusters {
2022-04-23 12:32:30 +08:00
err := this . MonitorCluster ( cluster )
2020-10-25 18:26:46 +08:00
if err != nil {
return err
}
}
return nil
}
2022-04-23 12:32:30 +08:00
func ( this * NodeMonitorTask ) MonitorCluster ( cluster * models . NodeCluster ) error {
var clusterId = int64 ( cluster . Id )
2020-10-25 18:26:46 +08:00
// 检查离线节点
2021-01-01 23:31:30 +08:00
inactiveNodes , err := models . SharedNodeDAO . FindAllInactiveNodesWithClusterId ( nil , clusterId )
2020-10-25 18:26:46 +08:00
if err != nil {
return err
}
2022-01-18 10:13:41 +08:00
2022-09-17 10:18:00 +08:00
// 尝试自动远程启动
var nodeQueue = installers . NewNodeQueue ( )
for _ , node := range inactiveNodes {
var nodeId = int64 ( node . Id )
tryInfo , ok := this . recoverMap [ nodeId ]
if ! ok {
tryInfo = & nodeStartingTry {
count : 1 ,
timestamp : time . Now ( ) . Unix ( ) ,
}
this . recoverMap [ nodeId ] = tryInfo
} else {
if tryInfo . count >= 3 /** 3次 **/ { // N 秒内超过 M 次就暂时不再重新尝试,防止阻塞当前任务
if tryInfo . timestamp + 10 * 60 /** 10 分钟 **/ > time . Now ( ) . Unix ( ) {
continue
}
tryInfo . timestamp = time . Now ( ) . Unix ( )
tryInfo . count = 0
}
tryInfo . count ++
}
// TODO 如果用户手工安装的位置不在标准位置,需要节点自身记住最近启动的位置
err = nodeQueue . StartNode ( nodeId )
if err != nil {
if ! installers . IsGrantError ( err ) {
_ = models . SharedNodeLogDAO . CreateLog ( nil , nodeconfigs . NodeRoleNode , nodeId , 0 , 0 , models . LevelError , "NODE" , "start node from remote API failed: " + err . Error ( ) , time . Now ( ) . Unix ( ) , "" , nil )
}
} else {
_ = models . SharedNodeLogDAO . CreateLog ( nil , nodeconfigs . NodeRoleNode , nodeId , 0 , 0 , models . LevelSuccess , "NODE" , "start node from remote API successfully" , time . Now ( ) . Unix ( ) , "" , nil )
}
}
var nodeMap = map [ int64 ] * models . Node { } // nodeId => Node
2020-10-25 18:26:46 +08:00
for _ , node := range inactiveNodes {
2022-01-18 10:13:41 +08:00
var nodeId = int64 ( node . Id )
nodeMap [ nodeId ] = node
2022-01-18 10:35:30 +08:00
this . inactiveMap [ types . String ( clusterId ) + "@" + types . String ( nodeId ) ] ++
2022-01-18 10:13:41 +08:00
}
2020-11-16 09:20:24 +08:00
2022-01-18 10:13:41 +08:00
const maxInactiveTries = 5
// 处理现有的离线状态
2022-01-18 10:35:30 +08:00
for key , count := range this . inactiveMap {
var pieces = strings . Split ( key , "@" )
if pieces [ 0 ] != types . String ( clusterId ) {
continue
}
var nodeId = types . Int64 ( pieces [ 1 ] )
2022-01-18 10:13:41 +08:00
node , ok := nodeMap [ nodeId ]
if ok {
2022-01-18 10:35:30 +08:00
// 连续 N 次离线发送通知
// 同时也要确保两次发送通知的时间不会过近
if count >= maxInactiveTries && time . Now ( ) . Unix ( ) - this . notifiedMap [ nodeId ] > 3600 {
this . inactiveMap [ key ] = 0
this . notifiedMap [ nodeId ] = time . Now ( ) . Unix ( )
2022-01-18 10:13:41 +08:00
2022-08-07 17:28:54 +08:00
var subject = "节点\"" + node . Name + "\"已处于离线状态"
var msg = "集群'" + cluster . Name + "'节点\"" + node . Name + "\"已处于离线状态,请检查节点是否异常"
2022-01-18 10:13:41 +08:00
err = models . SharedMessageDAO . CreateNodeMessage ( nil , nodeconfigs . NodeRoleNode , clusterId , int64 ( node . Id ) , models . MessageTypeNodeInactive , models . LevelError , subject , msg , nil , false )
if err != nil {
return err
}
2022-08-07 17:28:54 +08:00
// 设置通知时间
err = models . SharedNodeDAO . UpdateNodeInactiveNotifiedAt ( nil , nodeId , time . Now ( ) . Unix ( ) )
if err != nil {
return err
}
2022-01-18 10:13:41 +08:00
}
} else {
2022-01-18 10:35:30 +08:00
delete ( this . inactiveMap , key )
2020-11-16 09:20:24 +08:00
}
2020-10-25 18:26:46 +08:00
}
2022-01-18 10:13:41 +08:00
// 检查CPU、内存、磁盘不足节点
2020-10-25 18:26:46 +08:00
// TODO 需要实现
return nil
}