mirror of
https://github.com/TeaOSLab/EdgeNode.git
synced 2025-11-03 15:00:26 +08:00
优化节点进程退出逻辑
This commit is contained in:
@@ -6,7 +6,6 @@ import (
|
||||
"github.com/TeaOSLab/EdgeNode/internal/events"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
|
||||
"github.com/iwind/TeaGo/lists"
|
||||
"github.com/iwind/TeaGo/logs"
|
||||
"github.com/iwind/TeaGo/types"
|
||||
"strconv"
|
||||
"sync"
|
||||
@@ -16,7 +15,7 @@ var SharedManager = NewManager()
|
||||
|
||||
func init() {
|
||||
events.On(events.EventQuit, func() {
|
||||
logs.Println("CACHE", "quiting cache manager")
|
||||
remotelogs.Println("CACHE", "quiting cache manager")
|
||||
SharedManager.UpdatePolicies([]*serverconfigs.HTTPCachePolicy{})
|
||||
})
|
||||
}
|
||||
|
||||
@@ -18,7 +18,8 @@ import (
|
||||
type IPListDB struct {
|
||||
db *sql.DB
|
||||
|
||||
itemTableName string
|
||||
itemTableName string
|
||||
|
||||
deleteExpiredItemsStmt *sql.Stmt
|
||||
deleteItemStmt *sql.Stmt
|
||||
insertItemStmt *sql.Stmt
|
||||
@@ -53,7 +54,11 @@ func (this *IPListDB) init() error {
|
||||
remotelogs.Println("IP_LIST_DB", "create data dir '"+this.dir+"'")
|
||||
}
|
||||
|
||||
db, err := sql.Open("sqlite3", "file:"+this.dir+"/ip_list.db?cache=shared&mode=rwc&_journal_mode=WAL&_sync=OFF")
|
||||
var path = this.dir + "/ip_list.db"
|
||||
_ = os.Remove(path + "-shm")
|
||||
_ = os.Remove(path + "-wal")
|
||||
|
||||
db, err := sql.Open("sqlite3", "file:"+path+"?cache=shared&mode=rwc&_journal_mode=WAL&_sync=OFF")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@ func TestIPListDB_AddItem(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = db.AddItem(&pb.IPItem{
|
||||
Id: 1,
|
||||
IpFrom: "192.168.1.101",
|
||||
@@ -45,6 +46,12 @@ func TestIPListDB_AddItem(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = db.Close()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
t.Log("ok")
|
||||
}
|
||||
|
||||
|
||||
@@ -1,26 +0,0 @@
|
||||
package iplibrary
|
||||
|
||||
import (
|
||||
_ "github.com/iwind/TeaGo/bootstrap"
|
||||
"github.com/iwind/TeaGo/dbs"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestManager_Load(t *testing.T) {
|
||||
dbs.NotifyReady()
|
||||
|
||||
manager := NewManager()
|
||||
lib, err := manager.Load()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Log(lib.Lookup("1.2.3.4"))
|
||||
t.Log(lib.Lookup("2.3.4.5"))
|
||||
t.Log(lib.Lookup("200.200.200.200"))
|
||||
t.Log(lib.Lookup("202.106.0.20"))
|
||||
}
|
||||
|
||||
func TestNewManager(t *testing.T) {
|
||||
dbs.NotifyReady()
|
||||
t.Log(SharedLibrary)
|
||||
}
|
||||
@@ -4,6 +4,7 @@ package metrics
|
||||
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/events"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
|
||||
"strconv"
|
||||
"sync"
|
||||
@@ -11,7 +12,15 @@ import (
|
||||
|
||||
var SharedManager = NewManager()
|
||||
|
||||
func init() {
|
||||
events.On(events.EventQuit, func() {
|
||||
SharedManager.Quit()
|
||||
})
|
||||
}
|
||||
|
||||
type Manager struct {
|
||||
isQuiting bool
|
||||
|
||||
tasks map[int64]*Task // itemId => *Task
|
||||
categoryTasks map[string][]*Task // category => []*Task
|
||||
locker sync.RWMutex
|
||||
@@ -29,6 +38,10 @@ func NewManager() *Manager {
|
||||
}
|
||||
|
||||
func (this *Manager) Update(items []*serverconfigs.MetricItemConfig) {
|
||||
if this.isQuiting {
|
||||
return
|
||||
}
|
||||
|
||||
this.locker.Lock()
|
||||
defer this.locker.Unlock()
|
||||
|
||||
@@ -101,6 +114,10 @@ func (this *Manager) Update(items []*serverconfigs.MetricItemConfig) {
|
||||
|
||||
// Add 添加数据
|
||||
func (this *Manager) Add(obj MetricInterface) {
|
||||
if this.isQuiting {
|
||||
return
|
||||
}
|
||||
|
||||
this.locker.RLock()
|
||||
for _, task := range this.categoryTasks[obj.MetricCategory()] {
|
||||
task.Add(obj)
|
||||
@@ -119,3 +136,17 @@ func (this *Manager) HasTCPMetrics() bool {
|
||||
func (this *Manager) HasUDPMetrics() bool {
|
||||
return this.hasUDPMetrics
|
||||
}
|
||||
|
||||
// Quit 退出管理器
|
||||
func (this *Manager) Quit() {
|
||||
this.isQuiting = true
|
||||
|
||||
remotelogs.Println("METRIC_MANAGER", "quit")
|
||||
|
||||
this.locker.Lock()
|
||||
for _, task := range this.tasks {
|
||||
_ = task.Stop()
|
||||
}
|
||||
this.tasks = map[int64]*Task{}
|
||||
this.locker.Unlock()
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@ import (
|
||||
"github.com/TeaOSLab/EdgeNode/internal/utils/dbs"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/zero"
|
||||
"github.com/iwind/TeaGo/Tea"
|
||||
"github.com/iwind/TeaGo/types"
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
"os"
|
||||
"strconv"
|
||||
@@ -89,7 +90,11 @@ func (this *Task) Init() error {
|
||||
remotelogs.Println("METRIC", "create data dir '"+dir+"'")
|
||||
}
|
||||
|
||||
db, err := sql.Open("sqlite3", "file:"+dir+"/metric."+strconv.FormatInt(this.item.Id, 10)+".db?cache=shared&mode=rwc&_journal_mode=WAL&_sync=OFF")
|
||||
var path = dir + "/metric." + types.String(this.item.Id) + ".db"
|
||||
_ = os.Remove(path + "-shm")
|
||||
_ = os.Remove(path + "-wal")
|
||||
|
||||
db, err := sql.Open("sqlite3", "file:"+path+"?cache=shared&mode=rwc&_journal_mode=WAL&_sync=OFF")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -894,7 +894,7 @@ func (this *Node) listenSock() error {
|
||||
})
|
||||
|
||||
events.OnKey(events.EventQuit, this, func() {
|
||||
logs.Println("NODE", "quit unix sock")
|
||||
remotelogs.Println("NODE", "quit unix sock")
|
||||
_ = this.sock.Close()
|
||||
})
|
||||
|
||||
|
||||
Reference in New Issue
Block a user