mirror of
https://github.com/TeaOSLab/EdgeNode.git
synced 2025-11-25 00:10:25 +08:00
支持单个服务更新配置
This commit is contained in:
@@ -56,13 +56,16 @@ type Node struct {
|
||||
maxCPU int32
|
||||
maxThreads int
|
||||
timezone string
|
||||
|
||||
updatingServerMap map[int64]*serverconfigs.ServerConfig
|
||||
}
|
||||
|
||||
func NewNode() *Node {
|
||||
return &Node{
|
||||
sock: gosock.NewTmpSock(teaconst.ProcessName),
|
||||
maxThreads: -1,
|
||||
maxCPU: -1,
|
||||
sock: gosock.NewTmpSock(teaconst.ProcessName),
|
||||
maxThreads: -1,
|
||||
maxCPU: -1,
|
||||
updatingServerMap: map[int64]*serverconfigs.ServerConfig{},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -264,7 +267,7 @@ func (this *Node) loop() error {
|
||||
defer tr.End()
|
||||
|
||||
// 检查api.yaml是否存在
|
||||
apiConfigFile := Tea.ConfigFile("api.yaml")
|
||||
var apiConfigFile = Tea.ConfigFile("api.yaml")
|
||||
_, err := os.Stat(apiConfigFile)
|
||||
if err != nil {
|
||||
return nil
|
||||
@@ -275,7 +278,7 @@ func (this *Node) loop() error {
|
||||
return errors.New("create rpc client failed: " + err.Error())
|
||||
}
|
||||
|
||||
nodeCtx := rpcClient.Context()
|
||||
var nodeCtx = rpcClient.Context()
|
||||
tasksResp, err := rpcClient.NodeTaskRPC().FindNodeTasks(nodeCtx, &pb.FindNodeTasksRequest{})
|
||||
if err != nil {
|
||||
return errors.New("read node tasks failed: " + err.Error())
|
||||
@@ -295,11 +298,15 @@ func (this *Node) loop() error {
|
||||
return err
|
||||
}
|
||||
case "configChanged":
|
||||
if !task.IsPrimary {
|
||||
// 我们等等主节点配置准备完毕
|
||||
time.Sleep(2 * time.Second)
|
||||
if task.ServerId > 0 {
|
||||
err = this.syncServerConfig(task.ServerId)
|
||||
} else {
|
||||
if !task.IsPrimary {
|
||||
// 我们等等主节点配置准备完毕
|
||||
time.Sleep(2 * time.Second)
|
||||
}
|
||||
err = this.syncConfig(task.Version)
|
||||
}
|
||||
err := this.syncConfig(task.Version)
|
||||
if err != nil {
|
||||
_, err = rpcClient.NodeTaskRPC().ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{
|
||||
NodeTaskId: task.Id,
|
||||
@@ -316,6 +323,7 @@ func (this *Node) loop() error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
case "nodeVersionChanged":
|
||||
goman.New(func() {
|
||||
sharedUpgradeManager.Start()
|
||||
@@ -419,22 +427,8 @@ func (this *Node) syncConfig(taskVersion int64) error {
|
||||
remotelogs.Println("NODE", "loading config ...")
|
||||
}
|
||||
|
||||
nodeconfigs.ResetNodeConfig(nodeConfig)
|
||||
caches.SharedManager.MaxDiskCapacity = nodeConfig.MaxCacheDiskCapacity
|
||||
caches.SharedManager.MaxMemoryCapacity = nodeConfig.MaxCacheMemoryCapacity
|
||||
if len(nodeConfig.HTTPCachePolicies) > 0 {
|
||||
caches.SharedManager.UpdatePolicies(nodeConfig.HTTPCachePolicies)
|
||||
} else {
|
||||
caches.SharedManager.UpdatePolicies([]*serverconfigs.HTTPCachePolicy{})
|
||||
}
|
||||
|
||||
sharedWAFManager.UpdatePolicies(nodeConfig.FindAllFirewallPolicies())
|
||||
iplibrary.SharedActionManager.UpdateActions(nodeConfig.FirewallActions)
|
||||
sharedNodeConfig = nodeConfig
|
||||
this.onReload(nodeConfig)
|
||||
|
||||
metrics.SharedManager.Update(nodeConfig.MetricItems)
|
||||
|
||||
// 发送事件
|
||||
events.Notify(events.EventReload)
|
||||
|
||||
@@ -447,30 +441,96 @@ func (this *Node) syncConfig(taskVersion int64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 读取单个服务配置
|
||||
func (this *Node) syncServerConfig(serverId int64) error {
|
||||
rpcClient, err := rpc.SharedRPC()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp, err := rpcClient.ServerRPC().ComposeServerConfig(rpcClient.Context(), &pb.ComposeServerConfigRequest{ServerId: serverId})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
this.locker.Lock()
|
||||
defer this.locker.Unlock()
|
||||
if len(resp.ServerConfigJSON) == 0 {
|
||||
this.updatingServerMap[serverId] = nil
|
||||
} else {
|
||||
var config = &serverconfigs.ServerConfig{}
|
||||
err = json.Unmarshal(resp.ServerConfigJSON, config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
this.updatingServerMap[serverId] = config
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// 启动同步计时器
|
||||
func (this *Node) startSyncTimer() {
|
||||
// TODO 这个时间间隔可以自行设置
|
||||
ticker := time.NewTicker(60 * time.Second)
|
||||
var taskTicker = time.NewTicker(60 * time.Second)
|
||||
var serverChangeTicker = time.NewTicker(5 * time.Second)
|
||||
|
||||
events.OnKey(events.EventQuit, this, func() {
|
||||
remotelogs.Println("NODE", "quit sync timer")
|
||||
ticker.Stop()
|
||||
taskTicker.Stop()
|
||||
serverChangeTicker.Stop()
|
||||
})
|
||||
goman.New(func() {
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
case <-taskTicker.C: // 定期执行
|
||||
err := this.loop()
|
||||
if err != nil {
|
||||
remotelogs.Error("NODE", "sync config error: "+err.Error())
|
||||
continue
|
||||
}
|
||||
case <-nodeTaskNotify:
|
||||
case <-serverChangeTicker.C: // 服务变化
|
||||
this.locker.Lock()
|
||||
if len(this.updatingServerMap) > 0 {
|
||||
var updatingServerMap = this.updatingServerMap
|
||||
this.updatingServerMap = map[int64]*serverconfigs.ServerConfig{}
|
||||
newNodeConfig, err := nodeconfigs.CloneNodeConfig(sharedNodeConfig)
|
||||
if err != nil {
|
||||
remotelogs.Error("NODE", "apply server config error: "+err.Error())
|
||||
continue
|
||||
}
|
||||
for serverId, serverConfig := range updatingServerMap {
|
||||
if serverConfig != nil {
|
||||
newNodeConfig.AddServer(serverConfig)
|
||||
} else {
|
||||
newNodeConfig.RemoveServer(serverId)
|
||||
}
|
||||
}
|
||||
|
||||
err, serverErrors := newNodeConfig.Init()
|
||||
if err != nil {
|
||||
remotelogs.Error("NODE", "apply server config error: "+err.Error())
|
||||
continue
|
||||
}
|
||||
if len(serverErrors) > 0 {
|
||||
for _, serverErr := range serverErrors {
|
||||
remotelogs.ServerError(serverErr.Id, "NODE", serverErr.Message, nodeconfigs.NodeLogTypeServerConfigInitFailed, maps.Map{})
|
||||
}
|
||||
}
|
||||
|
||||
this.onReload(newNodeConfig)
|
||||
|
||||
err = sharedListenerManager.Start(newNodeConfig)
|
||||
if err != nil {
|
||||
remotelogs.Error("NODE", "apply server config error: "+err.Error())
|
||||
}
|
||||
}
|
||||
this.locker.Unlock()
|
||||
case <-nodeTaskNotify: // 有新的更新任务
|
||||
err := this.loop()
|
||||
if err != nil {
|
||||
remotelogs.Error("NODE", "sync config error: "+err.Error())
|
||||
continue
|
||||
}
|
||||
case <-nodeConfigChangedNotify:
|
||||
case <-nodeConfigChangedNotify: // 节点变化通知
|
||||
err := this.syncConfig(0)
|
||||
if err != nil {
|
||||
remotelogs.Error("NODE", "sync config error: "+err.Error())
|
||||
@@ -701,6 +761,25 @@ func (this *Node) listenSock() error {
|
||||
|
||||
// 重载配置调用
|
||||
func (this *Node) onReload(config *nodeconfigs.NodeConfig) {
|
||||
nodeconfigs.ResetNodeConfig(config)
|
||||
sharedNodeConfig = config
|
||||
|
||||
// 缓存策略
|
||||
caches.SharedManager.MaxDiskCapacity = config.MaxCacheDiskCapacity
|
||||
caches.SharedManager.MaxMemoryCapacity = config.MaxCacheMemoryCapacity
|
||||
if len(config.HTTPCachePolicies) > 0 {
|
||||
caches.SharedManager.UpdatePolicies(config.HTTPCachePolicies)
|
||||
} else {
|
||||
caches.SharedManager.UpdatePolicies([]*serverconfigs.HTTPCachePolicy{})
|
||||
}
|
||||
|
||||
// WAF策略
|
||||
sharedWAFManager.UpdatePolicies(config.FindAllFirewallPolicies())
|
||||
iplibrary.SharedActionManager.UpdateActions(config.FirewallActions)
|
||||
|
||||
// 统计指标
|
||||
metrics.SharedManager.Update(config.MetricItems)
|
||||
|
||||
// max cpu
|
||||
if config.MaxCPU != this.maxCPU {
|
||||
if config.MaxCPU > 0 && config.MaxCPU < int32(runtime.NumCPU()) {
|
||||
|
||||
Reference in New Issue
Block a user