Files
EdgeNode/internal/rpc/rpc_client.go

254 lines
7.7 KiB
Go
Raw Normal View History

2020-09-09 18:53:53 +08:00
package rpc
import (
"context"
2020-10-11 11:52:43 +08:00
"crypto/tls"
2020-09-09 18:53:53 +08:00
"encoding/base64"
"errors"
2020-09-13 20:37:40 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
2020-09-09 18:53:53 +08:00
"github.com/TeaOSLab/EdgeNode/internal/configs"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/encrypt"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/iwind/TeaGo/maps"
"github.com/iwind/TeaGo/rands"
"google.golang.org/grpc"
2020-11-02 15:49:30 +08:00
"google.golang.org/grpc/connectivity"
2020-10-11 11:52:43 +08:00
"google.golang.org/grpc/credentials"
2022-03-20 10:48:06 +08:00
"google.golang.org/grpc/credentials/insecure"
2022-03-20 11:28:34 +08:00
"google.golang.org/grpc/encoding/gzip"
2020-09-09 18:53:53 +08:00
"google.golang.org/grpc/metadata"
2020-10-11 11:52:43 +08:00
"net/url"
2020-11-02 15:49:30 +08:00
"sync"
2020-09-09 18:53:53 +08:00
"time"
)
type RPCClient struct {
2020-09-30 17:46:49 +08:00
apiConfig *configs.APIConfig
conns []*grpc.ClientConn
2020-11-02 15:49:30 +08:00
locker sync.RWMutex
2022-08-24 20:04:46 +08:00
NodeRPC pb.NodeServiceClient
NodeLogRPC pb.NodeLogServiceClient
NodeTaskRPC pb.NodeTaskServiceClient
NodeValueRPC pb.NodeValueServiceClient
HTTPAccessLogRPC pb.HTTPAccessLogServiceClient
HTTPCacheTaskKeyRPC pb.HTTPCacheTaskKeyServiceClient
APINodeRPC pb.APINodeServiceClient
IPLibraryArtifactRPC pb.IPLibraryArtifactServiceClient
IPListRPC pb.IPListServiceClient
IPItemRPC pb.IPItemServiceClient
FileRPC pb.FileServiceClient
FileChunkRPC pb.FileChunkServiceClient
ACMEAuthenticationRPC pb.ACMEAuthenticationServiceClient
ServerRPC pb.ServerServiceClient
ServerDailyStatRPC pb.ServerDailyStatServiceClient
ServerBandwidthStatRPC pb.ServerBandwidthStatServiceClient
MetricStatRPC pb.MetricStatServiceClient
FirewallRPC pb.FirewallServiceClient
SSLCertRPC pb.SSLCertServiceClient
ScriptRPC pb.ScriptServiceClient
UserRPC pb.UserServiceClient
2020-09-09 18:53:53 +08:00
}
func NewRPCClient(apiConfig *configs.APIConfig) (*RPCClient, error) {
if apiConfig == nil {
return nil, errors.New("api config should not be nil")
}
2022-08-24 20:04:46 +08:00
var client = &RPCClient{
2020-11-02 15:49:30 +08:00
apiConfig: apiConfig,
2020-09-09 18:53:53 +08:00
}
2020-11-02 15:49:30 +08:00
2022-08-24 20:04:46 +08:00
// 初始化RPC实例
client.NodeRPC = pb.NewNodeServiceClient(client)
client.NodeLogRPC = pb.NewNodeLogServiceClient(client)
client.NodeTaskRPC = pb.NewNodeTaskServiceClient(client)
client.NodeValueRPC = pb.NewNodeValueServiceClient(client)
client.HTTPAccessLogRPC = pb.NewHTTPAccessLogServiceClient(client)
client.HTTPCacheTaskKeyRPC = pb.NewHTTPCacheTaskKeyServiceClient(client)
client.APINodeRPC = pb.NewAPINodeServiceClient(client)
client.IPLibraryArtifactRPC = pb.NewIPLibraryArtifactServiceClient(client)
client.IPListRPC = pb.NewIPListServiceClient(client)
client.IPItemRPC = pb.NewIPItemServiceClient(client)
client.FileRPC = pb.NewFileServiceClient(client)
client.FileChunkRPC = pb.NewFileChunkServiceClient(client)
client.ACMEAuthenticationRPC = pb.NewACMEAuthenticationServiceClient(client)
client.ServerRPC = pb.NewServerServiceClient(client)
client.ServerDailyStatRPC = pb.NewServerDailyStatServiceClient(client)
client.ServerBandwidthStatRPC = pb.NewServerBandwidthStatServiceClient(client)
client.MetricStatRPC = pb.NewMetricStatServiceClient(client)
client.FirewallRPC = pb.NewFirewallServiceClient(client)
client.SSLCertRPC = pb.NewSSLCertServiceClient(client)
client.ScriptRPC = pb.NewScriptServiceClient(client)
client.UserRPC = pb.NewUserServiceClient(client)
2022-08-24 20:04:46 +08:00
2020-11-02 15:49:30 +08:00
err := client.init()
if err != nil {
return nil, err
2020-09-09 18:53:53 +08:00
}
2020-11-02 15:49:30 +08:00
return client, nil
2020-09-09 18:53:53 +08:00
}
2021-04-29 16:48:47 +08:00
// Context 节点上下文信息
2020-09-09 18:53:53 +08:00
func (this *RPCClient) Context() context.Context {
2022-08-24 20:04:46 +08:00
var m = maps.Map{
2020-09-09 18:53:53 +08:00
"timestamp": time.Now().Unix(),
"type": "node",
"userId": 0,
}
method, err := encrypt.NewMethodInstance(teaconst.EncryptMethod, this.apiConfig.Secret, this.apiConfig.NodeId)
if err != nil {
utils.PrintError(err)
return context.Background()
}
data, err := method.Encrypt(m.AsJSON())
if err != nil {
utils.PrintError(err)
return context.Background()
}
2022-08-23 14:32:39 +08:00
var token = base64.StdEncoding.EncodeToString(data)
2022-11-13 10:32:12 +08:00
var ctx = context.Background()
2020-09-09 18:53:53 +08:00
ctx = metadata.AppendToOutgoingContext(ctx, "nodeId", this.apiConfig.NodeId, "token", token)
return ctx
}
2020-09-30 17:46:49 +08:00
2021-04-29 16:48:47 +08:00
// ClusterContext 集群上下文
func (this *RPCClient) ClusterContext(clusterId string, clusterSecret string) context.Context {
ctx := context.Background()
m := maps.Map{
"timestamp": time.Now().Unix(),
"type": "cluster",
"userId": 0,
}
method, err := encrypt.NewMethodInstance(teaconst.EncryptMethod, clusterSecret, clusterId)
if err != nil {
utils.PrintError(err)
return context.Background()
}
data, err := method.Encrypt(m.AsJSON())
if err != nil {
utils.PrintError(err)
return context.Background()
}
token := base64.StdEncoding.EncodeToString(data)
ctx = metadata.AppendToOutgoingContext(ctx, "nodeId", clusterId, "token", token)
return ctx
}
2021-04-29 16:48:47 +08:00
// Close 关闭连接
2020-10-28 11:19:06 +08:00
func (this *RPCClient) Close() {
this.locker.Lock()
2020-10-28 11:19:06 +08:00
for _, conn := range this.conns {
_ = conn.Close()
}
this.locker.Unlock()
2020-10-28 11:19:06 +08:00
}
2021-04-29 16:48:47 +08:00
// UpdateConfig 修改配置
2021-02-24 11:01:06 +08:00
func (this *RPCClient) UpdateConfig(config *configs.APIConfig) error {
this.apiConfig = config
this.locker.Lock()
err := this.init()
this.locker.Unlock()
return err
2021-02-24 11:01:06 +08:00
}
2020-11-02 15:49:30 +08:00
// 初始化
func (this *RPCClient) init() error {
// 重新连接
2022-08-24 20:04:46 +08:00
var conns = []*grpc.ClientConn{}
2020-11-02 15:49:30 +08:00
for _, endpoint := range this.apiConfig.RPC.Endpoints {
u, err := url.Parse(endpoint)
if err != nil {
return errors.New("parse endpoint failed: " + err.Error())
}
var conn *grpc.ClientConn
2022-09-12 21:59:52 +08:00
var callOptions = grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(128*1024*1024),
grpc.MaxCallSendMsgSize(128*1024*1024),
grpc.UseCompressor(gzip.Name),
)
2020-11-02 15:49:30 +08:00
if u.Scheme == "http" {
2022-09-12 21:59:52 +08:00
conn, err = grpc.Dial(u.Host, grpc.WithTransportCredentials(insecure.NewCredentials()), callOptions)
2020-11-02 15:49:30 +08:00
} else if u.Scheme == "https" {
conn, err = grpc.Dial(u.Host, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{
InsecureSkipVerify: true,
2022-03-20 11:28:34 +08:00
})), callOptions)
2020-11-02 15:49:30 +08:00
} else {
return errors.New("parse endpoint failed: invalid scheme '" + u.Scheme + "'")
}
if err != nil {
return err
}
conns = append(conns, conn)
}
if len(conns) == 0 {
return errors.New("[RPC]no available endpoints")
}
2021-02-24 11:01:06 +08:00
// 这里不需要加锁防止和pickConn()冲突
2020-11-02 15:49:30 +08:00
this.conns = conns
return nil
}
2020-09-30 17:46:49 +08:00
// 随机选择一个连接
func (this *RPCClient) pickConn() *grpc.ClientConn {
2021-11-20 19:17:57 +08:00
this.locker.Lock()
defer this.locker.Unlock()
2020-11-02 15:49:30 +08:00
// 检查连接状态
if len(this.conns) > 0 {
2022-08-24 20:04:46 +08:00
for _, stateArray := range [][2]connectivity.State{
{connectivity.Ready, connectivity.Idle}, // 优先Ready和Idle
{connectivity.Connecting, connectivity.Connecting},
2022-11-17 10:32:26 +08:00
{connectivity.TransientFailure, connectivity.TransientFailure},
2022-08-24 20:04:46 +08:00
} {
2022-11-17 10:38:20 +08:00
var availableConns = []*grpc.ClientConn{}
2020-11-15 11:58:08 +08:00
for _, conn := range this.conns {
2022-08-24 20:04:46 +08:00
var state = conn.GetState()
if state == stateArray[0] || state == stateArray[1] {
2020-11-15 11:58:08 +08:00
availableConns = append(availableConns, conn)
}
}
if len(availableConns) > 0 {
2022-08-24 20:04:46 +08:00
return this.randConn(availableConns)
2020-11-02 15:49:30 +08:00
}
}
2020-09-30 17:46:49 +08:00
}
2020-11-02 15:49:30 +08:00
2022-08-24 20:04:46 +08:00
return this.randConn(this.conns)
}
func (this *RPCClient) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...grpc.CallOption) error {
var conn = this.pickConn()
if conn == nil {
return errors.New("can not get available grpc connection")
}
return conn.Invoke(ctx, method, args, reply, opts...)
}
func (this *RPCClient) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) {
var conn = this.pickConn()
if conn == nil {
return nil, errors.New("can not get available grpc connection")
}
return conn.NewStream(ctx, desc, method, opts...)
}
func (this *RPCClient) randConn(conns []*grpc.ClientConn) *grpc.ClientConn {
var l = len(conns)
if l == 0 {
return nil
}
if l == 1 {
return conns[0]
}
return conns[rands.Int(0, l-1)]
2020-09-30 17:46:49 +08:00
}