diff --git a/cmd/edge-node/main.go b/cmd/edge-node/main.go index 8f3dffc..d5f7290 100644 --- a/cmd/edge-node/main.go +++ b/cmd/edge-node/main.go @@ -5,15 +5,19 @@ import ( "github.com/TeaOSLab/EdgeNode/internal/apps" teaconst "github.com/TeaOSLab/EdgeNode/internal/const" "github.com/TeaOSLab/EdgeNode/internal/nodes" + "github.com/iwind/TeaGo/Tea" _ "github.com/iwind/TeaGo/bootstrap" + "github.com/iwind/TeaGo/types" + "io/ioutil" "os" + "syscall" ) func main() { app := apps.NewAppCmd(). Version(teaconst.Version). Product(teaconst.ProductName). - Usage(teaconst.ProcessName + " [-v|start|stop|restart|sync|update|test]") + Usage(teaconst.ProcessName + " [-v|start|stop|restart|quit|sync|update|test]") app.On("test", func() { err := nodes.NewNode().Test() @@ -29,6 +33,27 @@ func main() { // TODO fmt.Println("not implemented yet") }) + app.On("quit", func() { + pidFile := Tea.Root + "/bin/pid" + data, err := ioutil.ReadFile(pidFile) + if err != nil { + fmt.Println("[ERROR]quit failed: " + err.Error()) + return + } + pid := types.Int(string(data)) + if pid == 0 { + fmt.Println("[ERROR]quit failed: pid=0") + return + } + + process, err := os.FindProcess(pid) + if err != nil { + return + } + if process != nil { + _ = process.Signal(syscall.SIGQUIT) + } + }) app.Run(func() { node := nodes.NewNode() node.Start() diff --git a/internal/apps/app_cmd.go b/internal/apps/app_cmd.go index 7e67151..4f69aae 100644 --- a/internal/apps/app_cmd.go +++ b/internal/apps/app_cmd.go @@ -150,9 +150,6 @@ func (this *AppCmd) Run(main func()) { return } - // 记录PID - _ = this.writePid() - // 日志 writer := new(LogWriter) writer.Init() @@ -227,8 +224,3 @@ func (this *AppCmd) runStatus() { func (this *AppCmd) checkPid() *os.Process { return CheckPid(Tea.Root + "/bin/pid") } - -// 写入PID -func (this *AppCmd) writePid() error { - return WritePid(Tea.Root + "/bin/pid") -} diff --git a/internal/apps/pid.go b/internal/apps/pid.go index d5076ee..6c57a34 100644 --- a/internal/apps/pid.go +++ b/internal/apps/pid.go @@ -2,6 +2,8 @@ package apps import ( "fmt" + "github.com/TeaOSLab/EdgeNode/internal/events" + "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/types" "io/ioutil" "os" @@ -50,11 +52,15 @@ func CheckPid(path string) *os.Process { } // 写入Pid -func WritePid(path string) error { +func WritePid() error { + path := Tea.Root + "/bin/pid" fp, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY|os.O_RDONLY, 0666) if err != nil { return err } + events.On(events.EventQuit, func() { + _ = fp.Close() + }) if runtime.GOOS != "windows" { err = LockFile(fp) diff --git a/internal/caches/storage_file.go b/internal/caches/storage_file.go index 094a6da..e74f666 100644 --- a/internal/caches/storage_file.go +++ b/internal/caches/storage_file.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/logs" "github.com/TeaOSLab/EdgeNode/internal/utils" "github.com/iwind/TeaGo/Tea" @@ -557,6 +558,13 @@ func (this *FileStorage) initList() error { // 启动定时清理任务 this.ticker = utils.NewTicker(30 * time.Second) + events.On(events.EventQuit, func() { + logs.Println("CACHE", "quit clean timer") + var ticker = this.ticker + if ticker != nil { + ticker.Stop() + } + }) go func() { for this.ticker.Next() { this.purgeLoop() diff --git a/internal/events/events.go b/internal/events/events.go new file mode 100644 index 0000000..7b57e2d --- /dev/null +++ b/internal/events/events.go @@ -0,0 +1,7 @@ +package events + +type Event = string + +const ( + EventQuit Event = "quit" // quit node gracefully +) diff --git a/internal/events/utils.go b/internal/events/utils.go new file mode 100644 index 0000000..8c45e95 --- /dev/null +++ b/internal/events/utils.go @@ -0,0 +1,27 @@ +package events + +import "sync" + +var eventsMap = map[string][]func(){} // event => []callbacks +var locker = sync.Mutex{} + +// 增加事件回调 +func On(event string, callback func()) { + locker.Lock() + defer locker.Unlock() + + callbacks, _ := eventsMap[event] + callbacks = append(callbacks, callback) + eventsMap[event] = callbacks +} + +// 通知事件 +func Notify(event string) { + locker.Lock() + callbacks, _ := eventsMap[event] + locker.Unlock() + + for _, callback := range callbacks { + callback() + } +} diff --git a/internal/events/utils_test.go b/internal/events/utils_test.go new file mode 100644 index 0000000..4cb41e3 --- /dev/null +++ b/internal/events/utils_test.go @@ -0,0 +1,16 @@ +package events + +import "testing" + +func TestOn(t *testing.T) { + On("hello", func() { + t.Log("world") + }) + On("hello", func() { + t.Log("world2") + }) + On("hello2", func() { + t.Log("world2") + }) + Notify("hello") +} diff --git a/internal/nodes/api_stream.go b/internal/nodes/api_stream.go index 898416d..6eaa19c 100644 --- a/internal/nodes/api_stream.go +++ b/internal/nodes/api_stream.go @@ -8,6 +8,7 @@ import ( "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeNode/internal/caches" "github.com/TeaOSLab/EdgeNode/internal/errors" + "github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/logs" "github.com/TeaOSLab/EdgeNode/internal/rpc" "io" @@ -27,7 +28,14 @@ func NewAPIStream() *APIStream { } func (this *APIStream) Start() { + isQuiting := false + events.On(events.EventQuit, func() { + isQuiting = true + }) for { + if isQuiting { + return + } err := this.loop() if err != nil { logs.Error("API_STREAM", err.Error()) @@ -43,15 +51,29 @@ func (this *APIStream) loop() error { if err != nil { return errors.Wrap(err) } + isQuiting := false + events.On(events.EventQuit, func() { + isQuiting = true + }) nodeStream, err := rpcClient.NodeRPC().NodeStream(rpcClient.Context()) if err != nil { + if isQuiting { + return nil + } return errors.Wrap(err) } this.stream = nodeStream for { + if isQuiting { + break + } + message, err := nodeStream.Recv() if err != nil { + if isQuiting { + return nil + } return errors.Wrap(err) } @@ -80,6 +102,8 @@ func (this *APIStream) loop() error { logs.Error("API_STREAM", "handle message failed: "+err.Error()) } } + + return nil } // 连接API节点成功 diff --git a/internal/nodes/listener.go b/internal/nodes/listener.go index df1557e..e00d1d0 100644 --- a/internal/nodes/listener.go +++ b/internal/nodes/listener.go @@ -4,6 +4,7 @@ import ( "context" "errors" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/logs" "net" "sync" @@ -47,6 +48,10 @@ func (this *Listener) Listen() error { if err != nil { return err } + events.On(events.EventQuit, func() { + logs.Println("LISTENER", "quit "+this.group.FullAddr()) + _ = netListener.Close() + }) switch protocol { case serverconfigs.ProtocolHTTP, serverconfigs.ProtocolHTTP4, serverconfigs.ProtocolHTTP6: @@ -88,6 +93,13 @@ func (this *Listener) Listen() error { go func() { err := this.listener.Serve() if err != nil { + // 在这里屏蔽accept错误,防止在优雅关闭的时候有多余的提示 + opErr, ok := err.(*net.OpError) + if ok && opErr.Op == "accept" { + return + } + + // 打印其他错误 logs.Error("LISTENER", err.Error()) } }() diff --git a/internal/nodes/listener_base.go b/internal/nodes/listener_base.go index 96b951b..e5f11ed 100644 --- a/internal/nodes/listener_base.go +++ b/internal/nodes/listener_base.go @@ -5,6 +5,7 @@ import ( "errors" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/sslconfigs" + "github.com/iwind/TeaGo/types" http2 "golang.org/x/net/http2" "sync" ) @@ -15,6 +16,8 @@ type BaseListener struct { namedServers map[string]*NamedServer // 域名 => server Group *serverconfigs.ServerGroup + + countActiveConnections int64 // 当前活跃的连接数 } // 初始化 @@ -29,6 +32,11 @@ func (this *BaseListener) Reset() { this.namedServersLocker.Unlock() } +// 获取当前活跃连接数 +func (this *BaseListener) CountActiveListeners() int { + return types.Int(this.countActiveConnections) +} + // 构造TLS配置 func (this *BaseListener) buildTLSConfig() *tls.Config { return &tls.Config{ diff --git a/internal/nodes/listener_http.go b/internal/nodes/listener_http.go index cd62cb9..19d1b07 100644 --- a/internal/nodes/listener_http.go +++ b/internal/nodes/listener_http.go @@ -7,6 +7,7 @@ import ( "net" "net/http" "strings" + "sync/atomic" "time" ) @@ -34,7 +35,15 @@ func (this *HTTPListener) Serve() error { this.httpServer = &http.Server{ Addr: this.addr, Handler: handler, - IdleTimeout: 2 * time.Minute, + IdleTimeout: 2 * time.Minute, // TODO IdleTimeout可以设置 + ConnState: func(conn net.Conn, state http.ConnState) { + switch state { + case http.StateNew: + atomic.AddInt64(&this.countActiveConnections, 1) + case http.StateClosed: + atomic.AddInt64(&this.countActiveConnections, -1) + } + }, } this.httpServer.SetKeepAlivesEnabled(true) diff --git a/internal/nodes/listener_interface.go b/internal/nodes/listener_interface.go index 861ccfc..01ae031 100644 --- a/internal/nodes/listener_interface.go +++ b/internal/nodes/listener_interface.go @@ -15,4 +15,7 @@ type ListenerInterface interface { // 重载配置 Reload(serverGroup *serverconfigs.ServerGroup) + + // 获取当前活跃的连接数 + CountActiveListeners() int } diff --git a/internal/nodes/listener_manager.go b/internal/nodes/listener_manager.go index f523420..9ab450e 100644 --- a/internal/nodes/listener_manager.go +++ b/internal/nodes/listener_manager.go @@ -93,6 +93,18 @@ func (this *ListenerManager) Start(node *nodeconfigs.NodeConfig) error { return nil } +// 获取总的活跃连接数 +func (this *ListenerManager) TotalActiveConnections() int { + this.locker.Lock() + defer this.locker.Unlock() + + total := 0 + for _, listener := range this.listenersMap { + total += listener.listener.CountActiveListeners() + } + return total +} + // 返回更加友好格式的地址 func (this *ListenerManager) prettyAddress(addr string) string { u, err := url.Parse(addr) diff --git a/internal/nodes/listener_tcp.go b/internal/nodes/listener_tcp.go index fa49514..feec68d 100644 --- a/internal/nodes/listener_tcp.go +++ b/internal/nodes/listener_tcp.go @@ -6,6 +6,7 @@ import ( "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeNode/internal/logs" "net" + "sync/atomic" ) type TCPListener struct { @@ -25,10 +26,16 @@ func (this *TCPListener) Serve() error { if err != nil { break } - err = this.handleConn(conn) - if err != nil { - logs.Error("TCP_LISTENER", err.Error()) - } + + atomic.AddInt64(&this.countActiveConnections, 1) + + go func(conn net.Conn) { + err = this.handleConn(conn) + if err != nil { + logs.Error("TCP_LISTENER", err.Error()) + } + atomic.AddInt64(&this.countActiveConnections, -1) + }(conn) } return nil diff --git a/internal/nodes/listener_udp.go b/internal/nodes/listener_udp.go index be05e70..574dd17 100644 --- a/internal/nodes/listener_udp.go +++ b/internal/nodes/listener_udp.go @@ -13,6 +13,7 @@ type UDPListener struct { func (this *UDPListener) Serve() error { // TODO + // TODO 注意管理 CountActiveConnections return nil } diff --git a/internal/nodes/listener_unix.go b/internal/nodes/listener_unix.go index 787c593..a2475af 100644 --- a/internal/nodes/listener_unix.go +++ b/internal/nodes/listener_unix.go @@ -13,6 +13,7 @@ type UnixListener struct { func (this *UnixListener) Serve() error { // TODO + // TODO 注意管理 CountActiveConnections return nil } diff --git a/internal/nodes/node.go b/internal/nodes/node.go index 0fb01c5..abe8645 100644 --- a/internal/nodes/node.go +++ b/internal/nodes/node.go @@ -5,8 +5,10 @@ import ( "errors" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/TeaOSLab/EdgeNode/internal/apps" "github.com/TeaOSLab/EdgeNode/internal/caches" "github.com/TeaOSLab/EdgeNode/internal/configs" + "github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/logs" "github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/TeaOSLab/EdgeNode/internal/utils" @@ -16,7 +18,9 @@ import ( "io/ioutil" "net" "os" + "os/signal" "runtime" + "syscall" "time" ) @@ -49,6 +53,9 @@ func (this *Node) Test() error { // 启动 func (this *Node) Start() { + // 处理信号 + this.listenSignals() + // 本地Sock err := this.listenSock() if err != nil { @@ -92,12 +99,46 @@ func (this *Node) Start() { err = sharedListenerManager.Start(nodeConfig) if err != nil { logs.Error("NODE", "start failed: "+err.Error()) + return + } + + // 写入PID + err = apps.WritePid() + if err != nil { + logs.Error("NODE", "write pid failed: "+err.Error()) + return } // hold住进程 select {} } +// 处理信号 +func (this *Node) listenSignals() { + signals := make(chan os.Signal) + signal.Notify(signals, syscall.SIGQUIT) + go func() { + for s := range signals { + switch s { + case syscall.SIGQUIT: + events.Notify(events.EventQuit) + + // 监控连接数,如果连接数为0,则退出进程 + go func() { + for { + countActiveConnections := sharedListenerManager.TotalActiveConnections() + if countActiveConnections <= 0 { + os.Exit(0) + return + } + time.Sleep(1 * time.Second) + } + }() + } + } + }() +} + // 读取API配置 func (this *Node) syncConfig(isFirstTime bool) error { // 检查api.yaml是否存在 @@ -180,6 +221,10 @@ func (this *Node) syncConfig(isFirstTime bool) error { func (this *Node) startSyncTimer() { // TODO 这个时间间隔可以自行设置 ticker := time.NewTicker(60 * time.Second) + events.On(events.EventQuit, func() { + logs.Println("NODE", "quit sync timer") + ticker.Stop() + }) go func() { for { select { @@ -272,6 +317,10 @@ func (this *Node) listenSock() error { if err != nil { return err } + events.On(events.EventQuit, func() { + logs.Println("NODE", "quit unix sock") + _ = listener.Close() + }) go func() { for { diff --git a/internal/nodes/node_status_executor.go b/internal/nodes/node_status_executor.go index ae460be..2bf1e6b 100644 --- a/internal/nodes/node_status_executor.go +++ b/internal/nodes/node_status_executor.go @@ -5,6 +5,7 @@ import ( "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" teaconst "github.com/TeaOSLab/EdgeNode/internal/const" + "github.com/TeaOSLab/EdgeNode/internal/events" "github.com/TeaOSLab/EdgeNode/internal/logs" "github.com/TeaOSLab/EdgeNode/internal/rpc" "github.com/iwind/TeaGo/lists" @@ -35,6 +36,12 @@ func (this *NodeStatusExecutor) Listen() { // TODO 这个时间间隔可以配置 ticker := time.NewTicker(30 * time.Second) + + events.On(events.EventQuit, func() { + logs.Println("NODE_STATUS", "quit executor") + ticker.Stop() + }) + for range ticker.C { this.isFirstTime = false this.update() @@ -48,6 +55,8 @@ func (this *NodeStatusExecutor) update() { status := &nodeconfigs.NodeStatus{} status.BuildVersion = teaconst.Version + status.OS = runtime.GOOS + status.Arch = runtime.GOARCH status.ConfigVersion = sharedNodeConfig.Version status.IsActive = true diff --git a/internal/rpc/rpc_client.go b/internal/rpc/rpc_client.go index 2b77a77..a31b6c1 100644 --- a/internal/rpc/rpc_client.go +++ b/internal/rpc/rpc_client.go @@ -122,6 +122,13 @@ func (this *RPCClient) ClusterContext(clusterId string, clusterSecret string) co return ctx } +// 关闭连接 +func (this *RPCClient) Close() { + for _, conn := range this.conns { + _ = conn.Close() + } +} + // 随机选择一个连接 func (this *RPCClient) pickConn() *grpc.ClientConn { if len(this.conns) == 0 {