提供批量更新服务配置API(阶段性提交)

This commit is contained in:
GoEdgeLab
2023-04-06 20:50:34 +08:00
parent 021b005c4c
commit afd2f349c6
5 changed files with 384 additions and 239 deletions

View File

@@ -9,7 +9,6 @@ import (
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/ddosconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/firewallconfigs"
"github.com/TeaOSLab/EdgeNode/internal/caches"
"github.com/TeaOSLab/EdgeNode/internal/configs"
@@ -77,7 +76,8 @@ type Node struct {
lastAPINodeVersion int64
lastAPINodeAddrs []string // 以前的API节点地址
lastTaskVersion int64
lastTaskVersion int64
lastUpdatingServerListId int64
}
func NewNode() *Node {
@@ -310,237 +310,6 @@ func (this *Node) InstallSystemService() error {
return nil
}
// 循环
func (this *Node) loop() error {
var tr = trackers.Begin("CHECK_NODE_CONFIG_CHANGES")
defer tr.End()
// 检查api.yaml是否存在
var apiConfigFile = Tea.ConfigFile("api.yaml")
_, err := os.Stat(apiConfigFile)
if err != nil {
return nil
}
rpcClient, err := rpc.SharedRPC()
if err != nil {
return errors.New("create rpc client failed: " + err.Error())
}
tasksResp, err := rpcClient.NodeTaskRPC.FindNodeTasks(rpcClient.Context(), &pb.FindNodeTasksRequest{
Version: this.lastTaskVersion,
})
if err != nil {
if rpc.IsConnError(err) && !Tea.IsTesting() {
return nil
}
return errors.New("read node tasks failed: " + err.Error())
}
for _, task := range tasksResp.NodeTasks {
err := this.execTask(rpcClient, task)
if !this.finishTask(task.Id, task.Version, err) {
// 防止失败的任务无法重试
break
}
}
return nil
}
// 执行任务
func (this *Node) execTask(rpcClient *rpc.RPCClient, task *pb.NodeTask) error {
switch task.Type {
case "ipItemChanged":
// 防止阻塞
select {
case iplibrary.IPListUpdateNotify <- true:
default:
}
case "configChanged":
if task.ServerId > 0 {
return this.syncServerConfig(task.ServerId)
}
if !task.IsPrimary {
// 我们等等主节点配置准备完毕
time.Sleep(2 * time.Second)
}
return this.syncConfig(task.Version)
case "nodeVersionChanged":
if !sharedUpgradeManager.IsInstalling() {
goman.New(func() {
sharedUpgradeManager.Start()
})
}
case "scriptsChanged":
err := this.reloadCommonScripts()
if err != nil {
return errors.New("reload common scripts failed: " + err.Error())
}
case "nodeLevelChanged":
levelInfoResp, err := rpcClient.NodeRPC.FindNodeLevelInfo(rpcClient.Context(), &pb.FindNodeLevelInfoRequest{})
if err != nil {
return err
}
if sharedNodeConfig != nil {
sharedNodeConfig.Level = levelInfoResp.Level
}
var parentNodes = map[int64][]*nodeconfigs.ParentNodeConfig{}
if len(levelInfoResp.ParentNodesMapJSON) > 0 {
err = json.Unmarshal(levelInfoResp.ParentNodesMapJSON, &parentNodes)
if err != nil {
return errors.New("decode level info failed: " + err.Error())
}
}
if sharedNodeConfig != nil {
sharedNodeConfig.ParentNodes = parentNodes
}
case "ddosProtectionChanged":
resp, err := rpcClient.NodeRPC.FindNodeDDoSProtection(rpcClient.Context(), &pb.FindNodeDDoSProtectionRequest{})
if err != nil {
return err
}
if len(resp.DdosProtectionJSON) == 0 {
if sharedNodeConfig != nil {
sharedNodeConfig.DDoSProtection = nil
}
return nil
}
var ddosProtectionConfig = &ddosconfigs.ProtectionConfig{}
err = json.Unmarshal(resp.DdosProtectionJSON, ddosProtectionConfig)
if err != nil {
return errors.New("decode DDoS protection config failed: " + err.Error())
}
if ddosProtectionConfig != nil && sharedNodeConfig != nil {
sharedNodeConfig.DDoSProtection = ddosProtectionConfig
}
err = firewalls.SharedDDoSProtectionManager.Apply(ddosProtectionConfig)
if err != nil {
// 不阻塞
remotelogs.Warn("NODE", "apply DDoS protection failed: "+err.Error())
return nil
}
case "globalServerConfigChanged":
resp, err := rpcClient.NodeRPC.FindNodeGlobalServerConfig(rpcClient.Context(), &pb.FindNodeGlobalServerConfigRequest{})
if err != nil {
return err
}
if len(resp.GlobalServerConfigJSON) > 0 {
var globalServerConfig = serverconfigs.DefaultGlobalServerConfig()
err = json.Unmarshal(resp.GlobalServerConfigJSON, globalServerConfig)
if err != nil {
return errors.New("decode global server config failed: " + err.Error())
}
if globalServerConfig != nil {
err = globalServerConfig.Init()
if err != nil {
return errors.New("validate global server config failed: " + err.Error())
}
if sharedNodeConfig != nil {
sharedNodeConfig.GlobalServerConfig = globalServerConfig
}
}
}
case "userServersStateChanged":
if task.UserId > 0 {
resp, err := rpcClient.UserRPC.CheckUserServersState(rpcClient.Context(), &pb.CheckUserServersStateRequest{UserId: task.UserId})
if err != nil {
return err
}
SharedUserManager.UpdateUserServersIsEnabled(task.UserId, resp.IsEnabled)
if resp.IsEnabled {
err = this.syncUserServersConfig(task.UserId)
if err != nil {
return err
}
}
}
case "uamPolicyChanged":
remotelogs.Println("NODE", "updating uam policies ...")
resp, err := rpcClient.NodeRPC.FindNodeUAMPolicies(rpcClient.Context(), &pb.FindNodeUAMPoliciesRequest{})
if err != nil {
return err
}
var uamPolicyMap = map[int64]*nodeconfigs.UAMPolicy{}
for _, policy := range resp.UamPolicies {
if len(policy.UamPolicyJSON) > 0 {
var uamPolicy = &nodeconfigs.UAMPolicy{}
err = json.Unmarshal(policy.UamPolicyJSON, uamPolicy)
if err != nil {
remotelogs.Error("NODE", "decode uam policy failed: "+err.Error())
continue
}
err = uamPolicy.Init()
if err != nil {
remotelogs.Error("NODE", "initialize uam policy failed: "+err.Error())
continue
}
uamPolicyMap[policy.NodeClusterId] = uamPolicy
}
}
sharedNodeConfig.UpdateUAMPolicies(uamPolicyMap)
case "plusChanged":
err := this.notifyPlusChange()
if err != nil {
return err
}
default:
remotelogs.Error("NODE", "task '"+types.String(task.Id)+"', type '"+task.Type+"' has not been handled")
}
return nil
}
// 标记任务完成
func (this *Node) finishTask(taskId int64, taskVersion int64, taskErr error) (success bool) {
if taskId <= 0 {
return true
}
rpcClient, err := rpc.SharedRPC()
if err != nil {
remotelogs.Debug("NODE", "create rpc client failed: "+err.Error())
return false
}
var isOk = taskErr == nil
if isOk && taskVersion > this.lastTaskVersion {
this.lastTaskVersion = taskVersion
}
var errMsg = ""
if taskErr != nil {
errMsg = taskErr.Error()
}
_, err = rpcClient.NodeTaskRPC.ReportNodeTaskDone(rpcClient.Context(), &pb.ReportNodeTaskDoneRequest{
NodeTaskId: taskId,
IsOk: isOk,
Error: errMsg,
})
success = err == nil
if err != nil {
// 连接错误不需要上报到服务中心
if rpc.IsConnError(err) {
remotelogs.Debug("NODE", "report task done failed: "+err.Error())
} else {
remotelogs.Error("NODE", "report task done failed: "+err.Error())
}
}
return success
}
// 读取API配置
func (this *Node) syncConfig(taskVersion int64) error {
this.locker.Lock()
@@ -730,7 +499,7 @@ func (this *Node) startSyncTimer() {
for {
select {
case <-taskTicker.C: // 定期执行
err := this.loop()
err := this.loopTasks()
if err != nil {
remotelogs.Error("NODE", "sync config error: "+err.Error())
continue
@@ -738,7 +507,7 @@ func (this *Node) startSyncTimer() {
case <-serverChangeTicker.C: // 服务变化
this.reloadServer()
case <-nodeTaskNotify: // 有新的更新任务
err := this.loop()
err := this.loopTasks()
if err != nil {
remotelogs.Error("NODE", "sync config error: "+err.Error())
continue
@@ -1236,7 +1005,9 @@ func (this *Node) reloadServer() {
this.locker.Lock()
defer this.locker.Unlock()
if len(this.updatingServerMap) > 0 {
var countUpdatingServers = len(this.updatingServerMap)
const maxPrintServers = 10
if countUpdatingServers > 0 {
var updatingServerMap = this.updatingServerMap
this.updatingServerMap = map[int64]*serverconfigs.ServerConfig{}
newNodeConfig, err := nodeconfigs.CloneNodeConfig(sharedNodeConfig)
@@ -1246,14 +1017,22 @@ func (this *Node) reloadServer() {
}
for serverId, serverConfig := range updatingServerMap {
if serverConfig != nil {
remotelogs.Debug("NODE", "load server '"+types.String(serverId)+"'")
if countUpdatingServers < maxPrintServers {
remotelogs.Debug("NODE", "load server '"+types.String(serverId)+"'")
}
newNodeConfig.AddServer(serverConfig)
} else {
remotelogs.Debug("NODE", "remove server '"+types.String(serverId)+"'")
if countUpdatingServers < maxPrintServers {
remotelogs.Debug("NODE", "remove server '"+types.String(serverId)+"'")
}
newNodeConfig.RemoveServer(serverId)
}
}
if countUpdatingServers >= maxPrintServers {
remotelogs.Debug("NODE", "reload "+types.String(countUpdatingServers)+" servers")
}
err, serverErrors := newNodeConfig.Init(nil)
if err != nil {
remotelogs.Error("NODE", "apply server config error: "+err.Error())