diff --git a/internal/firewalls/ddos_protection.go b/internal/firewalls/ddos_protection.go index 14cd495..aa9fa18 100644 --- a/internal/firewalls/ddos_protection.go +++ b/internal/firewalls/ddos_protection.go @@ -21,6 +21,7 @@ import ( stringutil "github.com/iwind/TeaGo/utils/string" "net" "strings" + "sync" "time" ) @@ -60,6 +61,8 @@ func init() { type DDoSProtectionManager struct { lastAllowIPList []string lastConfig []byte + + locker sync.Mutex } // NewDDoSProtectionManager 获取新对象 @@ -69,6 +72,12 @@ func NewDDoSProtectionManager() *DDoSProtectionManager { // Apply 应用配置 func (this *DDoSProtectionManager) Apply(config *ddosconfigs.ProtectionConfig) error { + // 加锁防止并发更改 + if !this.locker.TryLock() { + return nil + } + defer this.locker.Unlock() + // 同集群节点IP白名单 var allowIPListChanged = false nodeConfig, _ := nodeconfigs.SharedNodeConfig() diff --git a/internal/nodes/http_request_stat.go b/internal/nodes/http_request_stat.go index 0da0686..79661d1 100644 --- a/internal/nodes/http_request_stat.go +++ b/internal/nodes/http_request_stat.go @@ -6,7 +6,7 @@ import ( // 统计 func (this *HTTPRequest) doStat() { - if this.ReqServer == nil { + if this.ReqServer == nil || this.web == nil || this.web.StatRef == nil { return } diff --git a/internal/nodes/node.go b/internal/nodes/node.go index abb4de6..ac4e3fe 100644 --- a/internal/nodes/node.go +++ b/internal/nodes/node.go @@ -9,7 +9,6 @@ import ( "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" - "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/ddosconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/firewallconfigs" "github.com/TeaOSLab/EdgeNode/internal/caches" "github.com/TeaOSLab/EdgeNode/internal/configs" @@ -77,7 +76,8 @@ type Node struct { lastAPINodeVersion int64 lastAPINodeAddrs []string // 以前的API节点地址 - lastTaskVersion int64 + lastTaskVersion int64 + lastUpdatingServerListId int64 } func NewNode() *Node { @@ -310,237 +310,6 @@ func (this *Node) InstallSystemService() error { return nil } -// 循环 -func (this *Node) loop() error { - var tr = trackers.Begin("CHECK_NODE_CONFIG_CHANGES") - defer tr.End() - - // 检查api.yaml是否存在 - var apiConfigFile = Tea.ConfigFile("api.yaml") - _, err := os.Stat(apiConfigFile) - if err != nil { - return nil - } - - rpcClient, err := rpc.SharedRPC() - if err != nil { - return errors.New("create rpc client failed: " + err.Error()) - } - - tasksResp, err := rpcClient.NodeTaskRPC.FindNodeTasks(rpcClient.Context(), &pb.FindNodeTasksRequest{ - Version: this.lastTaskVersion, - }) - if err != nil { - if rpc.IsConnError(err) && !Tea.IsTesting() { - return nil - } - return errors.New("read node tasks failed: " + err.Error()) - } - for _, task := range tasksResp.NodeTasks { - err := this.execTask(rpcClient, task) - if !this.finishTask(task.Id, task.Version, err) { - // 防止失败的任务无法重试 - break - } - } - - return nil -} - -// 执行任务 -func (this *Node) execTask(rpcClient *rpc.RPCClient, task *pb.NodeTask) error { - switch task.Type { - case "ipItemChanged": - // 防止阻塞 - select { - case iplibrary.IPListUpdateNotify <- true: - default: - - } - case "configChanged": - if task.ServerId > 0 { - return this.syncServerConfig(task.ServerId) - } - if !task.IsPrimary { - // 我们等等主节点配置准备完毕 - time.Sleep(2 * time.Second) - } - return this.syncConfig(task.Version) - case "nodeVersionChanged": - if !sharedUpgradeManager.IsInstalling() { - goman.New(func() { - sharedUpgradeManager.Start() - }) - } - case "scriptsChanged": - err := this.reloadCommonScripts() - if err != nil { - return errors.New("reload common scripts failed: " + err.Error()) - } - case "nodeLevelChanged": - levelInfoResp, err := rpcClient.NodeRPC.FindNodeLevelInfo(rpcClient.Context(), &pb.FindNodeLevelInfoRequest{}) - if err != nil { - return err - } - - if sharedNodeConfig != nil { - sharedNodeConfig.Level = levelInfoResp.Level - } - - var parentNodes = map[int64][]*nodeconfigs.ParentNodeConfig{} - if len(levelInfoResp.ParentNodesMapJSON) > 0 { - err = json.Unmarshal(levelInfoResp.ParentNodesMapJSON, &parentNodes) - if err != nil { - return errors.New("decode level info failed: " + err.Error()) - } - } - - if sharedNodeConfig != nil { - sharedNodeConfig.ParentNodes = parentNodes - } - case "ddosProtectionChanged": - resp, err := rpcClient.NodeRPC.FindNodeDDoSProtection(rpcClient.Context(), &pb.FindNodeDDoSProtectionRequest{}) - if err != nil { - return err - } - if len(resp.DdosProtectionJSON) == 0 { - if sharedNodeConfig != nil { - sharedNodeConfig.DDoSProtection = nil - } - return nil - } - - var ddosProtectionConfig = &ddosconfigs.ProtectionConfig{} - err = json.Unmarshal(resp.DdosProtectionJSON, ddosProtectionConfig) - if err != nil { - return errors.New("decode DDoS protection config failed: " + err.Error()) - } - - if ddosProtectionConfig != nil && sharedNodeConfig != nil { - sharedNodeConfig.DDoSProtection = ddosProtectionConfig - } - - err = firewalls.SharedDDoSProtectionManager.Apply(ddosProtectionConfig) - if err != nil { - // 不阻塞 - remotelogs.Warn("NODE", "apply DDoS protection failed: "+err.Error()) - return nil - } - case "globalServerConfigChanged": - resp, err := rpcClient.NodeRPC.FindNodeGlobalServerConfig(rpcClient.Context(), &pb.FindNodeGlobalServerConfigRequest{}) - if err != nil { - return err - } - if len(resp.GlobalServerConfigJSON) > 0 { - var globalServerConfig = serverconfigs.DefaultGlobalServerConfig() - err = json.Unmarshal(resp.GlobalServerConfigJSON, globalServerConfig) - if err != nil { - return errors.New("decode global server config failed: " + err.Error()) - } - - if globalServerConfig != nil { - err = globalServerConfig.Init() - if err != nil { - return errors.New("validate global server config failed: " + err.Error()) - } - if sharedNodeConfig != nil { - sharedNodeConfig.GlobalServerConfig = globalServerConfig - } - } - } - case "userServersStateChanged": - if task.UserId > 0 { - resp, err := rpcClient.UserRPC.CheckUserServersState(rpcClient.Context(), &pb.CheckUserServersStateRequest{UserId: task.UserId}) - if err != nil { - return err - } - - SharedUserManager.UpdateUserServersIsEnabled(task.UserId, resp.IsEnabled) - - if resp.IsEnabled { - err = this.syncUserServersConfig(task.UserId) - if err != nil { - return err - } - } - } - case "uamPolicyChanged": - remotelogs.Println("NODE", "updating uam policies ...") - resp, err := rpcClient.NodeRPC.FindNodeUAMPolicies(rpcClient.Context(), &pb.FindNodeUAMPoliciesRequest{}) - if err != nil { - return err - } - var uamPolicyMap = map[int64]*nodeconfigs.UAMPolicy{} - for _, policy := range resp.UamPolicies { - if len(policy.UamPolicyJSON) > 0 { - var uamPolicy = &nodeconfigs.UAMPolicy{} - err = json.Unmarshal(policy.UamPolicyJSON, uamPolicy) - if err != nil { - remotelogs.Error("NODE", "decode uam policy failed: "+err.Error()) - continue - } - err = uamPolicy.Init() - if err != nil { - remotelogs.Error("NODE", "initialize uam policy failed: "+err.Error()) - continue - } - uamPolicyMap[policy.NodeClusterId] = uamPolicy - } - } - sharedNodeConfig.UpdateUAMPolicies(uamPolicyMap) - case "plusChanged": - err := this.notifyPlusChange() - if err != nil { - return err - } - default: - remotelogs.Error("NODE", "task '"+types.String(task.Id)+"', type '"+task.Type+"' has not been handled") - } - - return nil -} - -// 标记任务完成 -func (this *Node) finishTask(taskId int64, taskVersion int64, taskErr error) (success bool) { - if taskId <= 0 { - return true - } - - rpcClient, err := rpc.SharedRPC() - if err != nil { - remotelogs.Debug("NODE", "create rpc client failed: "+err.Error()) - return false - } - - var isOk = taskErr == nil - if isOk && taskVersion > this.lastTaskVersion { - this.lastTaskVersion = taskVersion - } - - var errMsg = "" - if taskErr != nil { - errMsg = taskErr.Error() - } - - _, err = rpcClient.NodeTaskRPC.ReportNodeTaskDone(rpcClient.Context(), &pb.ReportNodeTaskDoneRequest{ - NodeTaskId: taskId, - IsOk: isOk, - Error: errMsg, - }) - success = err == nil - - if err != nil { - // 连接错误不需要上报到服务中心 - if rpc.IsConnError(err) { - remotelogs.Debug("NODE", "report task done failed: "+err.Error()) - } else { - remotelogs.Error("NODE", "report task done failed: "+err.Error()) - } - } - - return success -} - // 读取API配置 func (this *Node) syncConfig(taskVersion int64) error { this.locker.Lock() @@ -730,7 +499,7 @@ func (this *Node) startSyncTimer() { for { select { case <-taskTicker.C: // 定期执行 - err := this.loop() + err := this.loopTasks() if err != nil { remotelogs.Error("NODE", "sync config error: "+err.Error()) continue @@ -738,7 +507,7 @@ func (this *Node) startSyncTimer() { case <-serverChangeTicker.C: // 服务变化 this.reloadServer() case <-nodeTaskNotify: // 有新的更新任务 - err := this.loop() + err := this.loopTasks() if err != nil { remotelogs.Error("NODE", "sync config error: "+err.Error()) continue @@ -1236,7 +1005,9 @@ func (this *Node) reloadServer() { this.locker.Lock() defer this.locker.Unlock() - if len(this.updatingServerMap) > 0 { + var countUpdatingServers = len(this.updatingServerMap) + const maxPrintServers = 10 + if countUpdatingServers > 0 { var updatingServerMap = this.updatingServerMap this.updatingServerMap = map[int64]*serverconfigs.ServerConfig{} newNodeConfig, err := nodeconfigs.CloneNodeConfig(sharedNodeConfig) @@ -1246,14 +1017,22 @@ func (this *Node) reloadServer() { } for serverId, serverConfig := range updatingServerMap { if serverConfig != nil { - remotelogs.Debug("NODE", "load server '"+types.String(serverId)+"'") + if countUpdatingServers < maxPrintServers { + remotelogs.Debug("NODE", "load server '"+types.String(serverId)+"'") + } newNodeConfig.AddServer(serverConfig) } else { - remotelogs.Debug("NODE", "remove server '"+types.String(serverId)+"'") + if countUpdatingServers < maxPrintServers { + remotelogs.Debug("NODE", "remove server '"+types.String(serverId)+"'") + } newNodeConfig.RemoveServer(serverId) } } + if countUpdatingServers >= maxPrintServers { + remotelogs.Debug("NODE", "reload "+types.String(countUpdatingServers)+" servers") + } + err, serverErrors := newNodeConfig.Init(nil) if err != nil { remotelogs.Error("NODE", "apply server config error: "+err.Error()) diff --git a/internal/nodes/node_tasks.go b/internal/nodes/node_tasks.go new file mode 100644 index 0000000..e608098 --- /dev/null +++ b/internal/nodes/node_tasks.go @@ -0,0 +1,355 @@ +// Copyright 2023 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package nodes + +import ( + "encoding/json" + "errors" + "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/ddosconfigs" + "github.com/TeaOSLab/EdgeNode/internal/firewalls" + "github.com/TeaOSLab/EdgeNode/internal/goman" + "github.com/TeaOSLab/EdgeNode/internal/iplibrary" + "github.com/TeaOSLab/EdgeNode/internal/remotelogs" + "github.com/TeaOSLab/EdgeNode/internal/rpc" + "github.com/TeaOSLab/EdgeNode/internal/trackers" + "github.com/iwind/TeaGo/Tea" + "github.com/iwind/TeaGo/types" + "os" + "time" +) + +// 循环 +func (this *Node) loopTasks() error { + var tr = trackers.Begin("CHECK_NODE_CONFIG_CHANGES") + defer tr.End() + + // 检查api.yaml是否存在 + var apiConfigFile = Tea.ConfigFile("api.yaml") + _, err := os.Stat(apiConfigFile) + if err != nil { + return nil + } + + rpcClient, err := rpc.SharedRPC() + if err != nil { + return errors.New("create rpc client failed: " + err.Error()) + } + + tasksResp, err := rpcClient.NodeTaskRPC.FindNodeTasks(rpcClient.Context(), &pb.FindNodeTasksRequest{ + Version: this.lastTaskVersion, + }) + if err != nil { + if rpc.IsConnError(err) && !Tea.IsTesting() { + return nil + } + return errors.New("read node tasks failed: " + err.Error()) + } + for _, task := range tasksResp.NodeTasks { + err := this.execTask(rpcClient, task) + if !this.finishTask(task.Id, task.Version, err) { + // 防止失败的任务无法重试 + break + } + } + + return nil +} + +// 执行任务 +func (this *Node) execTask(rpcClient *rpc.RPCClient, task *pb.NodeTask) error { + var err error + switch task.Type { + case "ipItemChanged": + err = this.execIPItemChangedTask() + case "configChanged": + err = this.execConfigChangedTask(task) + case "nodeVersionChanged": + err = this.execNodeVersionChangedTask() + case "scriptsChanged": + err = this.execScriptsChangedTask() + case "nodeLevelChanged": + err = this.execNodeLevelChangedTask(rpcClient) + case "ddosProtectionChanged": + err = this.execDDoSProtectionChangedTask(rpcClient) + case "globalServerConfigChanged": + err = this.execGlobalServerConfigChangedTask(rpcClient) + case "userServersStateChanged": + err = this.execUserServersStateChangedTask(rpcClient, task) + case "uamPolicyChanged": + err = this.execUAMPolicyChangedTask(rpcClient) + case "updatingServers": + err = this.execUpdatingServersTask(rpcClient) + case "plusChanged": + err = this.notifyPlusChange() + default: + remotelogs.Error("NODE", "task '"+types.String(task.Id)+"', type '"+task.Type+"' has not been handled") + } + + return err +} + +// 更新IP条目变更 +func (this *Node) execIPItemChangedTask() error { + // 防止阻塞 + select { + case iplibrary.IPListUpdateNotify <- true: + default: + + } + return nil +} + +// 更新节点配置变更 +func (this *Node) execConfigChangedTask(task *pb.NodeTask) error { + if task.ServerId > 0 { + return this.syncServerConfig(task.ServerId) + } + if !task.IsPrimary { + // 我们等等主节点配置准备完毕 + time.Sleep(2 * time.Second) + } + return this.syncConfig(task.Version) +} + +// 节点程序版本号变更 +func (this *Node) execNodeVersionChangedTask() error { + if !sharedUpgradeManager.IsInstalling() { + goman.New(func() { + sharedUpgradeManager.Start() + }) + } + return nil +} + +// 脚本库变更 +func (this *Node) execScriptsChangedTask() error { + err := this.reloadCommonScripts() + if err != nil { + return errors.New("reload common scripts failed: " + err.Error()) + } + return nil +} + +// 节点级别变更 +func (this *Node) execNodeLevelChangedTask(rpcClient *rpc.RPCClient) error { + levelInfoResp, err := rpcClient.NodeRPC.FindNodeLevelInfo(rpcClient.Context(), &pb.FindNodeLevelInfoRequest{}) + if err != nil { + return err + } + + if sharedNodeConfig != nil { + sharedNodeConfig.Level = levelInfoResp.Level + } + + var parentNodes = map[int64][]*nodeconfigs.ParentNodeConfig{} + if len(levelInfoResp.ParentNodesMapJSON) > 0 { + err = json.Unmarshal(levelInfoResp.ParentNodesMapJSON, &parentNodes) + if err != nil { + return errors.New("decode level info failed: " + err.Error()) + } + } + + if sharedNodeConfig != nil { + sharedNodeConfig.ParentNodes = parentNodes + } + + return nil +} + +// UAM策略变更 +func (this *Node) execUAMPolicyChangedTask(rpcClient *rpc.RPCClient) error { + remotelogs.Println("NODE", "updating uam policies ...") + resp, err := rpcClient.NodeRPC.FindNodeUAMPolicies(rpcClient.Context(), &pb.FindNodeUAMPoliciesRequest{}) + if err != nil { + return err + } + var uamPolicyMap = map[int64]*nodeconfigs.UAMPolicy{} + for _, policy := range resp.UamPolicies { + if len(policy.UamPolicyJSON) > 0 { + var uamPolicy = &nodeconfigs.UAMPolicy{} + err = json.Unmarshal(policy.UamPolicyJSON, uamPolicy) + if err != nil { + remotelogs.Error("NODE", "decode uam policy failed: "+err.Error()) + continue + } + err = uamPolicy.Init() + if err != nil { + remotelogs.Error("NODE", "initialize uam policy failed: "+err.Error()) + continue + } + uamPolicyMap[policy.NodeClusterId] = uamPolicy + } + } + sharedNodeConfig.UpdateUAMPolicies(uamPolicyMap) + return nil +} + +// DDoS配置变更 +func (this *Node) execDDoSProtectionChangedTask(rpcClient *rpc.RPCClient) error { + resp, err := rpcClient.NodeRPC.FindNodeDDoSProtection(rpcClient.Context(), &pb.FindNodeDDoSProtectionRequest{}) + if err != nil { + return err + } + if len(resp.DdosProtectionJSON) == 0 { + if sharedNodeConfig != nil { + sharedNodeConfig.DDoSProtection = nil + } + return nil + } + + var ddosProtectionConfig = &ddosconfigs.ProtectionConfig{} + err = json.Unmarshal(resp.DdosProtectionJSON, ddosProtectionConfig) + if err != nil { + return errors.New("decode DDoS protection config failed: " + err.Error()) + } + + if ddosProtectionConfig != nil && sharedNodeConfig != nil { + sharedNodeConfig.DDoSProtection = ddosProtectionConfig + } + + go func() { + err = firewalls.SharedDDoSProtectionManager.Apply(ddosProtectionConfig) + if err != nil { + // 不阻塞 + remotelogs.Warn("NODE", "apply DDoS protection failed: "+err.Error()) + } + }() + + return nil +} + +// 服务全局配置变更 +func (this *Node) execGlobalServerConfigChangedTask(rpcClient *rpc.RPCClient) error { + resp, err := rpcClient.NodeRPC.FindNodeGlobalServerConfig(rpcClient.Context(), &pb.FindNodeGlobalServerConfigRequest{}) + if err != nil { + return err + } + if len(resp.GlobalServerConfigJSON) > 0 { + var globalServerConfig = serverconfigs.DefaultGlobalServerConfig() + err = json.Unmarshal(resp.GlobalServerConfigJSON, globalServerConfig) + if err != nil { + return errors.New("decode global server config failed: " + err.Error()) + } + + if globalServerConfig != nil { + err = globalServerConfig.Init() + if err != nil { + return errors.New("validate global server config failed: " + err.Error()) + } + if sharedNodeConfig != nil { + sharedNodeConfig.GlobalServerConfig = globalServerConfig + } + } + } + return nil +} + +// 单个用户服务状态变更 +func (this *Node) execUserServersStateChangedTask(rpcClient *rpc.RPCClient, task *pb.NodeTask) error { + if task.UserId > 0 { + resp, err := rpcClient.UserRPC.CheckUserServersState(rpcClient.Context(), &pb.CheckUserServersStateRequest{UserId: task.UserId}) + if err != nil { + return err + } + + SharedUserManager.UpdateUserServersIsEnabled(task.UserId, resp.IsEnabled) + + if resp.IsEnabled { + err = this.syncUserServersConfig(task.UserId) + if err != nil { + return err + } + } + } + return nil +} + +// 更新一组服务列表 +func (this *Node) execUpdatingServersTask(rpcClient *rpc.RPCClient) error { + if this.lastUpdatingServerListId <= 0 { + this.lastUpdatingServerListId = sharedNodeConfig.UpdatingServerListId + } + + resp, err := rpcClient.UpdatingServerListRPC.FindUpdatingServerLists(rpcClient.Context(), &pb.FindUpdatingServerListsRequest{LastId: this.lastUpdatingServerListId}) + if err != nil { + return err + } + + if resp.MaxId <= 0 || len(resp.ServersJSON) == 0 { + return nil + } + + var serverConfigs = []*serverconfigs.ServerConfig{} + err = json.Unmarshal(resp.ServersJSON, &serverConfigs) + if err != nil { + return errors.New("decode server configs failed: " + err.Error()) + } + + if resp.MaxId > this.lastUpdatingServerListId { + this.lastUpdatingServerListId = resp.MaxId + } + + if len(serverConfigs) == 0 { + return nil + } + + this.locker.Lock() + defer this.locker.Unlock() + for _, serverConfig := range serverConfigs { + if serverConfig == nil { + continue + } + + if serverConfig.IsOn { + this.updatingServerMap[serverConfig.Id] = serverConfig + } else { + this.updatingServerMap[serverConfig.Id] = nil + } + } + + return nil +} + +// 标记任务完成 +func (this *Node) finishTask(taskId int64, taskVersion int64, taskErr error) (success bool) { + if taskId <= 0 { + return true + } + + rpcClient, err := rpc.SharedRPC() + if err != nil { + remotelogs.Debug("NODE", "create rpc client failed: "+err.Error()) + return false + } + + var isOk = taskErr == nil + if isOk && taskVersion > this.lastTaskVersion { + this.lastTaskVersion = taskVersion + } + + var errMsg = "" + if taskErr != nil { + errMsg = taskErr.Error() + } + + _, err = rpcClient.NodeTaskRPC.ReportNodeTaskDone(rpcClient.Context(), &pb.ReportNodeTaskDoneRequest{ + NodeTaskId: taskId, + IsOk: isOk, + Error: errMsg, + }) + success = err == nil + + if err != nil { + // 连接错误不需要上报到服务中心 + if rpc.IsConnError(err) { + remotelogs.Debug("NODE", "report task done failed: "+err.Error()) + } else { + remotelogs.Error("NODE", "report task done failed: "+err.Error()) + } + } + + return success +} diff --git a/internal/rpc/rpc_client.go b/internal/rpc/rpc_client.go index 063801f..7c9ab7c 100644 --- a/internal/rpc/rpc_client.go +++ b/internal/rpc/rpc_client.go @@ -52,6 +52,7 @@ type RPCClient struct { UserRPC pb.UserServiceClient ClientAgentIPRPC pb.ClientAgentIPServiceClient AuthorityKeyRPC pb.AuthorityKeyServiceClient + UpdatingServerListRPC pb.UpdatingServerListServiceClient } func NewRPCClient(apiConfig *configs.APIConfig) (*RPCClient, error) { @@ -87,6 +88,7 @@ func NewRPCClient(apiConfig *configs.APIConfig) (*RPCClient, error) { client.UserRPC = pb.NewUserServiceClient(client) client.ClientAgentIPRPC = pb.NewClientAgentIPServiceClient(client) client.AuthorityKeyRPC = pb.NewAuthorityKeyServiceClient(client) + client.UpdatingServerListRPC = pb.NewUpdatingServerListServiceClient(client) err := client.init() if err != nil {