diff --git a/internal/nodes/api_stream.go b/internal/nodes/api_stream.go index dcab7cb..cdccf10 100644 --- a/internal/nodes/api_stream.go +++ b/internal/nodes/api_stream.go @@ -9,15 +9,18 @@ import ( "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeNode/internal/caches" + "github.com/TeaOSLab/EdgeNode/internal/configs" teaconst "github.com/TeaOSLab/EdgeNode/internal/const" "github.com/TeaOSLab/EdgeNode/internal/errors" "github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" "github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/utils" + "github.com/iwind/TeaGo/Tea" "io" "net" "net/http" + "net/url" "os/exec" "strconv" "strings" @@ -111,6 +114,8 @@ func (this *APIStream) loop() error { err = this.handleNewNodeTask(message) case messageconfigs.MessageCodeCheckSystemdService: // 检查Systemd服务 err = this.handleCheckSystemdService(message) + case messageconfigs.MessageCodeChangeAPINode: // 修改API节点地址 + err = this.handleChangeAPINode(message) default: err = this.handleUnknownMessage(message) } @@ -570,6 +575,65 @@ func (this *APIStream) handleCheckSystemdService(message *pb.NodeStreamMessage) return nil } +// 修改API地址 +func (this *APIStream) handleChangeAPINode(message *pb.NodeStreamMessage) error { + config, err := configs.LoadAPIConfig() + if err != nil { + this.replyFail(message.RequestId, "read config error: "+err.Error()) + return nil + } + + var messageData = &messageconfigs.ChangeAPINodeMessage{} + err = json.Unmarshal(message.DataJSON, messageData) + if err != nil { + this.replyFail(message.RequestId, "unmarshal message failed: "+err.Error()) + return nil + } + + _, err = url.Parse(messageData.Addr) + if err != nil { + this.replyFail(message.RequestId, "invalid new api node address: '"+messageData.Addr+"'") + return nil + } + + config.RPC.Endpoints = []string{messageData.Addr} + + // 保存到文件 + err = config.WriteFile(Tea.ConfigFile("api.yaml")) + if err != nil { + this.replyFail(message.RequestId, "save config file failed: "+err.Error()) + return nil + } + + this.replyOk(message.RequestId, "") + + go func() { + // 延后生效,防止变更前的API无法读取到状态 + time.Sleep(1 * time.Second) + + rpcClient, err := rpc.SharedRPC() + if err != nil { + remotelogs.Error("API_STREAM", "change rpc endpoint to '"+ + messageData.Addr+"' failed: "+err.Error()) + return + } + + rpcClient.Close() + + err = rpcClient.UpdateConfig(config) + if err != nil { + remotelogs.Error("API_STREAM", "change rpc endpoint to '"+ + messageData.Addr+"' failed: "+err.Error()) + return + } + + remotelogs.Println("API_STREAM", "change rpc endpoint to '"+ + messageData.Addr+"' successfully") + }() + + return nil +} + // 处理未知消息 func (this *APIStream) handleUnknownMessage(message *pb.NodeStreamMessage) error { this.replyFail(message.RequestId, "unknown message code '"+message.Code+"'")