diff --git a/internal/nodes/http_request_mismatch.go b/internal/nodes/http_request_mismatch.go index 73aa217..b3546e7 100644 --- a/internal/nodes/http_request_mismatch.go +++ b/internal/nodes/http_request_mismatch.go @@ -32,7 +32,7 @@ func (this *HTTPRequest) doMismatch() { } // 根据配置进行相应的处理 - if sharedNodeConfig.GlobalConfig != nil && sharedNodeConfig.GlobalConfig.HTTPAll.MatchDomainStrictly { + if sharedNodeConfig.GlobalServerConfig != nil && sharedNodeConfig.GlobalServerConfig.HTTPAll.MatchDomainStrictly { // 检查cc // TODO 可以在管理端配置是否开启以及最多尝试次数 if len(remoteIP) > 0 { @@ -46,7 +46,7 @@ func (this *HTTPRequest) doMismatch() { } // 处理当前连接 - var httpAllConfig = sharedNodeConfig.GlobalConfig.HTTPAll + var httpAllConfig = sharedNodeConfig.GlobalServerConfig.HTTPAll var mismatchAction = httpAllConfig.DomainMismatchAction if mismatchAction != nil && mismatchAction.Code == "page" { if mismatchAction.Options != nil { diff --git a/internal/nodes/listener_base.go b/internal/nodes/listener_base.go index 312e229..8563d36 100644 --- a/internal/nodes/listener_base.go +++ b/internal/nodes/listener_base.go @@ -3,11 +3,12 @@ package nodes import ( "crypto/tls" "errors" + "github.com/TeaOSLab/EdgeCommon/pkg/configutils" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/sslconfigs" "github.com/TeaOSLab/EdgeNode/internal/remotelogs" - "github.com/iwind/TeaGo/lists" "github.com/iwind/TeaGo/types" + "net" ) type BaseListener struct { @@ -75,7 +76,7 @@ func (this *BaseListener) matchSSL(domain string) (*sslconfigs.SSLPolicy, *tls.C // 如果域名为空,则取第一个 // 通常域名为空是因为是直接通过IP访问的 if len(domain) == 0 { - if group.IsHTTPS() && sharedNodeConfig.GlobalConfig != nil && sharedNodeConfig.GlobalConfig.HTTPAll.MatchDomainStrictly { + if group.IsHTTPS() && sharedNodeConfig.GlobalServerConfig != nil && sharedNodeConfig.GlobalServerConfig.HTTPAll.MatchDomainStrictly { return nil, nil, errors.New("no tls server name matched") } @@ -131,19 +132,19 @@ func (this *BaseListener) findNamedServer(name string) (serverConfig *serverconf return } - var matchDomainStrictly = sharedNodeConfig.GlobalConfig != nil && sharedNodeConfig.GlobalConfig.HTTPAll.MatchDomainStrictly + var matchDomainStrictly = sharedNodeConfig.GlobalServerConfig != nil && sharedNodeConfig.GlobalServerConfig.HTTPAll.MatchDomainStrictly - if sharedNodeConfig.GlobalConfig != nil && - len(sharedNodeConfig.GlobalConfig.HTTPAll.DefaultDomain) > 0 && - (!matchDomainStrictly || lists.ContainsString(sharedNodeConfig.GlobalConfig.HTTPAll.AllowMismatchDomains, name)) { - defaultDomain := sharedNodeConfig.GlobalConfig.HTTPAll.DefaultDomain + if sharedNodeConfig.GlobalServerConfig != nil && + len(sharedNodeConfig.GlobalServerConfig.HTTPAll.DefaultDomain) > 0 && + (!matchDomainStrictly || configutils.MatchDomains(sharedNodeConfig.GlobalServerConfig.HTTPAll.AllowMismatchDomains, name) || (sharedNodeConfig.GlobalServerConfig.HTTPAll.AllowNodeIP && net.ParseIP(name) != nil)) { + var defaultDomain = sharedNodeConfig.GlobalServerConfig.HTTPAll.DefaultDomain serverConfig, serverName = this.findNamedServerMatched(defaultDomain) if serverConfig != nil { return } } - if matchDomainStrictly && !lists.ContainsString(sharedNodeConfig.GlobalConfig.HTTPAll.AllowMismatchDomains, name) { + if matchDomainStrictly && !configutils.MatchDomains(sharedNodeConfig.GlobalServerConfig.HTTPAll.AllowMismatchDomains, name) && (!sharedNodeConfig.GlobalServerConfig.HTTPAll.AllowNodeIP || net.ParseIP(name) == nil) { return } @@ -170,7 +171,7 @@ func (this *BaseListener) findNamedServerMatched(name string) (serverConfig *ser } // 是否严格匹配域名 - matchDomainStrictly := sharedNodeConfig.GlobalConfig != nil && sharedNodeConfig.GlobalConfig.HTTPAll.MatchDomainStrictly + var matchDomainStrictly = sharedNodeConfig.GlobalServerConfig != nil && sharedNodeConfig.GlobalServerConfig.HTTPAll.MatchDomainStrictly // 如果只有一个server,则默认为这个 var currentServers = group.Servers() diff --git a/internal/nodes/node.go b/internal/nodes/node.go index 73c0b56..164919a 100644 --- a/internal/nodes/node.go +++ b/internal/nodes/node.go @@ -2,6 +2,7 @@ package nodes import ( "bytes" + "context" "encoding/json" "errors" iplib "github.com/TeaOSLab/EdgeCommon/pkg/iplibrary" @@ -321,139 +322,157 @@ func (this *Node) loop() error { return errors.New("read node tasks failed: " + err.Error()) } for _, task := range tasksResp.NodeTasks { - switch task.Type { - case "ipItemChanged": - // 防止阻塞 - select { - case iplibrary.IPListUpdateNotify <- true: - default: - - } - - // 修改为已同步 - _, err = rpcClient.NodeTaskRPC.ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{ - NodeTaskId: task.Id, - IsOk: true, - Error: "", - }) - if err != nil { - return err - } - case "configChanged": - if task.ServerId > 0 { - err = this.syncServerConfig(task.ServerId) - } else { - if !task.IsPrimary { - // 我们等等主节点配置准备完毕 - time.Sleep(2 * time.Second) - } - err = this.syncConfig(task.Version) - } - if err != nil { - _, err = rpcClient.NodeTaskRPC.ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{ - NodeTaskId: task.Id, - IsOk: false, - Error: err.Error(), - }) - } else { - _, err = rpcClient.NodeTaskRPC.ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{ - NodeTaskId: task.Id, - IsOk: true, - Error: "", - }) - } - if err != nil { - return err - } - case "nodeVersionChanged": - if !sharedUpgradeManager.IsInstalling() { - goman.New(func() { - sharedUpgradeManager.Start() - }) - } - case "scriptsChanged": - err = this.reloadCommonScripts() - if err != nil { - return errors.New("reload common scripts failed: " + err.Error()) - } - - // 修改为已同步 - _, err = rpcClient.NodeTaskRPC.ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{ - NodeTaskId: task.Id, - IsOk: true, - Error: "", - }) - if err != nil { - return err - } - case "nodeLevelChanged": - levelInfoResp, err := rpcClient.NodeRPC.FindNodeLevelInfo(nodeCtx, &pb.FindNodeLevelInfoRequest{}) - if err != nil { - return err - } - - sharedNodeConfig.Level = levelInfoResp.Level - - var parentNodes = map[int64][]*nodeconfigs.ParentNodeConfig{} - if len(levelInfoResp.ParentNodesMapJSON) > 0 { - err = json.Unmarshal(levelInfoResp.ParentNodesMapJSON, &parentNodes) - if err != nil { - return errors.New("decode level info failed: " + err.Error()) - } - } - sharedNodeConfig.ParentNodes = parentNodes - - // 修改为已同步 - _, err = rpcClient.NodeTaskRPC.ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{ - NodeTaskId: task.Id, - IsOk: true, - Error: "", - }) - if err != nil { - return err - } - case "ddosProtectionChanged": - resp, err := rpcClient.NodeRPC.FindNodeDDoSProtection(nodeCtx, &pb.FindNodeDDoSProtectionRequest{}) - if err != nil { - return err - } - if len(resp.DdosProtectionJSON) == 0 { - if sharedNodeConfig != nil { - sharedNodeConfig.DDoSProtection = nil - } - } else { - var ddosProtectionConfig = &ddosconfigs.ProtectionConfig{} - err = json.Unmarshal(resp.DdosProtectionJSON, ddosProtectionConfig) - if err != nil { - return errors.New("decode DDoS protection config failed: " + err.Error()) - } - - if sharedNodeConfig != nil { - sharedNodeConfig.DDoSProtection = ddosProtectionConfig - } - - err = firewalls.SharedDDoSProtectionManager.Apply(ddosProtectionConfig) - if err != nil { - // 不阻塞 - remotelogs.Error("NODE", "apply DDoS protection failed: "+err.Error()) - } - } - - // 修改为已同步 - _, err = rpcClient.NodeTaskRPC.ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{ - NodeTaskId: task.Id, - IsOk: true, - Error: "", - }) - if err != nil { - return err - } - } + err := this.execTask(rpcClient, nodeCtx, task) + this.finishTask(task.Id, err) } return nil } +// 执行任务 +func (this *Node) execTask(rpcClient *rpc.RPCClient, nodeCtx context.Context, task *pb.NodeTask) error { + switch task.Type { + case "ipItemChanged": + // 防止阻塞 + select { + case iplibrary.IPListUpdateNotify <- true: + default: + + } + case "configChanged": + if task.ServerId > 0 { + return this.syncServerConfig(task.ServerId) + } + if !task.IsPrimary { + // 我们等等主节点配置准备完毕 + time.Sleep(2 * time.Second) + } + return this.syncConfig(task.Version) + case "nodeVersionChanged": + if !sharedUpgradeManager.IsInstalling() { + goman.New(func() { + sharedUpgradeManager.Start() + }) + } + case "scriptsChanged": + err := this.reloadCommonScripts() + if err != nil { + return errors.New("reload common scripts failed: " + err.Error()) + } + case "nodeLevelChanged": + levelInfoResp, err := rpcClient.NodeRPC.FindNodeLevelInfo(nodeCtx, &pb.FindNodeLevelInfoRequest{}) + if err != nil { + return err + } + + if sharedNodeConfig != nil { + sharedNodeConfig.Level = levelInfoResp.Level + } + + var parentNodes = map[int64][]*nodeconfigs.ParentNodeConfig{} + if len(levelInfoResp.ParentNodesMapJSON) > 0 { + err = json.Unmarshal(levelInfoResp.ParentNodesMapJSON, &parentNodes) + if err != nil { + return errors.New("decode level info failed: " + err.Error()) + } + } + + if sharedNodeConfig != nil { + sharedNodeConfig.ParentNodes = parentNodes + } + case "ddosProtectionChanged": + resp, err := rpcClient.NodeRPC.FindNodeDDoSProtection(nodeCtx, &pb.FindNodeDDoSProtectionRequest{}) + if err != nil { + return err + } + if len(resp.DdosProtectionJSON) == 0 { + if sharedNodeConfig != nil { + sharedNodeConfig.DDoSProtection = nil + } + return nil + } + + var ddosProtectionConfig = &ddosconfigs.ProtectionConfig{} + err = json.Unmarshal(resp.DdosProtectionJSON, ddosProtectionConfig) + if err != nil { + return errors.New("decode DDoS protection config failed: " + err.Error()) + } + + if ddosProtectionConfig != nil && sharedNodeConfig != nil { + sharedNodeConfig.DDoSProtection = ddosProtectionConfig + } + + err = firewalls.SharedDDoSProtectionManager.Apply(ddosProtectionConfig) + if err != nil { + // 不阻塞 + remotelogs.Warn("NODE", "apply DDoS protection failed: "+err.Error()) + return nil + } + case "globalServerConfigChanged": + resp, err := rpcClient.NodeRPC.FindNodeGlobalServerConfig(nodeCtx, &pb.FindNodeGlobalServerConfigRequest{}) + if err != nil { + return err + } + if len(resp.GlobalServerConfigJSON) > 0 { + var globalServerConfig = serverconfigs.DefaultGlobalServerConfig() + err = json.Unmarshal(resp.GlobalServerConfigJSON, globalServerConfig) + if err != nil { + return errors.New("decode global server config failed: " + err.Error()) + } + + if globalServerConfig != nil { + err = globalServerConfig.Init() + if err != nil { + return errors.New("validate global server config failed: " + err.Error()) + } + if sharedNodeConfig != nil { + sharedNodeConfig.GlobalServerConfig = globalServerConfig + } + } + } + default: + remotelogs.Error("NODE", "task '"+types.String(task.Id)+"', type '"+task.Type+"' has not been handled") + } + + return nil +} + +// 标记任务完成 +func (this *Node) finishTask(taskId int64, err error) { + if taskId <= 0 { + return + } + + rpcClient, err := rpc.SharedRPC() + if err != nil { + logs.Println("[NODE]", "create rpc client failed: "+err.Error()) + return + } + + var nodeCtx = rpcClient.Context() + + var isOk = err == nil + var errMsg = "" + if err != nil { + errMsg = err.Error() + } + + _, err = rpcClient.NodeTaskRPC.ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{ + NodeTaskId: taskId, + IsOk: isOk, + Error: errMsg, + }) + + if err != nil { + // 不需要上报到服务中心 + if rpc.IsConnError(err) { + logs.Println("[NODE]", "report task done failed: "+err.Error()) + } else { + remotelogs.Error("NODE", "report task done failed: "+err.Error()) + } + } +} + // 读取API配置 func (this *Node) syncConfig(taskVersion int64) error { this.locker.Lock() diff --git a/internal/remotelogs/utils.go b/internal/remotelogs/utils.go index d2ef3d9..113b07d 100644 --- a/internal/remotelogs/utils.go +++ b/internal/remotelogs/utils.go @@ -21,7 +21,7 @@ var logChan = make(chan *pb.NodeLog, 1024) func init() { // 定期上传日志 - ticker := time.NewTicker(60 * time.Second) + var ticker = time.NewTicker(60 * time.Second) if Tea.IsTesting() { ticker = time.NewTicker(10 * time.Second) } @@ -111,6 +111,12 @@ func ErrorObject(tag string, err error) { // ServerError 打印服务相关错误信息 func ServerError(serverId int64, tag string, description string, logType nodeconfigs.NodeLogType, params maps.Map) { + // 是否记录服务相关错误 + nodeConfig, _ := nodeconfigs.SharedNodeConfig() + if nodeConfig != nil && nodeConfig.GlobalServerConfig != nil && !nodeConfig.GlobalServerConfig.Log.RecordServerError { + return + } + logs.Println("[" + tag + "]" + description) // 参数 @@ -207,7 +213,7 @@ func ServerLog(serverId int64, tag string, description string, logType nodeconfi // 上传日志 func uploadLogs() error { - logList := []*pb.NodeLog{} + var logList = []*pb.NodeLog{} const hashSize = 5 var hashList = []uint64{} @@ -242,6 +248,7 @@ Loop: if len(logList) == 0 { return nil } + rpcClient, err := rpc.SharedRPC() if err != nil { return err