diff --git a/internal/nodes/node.go b/internal/nodes/node.go index 955eefd..12e14fe 100644 --- a/internal/nodes/node.go +++ b/internal/nodes/node.go @@ -56,13 +56,16 @@ type Node struct { maxCPU int32 maxThreads int timezone string + + updatingServerMap map[int64]*serverconfigs.ServerConfig } func NewNode() *Node { return &Node{ - sock: gosock.NewTmpSock(teaconst.ProcessName), - maxThreads: -1, - maxCPU: -1, + sock: gosock.NewTmpSock(teaconst.ProcessName), + maxThreads: -1, + maxCPU: -1, + updatingServerMap: map[int64]*serverconfigs.ServerConfig{}, } } @@ -264,7 +267,7 @@ func (this *Node) loop() error { defer tr.End() // 检查api.yaml是否存在 - apiConfigFile := Tea.ConfigFile("api.yaml") + var apiConfigFile = Tea.ConfigFile("api.yaml") _, err := os.Stat(apiConfigFile) if err != nil { return nil @@ -275,7 +278,7 @@ func (this *Node) loop() error { return errors.New("create rpc client failed: " + err.Error()) } - nodeCtx := rpcClient.Context() + var nodeCtx = rpcClient.Context() tasksResp, err := rpcClient.NodeTaskRPC().FindNodeTasks(nodeCtx, &pb.FindNodeTasksRequest{}) if err != nil { return errors.New("read node tasks failed: " + err.Error()) @@ -295,11 +298,15 @@ func (this *Node) loop() error { return err } case "configChanged": - if !task.IsPrimary { - // 我们等等主节点配置准备完毕 - time.Sleep(2 * time.Second) + if task.ServerId > 0 { + err = this.syncServerConfig(task.ServerId) + } else { + if !task.IsPrimary { + // 我们等等主节点配置准备完毕 + time.Sleep(2 * time.Second) + } + err = this.syncConfig(task.Version) } - err := this.syncConfig(task.Version) if err != nil { _, err = rpcClient.NodeTaskRPC().ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{ NodeTaskId: task.Id, @@ -316,6 +323,7 @@ func (this *Node) loop() error { if err != nil { return err } + case "nodeVersionChanged": goman.New(func() { sharedUpgradeManager.Start() @@ -419,22 +427,8 @@ func (this *Node) syncConfig(taskVersion int64) error { remotelogs.Println("NODE", "loading config ...") } - nodeconfigs.ResetNodeConfig(nodeConfig) - caches.SharedManager.MaxDiskCapacity = nodeConfig.MaxCacheDiskCapacity - caches.SharedManager.MaxMemoryCapacity = nodeConfig.MaxCacheMemoryCapacity - if len(nodeConfig.HTTPCachePolicies) > 0 { - caches.SharedManager.UpdatePolicies(nodeConfig.HTTPCachePolicies) - } else { - caches.SharedManager.UpdatePolicies([]*serverconfigs.HTTPCachePolicy{}) - } - - sharedWAFManager.UpdatePolicies(nodeConfig.FindAllFirewallPolicies()) - iplibrary.SharedActionManager.UpdateActions(nodeConfig.FirewallActions) - sharedNodeConfig = nodeConfig this.onReload(nodeConfig) - metrics.SharedManager.Update(nodeConfig.MetricItems) - // 发送事件 events.Notify(events.EventReload) @@ -447,30 +441,96 @@ func (this *Node) syncConfig(taskVersion int64) error { return nil } +// 读取单个服务配置 +func (this *Node) syncServerConfig(serverId int64) error { + rpcClient, err := rpc.SharedRPC() + if err != nil { + return err + } + resp, err := rpcClient.ServerRPC().ComposeServerConfig(rpcClient.Context(), &pb.ComposeServerConfigRequest{ServerId: serverId}) + if err != nil { + return err + } + + this.locker.Lock() + defer this.locker.Unlock() + if len(resp.ServerConfigJSON) == 0 { + this.updatingServerMap[serverId] = nil + } else { + var config = &serverconfigs.ServerConfig{} + err = json.Unmarshal(resp.ServerConfigJSON, config) + if err != nil { + return err + } + this.updatingServerMap[serverId] = config + } + return nil +} + // 启动同步计时器 func (this *Node) startSyncTimer() { // TODO 这个时间间隔可以自行设置 - ticker := time.NewTicker(60 * time.Second) + var taskTicker = time.NewTicker(60 * time.Second) + var serverChangeTicker = time.NewTicker(5 * time.Second) + events.OnKey(events.EventQuit, this, func() { remotelogs.Println("NODE", "quit sync timer") - ticker.Stop() + taskTicker.Stop() + serverChangeTicker.Stop() }) goman.New(func() { for { select { - case <-ticker.C: + case <-taskTicker.C: // 定期执行 err := this.loop() if err != nil { remotelogs.Error("NODE", "sync config error: "+err.Error()) continue } - case <-nodeTaskNotify: + case <-serverChangeTicker.C: // 服务变化 + this.locker.Lock() + if len(this.updatingServerMap) > 0 { + var updatingServerMap = this.updatingServerMap + this.updatingServerMap = map[int64]*serverconfigs.ServerConfig{} + newNodeConfig, err := nodeconfigs.CloneNodeConfig(sharedNodeConfig) + if err != nil { + remotelogs.Error("NODE", "apply server config error: "+err.Error()) + continue + } + for serverId, serverConfig := range updatingServerMap { + if serverConfig != nil { + newNodeConfig.AddServer(serverConfig) + } else { + newNodeConfig.RemoveServer(serverId) + } + } + + err, serverErrors := newNodeConfig.Init() + if err != nil { + remotelogs.Error("NODE", "apply server config error: "+err.Error()) + continue + } + if len(serverErrors) > 0 { + for _, serverErr := range serverErrors { + remotelogs.ServerError(serverErr.Id, "NODE", serverErr.Message, nodeconfigs.NodeLogTypeServerConfigInitFailed, maps.Map{}) + } + } + + this.onReload(newNodeConfig) + + err = sharedListenerManager.Start(newNodeConfig) + if err != nil { + remotelogs.Error("NODE", "apply server config error: "+err.Error()) + } + } + this.locker.Unlock() + case <-nodeTaskNotify: // 有新的更新任务 err := this.loop() if err != nil { remotelogs.Error("NODE", "sync config error: "+err.Error()) continue } - case <-nodeConfigChangedNotify: + case <-nodeConfigChangedNotify: // 节点变化通知 err := this.syncConfig(0) if err != nil { remotelogs.Error("NODE", "sync config error: "+err.Error()) @@ -701,6 +761,25 @@ func (this *Node) listenSock() error { // 重载配置调用 func (this *Node) onReload(config *nodeconfigs.NodeConfig) { + nodeconfigs.ResetNodeConfig(config) + sharedNodeConfig = config + + // 缓存策略 + caches.SharedManager.MaxDiskCapacity = config.MaxCacheDiskCapacity + caches.SharedManager.MaxMemoryCapacity = config.MaxCacheMemoryCapacity + if len(config.HTTPCachePolicies) > 0 { + caches.SharedManager.UpdatePolicies(config.HTTPCachePolicies) + } else { + caches.SharedManager.UpdatePolicies([]*serverconfigs.HTTPCachePolicy{}) + } + + // WAF策略 + sharedWAFManager.UpdatePolicies(config.FindAllFirewallPolicies()) + iplibrary.SharedActionManager.UpdateActions(config.FirewallActions) + + // 统计指标 + metrics.SharedManager.Update(config.MetricItems) + // max cpu if config.MaxCPU != this.maxCPU { if config.MaxCPU > 0 && config.MaxCPU < int32(runtime.NumCPU()) {