增加节点同步状态提示和任务列表

This commit is contained in:
GoEdgeLab
2021-01-17 16:47:37 +08:00
parent 5564754ea8
commit 6d3b77fab9
4 changed files with 86 additions and 37 deletions

View File

@@ -10,7 +10,6 @@ import (
teaconst "github.com/TeaOSLab/EdgeNode/internal/const" teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/errors" "github.com/TeaOSLab/EdgeNode/internal/errors"
"github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/iplibrary"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/TeaOSLab/EdgeNode/internal/utils"
@@ -97,10 +96,8 @@ func (this *APIStream) loop() error {
err = this.handlePurgeCache(message) err = this.handlePurgeCache(message)
case messageconfigs.MessageCodePreheatCache: // 预热缓存 case messageconfigs.MessageCodePreheatCache: // 预热缓存
err = this.handlePreheatCache(message) err = this.handlePreheatCache(message)
case messageconfigs.MessageCodeConfigChanged: // 配置变化 case messageconfigs.MessageCodeNewNodeTask: // 有新的任务
err = this.handleConfigChanged(message) err = this.handleNewNodeTask(message)
case messageconfigs.MessageCodeIPListChanged: // IPList变化
err = this.handleIPListChanged(message)
case messageconfigs.MessageCodeCheckSystemdService: // 检查Systemd服务 case messageconfigs.MessageCodeCheckSystemdService: // 检查Systemd服务
err = this.handleCheckSystemdService(message) err = this.handleCheckSystemdService(message)
default: default:
@@ -480,20 +477,9 @@ func (this *APIStream) handlePreheatCache(message *pb.NodeStreamMessage) error {
} }
// 处理配置变化 // 处理配置变化
func (this *APIStream) handleConfigChanged(message *pb.NodeStreamMessage) error { func (this *APIStream) handleNewNodeTask(message *pb.NodeStreamMessage) error {
select { select {
case changeNotify <- true: case nodeTaskNotify <- true:
default:
}
this.replyOk(message.RequestId, "ok")
return nil
}
// 处理IPList变化
func (this *APIStream) handleIPListChanged(message *pb.NodeStreamMessage) error {
select {
case iplibrary.IPListUpdateNotify <- true:
default: default:
} }

View File

@@ -32,9 +32,9 @@ func (this *ListenerManager) Start(node *nodeconfigs.NodeConfig) error {
defer this.locker.Unlock() defer this.locker.Unlock()
// 检查是否有变化 // 检查是否有变化
if this.lastConfig != nil && this.lastConfig.Version == node.Version { /**if this.lastConfig != nil && this.lastConfig.Version == node.Version {
return nil return nil
} }**/
this.lastConfig = node this.lastConfig = node
// 初始化 // 初始化

View File

@@ -12,6 +12,7 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/configs" "github.com/TeaOSLab/EdgeNode/internal/configs"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const" teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/iplibrary"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/TeaOSLab/EdgeNode/internal/utils"
@@ -30,12 +31,12 @@ import (
"time" "time"
) )
var lastVersion = int64(-1)
var sharedNodeConfig *nodeconfigs.NodeConfig var sharedNodeConfig *nodeconfigs.NodeConfig
var changeNotify = make(chan bool, 8) var nodeTaskNotify = make(chan bool, 8)
// 节点 // 节点
type Node struct { type Node struct {
isLoaded bool
} }
func NewNode() *Node { func NewNode() *Node {
@@ -75,7 +76,7 @@ func (this *Node) Start() {
// 读取API配置 // 读取API配置
tryTimes := 0 tryTimes := 0
for { for {
err = this.syncConfig(false) err = this.syncConfig()
if err != nil { if err != nil {
tryTimes++ tryTimes++
@@ -227,8 +228,66 @@ func (this *Node) listenSignals() {
}() }()
} }
// 循环
func (this *Node) loop() error {
// 检查api.yaml是否存在
apiConfigFile := Tea.ConfigFile("api.yaml")
_, err := os.Stat(apiConfigFile)
if err != nil {
return nil
}
rpcClient, err := rpc.SharedRPC()
if err != nil {
return errors.New("create rpc client failed: " + err.Error())
}
nodeCtx := rpcClient.Context()
tasksResp, err := rpcClient.NodeTaskRPC().FindNodeTasks(nodeCtx, &pb.FindNodeTasksRequest{})
if err != nil {
return errors.New("read node tasks failed: " + err.Error())
}
for _, task := range tasksResp.NodeTasks {
logs.Println("update task:", task.Type) // TODO
switch task.Type {
case "ipItemChanged":
iplibrary.IPListUpdateNotify <- true
// 修改为已同步
_, err = rpcClient.NodeTaskRPC().ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{
NodeTaskId: task.Id,
IsOk: true,
Error: "",
})
if err != nil {
return err
}
case "configChanged":
err := this.syncConfig()
if err != nil {
_, err = rpcClient.NodeTaskRPC().ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{
NodeTaskId: task.Id,
IsOk: false,
Error: err.Error(),
})
} else {
_, err = rpcClient.NodeTaskRPC().ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{
NodeTaskId: task.Id,
IsOk: true,
Error: "",
})
}
if err != nil {
return err
}
}
}
return nil
}
// 读取API配置 // 读取API配置
func (this *Node) syncConfig(isFirstTime bool) error { func (this *Node) syncConfig() error {
// 检查api.yaml是否存在 // 检查api.yaml是否存在
apiConfigFile := Tea.ConfigFile("api.yaml") apiConfigFile := Tea.ConfigFile("api.yaml")
_, err := os.Stat(apiConfigFile) _, err := os.Stat(apiConfigFile)
@@ -250,9 +309,13 @@ func (this *Node) syncConfig(isFirstTime bool) error {
if err != nil { if err != nil {
return errors.New("create rpc client failed: " + err.Error()) return errors.New("create rpc client failed: " + err.Error())
} }
// 获取同步任务
nodeCtx := rpcClient.Context()
// TODO 这里考虑只同步版本号有变更的 // TODO 这里考虑只同步版本号有变更的
configResp, err := rpcClient.NodeRPC().FindCurrentNodeConfig(rpcClient.Context(), &pb.FindCurrentNodeConfigRequest{ configResp, err := rpcClient.NodeRPC().FindCurrentNodeConfig(nodeCtx, &pb.FindCurrentNodeConfigRequest{
Version: lastVersion, Version: -1, // 更新所有版本
}) })
if err != nil { if err != nil {
return errors.New("read config from rpc failed: " + err.Error()) return errors.New("read config from rpc failed: " + err.Error())
@@ -274,12 +337,6 @@ func (this *Node) syncConfig(isFirstTime bool) error {
return err return err
} }
// 如果版本相同,则只是保存
if lastVersion == nodeConfig.Version {
return nil
}
lastVersion = nodeConfig.Version
err = nodeConfig.Init() err = nodeConfig.Init()
if err != nil { if err != nil {
return err return err
@@ -293,7 +350,7 @@ func (this *Node) syncConfig(isFirstTime bool) error {
} }
// 刷新配置 // 刷新配置
if isFirstTime { if this.isLoaded {
remotelogs.Println("NODE", "reloading config ...") remotelogs.Println("NODE", "reloading config ...")
} else { } else {
remotelogs.Println("NODE", "loading config ...") remotelogs.Println("NODE", "loading config ...")
@@ -315,10 +372,12 @@ func (this *Node) syncConfig(isFirstTime bool) error {
// 发送事件 // 发送事件
events.Notify(events.EventReload) events.Notify(events.EventReload)
if !isFirstTime { if this.isLoaded {
return sharedListenerManager.Start(nodeConfig) return sharedListenerManager.Start(nodeConfig)
} }
this.isLoaded = true
return nil return nil
} }
@@ -334,13 +393,13 @@ func (this *Node) startSyncTimer() {
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
err := this.syncConfig(false) err := this.loop()
if err != nil { if err != nil {
remotelogs.Error("NODE", "sync config error: "+err.Error()) remotelogs.Error("NODE", "sync config error: "+err.Error())
continue continue
} }
case <-changeNotify: case <-nodeTaskNotify:
err := this.syncConfig(false) err := this.loop()
if err != nil { if err != nil {
remotelogs.Error("NODE", "sync config error: "+err.Error()) remotelogs.Error("NODE", "sync config error: "+err.Error())
continue continue

View File

@@ -53,6 +53,10 @@ func (this *RPCClient) NodeLogRPC() pb.NodeLogServiceClient {
return pb.NewNodeLogServiceClient(this.pickConn()) return pb.NewNodeLogServiceClient(this.pickConn())
} }
func (this *RPCClient) NodeTaskRPC() pb.NodeTaskServiceClient {
return pb.NewNodeTaskServiceClient(this.pickConn())
}
func (this *RPCClient) HTTPAccessLogRPC() pb.HTTPAccessLogServiceClient { func (this *RPCClient) HTTPAccessLogRPC() pb.HTTPAccessLogServiceClient {
return pb.NewHTTPAccessLogServiceClient(this.pickConn()) return pb.NewHTTPAccessLogServiceClient(this.pickConn())
} }