优化RPC获取服务实例方式

This commit is contained in:
刘祥超
2022-08-24 20:04:46 +08:00
parent 8afd00f00d
commit ddaec82415
22 changed files with 123 additions and 147 deletions

View File

@@ -144,7 +144,7 @@ func (this *IPListManager) fetch() (hasNext bool, err error) {
if err != nil { if err != nil {
return false, err return false, err
} }
itemsResp, err := rpcClient.IPItemRPC().ListIPItemsAfterVersion(rpcClient.Context(), &pb.ListIPItemsAfterVersionRequest{ itemsResp, err := rpcClient.IPItemRPC.ListIPItemsAfterVersion(rpcClient.Context(), &pb.ListIPItemsAfterVersionRequest{
Version: this.version, Version: this.version,
Size: this.pageSize, Size: this.pageSize,
}) })

View File

@@ -424,7 +424,7 @@ func (this *Task) Upload(pauseDuration time.Duration) error {
return nil, err return nil, err
} }
_, err = rpcClient.MetricStatRPC().UploadMetricStats(rpcClient.Context(), &pb.UploadMetricStatsRequest{ _, err = rpcClient.MetricStatRPC.UploadMetricStats(rpcClient.Context(), &pb.UploadMetricStatsRequest{
MetricStats: pbStats, MetricStats: pbStats,
Time: currentTime, Time: currentTime,
ServerId: serverId, ServerId: serverId,

View File

@@ -69,7 +69,7 @@ func (this *ValueQueue) Loop() error {
} }
for value := range this.valuesChan { for value := range this.valuesChan {
_, err = rpcClient.NodeValueRPC().CreateNodeValue(rpcClient.Context(), &pb.CreateNodeValueRequest{ _, err = rpcClient.NodeValueRPC.CreateNodeValue(rpcClient.Context(), &pb.CreateNodeValueRequest{
Item: value.Item, Item: value.Item,
ValueJSON: value.ValueJSON, ValueJSON: value.ValueJSON,
CreatedAt: value.CreatedAt, CreatedAt: value.CreatedAt,

View File

@@ -16,7 +16,7 @@ func TestValueQueue_RPC(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
_, err = rpcClient.NodeValueRPC().CreateNodeValue(rpcClient.Context(), &pb.CreateNodeValueRequest{}) _, err = rpcClient.NodeValueRPC.CreateNodeValue(rpcClient.Context(), &pb.CreateNodeValueRequest{})
if err != nil { if err != nil {
statusErr, ok := status.FromError(err) statusErr, ok := status.FromError(err)
if ok { if ok {

View File

@@ -73,7 +73,7 @@ func (this *APIStream) loop() error {
cancelFunc() cancelFunc()
}() }()
nodeStream, err := rpcClient.NodeRPC().NodeStream(ctx) nodeStream, err := rpcClient.NodeRPC.NodeStream(ctx)
if err != nil { if err != nil {
if this.isQuiting { if this.isQuiting {
return nil return nil

View File

@@ -96,7 +96,7 @@ Loop:
this.rpcClient = client this.rpcClient = client
} }
_, err := this.rpcClient.HTTPAccessLogRPC().CreateHTTPAccessLogs(this.rpcClient.Context(), &pb.CreateHTTPAccessLogsRequest{HttpAccessLogs: accessLogs}) _, err := this.rpcClient.HTTPAccessLogRPC.CreateHTTPAccessLogs(this.rpcClient.Context(), &pb.CreateHTTPAccessLogsRequest{HttpAccessLogs: accessLogs})
if err != nil { if err != nil {
// 是否包含了invalid UTF-8 // 是否包含了invalid UTF-8
if strings.Contains(err.Error(), "string field contains invalid UTF-8") { if strings.Contains(err.Error(), "string field contains invalid UTF-8") {
@@ -105,7 +105,7 @@ Loop:
} }
// 重新提交 // 重新提交
_, err = this.rpcClient.HTTPAccessLogRPC().CreateHTTPAccessLogs(this.rpcClient.Context(), &pb.CreateHTTPAccessLogsRequest{HttpAccessLogs: accessLogs}) _, err = this.rpcClient.HTTPAccessLogRPC.CreateHTTPAccessLogs(this.rpcClient.Context(), &pb.CreateHTTPAccessLogsRequest{HttpAccessLogs: accessLogs})
return err return err
} }

View File

@@ -55,7 +55,7 @@ func TestHTTPAccessLogQueue_Push(t *testing.T) {
// logs.PrintAsJSON(accessLog) // logs.PrintAsJSON(accessLog)
//t.Log(strings.ToValidUTF8(string(utf8Bytes), "")) //t.Log(strings.ToValidUTF8(string(utf8Bytes), ""))
_, err = client.HTTPAccessLogRPC().CreateHTTPAccessLogs(client.Context(), &pb.CreateHTTPAccessLogsRequest{HttpAccessLogs: []*pb.HTTPAccessLog{ _, err = client.HTTPAccessLogRPC.CreateHTTPAccessLogs(client.Context(), &pb.CreateHTTPAccessLogsRequest{HttpAccessLogs: []*pb.HTTPAccessLog{
accessLog, accessLog,
}}) }})
if err != nil { if err != nil {
@@ -99,7 +99,7 @@ func TestHTTPAccessLogQueue_Push2(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
_, err = client.HTTPAccessLogRPC().CreateHTTPAccessLogs(client.Context(), &pb.CreateHTTPAccessLogsRequest{HttpAccessLogs: []*pb.HTTPAccessLog{ _, err = client.HTTPAccessLogRPC.CreateHTTPAccessLogs(client.Context(), &pb.CreateHTTPAccessLogsRequest{HttpAccessLogs: []*pb.HTTPAccessLog{
accessLog, accessLog,
}}) }})
if err != nil { if err != nil {

View File

@@ -81,7 +81,7 @@ func (this *HTTPCacheTaskManager) Start() {
if rpcClient != nil { if rpcClient != nil {
for taskReq := range this.taskQueue { for taskReq := range this.taskQueue {
_, err := rpcClient.ServerRPC().PurgeServerCache(rpcClient.Context(), taskReq) _, err := rpcClient.ServerRPC.PurgeServerCache(rpcClient.Context(), taskReq)
if err != nil { if err != nil {
remotelogs.Error("HTTP_CACHE_TASK_MANAGER", "create purge task failed: "+err.Error()) remotelogs.Error("HTTP_CACHE_TASK_MANAGER", "create purge task failed: "+err.Error())
} }
@@ -104,7 +104,7 @@ func (this *HTTPCacheTaskManager) Loop() error {
return err return err
} }
resp, err := rpcClient.HTTPCacheTaskKeyRPC().FindDoingHTTPCacheTaskKeys(rpcClient.Context(), &pb.FindDoingHTTPCacheTaskKeysRequest{}) resp, err := rpcClient.HTTPCacheTaskKeyRPC.FindDoingHTTPCacheTaskKeys(rpcClient.Context(), &pb.FindDoingHTTPCacheTaskKeysRequest{})
if err != nil { if err != nil {
// 忽略连接错误 // 忽略连接错误
if rpc.IsConnError(err) { if rpc.IsConnError(err) {
@@ -135,7 +135,7 @@ func (this *HTTPCacheTaskManager) Loop() error {
pbResults = append(pbResults, pbResult) pbResults = append(pbResults, pbResult)
} }
_, err = rpcClient.HTTPCacheTaskKeyRPC().UpdateHTTPCacheTaskKeysStatus(rpcClient.Context(), &pb.UpdateHTTPCacheTaskKeysStatusRequest{KeyResults: pbResults}) _, err = rpcClient.HTTPCacheTaskKeyRPC.UpdateHTTPCacheTaskKeysStatus(rpcClient.Context(), &pb.UpdateHTTPCacheTaskKeysStatusRequest{KeyResults: pbResults})
if err != nil { if err != nil {
return err return err
} }

View File

@@ -21,7 +21,7 @@ func (this *HTTPRequest) doACME() (shouldStop bool) {
return false return false
} }
keyResp, err := rpcClient.ACMEAuthenticationRPC().FindACMEAuthenticationKeyWithToken(rpcClient.Context(), &pb.FindACMEAuthenticationKeyWithTokenRequest{Token: token}) keyResp, err := rpcClient.ACMEAuthenticationRPC.FindACMEAuthenticationKeyWithToken(rpcClient.Context(), &pb.FindACMEAuthenticationKeyWithTokenRequest{Token: token})
if err != nil { if err != nil {
remotelogs.Error("RPC", "[ACME]read key for token failed: "+err.Error()) remotelogs.Error("RPC", "[ACME]read key for token failed: "+err.Error())
return false return false

View File

@@ -46,7 +46,7 @@ func (this *IPLibraryUpdater) FindLatestFile() (code string, fileId int64, err e
if err != nil { if err != nil {
return "", 0, err return "", 0, err
} }
resp, err := rpcClient.IPLibraryArtifactRPC().FindPublicIPLibraryArtifact(rpcClient.Context(), &pb.FindPublicIPLibraryArtifactRequest{}) resp, err := rpcClient.IPLibraryArtifactRPC.FindPublicIPLibraryArtifact(rpcClient.Context(), &pb.FindPublicIPLibraryArtifactRequest{})
if err != nil { if err != nil {
return "", 0, err return "", 0, err
} }
@@ -68,12 +68,12 @@ func (this *IPLibraryUpdater) DownloadFile(fileId int64, writer io.Writer) error
return err return err
} }
chunkIdsResp, err := rpcClient.FileChunkRPC().FindAllFileChunkIds(rpcClient.Context(), &pb.FindAllFileChunkIdsRequest{FileId: fileId}) chunkIdsResp, err := rpcClient.FileChunkRPC.FindAllFileChunkIds(rpcClient.Context(), &pb.FindAllFileChunkIdsRequest{FileId: fileId})
if err != nil { if err != nil {
return err return err
} }
for _, chunkId := range chunkIdsResp.FileChunkIds { for _, chunkId := range chunkIdsResp.FileChunkIds {
chunkResp, err := rpcClient.FileChunkRPC().DownloadFileChunk(rpcClient.Context(), &pb.DownloadFileChunkRequest{FileChunkId: chunkId}) chunkResp, err := rpcClient.FileChunkRPC.DownloadFileChunk(rpcClient.Context(), &pb.DownloadFileChunkRequest{FileChunkId: chunkId})
if err != nil { if err != nil {
return err return err
} }

View File

@@ -79,7 +79,7 @@ func (this *Node) Test() error {
if err != nil { if err != nil {
return errors.New("test rpc failed: " + err.Error()) return errors.New("test rpc failed: " + err.Error())
} }
_, err = rpcClient.APINodeRPC().FindCurrentAPINodeVersion(rpcClient.Context(), &pb.FindCurrentAPINodeVersionRequest{}) _, err = rpcClient.APINodeRPC.FindCurrentAPINodeVersion(rpcClient.Context(), &pb.FindCurrentAPINodeVersionRequest{})
if err != nil { if err != nil {
return errors.New("test rpc failed: " + err.Error()) return errors.New("test rpc failed: " + err.Error())
} }
@@ -307,7 +307,7 @@ func (this *Node) loop() error {
} }
var nodeCtx = rpcClient.Context() var nodeCtx = rpcClient.Context()
tasksResp, err := rpcClient.NodeTaskRPC().FindNodeTasks(nodeCtx, &pb.FindNodeTasksRequest{}) tasksResp, err := rpcClient.NodeTaskRPC.FindNodeTasks(nodeCtx, &pb.FindNodeTasksRequest{})
if err != nil { if err != nil {
if rpc.IsConnError(err) && !Tea.IsTesting() { if rpc.IsConnError(err) && !Tea.IsTesting() {
return nil return nil
@@ -325,7 +325,7 @@ func (this *Node) loop() error {
} }
// 修改为已同步 // 修改为已同步
_, err = rpcClient.NodeTaskRPC().ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{ _, err = rpcClient.NodeTaskRPC.ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{
NodeTaskId: task.Id, NodeTaskId: task.Id,
IsOk: true, IsOk: true,
Error: "", Error: "",
@@ -344,13 +344,13 @@ func (this *Node) loop() error {
err = this.syncConfig(task.Version) err = this.syncConfig(task.Version)
} }
if err != nil { if err != nil {
_, err = rpcClient.NodeTaskRPC().ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{ _, err = rpcClient.NodeTaskRPC.ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{
NodeTaskId: task.Id, NodeTaskId: task.Id,
IsOk: false, IsOk: false,
Error: err.Error(), Error: err.Error(),
}) })
} else { } else {
_, err = rpcClient.NodeTaskRPC().ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{ _, err = rpcClient.NodeTaskRPC.ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{
NodeTaskId: task.Id, NodeTaskId: task.Id,
IsOk: true, IsOk: true,
Error: "", Error: "",
@@ -372,7 +372,7 @@ func (this *Node) loop() error {
} }
// 修改为已同步 // 修改为已同步
_, err = rpcClient.NodeTaskRPC().ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{ _, err = rpcClient.NodeTaskRPC.ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{
NodeTaskId: task.Id, NodeTaskId: task.Id,
IsOk: true, IsOk: true,
Error: "", Error: "",
@@ -381,7 +381,7 @@ func (this *Node) loop() error {
return err return err
} }
case "nodeLevelChanged": case "nodeLevelChanged":
levelInfoResp, err := rpcClient.NodeRPC().FindNodeLevelInfo(nodeCtx, &pb.FindNodeLevelInfoRequest{}) levelInfoResp, err := rpcClient.NodeRPC.FindNodeLevelInfo(nodeCtx, &pb.FindNodeLevelInfoRequest{})
if err != nil { if err != nil {
return err return err
} }
@@ -398,7 +398,7 @@ func (this *Node) loop() error {
sharedNodeConfig.ParentNodes = parentNodes sharedNodeConfig.ParentNodes = parentNodes
// 修改为已同步 // 修改为已同步
_, err = rpcClient.NodeTaskRPC().ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{ _, err = rpcClient.NodeTaskRPC.ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{
NodeTaskId: task.Id, NodeTaskId: task.Id,
IsOk: true, IsOk: true,
Error: "", Error: "",
@@ -407,7 +407,7 @@ func (this *Node) loop() error {
return err return err
} }
case "ddosProtectionChanged": case "ddosProtectionChanged":
resp, err := rpcClient.NodeRPC().FindNodeDDoSProtection(nodeCtx, &pb.FindNodeDDoSProtectionRequest{}) resp, err := rpcClient.NodeRPC.FindNodeDDoSProtection(nodeCtx, &pb.FindNodeDDoSProtectionRequest{})
if err != nil { if err != nil {
return err return err
} }
@@ -434,7 +434,7 @@ func (this *Node) loop() error {
} }
// 修改为已同步 // 修改为已同步
_, err = rpcClient.NodeTaskRPC().ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{ _, err = rpcClient.NodeTaskRPC.ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{
NodeTaskId: task.Id, NodeTaskId: task.Id,
IsOk: true, IsOk: true,
Error: "", Error: "",
@@ -479,7 +479,7 @@ func (this *Node) syncConfig(taskVersion int64) error {
nodeCtx := rpcClient.Context() nodeCtx := rpcClient.Context()
// TODO 这里考虑只同步版本号有变更的 // TODO 这里考虑只同步版本号有变更的
configResp, err := rpcClient.NodeRPC().FindCurrentNodeConfig(nodeCtx, &pb.FindCurrentNodeConfigRequest{ configResp, err := rpcClient.NodeRPC.FindCurrentNodeConfig(nodeCtx, &pb.FindCurrentNodeConfigRequest{
Version: -1, // 更新所有版本 Version: -1, // 更新所有版本
Compress: true, Compress: true,
NodeTaskVersion: taskVersion, NodeTaskVersion: taskVersion,
@@ -570,7 +570,7 @@ func (this *Node) syncServerConfig(serverId int64) error {
if err != nil { if err != nil {
return err return err
} }
resp, err := rpcClient.ServerRPC().ComposeServerConfig(rpcClient.Context(), &pb.ComposeServerConfigRequest{ServerId: serverId}) resp, err := rpcClient.ServerRPC.ComposeServerConfig(rpcClient.Context(), &pb.ComposeServerConfigRequest{ServerId: serverId})
if err != nil { if err != nil {
return err return err
} }
@@ -652,7 +652,7 @@ func (this *Node) checkClusterConfig() error {
} }
logs.Println("[NODE]registering node to cluster ...") logs.Println("[NODE]registering node to cluster ...")
resp, err := rpcClient.NodeRPC().RegisterClusterNode(rpcClient.ClusterContext(config.ClusterId, config.Secret), &pb.RegisterClusterNodeRequest{Name: HOSTNAME}) resp, err := rpcClient.NodeRPC.RegisterClusterNode(rpcClient.ClusterContext(config.ClusterId, config.Secret), &pb.RegisterClusterNodeRequest{Name: HOSTNAME})
if err != nil { if err != nil {
return err return err
} }

View File

@@ -125,7 +125,7 @@ func (this *NodeStatusExecutor) update() {
remotelogs.Error("NODE_STATUS", "failed to open rpc: "+err.Error()) remotelogs.Error("NODE_STATUS", "failed to open rpc: "+err.Error())
return return
} }
_, err = rpcClient.NodeRPC().UpdateNodeStatus(rpcClient.Context(), &pb.UpdateNodeStatusRequest{ _, err = rpcClient.NodeRPC.UpdateNodeStatus(rpcClient.Context(), &pb.UpdateNodeStatusRequest{
StatusJSON: jsonData, StatusJSON: jsonData,
}) })
if err != nil { if err != nil {

View File

@@ -60,7 +60,7 @@ func (this *OCSPUpdateTask) Loop() error {
return err return err
} }
resp, err := rpcClient.SSLCertRPC().ListUpdatedSSLCertOCSP(rpcClient.Context(), &pb.ListUpdatedSSLCertOCSPRequest{ resp, err := rpcClient.SSLCertRPC.ListUpdatedSSLCertOCSP(rpcClient.Context(), &pb.ListUpdatedSSLCertOCSPRequest{
Version: this.version, Version: this.version,
Size: 100, Size: 100,
}) })

View File

@@ -81,7 +81,7 @@ func (this *SyncAPINodesTask) Loop() error {
if err != nil { if err != nil {
return err return err
} }
resp, err := rpcClient.APINodeRPC().FindAllEnabledAPINodes(rpcClient.Context(), &pb.FindAllEnabledAPINodesRequest{}) resp, err := rpcClient.APINodeRPC.FindAllEnabledAPINodes(rpcClient.Context(), &pb.FindAllEnabledAPINodesRequest{})
if err != nil { if err != nil {
return err return err
} }

View File

@@ -126,7 +126,7 @@ func (this *UpgradeManager) install() error {
var sum = "" var sum = ""
var filename = "" var filename = ""
for { for {
resp, err := client.NodeRPC().DownloadNodeInstallationFile(client.Context(), &pb.DownloadNodeInstallationFileRequest{ resp, err := client.NodeRPC.DownloadNodeInstallationFile(client.Context(), &pb.DownloadNodeInstallationFileRequest{
Os: runtime.GOOS, Os: runtime.GOOS,
Arch: runtime.GOARCH, Arch: runtime.GOARCH,
ChunkOffset: offset, ChunkOffset: offset,

View File

@@ -173,7 +173,6 @@ func ServerSuccess(serverId int64, tag string, description string, logType nodec
} }
} }
// ServerLog 打印服务相关日志信息 // ServerLog 打印服务相关日志信息
func ServerLog(serverId int64, tag string, description string, logType nodeconfigs.NodeLogType, params maps.Map) { func ServerLog(serverId int64, tag string, description string, logType nodeconfigs.NodeLogType, params maps.Map) {
logs.Println("[" + tag + "]" + description) logs.Println("[" + tag + "]" + description)
@@ -253,6 +252,6 @@ Loop:
return nil return nil
} }
_, err = rpcClient.NodeLogRPC().CreateNodeLogs(rpcClient.Context(), &pb.CreateNodeLogsRequest{NodeLogs: logList}) _, err = rpcClient.NodeLogRPC.CreateNodeLogs(rpcClient.Context(), &pb.CreateNodeLogsRequest{NodeLogs: logList})
return err return err
} }

View File

@@ -28,6 +28,27 @@ type RPCClient struct {
conns []*grpc.ClientConn conns []*grpc.ClientConn
locker sync.RWMutex locker sync.RWMutex
NodeRPC pb.NodeServiceClient
NodeLogRPC pb.NodeLogServiceClient
NodeTaskRPC pb.NodeTaskServiceClient
NodeValueRPC pb.NodeValueServiceClient
HTTPAccessLogRPC pb.HTTPAccessLogServiceClient
HTTPCacheTaskKeyRPC pb.HTTPCacheTaskKeyServiceClient
APINodeRPC pb.APINodeServiceClient
IPLibraryArtifactRPC pb.IPLibraryArtifactServiceClient
IPListRPC pb.IPListServiceClient
IPItemRPC pb.IPItemServiceClient
FileRPC pb.FileServiceClient
FileChunkRPC pb.FileChunkServiceClient
ACMEAuthenticationRPC pb.ACMEAuthenticationServiceClient
ServerRPC pb.ServerServiceClient
ServerDailyStatRPC pb.ServerDailyStatServiceClient
ServerBandwidthStatRPC pb.ServerBandwidthStatServiceClient
MetricStatRPC pb.MetricStatServiceClient
FirewallRPC pb.FirewallServiceClient
SSLCertRPC pb.SSLCertServiceClient
ScriptRPC pb.ScriptServiceClient
} }
func NewRPCClient(apiConfig *configs.APIConfig) (*RPCClient, error) { func NewRPCClient(apiConfig *configs.APIConfig) (*RPCClient, error) {
@@ -35,10 +56,32 @@ func NewRPCClient(apiConfig *configs.APIConfig) (*RPCClient, error) {
return nil, errors.New("api config should not be nil") return nil, errors.New("api config should not be nil")
} }
client := &RPCClient{ var client = &RPCClient{
apiConfig: apiConfig, apiConfig: apiConfig,
} }
// 初始化RPC实例
client.NodeRPC = pb.NewNodeServiceClient(client)
client.NodeLogRPC = pb.NewNodeLogServiceClient(client)
client.NodeTaskRPC = pb.NewNodeTaskServiceClient(client)
client.NodeValueRPC = pb.NewNodeValueServiceClient(client)
client.HTTPAccessLogRPC = pb.NewHTTPAccessLogServiceClient(client)
client.HTTPCacheTaskKeyRPC = pb.NewHTTPCacheTaskKeyServiceClient(client)
client.APINodeRPC = pb.NewAPINodeServiceClient(client)
client.IPLibraryArtifactRPC = pb.NewIPLibraryArtifactServiceClient(client)
client.IPListRPC = pb.NewIPListServiceClient(client)
client.IPItemRPC = pb.NewIPItemServiceClient(client)
client.FileRPC = pb.NewFileServiceClient(client)
client.FileChunkRPC = pb.NewFileChunkServiceClient(client)
client.ACMEAuthenticationRPC = pb.NewACMEAuthenticationServiceClient(client)
client.ServerRPC = pb.NewServerServiceClient(client)
client.ServerDailyStatRPC = pb.NewServerDailyStatServiceClient(client)
client.ServerBandwidthStatRPC = pb.NewServerBandwidthStatServiceClient(client)
client.MetricStatRPC = pb.NewMetricStatServiceClient(client)
client.FirewallRPC = pb.NewFirewallServiceClient(client)
client.SSLCertRPC = pb.NewSSLCertServiceClient(client)
client.ScriptRPC = pb.NewScriptServiceClient(client)
err := client.init() err := client.init()
if err != nil { if err != nil {
return nil, err return nil, err
@@ -47,106 +90,10 @@ func NewRPCClient(apiConfig *configs.APIConfig) (*RPCClient, error) {
return client, nil return client, nil
} }
func (this *RPCClient) NodeRPC() pb.NodeServiceClient {
return pb.NewNodeServiceClient(this.pickConn())
}
func (this *RPCClient) NodeLogRPC() pb.NodeLogServiceClient {
return pb.NewNodeLogServiceClient(this.pickConn())
}
func (this *RPCClient) NodeTaskRPC() pb.NodeTaskServiceClient {
return pb.NewNodeTaskServiceClient(this.pickConn())
}
func (this *RPCClient) NodeValueRPC() pb.NodeValueServiceClient {
return pb.NewNodeValueServiceClient(this.pickConn())
}
func (this *RPCClient) HTTPAccessLogRPC() pb.HTTPAccessLogServiceClient {
return pb.NewHTTPAccessLogServiceClient(this.pickConn())
}
func (this *RPCClient) HTTPCacheTaskKeyRPC() pb.HTTPCacheTaskKeyServiceClient {
return pb.NewHTTPCacheTaskKeyServiceClient(this.pickConn())
}
func (this *RPCClient) APINodeRPC() pb.APINodeServiceClient {
return pb.NewAPINodeServiceClient(this.pickConn())
}
func (this *RPCClient) IPLibraryRPC() pb.IPLibraryServiceClient {
return pb.NewIPLibraryServiceClient(this.pickConn())
}
func (this *RPCClient) IPLibraryArtifactRPC() pb.IPLibraryArtifactServiceClient {
return pb.NewIPLibraryArtifactServiceClient(this.pickConn())
}
func (this *RPCClient) RegionCountryRPC() pb.RegionCountryServiceClient {
return pb.NewRegionCountryServiceClient(this.pickConn())
}
func (this *RPCClient) RegionProvinceRPC() pb.RegionProvinceServiceClient {
return pb.NewRegionProvinceServiceClient(this.pickConn())
}
func (this *RPCClient) RegionCityRPC() pb.RegionCityServiceClient {
return pb.NewRegionCityServiceClient(this.pickConn())
}
func (this *RPCClient) RegionProviderRPC() pb.RegionProviderServiceClient {
return pb.NewRegionProviderServiceClient(this.pickConn())
}
func (this *RPCClient) IPListRPC() pb.IPListServiceClient {
return pb.NewIPListServiceClient(this.pickConn())
}
func (this *RPCClient) IPItemRPC() pb.IPItemServiceClient {
return pb.NewIPItemServiceClient(this.pickConn())
}
func (this *RPCClient) FileRPC() pb.FileServiceClient {
return pb.NewFileServiceClient(this.pickConn())
}
func (this *RPCClient) FileChunkRPC() pb.FileChunkServiceClient {
return pb.NewFileChunkServiceClient(this.pickConn())
}
func (this *RPCClient) ACMEAuthenticationRPC() pb.ACMEAuthenticationServiceClient {
return pb.NewACMEAuthenticationServiceClient(this.pickConn())
}
func (this *RPCClient) ServerRPC() pb.ServerServiceClient {
return pb.NewServerServiceClient(this.pickConn())
}
func (this *RPCClient) ServerDailyStatRPC() pb.ServerDailyStatServiceClient {
return pb.NewServerDailyStatServiceClient(this.pickConn())
}
func (this *RPCClient) ServerBandwidthStatRPC() pb.ServerBandwidthStatServiceClient {
return pb.NewServerBandwidthStatServiceClient(this.pickConn())
}
func (this *RPCClient) MetricStatRPC() pb.MetricStatServiceClient {
return pb.NewMetricStatServiceClient(this.pickConn())
}
func (this *RPCClient) FirewallRPC() pb.FirewallServiceClient {
return pb.NewFirewallServiceClient(this.pickConn())
}
func (this *RPCClient) SSLCertRPC() pb.SSLCertServiceClient {
return pb.NewSSLCertServiceClient(this.pickConn())
}
// Context 节点上下文信息 // Context 节点上下文信息
func (this *RPCClient) Context() context.Context { func (this *RPCClient) Context() context.Context {
var ctx = context.Background() var ctx = context.Background()
m := maps.Map{ var m = maps.Map{
"timestamp": time.Now().Unix(), "timestamp": time.Now().Unix(),
"type": "node", "type": "node",
"userId": 0, "userId": 0,
@@ -213,7 +160,7 @@ func (this *RPCClient) UpdateConfig(config *configs.APIConfig) error {
// 初始化 // 初始化
func (this *RPCClient) init() error { func (this *RPCClient) init() error {
// 重新连接 // 重新连接
conns := []*grpc.ClientConn{} var conns = []*grpc.ClientConn{}
for _, endpoint := range this.apiConfig.RPC.Endpoints { for _, endpoint := range this.apiConfig.RPC.Endpoints {
u, err := url.Parse(endpoint) u, err := url.Parse(endpoint)
if err != nil { if err != nil {
@@ -252,20 +199,24 @@ func (this *RPCClient) pickConn() *grpc.ClientConn {
// 检查连接状态 // 检查连接状态
if len(this.conns) > 0 { if len(this.conns) > 0 {
availableConns := []*grpc.ClientConn{} var availableConns = []*grpc.ClientConn{}
for _, state := range []connectivity.State{connectivity.Ready, connectivity.Idle, connectivity.Connecting} { for _, stateArray := range [][2]connectivity.State{
{connectivity.Ready, connectivity.Idle}, // 优先Ready和Idle
{connectivity.Connecting, connectivity.Connecting},
} {
for _, conn := range this.conns { for _, conn := range this.conns {
if conn.GetState() == state { var state = conn.GetState()
if state == stateArray[0] || state == stateArray[1] {
availableConns = append(availableConns, conn) availableConns = append(availableConns, conn)
} }
} }
if len(availableConns) > 0 { if len(availableConns) > 0 {
break return this.randConn(availableConns)
} }
} }
if len(availableConns) > 0 { if len(availableConns) > 0 {
return availableConns[rands.Int(0, len(availableConns)-1)] return this.randConn(availableConns)
} }
// 关闭 // 关闭
@@ -285,5 +236,31 @@ func (this *RPCClient) pickConn() *grpc.ClientConn {
return nil return nil
} }
return this.conns[rands.Int(0, len(this.conns)-1)] return this.randConn(this.conns)
}
func (this *RPCClient) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...grpc.CallOption) error {
var conn = this.pickConn()
if conn == nil {
return errors.New("can not get available grpc connection")
}
return conn.Invoke(ctx, method, args, reply, opts...)
}
func (this *RPCClient) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) {
var conn = this.pickConn()
if conn == nil {
return nil, errors.New("can not get available grpc connection")
}
return conn.NewStream(ctx, desc, method, opts...)
}
func (this *RPCClient) randConn(conns []*grpc.ClientConn) *grpc.ClientConn {
var l = len(conns)
if l == 0 {
return nil
}
if l == 1 {
return conns[0]
}
return conns[rands.Int(0, l-1)]
} }

View File

@@ -96,7 +96,7 @@ func (this *BandwidthStatManager) Loop() error {
if err != nil { if err != nil {
return err return err
} }
_, err = rpcClient.ServerBandwidthStatRPC().UploadServerBandwidthStats(rpcClient.Context(), &pb.UploadServerBandwidthStatsRequest{ServerBandwidthStats: pbStats}) _, err = rpcClient.ServerBandwidthStatRPC.UploadServerBandwidthStats(rpcClient.Context(), &pb.UploadServerBandwidthStatsRequest{ServerBandwidthStats: pbStats})
if err != nil { if err != nil {
return err return err
} }

View File

@@ -324,7 +324,7 @@ func (this *HTTPRequestStatManager) Upload() error {
this.dailyFirewallRuleGroupMap = map[string]int64{} this.dailyFirewallRuleGroupMap = map[string]int64{}
// 上传数据 // 上传数据
_, err = rpcClient.ServerRPC().UploadServerHTTPRequestStat(rpcClient.Context(), &pb.UploadServerHTTPRequestStatRequest{ _, err = rpcClient.ServerRPC.UploadServerHTTPRequestStat(rpcClient.Context(), &pb.UploadServerHTTPRequestStatRequest{
Month: timeutil.Format("Ym"), Month: timeutil.Format("Ym"),
Day: timeutil.Format("Ymd"), Day: timeutil.Format("Ymd"),
RegionCities: pbCities, RegionCities: pbCities,
@@ -346,7 +346,7 @@ func (this *HTTPRequestStatManager) Upload() error {
} }
// 再次尝试 // 再次尝试
_, err = rpcClient.ServerRPC().UploadServerHTTPRequestStat(rpcClient.Context(), &pb.UploadServerHTTPRequestStatRequest{ _, err = rpcClient.ServerRPC.UploadServerHTTPRequestStat(rpcClient.Context(), &pb.UploadServerHTTPRequestStatRequest{
Month: timeutil.Format("Ym"), Month: timeutil.Format("Ym"),
Day: timeutil.Format("Ymd"), Day: timeutil.Format("Ymd"),
RegionCities: pbCities, RegionCities: pbCities,

View File

@@ -210,7 +210,7 @@ func (this *TrafficStatManager) Upload() error {
}) })
} }
_, err = client.ServerDailyStatRPC().UploadServerDailyStats(client.Context(), &pb.UploadServerDailyStatsRequest{ _, err = client.ServerDailyStatRPC.UploadServerDailyStats(client.Context(), &pb.UploadServerDailyStatsRequest{
Stats: pbServerStats, Stats: pbServerStats,
DomainStats: pbDomainStats, DomainStats: pbDomainStats,
}) })

View File

@@ -34,7 +34,7 @@ func init() {
} }
for task := range notifyChan { for task := range notifyChan {
_, err = rpcClient.FirewallRPC().NotifyHTTPFirewallEvent(rpcClient.Context(), &pb.NotifyHTTPFirewallEventRequest{ _, err = rpcClient.FirewallRPC.NotifyHTTPFirewallEvent(rpcClient.Context(), &pb.NotifyHTTPFirewallEventRequest{
ServerId: task.ServerId, ServerId: task.ServerId,
HttpFirewallPolicyId: task.HttpFirewallPolicyId, HttpFirewallPolicyId: task.HttpFirewallPolicyId,
HttpFirewallRuleGroupId: task.HttpFirewallRuleGroupId, HttpFirewallRuleGroupId: task.HttpFirewallRuleGroupId,

View File

@@ -50,7 +50,7 @@ func init() {
if len(reason) == 0 { if len(reason) == 0 {
reason = "触发WAF规则自动加入" reason = "触发WAF规则自动加入"
} }
_, err = rpcClient.IPItemRPC().CreateIPItem(rpcClient.Context(), &pb.CreateIPItemRequest{ _, err = rpcClient.IPItemRPC.CreateIPItem(rpcClient.Context(), &pb.CreateIPItemRequest{
IpListId: task.listId, IpListId: task.listId,
IpFrom: task.ip, IpFrom: task.ip,
IpTo: "", IpTo: "",