diff --git a/internal/nodes/http_request.go b/internal/nodes/http_request.go index bbab46d..db6253d 100644 --- a/internal/nodes/http_request.go +++ b/internal/nodes/http_request.go @@ -64,6 +64,8 @@ type HTTPRequest struct { firewallRuleGroupId int64 firewallRuleSetId int64 firewallRuleId int64 + + logAttrs map[string]string } // 初始化 @@ -79,6 +81,7 @@ func (this *HTTPRequest) init() { "cache.policy.id": "0", "cache.policy.type": "", } + this.logAttrs = map[string]string{} this.requestFromTime = time.Now() } diff --git a/internal/nodes/http_request_log.go b/internal/nodes/http_request_log.go index 927f872..105919d 100644 --- a/internal/nodes/http_request_log.go +++ b/internal/nodes/http_request_log.go @@ -120,6 +120,8 @@ func (this *HTTPRequest) log() { FirewallRuleGroupId: this.firewallRuleGroupId, FirewallRuleSetId: this.firewallRuleSetId, FirewallRuleId: this.firewallRuleId, + + Attrs: this.logAttrs, } if this.origin != nil { diff --git a/internal/nodes/http_request_waf.go b/internal/nodes/http_request_waf.go index af5a7d6..96dcfb4 100644 --- a/internal/nodes/http_request_waf.go +++ b/internal/nodes/http_request_waf.go @@ -3,6 +3,7 @@ package nodes import ( "github.com/TeaOSLab/EdgeNode/internal/waf" "github.com/iwind/TeaGo/logs" + "github.com/iwind/TeaGo/types" "net/http" ) @@ -13,7 +14,7 @@ func (this *HTTPRequest) doWAFRequest() (blocked bool) { return } - goNext, _, ruleSet, err := w.MatchRequest(this.RawReq, this.writer) + goNext, ruleGroup, ruleSet, err := w.MatchRequest(this.RawReq, this.writer) if err != nil { logs.Error(err) return @@ -21,8 +22,12 @@ func (this *HTTPRequest) doWAFRequest() (blocked bool) { if ruleSet != nil { if ruleSet.Action != waf.ActionAllow { - // TODO 记录日志 + this.firewallPolicyId = this.web.FirewallPolicy.Id + this.firewallRuleGroupId = types.Int64(ruleGroup.Id) + this.firewallRuleSetId = types.Int64(ruleSet.Id) } + + this.logAttrs["waf.action"] = ruleSet.Action } return !goNext @@ -35,7 +40,7 @@ func (this *HTTPRequest) doWAFResponse(resp *http.Response) (blocked bool) { return } - goNext, _, ruleSet, err := w.MatchResponse(this.RawReq, resp, this.writer) + goNext, ruleGroup, ruleSet, err := w.MatchResponse(this.RawReq, resp, this.writer) if err != nil { logs.Error(err) return @@ -43,8 +48,12 @@ func (this *HTTPRequest) doWAFResponse(resp *http.Response) (blocked bool) { if ruleSet != nil { if ruleSet.Action != waf.ActionAllow { - // TODO 记录日志 + this.firewallPolicyId = this.web.FirewallPolicy.Id + this.firewallRuleGroupId = types.Int64(ruleGroup.Id) + this.firewallRuleSetId = types.Int64(ruleSet.Id) } + + this.logAttrs["waf.action"] = ruleSet.Action } return !goNext diff --git a/internal/rpc/rpc_client.go b/internal/rpc/rpc_client.go index a31b6c1..980f8f6 100644 --- a/internal/rpc/rpc_client.go +++ b/internal/rpc/rpc_client.go @@ -13,15 +13,19 @@ import ( "github.com/iwind/TeaGo/maps" "github.com/iwind/TeaGo/rands" "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" "google.golang.org/grpc/metadata" "net/url" + "sync" "time" ) type RPCClient struct { apiConfig *configs.APIConfig conns []*grpc.ClientConn + + locker sync.Mutex } func NewRPCClient(apiConfig *configs.APIConfig) (*RPCClient, error) { @@ -29,35 +33,16 @@ func NewRPCClient(apiConfig *configs.APIConfig) (*RPCClient, error) { return nil, errors.New("api config should not be nil") } - conns := []*grpc.ClientConn{} - for _, endpoint := range apiConfig.RPC.Endpoints { - u, err := url.Parse(endpoint) - if err != nil { - return nil, errors.New("parse endpoint failed: " + err.Error()) - } - var conn *grpc.ClientConn - if u.Scheme == "http" { - conn, err = grpc.Dial(u.Host, grpc.WithInsecure()) - } else if u.Scheme == "https" { - conn, err = grpc.Dial(u.Host, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{ - InsecureSkipVerify: true, - }))) - } else { - return nil, errors.New("parse endpoint failed: invalid scheme '" + u.Scheme + "'") - } - if err != nil { - return nil, err - } - conns = append(conns, conn) - } - if len(conns) == 0 { - return nil, errors.New("[RPC]no available endpoints") + client := &RPCClient{ + apiConfig: apiConfig, } - return &RPCClient{ - apiConfig: apiConfig, - conns: conns, - }, nil + err := client.init() + if err != nil { + return nil, err + } + + return client, nil } func (this *RPCClient) NodeRPC() pb.NodeServiceClient { @@ -129,10 +114,66 @@ func (this *RPCClient) Close() { } } +// 初始化 +func (this *RPCClient) init() error { + // 重新连接 + conns := []*grpc.ClientConn{} + for _, endpoint := range this.apiConfig.RPC.Endpoints { + u, err := url.Parse(endpoint) + if err != nil { + return errors.New("parse endpoint failed: " + err.Error()) + } + var conn *grpc.ClientConn + if u.Scheme == "http" { + conn, err = grpc.Dial(u.Host, grpc.WithInsecure()) + } else if u.Scheme == "https" { + conn, err = grpc.Dial(u.Host, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{ + InsecureSkipVerify: true, + }))) + } else { + return errors.New("parse endpoint failed: invalid scheme '" + u.Scheme + "'") + } + if err != nil { + return err + } + conns = append(conns, conn) + } + if len(conns) == 0 { + return errors.New("[RPC]no available endpoints") + } + this.conns = conns + return nil +} + // 随机选择一个连接 func (this *RPCClient) pickConn() *grpc.ClientConn { + this.locker.Lock() + defer this.locker.Unlock() + + // 检查连接状态 + if len(this.conns) > 0 { + availableConns := []*grpc.ClientConn{} + for _, conn := range this.conns { + if conn.GetState() == connectivity.Ready { + availableConns = append(availableConns, conn) + } + } + + if len(availableConns) > 0 { + return availableConns[rands.Int(0, len(availableConns)-1)] + } + } + + // 重新初始化 + err := this.init() + if err != nil { + // 错误提示已经在构造对象时打印过,所以这里不再重复打印 + return nil + } + if len(this.conns) == 0 { return nil } + return this.conns[rands.Int(0, len(this.conns)-1)] } diff --git a/internal/waf/template.go b/internal/waf/template.go index 244b523..82e4abe 100644 --- a/internal/waf/template.go +++ b/internal/waf/template.go @@ -1,7 +1,5 @@ package waf -// 感谢以下规则来源: -// - Janusec: https://www.janusec.com/ func Template() *WAF { waf := NewWAF() waf.Id = "template"