mirror of
https://github.com/TeaOSLab/EdgeNode.git
synced 2025-11-09 03:50:27 +08:00
增加节点同步状态提示和任务列表
This commit is contained in:
@@ -10,7 +10,6 @@ import (
|
||||
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/errors"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/events"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/iplibrary"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/rpc"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/utils"
|
||||
@@ -97,10 +96,8 @@ func (this *APIStream) loop() error {
|
||||
err = this.handlePurgeCache(message)
|
||||
case messageconfigs.MessageCodePreheatCache: // 预热缓存
|
||||
err = this.handlePreheatCache(message)
|
||||
case messageconfigs.MessageCodeConfigChanged: // 配置变化
|
||||
err = this.handleConfigChanged(message)
|
||||
case messageconfigs.MessageCodeIPListChanged: // IPList变化
|
||||
err = this.handleIPListChanged(message)
|
||||
case messageconfigs.MessageCodeNewNodeTask: // 有新的任务
|
||||
err = this.handleNewNodeTask(message)
|
||||
case messageconfigs.MessageCodeCheckSystemdService: // 检查Systemd服务
|
||||
err = this.handleCheckSystemdService(message)
|
||||
default:
|
||||
@@ -480,20 +477,9 @@ func (this *APIStream) handlePreheatCache(message *pb.NodeStreamMessage) error {
|
||||
}
|
||||
|
||||
// 处理配置变化
|
||||
func (this *APIStream) handleConfigChanged(message *pb.NodeStreamMessage) error {
|
||||
func (this *APIStream) handleNewNodeTask(message *pb.NodeStreamMessage) error {
|
||||
select {
|
||||
case changeNotify <- true:
|
||||
default:
|
||||
|
||||
}
|
||||
this.replyOk(message.RequestId, "ok")
|
||||
return nil
|
||||
}
|
||||
|
||||
// 处理IPList变化
|
||||
func (this *APIStream) handleIPListChanged(message *pb.NodeStreamMessage) error {
|
||||
select {
|
||||
case iplibrary.IPListUpdateNotify <- true:
|
||||
case nodeTaskNotify <- true:
|
||||
default:
|
||||
|
||||
}
|
||||
|
||||
@@ -32,9 +32,9 @@ func (this *ListenerManager) Start(node *nodeconfigs.NodeConfig) error {
|
||||
defer this.locker.Unlock()
|
||||
|
||||
// 检查是否有变化
|
||||
if this.lastConfig != nil && this.lastConfig.Version == node.Version {
|
||||
/**if this.lastConfig != nil && this.lastConfig.Version == node.Version {
|
||||
return nil
|
||||
}
|
||||
}**/
|
||||
this.lastConfig = node
|
||||
|
||||
// 初始化
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
"github.com/TeaOSLab/EdgeNode/internal/configs"
|
||||
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/events"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/iplibrary"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/rpc"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/utils"
|
||||
@@ -30,12 +31,12 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
var lastVersion = int64(-1)
|
||||
var sharedNodeConfig *nodeconfigs.NodeConfig
|
||||
var changeNotify = make(chan bool, 8)
|
||||
var nodeTaskNotify = make(chan bool, 8)
|
||||
|
||||
// 节点
|
||||
type Node struct {
|
||||
isLoaded bool
|
||||
}
|
||||
|
||||
func NewNode() *Node {
|
||||
@@ -75,7 +76,7 @@ func (this *Node) Start() {
|
||||
// 读取API配置
|
||||
tryTimes := 0
|
||||
for {
|
||||
err = this.syncConfig(false)
|
||||
err = this.syncConfig()
|
||||
if err != nil {
|
||||
tryTimes++
|
||||
|
||||
@@ -227,8 +228,66 @@ func (this *Node) listenSignals() {
|
||||
}()
|
||||
}
|
||||
|
||||
// 循环
|
||||
func (this *Node) loop() error {
|
||||
// 检查api.yaml是否存在
|
||||
apiConfigFile := Tea.ConfigFile("api.yaml")
|
||||
_, err := os.Stat(apiConfigFile)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
rpcClient, err := rpc.SharedRPC()
|
||||
if err != nil {
|
||||
return errors.New("create rpc client failed: " + err.Error())
|
||||
}
|
||||
|
||||
nodeCtx := rpcClient.Context()
|
||||
tasksResp, err := rpcClient.NodeTaskRPC().FindNodeTasks(nodeCtx, &pb.FindNodeTasksRequest{})
|
||||
if err != nil {
|
||||
return errors.New("read node tasks failed: " + err.Error())
|
||||
}
|
||||
for _, task := range tasksResp.NodeTasks {
|
||||
logs.Println("update task:", task.Type) // TODO
|
||||
switch task.Type {
|
||||
case "ipItemChanged":
|
||||
iplibrary.IPListUpdateNotify <- true
|
||||
|
||||
// 修改为已同步
|
||||
_, err = rpcClient.NodeTaskRPC().ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{
|
||||
NodeTaskId: task.Id,
|
||||
IsOk: true,
|
||||
Error: "",
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case "configChanged":
|
||||
err := this.syncConfig()
|
||||
if err != nil {
|
||||
_, err = rpcClient.NodeTaskRPC().ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{
|
||||
NodeTaskId: task.Id,
|
||||
IsOk: false,
|
||||
Error: err.Error(),
|
||||
})
|
||||
} else {
|
||||
_, err = rpcClient.NodeTaskRPC().ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{
|
||||
NodeTaskId: task.Id,
|
||||
IsOk: true,
|
||||
Error: "",
|
||||
})
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// 读取API配置
|
||||
func (this *Node) syncConfig(isFirstTime bool) error {
|
||||
func (this *Node) syncConfig() error {
|
||||
// 检查api.yaml是否存在
|
||||
apiConfigFile := Tea.ConfigFile("api.yaml")
|
||||
_, err := os.Stat(apiConfigFile)
|
||||
@@ -250,9 +309,13 @@ func (this *Node) syncConfig(isFirstTime bool) error {
|
||||
if err != nil {
|
||||
return errors.New("create rpc client failed: " + err.Error())
|
||||
}
|
||||
|
||||
// 获取同步任务
|
||||
nodeCtx := rpcClient.Context()
|
||||
|
||||
// TODO 这里考虑只同步版本号有变更的
|
||||
configResp, err := rpcClient.NodeRPC().FindCurrentNodeConfig(rpcClient.Context(), &pb.FindCurrentNodeConfigRequest{
|
||||
Version: lastVersion,
|
||||
configResp, err := rpcClient.NodeRPC().FindCurrentNodeConfig(nodeCtx, &pb.FindCurrentNodeConfigRequest{
|
||||
Version: -1, // 更新所有版本
|
||||
})
|
||||
if err != nil {
|
||||
return errors.New("read config from rpc failed: " + err.Error())
|
||||
@@ -274,12 +337,6 @@ func (this *Node) syncConfig(isFirstTime bool) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// 如果版本相同,则只是保存
|
||||
if lastVersion == nodeConfig.Version {
|
||||
return nil
|
||||
}
|
||||
lastVersion = nodeConfig.Version
|
||||
|
||||
err = nodeConfig.Init()
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -293,7 +350,7 @@ func (this *Node) syncConfig(isFirstTime bool) error {
|
||||
}
|
||||
|
||||
// 刷新配置
|
||||
if isFirstTime {
|
||||
if this.isLoaded {
|
||||
remotelogs.Println("NODE", "reloading config ...")
|
||||
} else {
|
||||
remotelogs.Println("NODE", "loading config ...")
|
||||
@@ -315,10 +372,12 @@ func (this *Node) syncConfig(isFirstTime bool) error {
|
||||
// 发送事件
|
||||
events.Notify(events.EventReload)
|
||||
|
||||
if !isFirstTime {
|
||||
if this.isLoaded {
|
||||
return sharedListenerManager.Start(nodeConfig)
|
||||
}
|
||||
|
||||
this.isLoaded = true
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -334,13 +393,13 @@ func (this *Node) startSyncTimer() {
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
err := this.syncConfig(false)
|
||||
err := this.loop()
|
||||
if err != nil {
|
||||
remotelogs.Error("NODE", "sync config error: "+err.Error())
|
||||
continue
|
||||
}
|
||||
case <-changeNotify:
|
||||
err := this.syncConfig(false)
|
||||
case <-nodeTaskNotify:
|
||||
err := this.loop()
|
||||
if err != nil {
|
||||
remotelogs.Error("NODE", "sync config error: "+err.Error())
|
||||
continue
|
||||
|
||||
@@ -53,6 +53,10 @@ func (this *RPCClient) NodeLogRPC() pb.NodeLogServiceClient {
|
||||
return pb.NewNodeLogServiceClient(this.pickConn())
|
||||
}
|
||||
|
||||
func (this *RPCClient) NodeTaskRPC() pb.NodeTaskServiceClient {
|
||||
return pb.NewNodeTaskServiceClient(this.pickConn())
|
||||
}
|
||||
|
||||
func (this *RPCClient) HTTPAccessLogRPC() pb.HTTPAccessLogServiceClient {
|
||||
return pb.NewHTTPAccessLogServiceClient(this.pickConn())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user