优化RPC连接

This commit is contained in:
GoEdgeLab
2021-11-10 22:22:27 +08:00
parent 55cefdb62c
commit 6116fa4f3e
7 changed files with 58 additions and 10 deletions

View File

@@ -31,7 +31,7 @@ type RPCClient struct {
} }
// NewRPCClient 构造新的RPC客户端 // NewRPCClient 构造新的RPC客户端
func NewRPCClient(apiConfig *configs.APIConfig) (*RPCClient, error) { func NewRPCClient(apiConfig *configs.APIConfig, isPrimary bool) (*RPCClient, error) {
if apiConfig == nil { if apiConfig == nil {
return nil, errors.New("api config should not be nil") return nil, errors.New("api config should not be nil")
} }
@@ -46,7 +46,9 @@ func NewRPCClient(apiConfig *configs.APIConfig) (*RPCClient, error) {
} }
// 设置RPC // 设置RPC
if isPrimary {
dao.SetRPC(client) dao.SetRPC(client)
}
return client, nil return client, nil
} }
@@ -611,3 +613,20 @@ func (this *RPCClient) pickConn() *grpc.ClientConn {
return this.conns[rands.Int(0, len(this.conns)-1)] return this.conns[rands.Int(0, len(this.conns)-1)]
} }
// Close 关闭
func (this *RPCClient) Close() error {
this.locker.Lock()
defer this.locker.Unlock()
var lastErr error
for _, conn := range this.conns {
var err = conn.Close()
if err != nil {
lastErr = err
continue
}
}
return lastErr
}

View File

@@ -20,7 +20,7 @@ func SharedRPC() (*RPCClient, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
client, err := NewRPCClient(config) client, err := NewRPCClient(config, true)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -48,6 +48,7 @@ func SendMessageToCluster(ctx context.Context, clusterId int64, code string, msg
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
wg.Add(len(nodes)) wg.Add(len(nodes))
for _, node := range nodes { for _, node := range nodes {
// TODO 检查是否在线 // TODO 检查是否在线
@@ -104,7 +105,7 @@ func SendMessageToCluster(ctx context.Context, clusterId int64, code string, msg
}, },
NodeId: apiNode.UniqueId, NodeId: apiNode.UniqueId,
Secret: apiNode.Secret, Secret: apiNode.Secret,
}) }, false)
if err != nil { if err != nil {
locker.Lock() locker.Lock()
results = append(results, &MessageResult{ results = append(results, &MessageResult{
@@ -162,6 +163,11 @@ func SendMessageToCluster(ctx context.Context, clusterId int64, code string, msg
}) })
} }
// 关闭RPC
for _, rpcClient := range rpcMap {
_ = rpcClient.Close()
}
return return
} }
@@ -279,7 +285,7 @@ func SendMessageToNodeIds(ctx context.Context, nodeIds []int64, code string, msg
}, },
NodeId: apiNode.UniqueId, NodeId: apiNode.UniqueId,
Secret: apiNode.Secret, Secret: apiNode.Secret,
}) }, false)
if err != nil { if err != nil {
locker.Lock() locker.Lock()
results = append(results, &MessageResult{ results = append(results, &MessageResult{
@@ -337,5 +343,10 @@ func SendMessageToNodeIds(ctx context.Context, nodeIds []int64, code string, msg
}) })
} }
// 关闭RPC
for _, rpcClient := range rpcMap {
_ = rpcClient.Close()
}
return return
} }

View File

