diff --git a/internal/rpc/rpc_client.go b/internal/rpc/rpc_client.go index 920b9df5..82d957d8 100644 --- a/internal/rpc/rpc_client.go +++ b/internal/rpc/rpc_client.go @@ -31,7 +31,7 @@ type RPCClient struct { } // NewRPCClient 构造新的RPC客户端 -func NewRPCClient(apiConfig *configs.APIConfig) (*RPCClient, error) { +func NewRPCClient(apiConfig *configs.APIConfig, isPrimary bool) (*RPCClient, error) { if apiConfig == nil { return nil, errors.New("api config should not be nil") } @@ -46,7 +46,9 @@ func NewRPCClient(apiConfig *configs.APIConfig) (*RPCClient, error) { } // 设置RPC - dao.SetRPC(client) + if isPrimary { + dao.SetRPC(client) + } return client, nil } @@ -611,3 +613,20 @@ func (this *RPCClient) pickConn() *grpc.ClientConn { return this.conns[rands.Int(0, len(this.conns)-1)] } + +// Close 关闭 +func (this *RPCClient) Close() error { + this.locker.Lock() + defer this.locker.Unlock() + + var lastErr error + for _, conn := range this.conns { + var err = conn.Close() + if err != nil { + lastErr = err + continue + } + } + + return lastErr +} diff --git a/internal/rpc/rpc_utils.go b/internal/rpc/rpc_utils.go index ad0e474a..35ea970e 100644 --- a/internal/rpc/rpc_utils.go +++ b/internal/rpc/rpc_utils.go @@ -20,7 +20,7 @@ func SharedRPC() (*RPCClient, error) { if err != nil { return nil, err } - client, err := NewRPCClient(config) + client, err := NewRPCClient(config, true) if err != nil { return nil, err } diff --git a/internal/web/actions/default/nodes/nodeutils/utils.go b/internal/web/actions/default/nodes/nodeutils/utils.go index 4c700ba7..dbea17b3 100644 --- a/internal/web/actions/default/nodes/nodeutils/utils.go +++ b/internal/web/actions/default/nodes/nodeutils/utils.go @@ -48,6 +48,7 @@ func SendMessageToCluster(ctx context.Context, clusterId int64, code string, msg wg := &sync.WaitGroup{} wg.Add(len(nodes)) + for _, node := range nodes { // TODO 检查是否在线 @@ -104,7 +105,7 @@ func SendMessageToCluster(ctx context.Context, clusterId int64, code string, msg }, NodeId: apiNode.UniqueId, Secret: apiNode.Secret, - }) + }, false) if err != nil { locker.Lock() results = append(results, &MessageResult{ @@ -162,6 +163,11 @@ func SendMessageToCluster(ctx context.Context, clusterId int64, code string, msg }) } + // 关闭RPC + for _, rpcClient := range rpcMap { + _ = rpcClient.Close() + } + return } @@ -279,7 +285,7 @@ func SendMessageToNodeIds(ctx context.Context, nodeIds []int64, code string, msg }, NodeId: apiNode.UniqueId, Secret: apiNode.Secret, - }) + }, false) if err != nil { locker.Lock() results = append(results, &MessageResult{ @@ -337,5 +343,10 @@ func SendMessageToNodeIds(ctx context.Context, nodeIds []int64, code string, msg }) } + // 关闭RPC + for _, rpcClient := range rpcMap { + _ = rpcClient.Close() + } + return } diff --git a/internal/web/actions/default/recover/updateHosts.go b/internal/web/actions/default/recover/updateHosts.go index 03d3b976..f34a28db 100644 --- a/internal/web/actions/default/recover/updateHosts.go +++ b/internal/web/actions/default/recover/updateHosts.go @@ -42,7 +42,7 @@ func (this *UpdateHostsAction) RunPost(params struct { }, NodeId: params.NodeId, Secret: params.NodeSecret, - }) + }, false) if err != nil { this.FailField("host", "测试API节点时出错,请检查配置,错误信息:"+err.Error()) } @@ -51,6 +51,10 @@ func (this *UpdateHostsAction) RunPost(params struct { this.FailField("host", "无法连接此API节点,错误信息:"+err.Error()) } + defer func() { + _ = client.Close() + }() + // 获取管理员节点信息 apiTokensResp, err := client.APITokenRPC().FindAllEnabledAPITokens(client.APIContext(0), &pb.FindAllEnabledAPITokensRequest{Role: "admin"}) if err != nil { diff --git a/internal/web/actions/default/recover/validateApi.go b/internal/web/actions/default/recover/validateApi.go index bfb926cc..acb37456 100644 --- a/internal/web/actions/default/recover/validateApi.go +++ b/internal/web/actions/default/recover/validateApi.go @@ -49,10 +49,15 @@ func (this *ValidateApiAction) RunPost(params struct { }, NodeId: params.NodeId, Secret: params.NodeSecret, - }) + }, false) if err != nil { this.FailField("host", "测试API节点时出错,请检查配置,错误信息:"+err.Error()) } + + defer func() { + _ = client.Close() + }() + _, err = client.APINodeRPC().FindCurrentAPINodeVersion(client.APIContext(0), &pb.FindCurrentAPINodeVersionRequest{}) if err != nil { this.FailField("host", "无法连接此API节点,错误信息:"+err.Error()) diff --git a/internal/web/actions/default/setup/install.go b/internal/web/actions/default/setup/install.go index e59201ec..95468c77 100644 --- a/internal/web/actions/default/setup/install.go +++ b/internal/web/actions/default/setup/install.go @@ -206,7 +206,7 @@ func (this *InstallAction) RunPost(params struct { // 设置管理员 currentStatusText = "正在设置管理员" - client, err := rpc.NewRPCClient(apiConfig) + client, err := rpc.NewRPCClient(apiConfig, false) if err != nil { this.FailField("oldHost", "测试API节点时出错,请检查配置,错误信息:"+err.Error()) } @@ -263,11 +263,15 @@ func (this *InstallAction) RunPost(params struct { NodeId: apiNodeMap.GetString("oldNodeId"), Secret: apiNodeMap.GetString("oldNodeSecret"), } - client, err := rpc.NewRPCClient(apiConfig) + client, err := rpc.NewRPCClient(apiConfig, false) if err != nil { this.FailField("oldHost", "测试API节点时出错,请检查配置,错误信息:"+err.Error()) } + defer func() { + _ = client.Close() + }() + // 设置管理员 ctx := client.APIContext(0) _, err = client.AdminRPC().CreateOrUpdateAdmin(ctx, &pb.CreateOrUpdateAdminRequest{ diff --git a/internal/web/actions/default/setup/validateApi.go b/internal/web/actions/default/setup/validateApi.go index 4005d229..ddcac942 100644 --- a/internal/web/actions/default/setup/validateApi.go +++ b/internal/web/actions/default/setup/validateApi.go @@ -91,10 +91,15 @@ func (this *ValidateApiAction) RunPost(params struct { }, NodeId: params.OldNodeId, Secret: params.OldNodeSecret, - }) + }, false) if err != nil { this.FailField("oldHost", "测试API节点时出错,请检查配置,错误信息:"+err.Error()) } + + defer func() { + _ = client.Close() + }() + _, err = client.APINodeRPC().FindCurrentAPINodeVersion(client.APIContext(0), &pb.FindCurrentAPINodeVersionRequest{}) if err != nil { this.FailField("oldHost", "无法连接此API节点,错误信息:"+err.Error())