mirror of
https://github.com/TeaOSLab/EdgeNode.git
synced 2026-02-14 07:05:37 +08:00
集群设置中增加服务设置/可以设置不记录服务错误日志
This commit is contained in:
@@ -2,6 +2,7 @@ package nodes
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
iplib "github.com/TeaOSLab/EdgeCommon/pkg/iplibrary"
|
||||
@@ -321,139 +322,157 @@ func (this *Node) loop() error {
|
||||
return errors.New("read node tasks failed: " + err.Error())
|
||||
}
|
||||
for _, task := range tasksResp.NodeTasks {
|
||||
switch task.Type {
|
||||
case "ipItemChanged":
|
||||
// 防止阻塞
|
||||
select {
|
||||
case iplibrary.IPListUpdateNotify <- true:
|
||||
default:
|
||||
|
||||
}
|
||||
|
||||
// 修改为已同步
|
||||
_, err = rpcClient.NodeTaskRPC.ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{
|
||||
NodeTaskId: task.Id,
|
||||
IsOk: true,
|
||||
Error: "",
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case "configChanged":
|
||||
if task.ServerId > 0 {
|
||||
err = this.syncServerConfig(task.ServerId)
|
||||
} else {
|
||||
if !task.IsPrimary {
|
||||
// 我们等等主节点配置准备完毕
|
||||
time.Sleep(2 * time.Second)
|
||||
}
|
||||
err = this.syncConfig(task.Version)
|
||||
}
|
||||
if err != nil {
|
||||
_, err = rpcClient.NodeTaskRPC.ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{
|
||||
NodeTaskId: task.Id,
|
||||
IsOk: false,
|
||||
Error: err.Error(),
|
||||
})
|
||||
} else {
|
||||
_, err = rpcClient.NodeTaskRPC.ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{
|
||||
NodeTaskId: task.Id,
|
||||
IsOk: true,
|
||||
Error: "",
|
||||
})
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case "nodeVersionChanged":
|
||||
if !sharedUpgradeManager.IsInstalling() {
|
||||
goman.New(func() {
|
||||
sharedUpgradeManager.Start()
|
||||
})
|
||||
}
|
||||
case "scriptsChanged":
|
||||
err = this.reloadCommonScripts()
|
||||
if err != nil {
|
||||
return errors.New("reload common scripts failed: " + err.Error())
|
||||
}
|
||||
|
||||
// 修改为已同步
|
||||
_, err = rpcClient.NodeTaskRPC.ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{
|
||||
NodeTaskId: task.Id,
|
||||
IsOk: true,
|
||||
Error: "",
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case "nodeLevelChanged":
|
||||
levelInfoResp, err := rpcClient.NodeRPC.FindNodeLevelInfo(nodeCtx, &pb.FindNodeLevelInfoRequest{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sharedNodeConfig.Level = levelInfoResp.Level
|
||||
|
||||
var parentNodes = map[int64][]*nodeconfigs.ParentNodeConfig{}
|
||||
if len(levelInfoResp.ParentNodesMapJSON) > 0 {
|
||||
err = json.Unmarshal(levelInfoResp.ParentNodesMapJSON, &parentNodes)
|
||||
if err != nil {
|
||||
return errors.New("decode level info failed: " + err.Error())
|
||||
}
|
||||
}
|
||||
sharedNodeConfig.ParentNodes = parentNodes
|
||||
|
||||
// 修改为已同步
|
||||
_, err = rpcClient.NodeTaskRPC.ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{
|
||||
NodeTaskId: task.Id,
|
||||
IsOk: true,
|
||||
Error: "",
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case "ddosProtectionChanged":
|
||||
resp, err := rpcClient.NodeRPC.FindNodeDDoSProtection(nodeCtx, &pb.FindNodeDDoSProtectionRequest{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(resp.DdosProtectionJSON) == 0 {
|
||||
if sharedNodeConfig != nil {
|
||||
sharedNodeConfig.DDoSProtection = nil
|
||||
}
|
||||
} else {
|
||||
var ddosProtectionConfig = &ddosconfigs.ProtectionConfig{}
|
||||
err = json.Unmarshal(resp.DdosProtectionJSON, ddosProtectionConfig)
|
||||
if err != nil {
|
||||
return errors.New("decode DDoS protection config failed: " + err.Error())
|
||||
}
|
||||
|
||||
if sharedNodeConfig != nil {
|
||||
sharedNodeConfig.DDoSProtection = ddosProtectionConfig
|
||||
}
|
||||
|
||||
err = firewalls.SharedDDoSProtectionManager.Apply(ddosProtectionConfig)
|
||||
if err != nil {
|
||||
// 不阻塞
|
||||
remotelogs.Error("NODE", "apply DDoS protection failed: "+err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// 修改为已同步
|
||||
_, err = rpcClient.NodeTaskRPC.ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{
|
||||
NodeTaskId: task.Id,
|
||||
IsOk: true,
|
||||
Error: "",
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
err := this.execTask(rpcClient, nodeCtx, task)
|
||||
this.finishTask(task.Id, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// 执行任务
|
||||
func (this *Node) execTask(rpcClient *rpc.RPCClient, nodeCtx context.Context, task *pb.NodeTask) error {
|
||||
switch task.Type {
|
||||
case "ipItemChanged":
|
||||
// 防止阻塞
|
||||
select {
|
||||
case iplibrary.IPListUpdateNotify <- true:
|
||||
default:
|
||||
|
||||
}
|
||||
case "configChanged":
|
||||
if task.ServerId > 0 {
|
||||
return this.syncServerConfig(task.ServerId)
|
||||
}
|
||||
if !task.IsPrimary {
|
||||
// 我们等等主节点配置准备完毕
|
||||
time.Sleep(2 * time.Second)
|
||||
}
|
||||
return this.syncConfig(task.Version)
|
||||
case "nodeVersionChanged":
|
||||
if !sharedUpgradeManager.IsInstalling() {
|
||||
goman.New(func() {
|
||||
sharedUpgradeManager.Start()
|
||||
})
|
||||
}
|
||||
case "scriptsChanged":
|
||||
err := this.reloadCommonScripts()
|
||||
if err != nil {
|
||||
return errors.New("reload common scripts failed: " + err.Error())
|
||||
}
|
||||
case "nodeLevelChanged":
|
||||
levelInfoResp, err := rpcClient.NodeRPC.FindNodeLevelInfo(nodeCtx, &pb.FindNodeLevelInfoRequest{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if sharedNodeConfig != nil {
|
||||
sharedNodeConfig.Level = levelInfoResp.Level
|
||||
}
|
||||
|
||||
var parentNodes = map[int64][]*nodeconfigs.ParentNodeConfig{}
|
||||
if len(levelInfoResp.ParentNodesMapJSON) > 0 {
|
||||
err = json.Unmarshal(levelInfoResp.ParentNodesMapJSON, &parentNodes)
|
||||
if err != nil {
|
||||
return errors.New("decode level info failed: " + err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
if sharedNodeConfig != nil {
|
||||
sharedNodeConfig.ParentNodes = parentNodes
|
||||
}
|
||||
case "ddosProtectionChanged":
|
||||
resp, err := rpcClient.NodeRPC.FindNodeDDoSProtection(nodeCtx, &pb.FindNodeDDoSProtectionRequest{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(resp.DdosProtectionJSON) == 0 {
|
||||
if sharedNodeConfig != nil {
|
||||
sharedNodeConfig.DDoSProtection = nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
var ddosProtectionConfig = &ddosconfigs.ProtectionConfig{}
|
||||
err = json.Unmarshal(resp.DdosProtectionJSON, ddosProtectionConfig)
|
||||
if err != nil {
|
||||
return errors.New("decode DDoS protection config failed: " + err.Error())
|
||||
}
|
||||
|
||||
if ddosProtectionConfig != nil && sharedNodeConfig != nil {
|
||||
sharedNodeConfig.DDoSProtection = ddosProtectionConfig
|
||||
}
|
||||
|
||||
err = firewalls.SharedDDoSProtectionManager.Apply(ddosProtectionConfig)
|
||||
if err != nil {
|
||||
// 不阻塞
|
||||
remotelogs.Warn("NODE", "apply DDoS protection failed: "+err.Error())
|
||||
return nil
|
||||
}
|
||||
case "globalServerConfigChanged":
|
||||
resp, err := rpcClient.NodeRPC.FindNodeGlobalServerConfig(nodeCtx, &pb.FindNodeGlobalServerConfigRequest{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(resp.GlobalServerConfigJSON) > 0 {
|
||||
var globalServerConfig = serverconfigs.DefaultGlobalServerConfig()
|
||||
err = json.Unmarshal(resp.GlobalServerConfigJSON, globalServerConfig)
|
||||
if err != nil {
|
||||
return errors.New("decode global server config failed: " + err.Error())
|
||||
}
|
||||
|
||||
if globalServerConfig != nil {
|
||||
err = globalServerConfig.Init()
|
||||
if err != nil {
|
||||
return errors.New("validate global server config failed: " + err.Error())
|
||||
}
|
||||
if sharedNodeConfig != nil {
|
||||
sharedNodeConfig.GlobalServerConfig = globalServerConfig
|
||||
}
|
||||
}
|
||||
}
|
||||
default:
|
||||
remotelogs.Error("NODE", "task '"+types.String(task.Id)+"', type '"+task.Type+"' has not been handled")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// 标记任务完成
|
||||
func (this *Node) finishTask(taskId int64, err error) {
|
||||
if taskId <= 0 {
|
||||
return
|
||||
}
|
||||
|
||||
rpcClient, err := rpc.SharedRPC()
|
||||
if err != nil {
|
||||
logs.Println("[NODE]", "create rpc client failed: "+err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
var nodeCtx = rpcClient.Context()
|
||||
|
||||
var isOk = err == nil
|
||||
var errMsg = ""
|
||||
if err != nil {
|
||||
errMsg = err.Error()
|
||||
}
|
||||
|
||||
_, err = rpcClient.NodeTaskRPC.ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{
|
||||
NodeTaskId: taskId,
|
||||
IsOk: isOk,
|
||||
Error: errMsg,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
// 不需要上报到服务中心
|
||||
if rpc.IsConnError(err) {
|
||||
logs.Println("[NODE]", "report task done failed: "+err.Error())
|
||||
} else {
|
||||
remotelogs.Error("NODE", "report task done failed: "+err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 读取API配置
|
||||
func (this *Node) syncConfig(taskVersion int64) error {
|
||||
this.locker.Lock()
|
||||
|
||||
Reference in New Issue
Block a user