@@ -42,7 +42,7 @@ func (this *UpdateHostsAction) RunPost(params struct {
}, },
NodeId: params.NodeId, NodeId: params.NodeId,
Secret: params.NodeSecret, Secret: params.NodeSecret,
}) }, false)
if err != nil { if err != nil {
this.FailField("host", "测试API节点时出错请检查配置错误信息"+err.Error()) this.FailField("host", "测试API节点时出错请检查配置错误信息"+err.Error())
} }
@@ -51,6 +51,10 @@ func (this *UpdateHostsAction) RunPost(params struct {
this.FailField("host", "无法连接此API节点错误信息"+err.Error()) this.FailField("host", "无法连接此API节点错误信息"+err.Error())
} }
defer func() {
_ = client.Close()
}()
// 获取管理员节点信息 // 获取管理员节点信息
apiTokensResp, err := client.APITokenRPC().FindAllEnabledAPITokens(client.APIContext(0), &pb.FindAllEnabledAPITokensRequest{Role: "admin"}) apiTokensResp, err := client.APITokenRPC().FindAllEnabledAPITokens(client.APIContext(0), &pb.FindAllEnabledAPITokensRequest{Role: "admin"})
if err != nil { if err != nil {

View File

@@ -49,10 +49,15 @@ func (this *ValidateApiAction) RunPost(params struct {
}, },
NodeId: params.NodeId, NodeId: params.NodeId,
Secret: params.NodeSecret, Secret: params.NodeSecret,
}) }, false)
if err != nil { if err != nil {
this.FailField("host", "测试API节点时出错请检查配置错误信息"+err.Error()) this.FailField("host", "测试API节点时出错请检查配置错误信息"+err.Error())
} }
defer func() {
_ = client.Close()
}()
_, err = client.APINodeRPC().FindCurrentAPINodeVersion(client.APIContext(0), &pb.FindCurrentAPINodeVersionRequest{}) _, err = client.APINodeRPC().FindCurrentAPINodeVersion(client.APIContext(0), &pb.FindCurrentAPINodeVersionRequest{})
if err != nil { if err != nil {
this.FailField("host", "无法连接此API节点错误信息"+err.Error()) this.FailField("host", "无法连接此API节点错误信息"+err.Error())

View File

@@ -206,7 +206,7 @@ func (this *InstallAction) RunPost(params struct {
// 设置管理员 // 设置管理员
currentStatusText = "正在设置管理员" currentStatusText = "正在设置管理员"
client, err := rpc.NewRPCClient(apiConfig) client, err := rpc.NewRPCClient(apiConfig, false)
if err != nil { if err != nil {
this.FailField("oldHost", "测试API节点时出错请检查配置错误信息"+err.Error()) this.FailField("oldHost", "测试API节点时出错请检查配置错误信息"+err.Error())
} }
@@ -263,11 +263,15 @@ func (this *InstallAction) RunPost(params struct {
NodeId: apiNodeMap.GetString("oldNodeId"), NodeId: apiNodeMap.GetString("oldNodeId"),
Secret: apiNodeMap.GetString("oldNodeSecret"), Secret: apiNodeMap.GetString("oldNodeSecret"),
} }
client, err := rpc.NewRPCClient(apiConfig) client, err := rpc.NewRPCClient(apiConfig, false)
if err != nil { if err != nil {
this.FailField("oldHost", "测试API节点时出错请检查配置错误信息"+err.Error()) this.FailField("oldHost", "测试API节点时出错请检查配置错误信息"+err.Error())
} }
defer func() {
_ = client.Close()
}()
// 设置管理员 // 设置管理员
ctx := client.APIContext(0) ctx := client.APIContext(0)
_, err = client.AdminRPC().CreateOrUpdateAdmin(ctx, &pb.CreateOrUpdateAdminRequest{ _, err = client.AdminRPC().CreateOrUpdateAdmin(ctx, &pb.CreateOrUpdateAdminRequest{

View File

@@ -91,10 +91,15 @@ func (this *ValidateApiAction) RunPost(params struct {
}, },
NodeId: params.OldNodeId, NodeId: params.OldNodeId,
Secret: params.OldNodeSecret, Secret: params.OldNodeSecret,
}) }, false)
if err != nil { if err != nil {
this.FailField("oldHost", "测试API节点时出错请检查配置错误信息"+err.Error()) this.FailField("oldHost", "测试API节点时出错请检查配置错误信息"+err.Error())
} }
defer func() {
_ = client.Close()
}()
_, err = client.APINodeRPC().FindCurrentAPINodeVersion(client.APIContext(0), &pb.FindCurrentAPINodeVersionRequest{}) _, err = client.APINodeRPC().FindCurrentAPINodeVersion(client.APIContext(0), &pb.FindCurrentAPINodeVersionRequest{})
if err != nil { if err != nil {
this.FailField("oldHost", "无法连接此API节点错误信息"+err.Error()) this.FailField("oldHost", "无法连接此API节点错误信息"+err.Error())