diff --git a/internal/caches/manager.go b/internal/caches/manager.go index acafbcf..a6a7b88 100644 --- a/internal/caches/manager.go +++ b/internal/caches/manager.go @@ -2,7 +2,7 @@ package caches import ( "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" - "github.com/iwind/TeaGo/logs" + "github.com/TeaOSLab/EdgeNode/internal/logs" "strconv" "sync" ) @@ -35,7 +35,7 @@ func (this *Manager) UpdatePolicies(newPolicies []*serverconfigs.HTTPCachePolicy // 停止旧有的 for _, oldPolicy := range this.policyMap { if !this.containsInt64(newPolicyIds, oldPolicy.Id) { - logs.Println("[CACHE]remove policy", strconv.FormatInt(oldPolicy.Id, 10)) + logs.Error("CACHE", "remove policy "+strconv.FormatInt(oldPolicy.Id, 10)) delete(this.policyMap, oldPolicy.Id) storage, ok := this.storageMap[oldPolicy.Id] if ok { @@ -49,13 +49,13 @@ func (this *Manager) UpdatePolicies(newPolicies []*serverconfigs.HTTPCachePolicy for _, newPolicy := range newPolicies { _, ok := this.policyMap[newPolicy.Id] if !ok { - logs.Println("[CACHE]add policy", strconv.FormatInt(newPolicy.Id, 10)) + logs.Println("CACHE", "add policy "+strconv.FormatInt(newPolicy.Id, 10)) } // 初始化 err := newPolicy.Init() if err != nil { - logs.Println("[CACHE]UpdatePolicies: init policy error: " + err.Error()) + logs.Error("CACHE", "UpdatePolicies: init policy error: "+err.Error()) continue } this.policyMap[newPolicy.Id] = newPolicy @@ -67,19 +67,19 @@ func (this *Manager) UpdatePolicies(newPolicies []*serverconfigs.HTTPCachePolicy if !ok { storage := this.NewStorageWithPolicy(policy) if storage == nil { - logs.Println("[CACHE]can not find storage type '" + policy.Type + "'") + logs.Error("CACHE", "can not find storage type '"+policy.Type+"'") continue } err := storage.Init() if err != nil { - logs.Println("[CACHE]UpdatePolicies: init storage failed: " + err.Error()) + logs.Error("CACHE", "UpdatePolicies: init storage failed: "+err.Error()) continue } this.storageMap[policy.Id] = storage } else { // 检查policy是否有变化 if !storage.Policy().IsSame(policy) { - logs.Println("[CACHE]policy " + strconv.FormatInt(policy.Id, 10) + " changed") + logs.Println("CACHE", "policy "+strconv.FormatInt(policy.Id, 10)+" changed") // 停止老的 storage.Stop() @@ -88,12 +88,12 @@ func (this *Manager) UpdatePolicies(newPolicies []*serverconfigs.HTTPCachePolicy // 启动新的 storage := this.NewStorageWithPolicy(policy) if storage == nil { - logs.Println("[CACHE]can not find storage type '" + policy.Type + "'") + logs.Error("CACHE", "can not find storage type '"+policy.Type+"'") continue } err := storage.Init() if err != nil { - logs.Println("[CACHE]UpdatePolicies: init storage failed: " + err.Error()) + logs.Error("CACHE", "UpdatePolicies: init storage failed: "+err.Error()) continue } this.storageMap[policy.Id] = storage diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index 0947769..094a6da 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -6,9 +6,9 @@ import ( "errors" "fmt" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "github.com/TeaOSLab/EdgeNode/internal/logs" "github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/iwind/TeaGo/Tea" - "github.com/iwind/TeaGo/logs" "github.com/iwind/TeaGo/types" stringutil "github.com/iwind/TeaGo/utils/string" "io" @@ -81,7 +81,7 @@ func (this *FileStorage) Init() error { } cost := time.Since(before).Seconds() * 1000 - logs.Println("[CACHE]init policy "+strconv.FormatInt(this.policy.Id, 10)+", cost: "+fmt.Sprintf("%.2f", cost)+" ms, count: "+strconv.Itoa(count)+", size: ", fmt.Sprintf("%.3f", float64(size)/1024/1024)+" M") + logs.Println("CACHE", "init policy "+strconv.FormatInt(this.policy.Id, 10)+", cost: "+fmt.Sprintf("%.2f", cost)+" ms, count: "+strconv.Itoa(count)+", size: "+fmt.Sprintf("%.3f", float64(size)/1024/1024)+" M") }() // 配置 @@ -545,7 +545,7 @@ func (this *FileStorage) initList() error { item, err := this.decodeFile(path) if err != nil { if err != ErrNotFound { - logs.Println("[CACHE]decode path '" + path + "': " + err.Error()) + logs.Error("CACHE", "decode path '"+path+"': "+err.Error()) } continue } @@ -634,7 +634,7 @@ func (this *FileStorage) purgeLoop() { path := this.hashPath(hash) err := os.Remove(path) if err != nil && !os.IsNotExist(err) { - logs.Println("[CACHE]purge '" + path + "' error: " + err.Error()) + logs.Error("CACHE", "purge '"+path+"' error: "+err.Error()) } }) } diff --git a/internal/logs/utils.go b/internal/logs/utils.go new file mode 100644 index 0000000..d9c8cec --- /dev/null +++ b/internal/logs/utils.go @@ -0,0 +1,117 @@ +package logs + +import ( + "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + teaconst "github.com/TeaOSLab/EdgeNode/internal/const" + "github.com/TeaOSLab/EdgeNode/internal/rpc" + "github.com/iwind/TeaGo/logs" + "time" +) + +var logChan = make(chan *pb.NodeLog, 1024) + +func init() { + // 定期上传日志 + ticker := time.NewTicker(60 * time.Second) + go func() { + for range ticker.C { + err := uploadLogs() + if err != nil { + logs.Println("[LOG]" + err.Error()) + } + } + }() +} + +// 打印普通信息 +func Println(tag string, description string) { + logs.Println("[" + tag + "]" + description) + + nodeConfig, _ := nodeconfigs.SharedNodeConfig() + if nodeConfig == nil { + return + } + + select { + case logChan <- &pb.NodeLog{ + Role: teaconst.Role, + Tag: tag, + Description: description, + Level: "info", + NodeId: nodeConfig.Id, + CreatedAt: time.Now().Unix(), + }: + default: + + } +} + +// 打印警告信息 +func Warn(tag string, description string) { + logs.Println("[" + tag + "]" + description) + + nodeConfig, _ := nodeconfigs.SharedNodeConfig() + if nodeConfig == nil { + return + } + + select { + case logChan <- &pb.NodeLog{ + Role: teaconst.Role, + Tag: tag, + Description: description, + Level: "warning", + NodeId: nodeConfig.Id, + CreatedAt: time.Now().Unix(), + }: + default: + + } +} + +// 打印错误信息 +func Error(tag string, description string) { + logs.Println("[" + tag + "]" + description) + + nodeConfig, _ := nodeconfigs.SharedNodeConfig() + if nodeConfig == nil { + return + } + + select { + case logChan <- &pb.NodeLog{ + Role: teaconst.Role, + Tag: tag, + Description: description, + Level: "error", + NodeId: nodeConfig.Id, + CreatedAt: time.Now().Unix(), + }: + default: + + } +} + +// 上传日志 +func uploadLogs() error { + logList := []*pb.NodeLog{} +Loop: + for { + select { + case log := <-logChan: + logList = append(logList, log) + default: + break Loop + } + } + if len(logList) == 0 { + return nil + } + rpcClient, err := rpc.SharedRPC() + if err != nil { + return err + } + _, err = rpcClient.NodeLogRPC().CreateNodeLogs(rpcClient.Context(), &pb.CreateNodeLogsRequest{NodeLogs: logList}) + return err +} diff --git a/internal/nodes/api_stream.go b/internal/nodes/api_stream.go index 6b9806d..4738a06 100644 --- a/internal/nodes/api_stream.go +++ b/internal/nodes/api_stream.go @@ -8,8 +8,8 @@ import ( "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeNode/internal/caches" "github.com/TeaOSLab/EdgeNode/internal/errors" + "github.com/TeaOSLab/EdgeNode/internal/logs" "github.com/TeaOSLab/EdgeNode/internal/rpc" - "github.com/iwind/TeaGo/logs" "io" "net/http" "strconv" @@ -30,7 +30,7 @@ func (this *APIStream) Start() { for { err := this.loop() if err != nil { - logs.Println("[API STREAM]" + err.Error()) + logs.Error("API_STREAM", err.Error()) time.Sleep(10 * time.Second) continue } @@ -74,7 +74,7 @@ func (this *APIStream) loop() error { err = this.handleUnknownMessage(message) } if err != nil { - logs.Println("[API STREAM]handle message failed: " + err.Error()) + logs.Error("API_STREAM", "handle message failed: "+err.Error()) } } } @@ -100,7 +100,7 @@ func (this *APIStream) handleConnectedAPINode(message *pb.NodeStreamMessage) err if err != nil { return errors.Wrap(err) } - logs.Println("[API STREAM]connected to api node '" + strconv.FormatInt(msg.APINodeId, 10) + "'") + logs.Println("API_STREAM", "connected to api node '"+strconv.FormatInt(msg.APINodeId, 10)+"'") return nil } diff --git a/internal/nodes/http_request.go b/internal/nodes/http_request.go index ef007d8..3cd685d 100644 --- a/internal/nodes/http_request.go +++ b/internal/nodes/http_request.go @@ -544,7 +544,7 @@ func (this *HTTPRequest) Format(source string) string { if prefix == "node" { switch suffix { case "id": - return sharedNodeConfig.Id + return strconv.FormatInt(sharedNodeConfig.Id, 10) case "name": return sharedNodeConfig.Name case "role": diff --git a/internal/nodes/http_request_cache.go b/internal/nodes/http_request_cache.go index a507ed9..04acd68 100644 --- a/internal/nodes/http_request_cache.go +++ b/internal/nodes/http_request_cache.go @@ -3,7 +3,7 @@ package nodes import ( "bytes" "github.com/TeaOSLab/EdgeNode/internal/caches" - "github.com/iwind/TeaGo/logs" + "github.com/TeaOSLab/EdgeNode/internal/logs" "github.com/iwind/TeaGo/types" "net/http" "strconv" @@ -139,7 +139,7 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) { return } - logs.Println("read from cache failed: " + err.Error()) + logs.Error("REQUEST_CACHE", "read from cache failed: "+err.Error()) return } diff --git a/internal/nodes/http_request_reverse_proxy.go b/internal/nodes/http_request_reverse_proxy.go index 0e83f58..998d6f5 100644 --- a/internal/nodes/http_request_reverse_proxy.go +++ b/internal/nodes/http_request_reverse_proxy.go @@ -4,8 +4,8 @@ import ( "context" "errors" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/shared" + "github.com/TeaOSLab/EdgeNode/internal/logs" "github.com/TeaOSLab/EdgeNode/internal/utils" - "github.com/iwind/TeaGo/logs" "io" "net/url" "strconv" @@ -30,7 +30,7 @@ func (this *HTTPRequest) doReverseProxy() { origin := this.reverseProxy.NextOrigin(requestCall) if origin == nil { err := errors.New(this.requestPath() + ": no available backends for reverse proxy") - logs.Error(err) + logs.Error("REQUEST_REVERSE_PROXY", err.Error()) this.write500(err) return } @@ -50,7 +50,7 @@ func (this *HTTPRequest) doReverseProxy() { // 处理Scheme if origin.Addr == nil { err := errors.New(this.requestPath() + ": origin '" + strconv.FormatInt(origin.Id, 10) + "' does not has a address") - logs.Error(err) + logs.Error("REQUEST_REVERSE_PROXY", err.Error()) this.write500(err) return } @@ -127,7 +127,7 @@ func (this *HTTPRequest) doReverseProxy() { // 获取请求客户端 client, addr, err := SharedHTTPClientPool.Client(this, origin) if err != nil { - logs.Error(err) + logs.Error("REQUEST_REVERSE_PROXY", err.Error()) this.write500(err) return } @@ -143,7 +143,7 @@ func (this *HTTPRequest) doReverseProxy() { // TODO 如果超过最大失败次数,则下线 this.write500(err) - logs.Println("[proxy]'" + this.RawReq.URL.String() + "': " + err.Error()) + logs.Println("REQUEST_REVERSE_PROXY", this.RawReq.URL.String()+"': "+err.Error()) } else { // 是否为客户端方面的错误 isClientError := false @@ -170,7 +170,7 @@ func (this *HTTPRequest) doReverseProxy() { if this.doWAFResponse(resp) { err = resp.Body.Close() if err != nil { - logs.Error(err) + logs.Error("REQUEST_REVERSE_PROXY", err.Error()) } return } @@ -182,7 +182,7 @@ func (this *HTTPRequest) doReverseProxy() { if len(this.web.Pages) > 0 && this.doPage(resp.StatusCode) { err = resp.Body.Close() if err != nil { - logs.Error(err) + logs.Error("REQUEST_REVERSE_PROXY", err.Error()) } return } @@ -237,11 +237,11 @@ func (this *HTTPRequest) doReverseProxy() { err1 := resp.Body.Close() if err1 != nil { - logs.Error(err1) + logs.Error("REQUEST_REVERSE_PROXY", err1.Error()) } if err != nil && err != io.EOF { - logs.Error(err) + logs.Error("REQUEST_REVERSE_PROXY", err.Error()) this.addError(err) } } diff --git a/internal/nodes/http_writer.go b/internal/nodes/http_writer.go index e558f3b..c494ce8 100644 --- a/internal/nodes/http_writer.go +++ b/internal/nodes/http_writer.go @@ -6,9 +6,9 @@ import ( "compress/gzip" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeNode/internal/caches" + "github.com/TeaOSLab/EdgeNode/internal/logs" "github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/iwind/TeaGo/lists" - "github.com/iwind/TeaGo/logs" "net" "net/http" "strings" @@ -115,7 +115,7 @@ func (this *HTTPWriter) Write(data []byte) (n int, err error) { if err != nil { _ = this.cacheWriter.Discard() this.cacheWriter = nil - logs.Println("write cache failed: " + err.Error()) + logs.Error("REQUEST_WRITER", "write cache failed: "+err.Error()) } } } else { @@ -127,7 +127,7 @@ func (this *HTTPWriter) Write(data []byte) (n int, err error) { if this.gzipBodyWriter != nil { _, err := this.gzipBodyWriter.Write(data) if err != nil { - logs.Error(err) + logs.Error("REQUEST_WRITER", err.Error()) } } else { this.body = append(this.body, data...) @@ -281,7 +281,7 @@ func (this *HTTPWriter) prepareGzip(size int64) { var err error = nil this.gzipWriter, err = gzip.NewWriterLevel(this.writer, int(this.gzipConfig.Level)) if err != nil { - logs.Error(err) + logs.Error("REQUEST_WRITER", err.Error()) return } @@ -290,7 +290,7 @@ func (this *HTTPWriter) prepareGzip(size int64) { this.gzipBodyBuffer = bytes.NewBuffer([]byte{}) this.gzipBodyWriter, err = gzip.NewWriterLevel(this.gzipBodyBuffer, int(this.gzipConfig.Level)) if err != nil { - logs.Error(err) + logs.Error("REQUEST_WRITER", err.Error()) } } @@ -357,7 +357,7 @@ func (this *HTTPWriter) prepareCache(size int64) { expiredAt := utils.UnixTime() + life cacheWriter, err := storage.Open(this.req.cacheKey, expiredAt) if err != nil { - logs.Println("write cache failed: " + err.Error()) + logs.Error("REQUEST_WRITER", "write cache failed: "+err.Error()) return } this.cacheWriter = cacheWriter @@ -369,7 +369,7 @@ func (this *HTTPWriter) prepareCache(size int64) { headerData := this.HeaderData() _, err = cacheWriter.Write(headerData) if err != nil { - logs.Println("write cache failed: " + err.Error()) + logs.Error("REQUEST_WRITER", "write cache failed: "+err.Error()) _ = this.cacheWriter.Discard() this.cacheWriter = nil return diff --git a/internal/nodes/listener.go b/internal/nodes/listener.go index 147b6ce..df1557e 100644 --- a/internal/nodes/listener.go +++ b/internal/nodes/listener.go @@ -4,7 +4,7 @@ import ( "context" "errors" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" - "github.com/iwind/TeaGo/logs" + "github.com/TeaOSLab/EdgeNode/internal/logs" "net" "sync" ) @@ -88,7 +88,7 @@ func (this *Listener) Listen() error { go func() { err := this.listener.Serve() if err != nil { - logs.Println("[LISTENER]" + err.Error()) + logs.Error("LISTENER", err.Error()) } }() diff --git a/internal/nodes/listener_http.go b/internal/nodes/listener_http.go index 247fd4e..cd62cb9 100644 --- a/internal/nodes/listener_http.go +++ b/internal/nodes/listener_http.go @@ -2,7 +2,7 @@ package nodes import ( "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" - "github.com/iwind/TeaGo/logs" + "github.com/TeaOSLab/EdgeNode/internal/logs" "golang.org/x/net/http2" "net" "net/http" @@ -53,7 +53,7 @@ func (this *HTTPListener) Serve() error { // support http/2 err := http2.ConfigureServer(this.httpServer, nil) if err != nil { - logs.Println("[HTTP_LISTENER]configure http2 error: " + err.Error()) + logs.Error("HTTP_LISTENER", "configure http2 error: "+err.Error()) } err = this.httpServer.ServeTLS(this.Listener, "", "") diff --git a/internal/nodes/listener_manager.go b/internal/nodes/listener_manager.go index daf7d6d..c066ef8 100644 --- a/internal/nodes/listener_manager.go +++ b/internal/nodes/listener_manager.go @@ -2,8 +2,8 @@ package nodes import ( "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" + "github.com/TeaOSLab/EdgeNode/internal/logs" "github.com/iwind/TeaGo/lists" - "github.com/iwind/TeaGo/logs" "net/url" "regexp" "sync" @@ -47,7 +47,7 @@ func (this *ListenerManager) Start(node *nodeconfigs.NodeConfig) error { availableServerGroups := node.AvailableGroups() if len(availableServerGroups) == 0 { - logs.Println("[LISTENER_MANAGER]no available servers to startup") + logs.Println("LISTENER_MANAGER", "no available servers to startup") } for _, group := range availableServerGroups { @@ -59,7 +59,7 @@ func (this *ListenerManager) Start(node *nodeconfigs.NodeConfig) error { for listenerKey, listener := range this.listenersMap { addr := listener.FullAddr() if !lists.ContainsString(groupAddrs, addr) { - logs.Println("[LISTENER_MANAGER]close '" + addr + "'") + logs.Println("LISTENER_MANAGER", "close '"+addr+"'") _ = listener.Close() delete(this.listenersMap, listenerKey) @@ -71,15 +71,15 @@ func (this *ListenerManager) Start(node *nodeconfigs.NodeConfig) error { addr := group.FullAddr() listener, ok := this.listenersMap[addr] if ok { - logs.Println("[LISTENER_MANAGER]reload '" + this.prettyAddress(addr) + "'") + logs.Println("LISTENER_MANAGER", "reload '"+this.prettyAddress(addr)+"'") listener.Reload(group) } else { - logs.Println("[LISTENER_MANAGER]listen '" + this.prettyAddress(addr) + "'") + logs.Println("LISTENER_MANAGER", "listen '"+this.prettyAddress(addr)+"'") listener = NewListener() listener.Reload(group) err := listener.Listen() if err != nil { - logs.Println("[LISTENER_MANAGER]" + err.Error()) + logs.Error("LISTENER_MANAGER", err.Error()) continue } this.listenersMap[addr] = listener diff --git a/internal/nodes/listener_tcp.go b/internal/nodes/listener_tcp.go index 429c5d2..fa49514 100644 --- a/internal/nodes/listener_tcp.go +++ b/internal/nodes/listener_tcp.go @@ -4,7 +4,7 @@ import ( "crypto/tls" "errors" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" - "github.com/iwind/TeaGo/logs" + "github.com/TeaOSLab/EdgeNode/internal/logs" "net" ) @@ -27,7 +27,7 @@ func (this *TCPListener) Serve() error { } err = this.handleConn(conn) if err != nil { - logs.Println("[TCP_LISTENER]" + err.Error()) + logs.Error("TCP_LISTENER", err.Error()) } } @@ -112,7 +112,7 @@ func (this *TCPListener) connectOrigin(reverseProxy *serverconfigs.ReverseProxyC } conn, err = OriginConnect(origin) if err != nil { - logs.Println("[TCP_LISTENER]unable to connect origin: " + origin.Addr.Host + ":" + origin.Addr.PortRange + ": " + err.Error()) + logs.Error("TCP_LISTENER", "unable to connect origin: "+origin.Addr.Host+":"+origin.Addr.PortRange+": "+err.Error()) continue } else { return diff --git a/internal/nodes/node.go b/internal/nodes/node.go index 8bfb0bd..d612cfc 100644 --- a/internal/nodes/node.go +++ b/internal/nodes/node.go @@ -6,9 +6,9 @@ import ( "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeNode/internal/caches" + "github.com/TeaOSLab/EdgeNode/internal/logs" "github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/utils" - "github.com/iwind/TeaGo/logs" "time" ) @@ -27,7 +27,7 @@ func (this *Node) Start() { // 读取API配置 err := this.syncConfig(false) if err != nil { - logs.Println(err.Error()) + logs.Error("NODE", err.Error()) } // 启动同步计时器 @@ -39,12 +39,12 @@ func (this *Node) Start() { // 读取配置 nodeConfig, err := nodeconfigs.SharedNodeConfig() if err != nil { - logs.Println("[NODE]start failed: read node config failed: " + err.Error()) + logs.Error("NODE", "start failed: read node config failed: "+err.Error()) return } err = nodeConfig.Init() if err != nil { - logs.Println("[NODE]init node config failed: " + err.Error()) + logs.Error("NODE", "init node config failed: "+err.Error()) return } sharedNodeConfig = nodeConfig @@ -58,7 +58,7 @@ func (this *Node) Start() { // 启动端口 err = sharedListenerManager.Start(nodeConfig) if err != nil { - logs.Println("[NODE]start failed: " + err.Error()) + logs.Error("NODE", "start failed: "+err.Error()) } // hold住进程 @@ -101,7 +101,7 @@ func (this *Node) syncConfig(isFirstTime bool) error { } // 刷新配置 - logs.Println("[NODE]reload config ...") + logs.Println("NODE", "reload config ...") nodeconfigs.ResetNodeConfig(nodeConfig) caches.SharedManager.UpdatePolicies(nodeConfig.AllCachePolicies()) sharedWAFManager.UpdatePolicies(nodeConfig.AllHTTPFirewallPolicies()) @@ -122,7 +122,7 @@ func (this *Node) startSyncTimer() { for range ticker.C { err := this.syncConfig(false) if err != nil { - logs.Println("[NODE]sync config error: " + err.Error()) + logs.Error("NODE", "sync config error: "+err.Error()) continue } } diff --git a/internal/nodes/node_status_executor.go b/internal/nodes/node_status_executor.go index 52b5039..1f8154a 100644 --- a/internal/nodes/node_status_executor.go +++ b/internal/nodes/node_status_executor.go @@ -4,9 +4,9 @@ import ( "encoding/json" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" teaconst "github.com/TeaOSLab/EdgeNode/internal/const" + "github.com/TeaOSLab/EdgeNode/internal/logs" "github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/iwind/TeaGo/lists" - "github.com/iwind/TeaGo/logs" "github.com/shirou/gopsutil/cpu" "github.com/shirou/gopsutil/disk" "os" @@ -62,19 +62,19 @@ func (this *NodeStatusExecutor) update() { // 发送数据 jsonData, err := json.Marshal(status) if err != nil { - logs.Println("[NODE]serial NodeStatus fail: " + err.Error()) + logs.Error("NODE_STATUS", "serial NodeStatus fail: "+err.Error()) return } rpcClient, err := rpc.SharedRPC() if err != nil { - logs.Println("[NODE]failed to open rpc: " + err.Error()) + logs.Error("NODE_STATUS", "failed to open rpc: "+err.Error()) return } _, err = rpcClient.NodeRPC().UpdateNodeStatus(rpcClient.Context(), &pb.UpdateNodeStatusRequest{ StatusJSON: jsonData, }) if err != nil { - logs.Println("[NODE]rpc UpdateNodeStatus() failed: " + err.Error()) + logs.Error("NODE_STATUS", "rpc UpdateNodeStatus() failed: "+err.Error()) return } } @@ -120,7 +120,7 @@ func (this *NodeStatusExecutor) updateCPU(status *NodeStatus) { func (this *NodeStatusExecutor) updateDisk(status *NodeStatus) { partitions, err := disk.Partitions(false) if err != nil { - logs.Error(err) + logs.Error("NODE_STATUS", err.Error()) return } lists.Sort(partitions, func(i int, j int) bool { diff --git a/internal/nodes/waf_manager.go b/internal/nodes/waf_manager.go index 5e1782a..57ab502 100644 --- a/internal/nodes/waf_manager.go +++ b/internal/nodes/waf_manager.go @@ -3,8 +3,8 @@ package nodes import ( "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/firewallconfigs" "github.com/TeaOSLab/EdgeNode/internal/errors" + "github.com/TeaOSLab/EdgeNode/internal/logs" "github.com/TeaOSLab/EdgeNode/internal/waf" - "github.com/iwind/TeaGo/logs" "strconv" "sync" ) @@ -33,7 +33,7 @@ func (this *WAFManager) UpdatePolicies(policies []*firewallconfigs.HTTPFirewallP for _, p := range policies { w, err := this.convertWAF(p) if err != nil { - logs.Println("[WAF]initialize policy '" + strconv.FormatInt(p.Id, 10) + "' failed: " + err.Error()) + logs.Error("WAF", "initialize policy '"+strconv.FormatInt(p.Id, 10)+"' failed: "+err.Error()) continue } if w == nil { diff --git a/internal/rpc/rpc_client.go b/internal/rpc/rpc_client.go index df7a838..e639152 100644 --- a/internal/rpc/rpc_client.go +++ b/internal/rpc/rpc_client.go @@ -48,6 +48,10 @@ func (this *RPCClient) NodeRPC() pb.NodeServiceClient { return pb.NewNodeServiceClient(this.pickConn()) } +func (this *RPCClient) NodeLogRPC() pb.NodeLogServiceClient { + return pb.NewNodeLogServiceClient(this.pickConn()) +} + func (this *RPCClient) Context() context.Context { ctx := context.Background() m := maps.Map{