mirror of
https://github.com/TeaOSLab/EdgeNode.git
synced 2025-11-05 00:34:01 +08:00
支持优雅退出
This commit is contained in:
@@ -5,15 +5,19 @@ import (
|
||||
"github.com/TeaOSLab/EdgeNode/internal/apps"
|
||||
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/nodes"
|
||||
"github.com/iwind/TeaGo/Tea"
|
||||
_ "github.com/iwind/TeaGo/bootstrap"
|
||||
"github.com/iwind/TeaGo/types"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
func main() {
|
||||
app := apps.NewAppCmd().
|
||||
Version(teaconst.Version).
|
||||
Product(teaconst.ProductName).
|
||||
Usage(teaconst.ProcessName + " [-v|start|stop|restart|sync|update|test]")
|
||||
Usage(teaconst.ProcessName + " [-v|start|stop|restart|quit|sync|update|test]")
|
||||
|
||||
app.On("test", func() {
|
||||
err := nodes.NewNode().Test()
|
||||
@@ -29,6 +33,27 @@ func main() {
|
||||
// TODO
|
||||
fmt.Println("not implemented yet")
|
||||
})
|
||||
app.On("quit", func() {
|
||||
pidFile := Tea.Root + "/bin/pid"
|
||||
data, err := ioutil.ReadFile(pidFile)
|
||||
if err != nil {
|
||||
fmt.Println("[ERROR]quit failed: " + err.Error())
|
||||
return
|
||||
}
|
||||
pid := types.Int(string(data))
|
||||
if pid == 0 {
|
||||
fmt.Println("[ERROR]quit failed: pid=0")
|
||||
return
|
||||
}
|
||||
|
||||
process, err := os.FindProcess(pid)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if process != nil {
|
||||
_ = process.Signal(syscall.SIGQUIT)
|
||||
}
|
||||
})
|
||||
app.Run(func() {
|
||||
node := nodes.NewNode()
|
||||
node.Start()
|
||||
|
||||
@@ -150,9 +150,6 @@ func (this *AppCmd) Run(main func()) {
|
||||
return
|
||||
}
|
||||
|
||||
// 记录PID
|
||||
_ = this.writePid()
|
||||
|
||||
// 日志
|
||||
writer := new(LogWriter)
|
||||
writer.Init()
|
||||
@@ -227,8 +224,3 @@ func (this *AppCmd) runStatus() {
|
||||
func (this *AppCmd) checkPid() *os.Process {
|
||||
return CheckPid(Tea.Root + "/bin/pid")
|
||||
}
|
||||
|
||||
// 写入PID
|
||||
func (this *AppCmd) writePid() error {
|
||||
return WritePid(Tea.Root + "/bin/pid")
|
||||
}
|
||||
|
||||
@@ -2,6 +2,8 @@ package apps
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/events"
|
||||
"github.com/iwind/TeaGo/Tea"
|
||||
"github.com/iwind/TeaGo/types"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
@@ -50,11 +52,15 @@ func CheckPid(path string) *os.Process {
|
||||
}
|
||||
|
||||
// 写入Pid
|
||||
func WritePid(path string) error {
|
||||
func WritePid() error {
|
||||
path := Tea.Root + "/bin/pid"
|
||||
fp, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY|os.O_RDONLY, 0666)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
events.On(events.EventQuit, func() {
|
||||
_ = fp.Close()
|
||||
})
|
||||
|
||||
if runtime.GOOS != "windows" {
|
||||
err = LockFile(fp)
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/events"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/logs"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/utils"
|
||||
"github.com/iwind/TeaGo/Tea"
|
||||
@@ -557,6 +558,13 @@ func (this *FileStorage) initList() error {
|
||||
|
||||
// 启动定时清理任务
|
||||
this.ticker = utils.NewTicker(30 * time.Second)
|
||||
events.On(events.EventQuit, func() {
|
||||
logs.Println("CACHE", "quit clean timer")
|
||||
var ticker = this.ticker
|
||||
if ticker != nil {
|
||||
ticker.Stop()
|
||||
}
|
||||
})
|
||||
go func() {
|
||||
for this.ticker.Next() {
|
||||
this.purgeLoop()
|
||||
|
||||
7
internal/events/events.go
Normal file
7
internal/events/events.go
Normal file
@@ -0,0 +1,7 @@
|
||||
package events
|
||||
|
||||
type Event = string
|
||||
|
||||
const (
|
||||
EventQuit Event = "quit" // quit node gracefully
|
||||
)
|
||||
27
internal/events/utils.go
Normal file
27
internal/events/utils.go
Normal file
@@ -0,0 +1,27 @@
|
||||
package events
|
||||
|
||||
import "sync"
|
||||
|
||||
var eventsMap = map[string][]func(){} // event => []callbacks
|
||||
var locker = sync.Mutex{}
|
||||
|
||||
// 增加事件回调
|
||||
func On(event string, callback func()) {
|
||||
locker.Lock()
|
||||
defer locker.Unlock()
|
||||
|
||||
callbacks, _ := eventsMap[event]
|
||||
callbacks = append(callbacks, callback)
|
||||
eventsMap[event] = callbacks
|
||||
}
|
||||
|
||||
// 通知事件
|
||||
func Notify(event string) {
|
||||
locker.Lock()
|
||||
callbacks, _ := eventsMap[event]
|
||||
locker.Unlock()
|
||||
|
||||
for _, callback := range callbacks {
|
||||
callback()
|
||||
}
|
||||
}
|
||||
16
internal/events/utils_test.go
Normal file
16
internal/events/utils_test.go
Normal file
@@ -0,0 +1,16 @@
|
||||
package events
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestOn(t *testing.T) {
|
||||
On("hello", func() {
|
||||
t.Log("world")
|
||||
})
|
||||
On("hello", func() {
|
||||
t.Log("world2")
|
||||
})
|
||||
On("hello2", func() {
|
||||
t.Log("world2")
|
||||
})
|
||||
Notify("hello")
|
||||
}
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/caches"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/errors"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/events"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/logs"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/rpc"
|
||||
"io"
|
||||
@@ -27,7 +28,14 @@ func NewAPIStream() *APIStream {
|
||||
}
|
||||
|
||||
func (this *APIStream) Start() {
|
||||
isQuiting := false
|
||||
events.On(events.EventQuit, func() {
|
||||
isQuiting = true
|
||||
})
|
||||
for {
|
||||
if isQuiting {
|
||||
return
|
||||
}
|
||||
err := this.loop()
|
||||
if err != nil {
|
||||
logs.Error("API_STREAM", err.Error())
|
||||
@@ -43,15 +51,29 @@ func (this *APIStream) loop() error {
|
||||
if err != nil {
|
||||
return errors.Wrap(err)
|
||||
}
|
||||
isQuiting := false
|
||||
events.On(events.EventQuit, func() {
|
||||
isQuiting = true
|
||||
})
|
||||
nodeStream, err := rpcClient.NodeRPC().NodeStream(rpcClient.Context())
|
||||
if err != nil {
|
||||
if isQuiting {
|
||||
return nil
|
||||
}
|
||||
return errors.Wrap(err)
|
||||
}
|
||||
this.stream = nodeStream
|
||||
|
||||
for {
|
||||
if isQuiting {
|
||||
break
|
||||
}
|
||||
|
||||
message, err := nodeStream.Recv()
|
||||
if err != nil {
|
||||
if isQuiting {
|
||||
return nil
|
||||
}
|
||||
return errors.Wrap(err)
|
||||
}
|
||||
|
||||
@@ -80,6 +102,8 @@ func (this *APIStream) loop() error {
|
||||
logs.Error("API_STREAM", "handle message failed: "+err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// 连接API节点成功
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/events"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/logs"
|
||||
"net"
|
||||
"sync"
|
||||
@@ -47,6 +48,10 @@ func (this *Listener) Listen() error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
events.On(events.EventQuit, func() {
|
||||
logs.Println("LISTENER", "quit "+this.group.FullAddr())
|
||||
_ = netListener.Close()
|
||||
})
|
||||
|
||||
switch protocol {
|
||||
case serverconfigs.ProtocolHTTP, serverconfigs.ProtocolHTTP4, serverconfigs.ProtocolHTTP6:
|
||||
@@ -88,6 +93,13 @@ func (this *Listener) Listen() error {
|
||||
go func() {
|
||||
err := this.listener.Serve()
|
||||
if err != nil {
|
||||
// 在这里屏蔽accept错误,防止在优雅关闭的时候有多余的提示
|
||||
opErr, ok := err.(*net.OpError)
|
||||
if ok && opErr.Op == "accept" {
|
||||
return
|
||||
}
|
||||
|
||||
// 打印其他错误
|
||||
logs.Error("LISTENER", err.Error())
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"errors"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/sslconfigs"
|
||||
"github.com/iwind/TeaGo/types"
|
||||
http2 "golang.org/x/net/http2"
|
||||
"sync"
|
||||
)
|
||||
@@ -15,6 +16,8 @@ type BaseListener struct {
|
||||
namedServers map[string]*NamedServer // 域名 => server
|
||||
|
||||
Group *serverconfigs.ServerGroup
|
||||
|
||||
countActiveConnections int64 // 当前活跃的连接数
|
||||
}
|
||||
|
||||
// 初始化
|
||||
@@ -29,6 +32,11 @@ func (this *BaseListener) Reset() {
|
||||
this.namedServersLocker.Unlock()
|
||||
}
|
||||
|
||||
// 获取当前活跃连接数
|
||||
func (this *BaseListener) CountActiveListeners() int {
|
||||
return types.Int(this.countActiveConnections)
|
||||
}
|
||||
|
||||
// 构造TLS配置
|
||||
func (this *BaseListener) buildTLSConfig() *tls.Config {
|
||||
return &tls.Config{
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"net"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -34,7 +35,15 @@ func (this *HTTPListener) Serve() error {
|
||||
this.httpServer = &http.Server{
|
||||
Addr: this.addr,
|
||||
Handler: handler,
|
||||
IdleTimeout: 2 * time.Minute,
|
||||
IdleTimeout: 2 * time.Minute, // TODO IdleTimeout可以设置
|
||||
ConnState: func(conn net.Conn, state http.ConnState) {
|
||||
switch state {
|
||||
case http.StateNew:
|
||||
atomic.AddInt64(&this.countActiveConnections, 1)
|
||||
case http.StateClosed:
|
||||
atomic.AddInt64(&this.countActiveConnections, -1)
|
||||
}
|
||||
},
|
||||
}
|
||||
this.httpServer.SetKeepAlivesEnabled(true)
|
||||
|
||||
|
||||
@@ -15,4 +15,7 @@ type ListenerInterface interface {
|
||||
|
||||
// 重载配置
|
||||
Reload(serverGroup *serverconfigs.ServerGroup)
|
||||
|
||||
// 获取当前活跃的连接数
|
||||
CountActiveListeners() int
|
||||
}
|
||||
|
||||
@@ -93,6 +93,18 @@ func (this *ListenerManager) Start(node *nodeconfigs.NodeConfig) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 获取总的活跃连接数
|
||||
func (this *ListenerManager) TotalActiveConnections() int {
|
||||
this.locker.Lock()
|
||||
defer this.locker.Unlock()
|
||||
|
||||
total := 0
|
||||
for _, listener := range this.listenersMap {
|
||||
total += listener.listener.CountActiveListeners()
|
||||
}
|
||||
return total
|
||||
}
|
||||
|
||||
// 返回更加友好格式的地址
|
||||
func (this *ListenerManager) prettyAddress(addr string) string {
|
||||
u, err := url.Parse(addr)
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/logs"
|
||||
"net"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
type TCPListener struct {
|
||||
@@ -25,10 +26,16 @@ func (this *TCPListener) Serve() error {
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
err = this.handleConn(conn)
|
||||
if err != nil {
|
||||
logs.Error("TCP_LISTENER", err.Error())
|
||||
}
|
||||
|
||||
atomic.AddInt64(&this.countActiveConnections, 1)
|
||||
|
||||
go func(conn net.Conn) {
|
||||
err = this.handleConn(conn)
|
||||
if err != nil {
|
||||
logs.Error("TCP_LISTENER", err.Error())
|
||||
}
|
||||
atomic.AddInt64(&this.countActiveConnections, -1)
|
||||
}(conn)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@@ -13,6 +13,7 @@ type UDPListener struct {
|
||||
|
||||
func (this *UDPListener) Serve() error {
|
||||
// TODO
|
||||
// TODO 注意管理 CountActiveConnections
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@ type UnixListener struct {
|
||||
|
||||
func (this *UnixListener) Serve() error {
|
||||
// TODO
|
||||
// TODO 注意管理 CountActiveConnections
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -5,8 +5,10 @@ import (
|
||||
"errors"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/apps"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/caches"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/configs"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/events"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/logs"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/rpc"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/utils"
|
||||
@@ -16,7 +18,9 @@ import (
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"os"
|
||||
"os/signal"
|
||||
"runtime"
|
||||
"syscall"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -49,6 +53,9 @@ func (this *Node) Test() error {
|
||||
|
||||
// 启动
|
||||
func (this *Node) Start() {
|
||||
// 处理信号
|
||||
this.listenSignals()
|
||||
|
||||
// 本地Sock
|
||||
err := this.listenSock()
|
||||
if err != nil {
|
||||
@@ -92,12 +99,46 @@ func (this *Node) Start() {
|
||||
err = sharedListenerManager.Start(nodeConfig)
|
||||
if err != nil {
|
||||
logs.Error("NODE", "start failed: "+err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
// 写入PID
|
||||
err = apps.WritePid()
|
||||
if err != nil {
|
||||
logs.Error("NODE", "write pid failed: "+err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
// hold住进程
|
||||
select {}
|
||||
}
|
||||
|
||||
// 处理信号
|
||||
func (this *Node) listenSignals() {
|
||||
signals := make(chan os.Signal)
|
||||
signal.Notify(signals, syscall.SIGQUIT)
|
||||
go func() {
|
||||
for s := range signals {
|
||||
switch s {
|
||||
case syscall.SIGQUIT:
|
||||
events.Notify(events.EventQuit)
|
||||
|
||||
// 监控连接数,如果连接数为0,则退出进程
|
||||
go func() {
|
||||
for {
|
||||
countActiveConnections := sharedListenerManager.TotalActiveConnections()
|
||||
if countActiveConnections <= 0 {
|
||||
os.Exit(0)
|
||||
return
|
||||
}
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// 读取API配置
|
||||
func (this *Node) syncConfig(isFirstTime bool) error {
|
||||
// 检查api.yaml是否存在
|
||||
@@ -180,6 +221,10 @@ func (this *Node) syncConfig(isFirstTime bool) error {
|
||||
func (this *Node) startSyncTimer() {
|
||||
// TODO 这个时间间隔可以自行设置
|
||||
ticker := time.NewTicker(60 * time.Second)
|
||||
events.On(events.EventQuit, func() {
|
||||
logs.Println("NODE", "quit sync timer")
|
||||
ticker.Stop()
|
||||
})
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
@@ -272,6 +317,10 @@ func (this *Node) listenSock() error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
events.On(events.EventQuit, func() {
|
||||
logs.Println("NODE", "quit unix sock")
|
||||
_ = listener.Close()
|
||||
})
|
||||
|
||||
go func() {
|
||||
for {
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/events"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/logs"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/rpc"
|
||||
"github.com/iwind/TeaGo/lists"
|
||||
@@ -35,6 +36,12 @@ func (this *NodeStatusExecutor) Listen() {
|
||||
|
||||
// TODO 这个时间间隔可以配置
|
||||
ticker := time.NewTicker(30 * time.Second)
|
||||
|
||||
events.On(events.EventQuit, func() {
|
||||
logs.Println("NODE_STATUS", "quit executor")
|
||||
ticker.Stop()
|
||||
})
|
||||
|
||||
for range ticker.C {
|
||||
this.isFirstTime = false
|
||||
this.update()
|
||||
@@ -48,6 +55,8 @@ func (this *NodeStatusExecutor) update() {
|
||||
|
||||
status := &nodeconfigs.NodeStatus{}
|
||||
status.BuildVersion = teaconst.Version
|
||||
status.OS = runtime.GOOS
|
||||
status.Arch = runtime.GOARCH
|
||||
status.ConfigVersion = sharedNodeConfig.Version
|
||||
status.IsActive = true
|
||||
|
||||
|
||||
@@ -122,6 +122,13 @@ func (this *RPCClient) ClusterContext(clusterId string, clusterSecret string) co
|
||||
return ctx
|
||||
}
|
||||
|
||||
// 关闭连接
|
||||
func (this *RPCClient) Close() {
|
||||
for _, conn := range this.conns {
|
||||
_ = conn.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// 随机选择一个连接
|
||||
func (this *RPCClient) pickConn() *grpc.ClientConn {
|
||||
if len(this.conns) == 0 {
|
||||
|
||||
Reference in New Issue
Block a user