mirror of
https://github.com/TeaOSLab/EdgeNode.git
synced 2025-11-07 18:50:27 +08:00
优化代码
This commit is contained in:
@@ -2,7 +2,6 @@ package nodes
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"github.com/TeaOSLab/EdgeCommon/pkg/configutils"
|
"github.com/TeaOSLab/EdgeCommon/pkg/configutils"
|
||||||
@@ -317,8 +316,7 @@ func (this *Node) loop() error {
|
|||||||
return errors.New("create rpc client failed: " + err.Error())
|
return errors.New("create rpc client failed: " + err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
var nodeCtx = rpcClient.Context()
|
tasksResp, err := rpcClient.NodeTaskRPC.FindNodeTasks(rpcClient.Context(), &pb.FindNodeTasksRequest{
|
||||||
tasksResp, err := rpcClient.NodeTaskRPC.FindNodeTasks(nodeCtx, &pb.FindNodeTasksRequest{
|
|
||||||
Version: this.lastTaskVersion,
|
Version: this.lastTaskVersion,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -328,7 +326,7 @@ func (this *Node) loop() error {
|
|||||||
return errors.New("read node tasks failed: " + err.Error())
|
return errors.New("read node tasks failed: " + err.Error())
|
||||||
}
|
}
|
||||||
for _, task := range tasksResp.NodeTasks {
|
for _, task := range tasksResp.NodeTasks {
|
||||||
err := this.execTask(rpcClient, nodeCtx, task)
|
err := this.execTask(rpcClient, task)
|
||||||
if !this.finishTask(task.Id, task.Version, err) {
|
if !this.finishTask(task.Id, task.Version, err) {
|
||||||
// 防止失败的任务无法重试
|
// 防止失败的任务无法重试
|
||||||
break
|
break
|
||||||
@@ -339,7 +337,7 @@ func (this *Node) loop() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 执行任务
|
// 执行任务
|
||||||
func (this *Node) execTask(rpcClient *rpc.RPCClient, nodeCtx context.Context, task *pb.NodeTask) error {
|
func (this *Node) execTask(rpcClient *rpc.RPCClient, task *pb.NodeTask) error {
|
||||||
switch task.Type {
|
switch task.Type {
|
||||||
case "ipItemChanged":
|
case "ipItemChanged":
|
||||||
// 防止阻塞
|
// 防止阻塞
|
||||||
@@ -369,7 +367,7 @@ func (this *Node) execTask(rpcClient *rpc.RPCClient, nodeCtx context.Context, ta
|
|||||||
return errors.New("reload common scripts failed: " + err.Error())
|
return errors.New("reload common scripts failed: " + err.Error())
|
||||||
}
|
}
|
||||||
case "nodeLevelChanged":
|
case "nodeLevelChanged":
|
||||||
levelInfoResp, err := rpcClient.NodeRPC.FindNodeLevelInfo(nodeCtx, &pb.FindNodeLevelInfoRequest{})
|
levelInfoResp, err := rpcClient.NodeRPC.FindNodeLevelInfo(rpcClient.Context(), &pb.FindNodeLevelInfoRequest{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -390,7 +388,7 @@ func (this *Node) execTask(rpcClient *rpc.RPCClient, nodeCtx context.Context, ta
|
|||||||
sharedNodeConfig.ParentNodes = parentNodes
|
sharedNodeConfig.ParentNodes = parentNodes
|
||||||
}
|
}
|
||||||
case "ddosProtectionChanged":
|
case "ddosProtectionChanged":
|
||||||
resp, err := rpcClient.NodeRPC.FindNodeDDoSProtection(nodeCtx, &pb.FindNodeDDoSProtectionRequest{})
|
resp, err := rpcClient.NodeRPC.FindNodeDDoSProtection(rpcClient.Context(), &pb.FindNodeDDoSProtectionRequest{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -418,7 +416,7 @@ func (this *Node) execTask(rpcClient *rpc.RPCClient, nodeCtx context.Context, ta
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
case "globalServerConfigChanged":
|
case "globalServerConfigChanged":
|
||||||
resp, err := rpcClient.NodeRPC.FindNodeGlobalServerConfig(nodeCtx, &pb.FindNodeGlobalServerConfigRequest{})
|
resp, err := rpcClient.NodeRPC.FindNodeGlobalServerConfig(rpcClient.Context(), &pb.FindNodeGlobalServerConfigRequest{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -441,7 +439,7 @@ func (this *Node) execTask(rpcClient *rpc.RPCClient, nodeCtx context.Context, ta
|
|||||||
}
|
}
|
||||||
case "userServersStateChanged":
|
case "userServersStateChanged":
|
||||||
if task.UserId > 0 {
|
if task.UserId > 0 {
|
||||||
resp, err := rpcClient.UserRPC.CheckUserServersState(nodeCtx, &pb.CheckUserServersStateRequest{UserId: task.UserId})
|
resp, err := rpcClient.UserRPC.CheckUserServersState(rpcClient.Context(), &pb.CheckUserServersStateRequest{UserId: task.UserId})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -474,8 +472,6 @@ func (this *Node) finishTask(taskId int64, taskVersion int64, taskErr error) (su
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
var nodeCtx = rpcClient.Context()
|
|
||||||
|
|
||||||
var isOk = taskErr == nil
|
var isOk = taskErr == nil
|
||||||
if isOk && taskVersion > this.lastTaskVersion {
|
if isOk && taskVersion > this.lastTaskVersion {
|
||||||
this.lastTaskVersion = taskVersion
|
this.lastTaskVersion = taskVersion
|
||||||
@@ -486,7 +482,7 @@ func (this *Node) finishTask(taskId int64, taskVersion int64, taskErr error) (su
|
|||||||
errMsg = taskErr.Error()
|
errMsg = taskErr.Error()
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = rpcClient.NodeTaskRPC.ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{
|
_, err = rpcClient.NodeTaskRPC.ReportNodeTaskDone(rpcClient.Context(), &pb.ReportNodeTaskDoneRequest{
|
||||||
NodeTaskId: taskId,
|
NodeTaskId: taskId,
|
||||||
IsOk: isOk,
|
IsOk: isOk,
|
||||||
Error: errMsg,
|
Error: errMsg,
|
||||||
@@ -533,10 +529,8 @@ func (this *Node) syncConfig(taskVersion int64) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 获取同步任务
|
// 获取同步任务
|
||||||
var nodeCtx = rpcClient.Context()
|
|
||||||
|
|
||||||
// TODO 这里考虑只同步版本号有变更的
|
// TODO 这里考虑只同步版本号有变更的
|
||||||
configResp, err := rpcClient.NodeRPC.FindCurrentNodeConfig(nodeCtx, &pb.FindCurrentNodeConfigRequest{
|
configResp, err := rpcClient.NodeRPC.FindCurrentNodeConfig(rpcClient.Context(), &pb.FindCurrentNodeConfigRequest{
|
||||||
Version: -1, // 更新所有版本
|
Version: -1, // 更新所有版本
|
||||||
Compress: true,
|
Compress: true,
|
||||||
NodeTaskVersion: taskVersion,
|
NodeTaskVersion: taskVersion,
|
||||||
|
|||||||
@@ -262,7 +262,12 @@ func (this *RPCClient) pickConn() *grpc.ClientConn {
|
|||||||
defer this.locker.Unlock()
|
defer this.locker.Unlock()
|
||||||
|
|
||||||
// 检查连接状态
|
// 检查连接状态
|
||||||
if len(this.conns) > 0 {
|
var countConns = len(this.conns)
|
||||||
|
if countConns > 0 {
|
||||||
|
if countConns == 1 {
|
||||||
|
return this.conns[0]
|
||||||
|
}
|
||||||
|
|
||||||
for _, stateArray := range [][2]connectivity.State{
|
for _, stateArray := range [][2]connectivity.State{
|
||||||
{connectivity.Ready, connectivity.Idle}, // 优先Ready和Idle
|
{connectivity.Ready, connectivity.Idle}, // 优先Ready和Idle
|
||||||
{connectivity.Connecting, connectivity.Connecting},
|
{connectivity.Connecting, connectivity.Connecting},
|
||||||
|
|||||||
Reference in New Issue
Block a user