From a9d31a2e35788caced5f0bc62c281426d2d10679 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E7=A5=A5=E8=B6=85?= Date: Wed, 18 May 2022 23:14:57 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0edge-node=20accesslog?= =?UTF-8?q?=E5=91=BD=E4=BB=A4=EF=BC=8C=E7=94=A8=E6=9D=A5=E5=9C=A8=E6=9C=AC?= =?UTF-8?q?=E5=9C=B0=E6=9F=A5=E7=9C=8B=E8=AE=BF=E9=97=AE=E6=97=A5=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/edge-node/main.go | 49 +++++++++- internal/const/build.go | 4 +- internal/const/const.go | 2 + internal/nodes/http_access_log_queue.go | 7 ++ internal/nodes/http_access_log_viewer.go | 116 +++++++++++++++++++++++ internal/nodes/node.go | 12 +++ 6 files changed, 187 insertions(+), 3 deletions(-) create mode 100644 internal/nodes/http_access_log_viewer.go diff --git a/cmd/edge-node/main.go b/cmd/edge-node/main.go index cf3cb5c..977ad36 100644 --- a/cmd/edge-node/main.go +++ b/cmd/edge-node/main.go @@ -22,7 +22,7 @@ func main() { app := apps.NewAppCmd(). Version(teaconst.Version). Product(teaconst.ProductName). - Usage(teaconst.ProcessName + " [-v|start|stop|restart|status|quit|test|reload|service|daemon|pprof]"). + Usage(teaconst.ProcessName + " [-v|start|stop|restart|status|quit|test|reload|service|daemon|pprof|accesslog]"). Usage(teaconst.ProcessName + " [trackers|goman|conns|gc]"). Usage(teaconst.ProcessName + " [ip.drop|ip.reject|ip.remove] IP") @@ -258,6 +258,53 @@ func main() { } } }) + app.On("accesslog", func() { + // local sock + var tmpDir = os.TempDir() + var sockFile = tmpDir + "/" + teaconst.AccessLogSockName + _, err := os.Stat(sockFile) + if err != nil { + if !os.IsNotExist(err) { + fmt.Println("[ERROR]" + err.Error()) + return + } + } + + var processSock = gosock.NewTmpSock(teaconst.ProcessName) + reply, err := processSock.Send(&gosock.Command{ + Code: "accesslog", + }) + if err != nil { + fmt.Println("[ERROR]" + err.Error()) + return + } + if reply.Code == "error" { + var errString = maps.NewMap(reply.Params).GetString("error") + if len(errString) > 0 { + fmt.Println("[ERROR]" + errString) + return + } + } + + conn, err := net.Dial("unix", sockFile) + if err != nil { + fmt.Println("[ERROR]start reading access log failed: " + err.Error()) + return + } + defer func() { + _ = conn.Close() + }() + var buf = make([]byte, 1024) + for { + n, err := conn.Read(buf) + if n > 0 { + fmt.Print(string(buf[:n])) + } + if err != nil { + break + } + } + }) app.Run(func() { node := nodes.NewNode() node.Start() diff --git a/internal/const/build.go b/internal/const/build.go index 3766039..4afd984 100644 --- a/internal/const/build.go +++ b/internal/const/build.go @@ -1,6 +1,6 @@ // Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. -//go:build community -// +build community +//go:build !plus +// +build !plus package teaconst diff --git a/internal/const/const.go b/internal/const/const.go index 767e3a0..6c3bbeb 100644 --- a/internal/const/const.go +++ b/internal/const/const.go @@ -13,4 +13,6 @@ const ( // SystemdServiceName systemd SystemdServiceName = "edge-node" + + AccessLogSockName = "edge-node.accesslog.sock" ) diff --git a/internal/nodes/http_access_log_queue.go b/internal/nodes/http_access_log_queue.go index 1f1fc18..a091035 100644 --- a/internal/nodes/http_access_log_queue.go +++ b/internal/nodes/http_access_log_queue.go @@ -80,6 +80,13 @@ Loop: return nil } + // 发送到本地 + if sharedHTTPAccessLogViewer.HasConns() { + for _, accessLog := range accessLogs { + sharedHTTPAccessLogViewer.Send(accessLog) + } + } + // 发送到API if this.rpcClient == nil { client, err := rpc.SharedRPC() diff --git a/internal/nodes/http_access_log_viewer.go b/internal/nodes/http_access_log_viewer.go new file mode 100644 index 0000000..a166640 --- /dev/null +++ b/internal/nodes/http_access_log_viewer.go @@ -0,0 +1,116 @@ +// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package nodes + +import ( + "fmt" + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + teaconst "github.com/TeaOSLab/EdgeNode/internal/const" + "github.com/TeaOSLab/EdgeNode/internal/remotelogs" + "github.com/iwind/TeaGo/types" + "net" + "os" + "sync" + "sync/atomic" +) + +var sharedHTTPAccessLogViewer = NewHTTPAccessLogViewer() + +// HTTPAccessLogViewer 本地访问日志浏览器 +type HTTPAccessLogViewer struct { + sockFile string + + listener net.Listener + connMap map[int64]net.Conn // connId => net.Conn + connId int64 + locker sync.Mutex +} + +// NewHTTPAccessLogViewer 获取新对象 +func NewHTTPAccessLogViewer() *HTTPAccessLogViewer { + return &HTTPAccessLogViewer{ + sockFile: os.TempDir() + "/" + teaconst.AccessLogSockName, + connMap: map[int64]net.Conn{}, + } +} + +// Start 启动 +func (this *HTTPAccessLogViewer) Start() error { + this.locker.Lock() + defer this.locker.Unlock() + + if this.listener == nil { + // remove if exists + _ = os.Remove(this.sockFile) + + // start listening + listener, err := net.Listen("unix", this.sockFile) + if err != nil { + return err + } + this.listener = listener + + go func() { + for { + conn, err := this.listener.Accept() + if err != nil { + remotelogs.Error("ACCESS_LOG", "start local reading failed: "+err.Error()) + break + } + + this.locker.Lock() + var connId = this.nextConnId() + this.connMap[connId] = conn + go func() { + this.startReading(conn, connId) + }() + this.locker.Unlock() + } + }() + } + + return nil +} + +// HasConns 检查是否有连接 +func (this *HTTPAccessLogViewer) HasConns() bool { + this.locker.Lock() + defer this.locker.Unlock() + return len(this.connMap) > 0 +} + +// Send 发送日志 +func (this *HTTPAccessLogViewer) Send(accessLog *pb.HTTPAccessLog) { + var conns = []net.Conn{} + this.locker.Lock() + for _, conn := range this.connMap { + conns = append(conns, conn) + } + this.locker.Unlock() + + if len(conns) == 0 { + return + } + + for _, conn := range conns { + // ignore error + _, _ = conn.Write([]byte(accessLog.RemoteAddr + " [" + accessLog.TimeLocal + "] \"" + accessLog.RequestMethod + " " + accessLog.Scheme + "://" + accessLog.Host + accessLog.RequestURI + " " + accessLog.Proto + "\" " + types.String(accessLog.Status) + " - " + fmt.Sprintf("%.2fms", accessLog.RequestTime*1000) + "\n")) + } +} + +func (this *HTTPAccessLogViewer) nextConnId() int64 { + return atomic.AddInt64(&this.connId, 1) +} + +func (this *HTTPAccessLogViewer) startReading(conn net.Conn, connId int64) { + var buf = make([]byte, 1024) + for { + _, err := conn.Read(buf) + if err != nil { + this.locker.Lock() + delete(this.connMap, connId) + this.locker.Unlock() + break + } + } +} diff --git a/internal/nodes/node.go b/internal/nodes/node.go index 046a20a..753519a 100644 --- a/internal/nodes/node.go +++ b/internal/nodes/node.go @@ -820,6 +820,18 @@ func (this *Node) listenSock() error { } else { _ = cmd.ReplyOk() } + case "accesslog": + err := sharedHTTPAccessLogViewer.Start() + if err != nil { + _ = cmd.Reply(&gosock.Command{ + Code: "error", + Params: map[string]interface{}{ + "message": "start failed: " + err.Error(), + }, + }) + } else { + _ = cmd.ReplyOk() + } } })