优化代码

This commit is contained in:
刘祥超
2022-11-23 20:13:34 +08:00
parent 82329aa8b0
commit 61b5316a1f
2 changed files with 15 additions and 16 deletions

View File

@@ -2,7 +2,6 @@ package nodes
import ( import (
"bytes" "bytes"
"context"
"encoding/json" "encoding/json"
"errors" "errors"
"github.com/TeaOSLab/EdgeCommon/pkg/configutils" "github.com/TeaOSLab/EdgeCommon/pkg/configutils"
@@ -317,8 +316,7 @@ func (this *Node) loop() error {
return errors.New("create rpc client failed: " + err.Error()) return errors.New("create rpc client failed: " + err.Error())
} }
var nodeCtx = rpcClient.Context() tasksResp, err := rpcClient.NodeTaskRPC.FindNodeTasks(rpcClient.Context(), &pb.FindNodeTasksRequest{
tasksResp, err := rpcClient.NodeTaskRPC.FindNodeTasks(nodeCtx, &pb.FindNodeTasksRequest{
Version: this.lastTaskVersion, Version: this.lastTaskVersion,
}) })
if err != nil { if err != nil {
@@ -328,7 +326,7 @@ func (this *Node) loop() error {
return errors.New("read node tasks failed: " + err.Error()) return errors.New("read node tasks failed: " + err.Error())
} }
for _, task := range tasksResp.NodeTasks { for _, task := range tasksResp.NodeTasks {
err := this.execTask(rpcClient, nodeCtx, task) err := this.execTask(rpcClient, task)
if !this.finishTask(task.Id, task.Version, err) { if !this.finishTask(task.Id, task.Version, err) {
// 防止失败的任务无法重试 // 防止失败的任务无法重试
break break
@@ -339,7 +337,7 @@ func (this *Node) loop() error {
} }
// 执行任务 // 执行任务
func (this *Node) execTask(rpcClient *rpc.RPCClient, nodeCtx context.Context, task *pb.NodeTask) error { func (this *Node) execTask(rpcClient *rpc.RPCClient, task *pb.NodeTask) error {
switch task.Type { switch task.Type {
case "ipItemChanged": case "ipItemChanged":
// 防止阻塞 // 防止阻塞
@@ -369,7 +367,7 @@ func (this *Node) execTask(rpcClient *rpc.RPCClient, nodeCtx context.Context, ta
return errors.New("reload common scripts failed: " + err.Error()) return errors.New("reload common scripts failed: " + err.Error())
} }
case "nodeLevelChanged": case "nodeLevelChanged":
levelInfoResp, err := rpcClient.NodeRPC.FindNodeLevelInfo(nodeCtx, &pb.FindNodeLevelInfoRequest{}) levelInfoResp, err := rpcClient.NodeRPC.FindNodeLevelInfo(rpcClient.Context(), &pb.FindNodeLevelInfoRequest{})
if err != nil { if err != nil {
return err return err
} }
@@ -390,7 +388,7 @@ func (this *Node) execTask(rpcClient *rpc.RPCClient, nodeCtx context.Context, ta
sharedNodeConfig.ParentNodes = parentNodes sharedNodeConfig.ParentNodes = parentNodes
} }
case "ddosProtectionChanged": case "ddosProtectionChanged":
resp, err := rpcClient.NodeRPC.FindNodeDDoSProtection(nodeCtx, &pb.FindNodeDDoSProtectionRequest{}) resp, err := rpcClient.NodeRPC.FindNodeDDoSProtection(rpcClient.Context(), &pb.FindNodeDDoSProtectionRequest{})
if err != nil { if err != nil {
return err return err
} }
@@ -418,7 +416,7 @@ func (this *Node) execTask(rpcClient *rpc.RPCClient, nodeCtx context.Context, ta
return nil return nil
} }
case "globalServerConfigChanged": case "globalServerConfigChanged":
resp, err := rpcClient.NodeRPC.FindNodeGlobalServerConfig(nodeCtx, &pb.FindNodeGlobalServerConfigRequest{}) resp, err := rpcClient.NodeRPC.FindNodeGlobalServerConfig(rpcClient.Context(), &pb.FindNodeGlobalServerConfigRequest{})
if err != nil { if err != nil {
return err return err
} }
@@ -441,7 +439,7 @@ func (this *Node) execTask(rpcClient *rpc.RPCClient, nodeCtx context.Context, ta
} }
case "userServersStateChanged": case "userServersStateChanged":
if task.UserId > 0 { if task.UserId > 0 {
resp, err := rpcClient.UserRPC.CheckUserServersState(nodeCtx, &pb.CheckUserServersStateRequest{UserId: task.UserId}) resp, err := rpcClient.UserRPC.CheckUserServersState(rpcClient.Context(), &pb.CheckUserServersStateRequest{UserId: task.UserId})
if err != nil { if err != nil {
return err return err
} }
@@ -474,8 +472,6 @@ func (this *Node) finishTask(taskId int64, taskVersion int64, taskErr error) (su
return false return false
} }
var nodeCtx = rpcClient.Context()
var isOk = taskErr == nil var isOk = taskErr == nil
if isOk && taskVersion > this.lastTaskVersion { if isOk && taskVersion > this.lastTaskVersion {
this.lastTaskVersion = taskVersion this.lastTaskVersion = taskVersion
@@ -486,7 +482,7 @@ func (this *Node) finishTask(taskId int64, taskVersion int64, taskErr error) (su
errMsg = taskErr.Error() errMsg = taskErr.Error()
} }
_, err = rpcClient.NodeTaskRPC.ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{ _, err = rpcClient.NodeTaskRPC.ReportNodeTaskDone(rpcClient.Context(), &pb.ReportNodeTaskDoneRequest{
NodeTaskId: taskId, NodeTaskId: taskId,
IsOk: isOk, IsOk: isOk,
Error: errMsg, Error: errMsg,
@@ -533,10 +529,8 @@ func (this *Node) syncConfig(taskVersion int64) error {
} }
// 获取同步任务 // 获取同步任务
var nodeCtx = rpcClient.Context()
// TODO 这里考虑只同步版本号有变更的 // TODO 这里考虑只同步版本号有变更的
configResp, err := rpcClient.NodeRPC.FindCurrentNodeConfig(nodeCtx, &pb.FindCurrentNodeConfigRequest{ configResp, err := rpcClient.NodeRPC.FindCurrentNodeConfig(rpcClient.Context(), &pb.FindCurrentNodeConfigRequest{
Version: -1, // 更新所有版本 Version: -1, // 更新所有版本
Compress: true, Compress: true,
NodeTaskVersion: taskVersion, NodeTaskVersion: taskVersion,

View File

@@ -262,7 +262,12 @@ func (this *RPCClient) pickConn() *grpc.ClientConn {
defer this.locker.Unlock() defer this.locker.Unlock()
// 检查连接状态 // 检查连接状态
if len(this.conns) > 0 { var countConns = len(this.conns)
if countConns > 0 {
if countConns == 1 {
return this.conns[0]
}
for _, stateArray := range [][2]connectivity.State{ for _, stateArray := range [][2]connectivity.State{
{connectivity.Ready, connectivity.Idle}, // 优先Ready和Idle {connectivity.Ready, connectivity.Idle}, // 优先Ready和Idle
{connectivity.Connecting, connectivity.Connecting}, {connectivity.Connecting, connectivity.Connecting},