mirror of
https://github.com/TeaOSLab/EdgeNode.git
synced 2025-11-03 23:20:25 +08:00
1247 lines
32 KiB
Go
1247 lines
32 KiB
Go
package nodes
|
||
|
||
import (
|
||
"bytes"
|
||
"context"
|
||
"encoding/json"
|
||
"errors"
|
||
"fmt"
|
||
"github.com/TeaOSLab/EdgeCommon/pkg/configutils"
|
||
iplib "github.com/TeaOSLab/EdgeCommon/pkg/iplibrary"
|
||
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
|
||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/firewallconfigs"
|
||
"github.com/TeaOSLab/EdgeNode/internal/caches"
|
||
"github.com/TeaOSLab/EdgeNode/internal/configs"
|
||
"github.com/TeaOSLab/EdgeNode/internal/conns"
|
||
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
|
||
"github.com/TeaOSLab/EdgeNode/internal/events"
|
||
"github.com/TeaOSLab/EdgeNode/internal/firewalls"
|
||
"github.com/TeaOSLab/EdgeNode/internal/goman"
|
||
"github.com/TeaOSLab/EdgeNode/internal/iplibrary"
|
||
"github.com/TeaOSLab/EdgeNode/internal/metrics"
|
||
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
|
||
"github.com/TeaOSLab/EdgeNode/internal/rpc"
|
||
"github.com/TeaOSLab/EdgeNode/internal/stats"
|
||
"github.com/TeaOSLab/EdgeNode/internal/trackers"
|
||
"github.com/TeaOSLab/EdgeNode/internal/utils"
|
||
_ "github.com/TeaOSLab/EdgeNode/internal/utils/agents" // 引入Agent管理器
|
||
_ "github.com/TeaOSLab/EdgeNode/internal/utils/clock" // 触发时钟更新
|
||
fsutils "github.com/TeaOSLab/EdgeNode/internal/utils/fs"
|
||
"github.com/TeaOSLab/EdgeNode/internal/utils/jsonutils"
|
||
memutils "github.com/TeaOSLab/EdgeNode/internal/utils/mem"
|
||
"github.com/TeaOSLab/EdgeNode/internal/waf"
|
||
"github.com/andybalholm/brotli"
|
||
"github.com/iwind/TeaGo/Tea"
|
||
"github.com/iwind/TeaGo/lists"
|
||
"github.com/iwind/TeaGo/maps"
|
||
"github.com/iwind/TeaGo/types"
|
||
"github.com/iwind/gosock/pkg/gosock"
|
||
"log"
|
||
"os"
|
||
"os/exec"
|
||
"os/signal"
|
||
"path/filepath"
|
||
"runtime"
|
||
"runtime/debug"
|
||
"sort"
|
||
"strings"
|
||
"sync"
|
||
"syscall"
|
||
"time"
|
||
)
|
||
|
||
var sharedNodeConfig *nodeconfigs.NodeConfig
|
||
var nodeTaskNotify = make(chan bool, 8)
|
||
var nodeConfigChangedNotify = make(chan bool, 8)
|
||
var nodeConfigUpdatedAt int64
|
||
var DaemonIsOn = false
|
||
var DaemonPid = 0
|
||
var nodeInstance *Node
|
||
|
||
// Node 节点
|
||
type Node struct {
|
||
isLoaded bool
|
||
sock *gosock.Sock
|
||
locker sync.Mutex
|
||
|
||
oldMaxCPU int32
|
||
oldMaxThreads int
|
||
oldTimezone string
|
||
oldHTTPCachePolicies []*serverconfigs.HTTPCachePolicy
|
||
oldHTTPFirewallPolicies []*firewallconfigs.HTTPFirewallPolicy
|
||
oldFirewallActions []*firewallconfigs.FirewallActionConfig
|
||
oldMetricItems []*serverconfigs.MetricItemConfig
|
||
|
||
updatingServerMap map[int64]*serverconfigs.ServerConfig
|
||
|
||
lastAPINodeVersion int64
|
||
lastAPINodeAddrs []string // 以前的API节点地址
|
||
|
||
lastTaskVersion int64
|
||
lastUpdatingServerListId int64
|
||
}
|
||
|
||
func NewNode() *Node {
|
||
nodeInstance = &Node{
|
||
sock: gosock.NewTmpSock(teaconst.ProcessName),
|
||
oldMaxThreads: -1,
|
||
oldMaxCPU: -1,
|
||
updatingServerMap: map[int64]*serverconfigs.ServerConfig{},
|
||
}
|
||
return nodeInstance
|
||
}
|
||
|
||
// Test 检查配置
|
||
func (this *Node) Test() error {
|
||
// 检查是否能连接API
|
||
rpcClient, err := rpc.SharedRPC()
|
||
if err != nil {
|
||
return fmt.Errorf("test rpc failed: %w", err)
|
||
}
|
||
_, err = rpcClient.APINodeRPC.FindCurrentAPINodeVersion(rpcClient.Context(), &pb.FindCurrentAPINodeVersionRequest{})
|
||
if err != nil {
|
||
return fmt.Errorf("test rpc failed: %w", err)
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// Start 启动
|
||
func (this *Node) Start() {
|
||
// 设置netdns
|
||
// 这个需要放在所有网络访问的最前面
|
||
_ = os.Setenv("GODEBUG", "netdns=go")
|
||
|
||
_, ok := os.LookupEnv("EdgeDaemon")
|
||
if ok {
|
||
remotelogs.Println("NODE", "start from daemon")
|
||
DaemonIsOn = true
|
||
DaemonPid = os.Getppid()
|
||
}
|
||
|
||
// 处理异常
|
||
this.handlePanic()
|
||
|
||
// 监听signal
|
||
this.listenSignals()
|
||
|
||
// 本地Sock
|
||
err := this.listenSock()
|
||
if err != nil {
|
||
remotelogs.Error("NODE", err.Error())
|
||
return
|
||
}
|
||
|
||
// 启动IP库
|
||
remotelogs.Println("NODE", "initializing ip library ...")
|
||
err = iplib.InitDefault()
|
||
if err != nil {
|
||
remotelogs.Error("NODE", "initialize ip library failed: "+err.Error())
|
||
}
|
||
|
||
// 启动事件
|
||
events.Notify(events.EventStart)
|
||
|
||
// 读取API配置
|
||
remotelogs.Println("NODE", "init config ...")
|
||
err = this.syncConfig(0)
|
||
if err != nil {
|
||
_, err = nodeconfigs.SharedNodeConfig()
|
||
if err != nil {
|
||
// 无本地数据时,会尝试多次读取
|
||
tryTimes := 0
|
||
for {
|
||
err = this.syncConfig(0)
|
||
if err != nil {
|
||
tryTimes++
|
||
|
||
if tryTimes%10 == 0 {
|
||
remotelogs.Error("NODE", err.Error())
|
||
}
|
||
time.Sleep(1 * time.Second)
|
||
|
||
// 不做长时间的无意义的重试
|
||
if tryTimes > 1000 {
|
||
return
|
||
}
|
||
} else {
|
||
break
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// 启动同步计时器
|
||
this.startSyncTimer()
|
||
|
||
// 更新IP库
|
||
goman.New(func() {
|
||
iplib.NewUpdater(NewIPLibraryUpdater(), 10*time.Minute).Start()
|
||
})
|
||
|
||
// 监控节点运行状态
|
||
goman.New(func() {
|
||
NewNodeStatusExecutor().Listen()
|
||
})
|
||
|
||
// 读取配置
|
||
nodeConfig, err := nodeconfigs.SharedNodeConfig()
|
||
if err != nil {
|
||
remotelogs.Error("NODE", "start failed: read node config failed: "+err.Error())
|
||
return
|
||
}
|
||
teaconst.NodeId = nodeConfig.Id
|
||
teaconst.NodeIdString = types.String(teaconst.NodeId)
|
||
err, serverErrors := nodeConfig.Init(context.Background())
|
||
if err != nil {
|
||
remotelogs.Error("NODE", "init node config failed: "+err.Error())
|
||
return
|
||
}
|
||
if len(serverErrors) > 0 {
|
||
for _, serverErr := range serverErrors {
|
||
remotelogs.ServerError(serverErr.Id, "NODE", serverErr.Message, nodeconfigs.NodeLogTypeServerConfigInitFailed, maps.Map{})
|
||
}
|
||
}
|
||
sharedNodeConfig = nodeConfig
|
||
this.onReload(nodeConfig, true)
|
||
|
||
// 调整系统参数
|
||
go this.tuneSystemParameters()
|
||
|
||
// 发送事件
|
||
events.Notify(events.EventLoaded)
|
||
|
||
// 设置rlimit
|
||
_ = utils.SetRLimit(1 << 20)
|
||
|
||
// 连接API
|
||
goman.New(func() {
|
||
NewAPIStream().Start()
|
||
})
|
||
|
||
// 统计
|
||
goman.New(func() {
|
||
stats.SharedTrafficStatManager.Start()
|
||
})
|
||
goman.New(func() {
|
||
stats.SharedHTTPRequestStatManager.Start()
|
||
})
|
||
|
||
// 硬盘TRIM任务
|
||
goman.New(func() {
|
||
NewTrimDisksTask().Start()
|
||
})
|
||
|
||
// 启动端口
|
||
err = sharedListenerManager.Start(nodeConfig)
|
||
if err != nil {
|
||
remotelogs.Error("NODE", "start failed: "+err.Error())
|
||
return
|
||
}
|
||
|
||
// hold住进程
|
||
select {}
|
||
}
|
||
|
||
// Daemon 实现守护进程
|
||
func (this *Node) Daemon() {
|
||
var isDebug = lists.ContainsString(os.Args, "debug")
|
||
for {
|
||
conn, err := this.sock.Dial()
|
||
if err != nil {
|
||
if isDebug {
|
||
log.Println("[DAEMON]starting ...")
|
||
}
|
||
|
||
// 尝试启动
|
||
err = func() error {
|
||
exe, err := os.Executable()
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 可以标记当前是从守护进程启动的
|
||
_ = os.Setenv("EdgeDaemon", "on")
|
||
_ = os.Setenv("EdgeBackground", "on")
|
||
|
||
var cmd = exec.Command(exe)
|
||
var buf = &bytes.Buffer{}
|
||
cmd.Stderr = buf
|
||
err = cmd.Start()
|
||
if err != nil {
|
||
return err
|
||
}
|
||
err = cmd.Wait()
|
||
if err != nil {
|
||
if isDebug {
|
||
log.Println("[DAEMON]" + buf.String())
|
||
}
|
||
return err
|
||
}
|
||
return nil
|
||
}()
|
||
|
||
if err != nil {
|
||
if isDebug {
|
||
log.Println("[DAEMON]", err)
|
||
}
|
||
time.Sleep(1 * time.Second)
|
||
} else {
|
||
time.Sleep(5 * time.Second)
|
||
}
|
||
} else {
|
||
_ = conn.Close()
|
||
time.Sleep(5 * time.Second)
|
||
}
|
||
}
|
||
}
|
||
|
||
// InstallSystemService 安装系统服务
|
||
func (this *Node) InstallSystemService() error {
|
||
shortName := teaconst.SystemdServiceName
|
||
|
||
exe, err := os.Executable()
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
manager := utils.NewServiceManager(shortName, teaconst.ProductName)
|
||
err = manager.Install(exe, []string{})
|
||
if err != nil {
|
||
return err
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// 读取API配置
|
||
func (this *Node) syncConfig(taskVersion int64) error {
|
||
this.locker.Lock()
|
||
defer this.locker.Unlock()
|
||
|
||
// 检查api_node.yaml是否存在
|
||
var apiConfigFile = Tea.ConfigFile(configs.ConfigFileName)
|
||
_, err := os.Stat(apiConfigFile)
|
||
if err != nil {
|
||
if os.IsNotExist(err) {
|
||
clusterErr := this.checkClusterConfig()
|
||
if clusterErr != nil {
|
||
if os.IsNotExist(clusterErr) {
|
||
return fmt.Errorf("can not find config file 'configs/%s'", configs.ConfigFileName)
|
||
}
|
||
return fmt.Errorf("check cluster config failed: %w", clusterErr)
|
||
}
|
||
} else {
|
||
return err
|
||
}
|
||
}
|
||
|
||
rpcClient, err := rpc.SharedRPC()
|
||
if err != nil {
|
||
return fmt.Errorf("create rpc client failed: %w", err)
|
||
}
|
||
|
||
// 获取同步任务
|
||
// TODO 这里考虑只同步版本号有变更的
|
||
configResp, err := rpcClient.NodeRPC.FindCurrentNodeConfig(rpcClient.Context(), &pb.FindCurrentNodeConfigRequest{
|
||
Version: -1, // 更新所有版本
|
||
Compress: true,
|
||
NodeTaskVersion: taskVersion,
|
||
UseDataMap: true,
|
||
})
|
||
if err != nil {
|
||
return fmt.Errorf("read config from rpc failed: %w", err)
|
||
}
|
||
if !configResp.IsChanged {
|
||
return nil
|
||
}
|
||
|
||
var configJSON = configResp.NodeJSON
|
||
if configResp.IsCompressed {
|
||
var reader = brotli.NewReader(bytes.NewReader(configJSON))
|
||
var configBuf = &bytes.Buffer{}
|
||
var buf = make([]byte, 32*1024)
|
||
for {
|
||
n, err := reader.Read(buf)
|
||
if n > 0 {
|
||
configBuf.Write(buf[:n])
|
||
}
|
||
if err != nil {
|
||
break
|
||
}
|
||
}
|
||
configJSON = configBuf.Bytes()
|
||
}
|
||
|
||
nodeConfigUpdatedAt = time.Now().Unix()
|
||
|
||
var nodeConfig = &nodeconfigs.NodeConfig{}
|
||
err = json.Unmarshal(configJSON, nodeConfig)
|
||
if err != nil {
|
||
return fmt.Errorf("decode config failed: %w", err)
|
||
}
|
||
teaconst.NodeId = nodeConfig.Id
|
||
teaconst.NodeIdString = types.String(teaconst.NodeId)
|
||
|
||
// 检查时间是否一致
|
||
// 这个需要在 teaconst.NodeId 设置之后,因为上报到API节点的时候需要节点ID
|
||
if configResp.Timestamp > 0 {
|
||
var timestampDelta = configResp.Timestamp - time.Now().Unix()
|
||
if timestampDelta > 60 || timestampDelta < -60 {
|
||
remotelogs.Error("NODE", "node timestamp ('"+types.String(time.Now().Unix())+"') is not same as api node ('"+types.String(configResp.Timestamp)+"'), please sync the time")
|
||
}
|
||
}
|
||
|
||
// 写入到文件中
|
||
err = nodeConfig.Save()
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
err, serverErrors := nodeConfig.Init(context.Background())
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if len(serverErrors) > 0 {
|
||
for _, serverErr := range serverErrors {
|
||
remotelogs.ServerError(serverErr.Id, "NODE", serverErr.Message, nodeconfigs.NodeLogTypeServerConfigInitFailed, maps.Map{})
|
||
}
|
||
}
|
||
|
||
// 刷新配置
|
||
if this.isLoaded {
|
||
remotelogs.Println("NODE", "reloading node config ...")
|
||
} else {
|
||
remotelogs.Println("NODE", "loading node config ...")
|
||
}
|
||
|
||
this.onReload(nodeConfig, true)
|
||
|
||
// 发送事件
|
||
events.Notify(events.EventReload)
|
||
|
||
if this.isLoaded {
|
||
return sharedListenerManager.Start(nodeConfig)
|
||
}
|
||
|
||
this.isLoaded = true
|
||
|
||
// 整体更新不需要再更新单个服务
|
||
this.updatingServerMap = map[int64]*serverconfigs.ServerConfig{}
|
||
|
||
return nil
|
||
}
|
||
|
||
// 读取单个服务配置
|
||
func (this *Node) syncServerConfig(serverId int64) error {
|
||
rpcClient, err := rpc.SharedRPC()
|
||
if err != nil {
|
||
return err
|
||
}
|
||
resp, err := rpcClient.ServerRPC.ComposeServerConfig(rpcClient.Context(), &pb.ComposeServerConfigRequest{ServerId: serverId})
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
this.locker.Lock()
|
||
defer this.locker.Unlock()
|
||
if len(resp.ServerConfigJSON) == 0 {
|
||
this.updatingServerMap[serverId] = nil
|
||
} else {
|
||
var config = &serverconfigs.ServerConfig{}
|
||
err = json.Unmarshal(resp.ServerConfigJSON, config)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
this.updatingServerMap[serverId] = config
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// 同步某个用户下的所有服务配置
|
||
func (this *Node) syncUserServersConfig(userId int64) error {
|
||
rpcClient, err := rpc.SharedRPC()
|
||
if err != nil {
|
||
return err
|
||
}
|
||
serverConfigsResp, err := rpcClient.ServerRPC.ComposeAllUserServersConfig(rpcClient.Context(), &pb.ComposeAllUserServersConfigRequest{
|
||
UserId: userId,
|
||
})
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if len(serverConfigsResp.ServersConfigJSON) == 0 {
|
||
return nil
|
||
}
|
||
var serverConfigs = []*serverconfigs.ServerConfig{}
|
||
err = json.Unmarshal(serverConfigsResp.ServersConfigJSON, &serverConfigs)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
this.locker.Lock()
|
||
defer this.locker.Unlock()
|
||
|
||
for _, config := range serverConfigs {
|
||
this.updatingServerMap[config.Id] = config
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// 启动同步计时器
|
||
func (this *Node) startSyncTimer() {
|
||
// TODO 这个时间间隔可以自行设置
|
||
var taskTicker = time.NewTicker(60 * time.Second)
|
||
var serverChangeTicker = time.NewTicker(5 * time.Second)
|
||
|
||
events.OnKey(events.EventQuit, this, func() {
|
||
remotelogs.Println("NODE", "quit sync timer")
|
||
taskTicker.Stop()
|
||
serverChangeTicker.Stop()
|
||
})
|
||
goman.New(func() {
|
||
for {
|
||
select {
|
||
case <-taskTicker.C: // 定期执行
|
||
err := this.loopTasks()
|
||
if err != nil {
|
||
remotelogs.Error("NODE", "sync config error: "+err.Error())
|
||
continue
|
||
}
|
||
case <-serverChangeTicker.C: // 服务变化
|
||
this.reloadServer()
|
||
case <-nodeTaskNotify: // 有新的更新任务
|
||
err := this.loopTasks()
|
||
if err != nil {
|
||
remotelogs.Error("NODE", "sync config error: "+err.Error())
|
||
continue
|
||
}
|
||
case <-nodeConfigChangedNotify: // 节点变化通知
|
||
err := this.syncConfig(0)
|
||
if err != nil {
|
||
remotelogs.Error("NODE", "sync config error: "+err.Error())
|
||
continue
|
||
}
|
||
}
|
||
}
|
||
})
|
||
}
|
||
|
||
// 检查集群设置
|
||
func (this *Node) checkClusterConfig() error {
|
||
config, err := configs.LoadClusterConfig()
|
||
if err != nil {
|
||
return err
|
||
}
|
||
rpcClient, err := rpc.NewRPCClient(&configs.APIConfig{
|
||
RPCEndpoints: config.RPCEndpoints,
|
||
RPCDisableUpdate: config.RPCDisableUpdate,
|
||
NodeId: config.ClusterId,
|
||
Secret: config.Secret,
|
||
})
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
remotelogs.Debug("NODE", "registering node to cluster ...")
|
||
resp, err := rpcClient.NodeRPC.RegisterClusterNode(rpcClient.ClusterContext(config.ClusterId, config.Secret), &pb.RegisterClusterNodeRequest{Name: HOSTNAME})
|
||
if err != nil {
|
||
return err
|
||
}
|
||
remotelogs.Debug("NODE", "registered successfully")
|
||
|
||
// 写入到配置文件中
|
||
if len(resp.Endpoints) == 0 {
|
||
resp.Endpoints = []string{}
|
||
}
|
||
var apiConfig = &configs.APIConfig{
|
||
RPCEndpoints: resp.Endpoints,
|
||
RPCDisableUpdate: false,
|
||
NodeId: resp.UniqueId,
|
||
Secret: resp.Secret,
|
||
}
|
||
remotelogs.Debug("NODE", "writing 'configs/"+configs.ConfigFileName+"' ...")
|
||
err = apiConfig.WriteFile(Tea.ConfigFile(configs.ConfigFileName))
|
||
if err != nil {
|
||
return err
|
||
}
|
||
remotelogs.Debug("NODE", "wrote 'configs/"+configs.ConfigFileName+"' successfully")
|
||
|
||
return nil
|
||
}
|
||
|
||
// 监听一些信号
|
||
func (this *Node) listenSignals() {
|
||
var queue = make(chan os.Signal, 8)
|
||
signal.Notify(queue, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGQUIT)
|
||
goman.New(func() {
|
||
for range queue {
|
||
time.Sleep(100 * time.Millisecond)
|
||
utils.Exit()
|
||
return
|
||
}
|
||
})
|
||
}
|
||
|
||
// 监听本地sock
|
||
func (this *Node) listenSock() error {
|
||
// 检查是否在运行
|
||
if this.sock.IsListening() {
|
||
reply, err := this.sock.Send(&gosock.Command{Code: "pid"})
|
||
if err == nil {
|
||
return errors.New("error: the process is already running, pid: " + types.String(maps.NewMap(reply.Params).GetInt("pid")))
|
||
} else {
|
||
return errors.New("error: the process is already running")
|
||
}
|
||
}
|
||
|
||
// 启动监听
|
||
goman.New(func() {
|
||
this.sock.OnCommand(func(cmd *gosock.Command) {
|
||
switch cmd.Code {
|
||
case "pid":
|
||
_ = cmd.Reply(&gosock.Command{
|
||
Code: "pid",
|
||
Params: map[string]interface{}{
|
||
"pid": os.Getpid(),
|
||
},
|
||
})
|
||
case "info":
|
||
exePath, _ := os.Executable()
|
||
_ = cmd.Reply(&gosock.Command{
|
||
Code: "info",
|
||
Params: map[string]interface{}{
|
||
"pid": os.Getpid(),
|
||
"version": teaconst.Version,
|
||
"path": exePath,
|
||
},
|
||
})
|
||
case "stop":
|
||
_ = cmd.ReplyOk()
|
||
|
||
// 退出主进程
|
||
events.Notify(events.EventQuit)
|
||
time.Sleep(100 * time.Millisecond)
|
||
utils.Exit()
|
||
case "quit":
|
||
_ = cmd.ReplyOk()
|
||
_ = this.sock.Close()
|
||
|
||
events.Notify(events.EventQuit)
|
||
events.Notify(events.EventTerminated)
|
||
|
||
// 监控连接数,如果连接数为0,则退出进程
|
||
goman.New(func() {
|
||
for {
|
||
countActiveConnections := sharedListenerManager.TotalActiveConnections()
|
||
if countActiveConnections <= 0 {
|
||
utils.Exit()
|
||
return
|
||
}
|
||
time.Sleep(1 * time.Second)
|
||
}
|
||
})
|
||
case "trackers":
|
||
_ = cmd.Reply(&gosock.Command{
|
||
Params: map[string]interface{}{
|
||
"labels": trackers.SharedManager.Labels(),
|
||
},
|
||
})
|
||
case "goman":
|
||
var posMap = map[string]maps.Map{} // file#line => Map
|
||
for _, instance := range goman.List() {
|
||
var pos = instance.File + "#" + types.String(instance.Line)
|
||
m, ok := posMap[pos]
|
||
if ok {
|
||
m["count"] = m["count"].(int) + 1
|
||
} else {
|
||
m = maps.Map{
|
||
"pos": pos,
|
||
"count": 1,
|
||
}
|
||
posMap[pos] = m
|
||
}
|
||
}
|
||
|
||
var result = []maps.Map{}
|
||
for _, m := range posMap {
|
||
result = append(result, m)
|
||
}
|
||
|
||
sort.Slice(result, func(i, j int) bool {
|
||
return result[i]["count"].(int) > result[j]["count"].(int)
|
||
})
|
||
|
||
_ = cmd.Reply(&gosock.Command{
|
||
Params: map[string]interface{}{
|
||
"total": runtime.NumGoroutine(),
|
||
"result": result,
|
||
},
|
||
})
|
||
case "conns":
|
||
var connMaps = []maps.Map{}
|
||
var connMap = conns.SharedMap.AllConns()
|
||
for _, conn := range connMap {
|
||
var createdAt int64
|
||
var lastReadAt int64
|
||
var lastWriteAt int64
|
||
var lastErrString = ""
|
||
var protocol = "tcp"
|
||
clientConn, ok := conn.(*ClientConn)
|
||
if ok {
|
||
createdAt = clientConn.CreatedAt()
|
||
lastReadAt = clientConn.LastReadAt()
|
||
lastWriteAt = clientConn.LastWriteAt()
|
||
|
||
var lastErr = clientConn.LastErr()
|
||
if lastErr != nil {
|
||
lastErrString = lastErr.Error()
|
||
}
|
||
} else {
|
||
protocol = "udp"
|
||
}
|
||
var age int64 = -1
|
||
var lastReadAge int64 = -1
|
||
var lastWriteAge int64 = -1
|
||
var currentTime = time.Now().Unix()
|
||
if createdAt > 0 {
|
||
age = currentTime - createdAt
|
||
}
|
||
if lastReadAt > 0 {
|
||
lastReadAge = currentTime - lastReadAt
|
||
}
|
||
if lastWriteAt > 0 {
|
||
lastWriteAge = currentTime - lastWriteAt
|
||
}
|
||
|
||
connMaps = append(connMaps, maps.Map{
|
||
"protocol": protocol,
|
||
"addr": conn.RemoteAddr().String(),
|
||
"age": age,
|
||
"readAge": lastReadAge,
|
||
"writeAge": lastWriteAge,
|
||
"lastErr": lastErrString,
|
||
})
|
||
}
|
||
sort.Slice(connMaps, func(i, j int) bool {
|
||
var m1 = connMaps[i]
|
||
var m2 = connMaps[j]
|
||
return m1.GetInt64("age") < m2.GetInt64("age")
|
||
})
|
||
|
||
_ = cmd.Reply(&gosock.Command{
|
||
Params: map[string]interface{}{
|
||
"conns": connMaps,
|
||
"total": len(connMaps),
|
||
},
|
||
})
|
||
case "dropIP":
|
||
var m = maps.NewMap(cmd.Params)
|
||
var ip = m.GetString("ip")
|
||
var timeSeconds = m.GetInt("timeoutSeconds")
|
||
var async = m.GetBool("async")
|
||
err := firewalls.Firewall().DropSourceIP(ip, timeSeconds, async)
|
||
if err != nil {
|
||
_ = cmd.Reply(&gosock.Command{
|
||
Params: map[string]interface{}{
|
||
"error": err.Error(),
|
||
},
|
||
})
|
||
} else {
|
||
_ = cmd.ReplyOk()
|
||
}
|
||
case "rejectIP":
|
||
var m = maps.NewMap(cmd.Params)
|
||
var ip = m.GetString("ip")
|
||
var timeSeconds = m.GetInt("timeoutSeconds")
|
||
err := firewalls.Firewall().RejectSourceIP(ip, timeSeconds)
|
||
if err != nil {
|
||
_ = cmd.Reply(&gosock.Command{
|
||
Params: map[string]interface{}{
|
||
"error": err.Error(),
|
||
},
|
||
})
|
||
} else {
|
||
_ = cmd.ReplyOk()
|
||
}
|
||
case "closeIP":
|
||
var m = maps.NewMap(cmd.Params)
|
||
var ip = m.GetString("ip")
|
||
conns.SharedMap.CloseIPConns(ip)
|
||
_ = cmd.ReplyOk()
|
||
case "removeIP":
|
||
var m = maps.NewMap(cmd.Params)
|
||
var ip = m.GetString("ip")
|
||
err := firewalls.Firewall().RemoveSourceIP(ip)
|
||
if err != nil {
|
||
_ = cmd.Reply(&gosock.Command{
|
||
Params: map[string]interface{}{
|
||
"error": err.Error(),
|
||
},
|
||
})
|
||
} else {
|
||
_ = cmd.ReplyOk()
|
||
}
|
||
case "gc":
|
||
var before = time.Now()
|
||
runtime.GC()
|
||
debug.FreeOSMemory()
|
||
|
||
var costSeconds = time.Since(before).Seconds()
|
||
var gcStats = &debug.GCStats{}
|
||
debug.ReadGCStats(gcStats)
|
||
var pauseMS float64
|
||
if len(gcStats.Pause) > 0 {
|
||
pauseMS = gcStats.Pause[0].Seconds() * 1000
|
||
}
|
||
_ = cmd.Reply(&gosock.Command{
|
||
Params: map[string]any{
|
||
"pauseMS": pauseMS,
|
||
"costMS": costSeconds * 1000,
|
||
},
|
||
})
|
||
case "reload":
|
||
err := this.syncConfig(0)
|
||
if err != nil {
|
||
_ = cmd.Reply(&gosock.Command{
|
||
Params: map[string]interface{}{
|
||
"error": err.Error(),
|
||
},
|
||
})
|
||
} else {
|
||
_ = cmd.ReplyOk()
|
||
}
|
||
case "accesslog":
|
||
err := sharedHTTPAccessLogViewer.Start()
|
||
if err != nil {
|
||
_ = cmd.Reply(&gosock.Command{
|
||
Code: "error",
|
||
Params: map[string]interface{}{
|
||
"message": "start failed: " + err.Error(),
|
||
},
|
||
})
|
||
} else {
|
||
_ = cmd.ReplyOk()
|
||
}
|
||
case "bandwidth":
|
||
var m = stats.SharedBandwidthStatManager.Map()
|
||
_ = cmd.Reply(&gosock.Command{Params: maps.Map{
|
||
"stats": m,
|
||
}})
|
||
case "cache.garbage":
|
||
var shouldDelete = maps.NewMap(cmd.Params).GetBool("delete")
|
||
|
||
var count = 0
|
||
var sampleFiles = []string{}
|
||
err := caches.SharedManager.ScanGarbageCaches(func(path string) error {
|
||
count++
|
||
if len(sampleFiles) < 10 {
|
||
sampleFiles = append(sampleFiles, path)
|
||
}
|
||
|
||
if shouldDelete {
|
||
_ = os.Remove(path) // .cache
|
||
_ = os.Remove(caches.PartialRangesFilePath(path)) // @range.cache
|
||
}
|
||
|
||
return nil
|
||
})
|
||
if err != nil {
|
||
_ = cmd.Reply(&gosock.Command{Params: maps.Map{
|
||
"isOk": false,
|
||
"error": err.Error(),
|
||
}})
|
||
} else {
|
||
_ = cmd.Reply(&gosock.Command{Params: maps.Map{
|
||
"isOk": true,
|
||
"count": count,
|
||
"sampleFiles": sampleFiles,
|
||
}})
|
||
}
|
||
}
|
||
})
|
||
|
||
err := this.sock.Listen()
|
||
if err != nil {
|
||
remotelogs.Debug("NODE", err.Error())
|
||
}
|
||
})
|
||
|
||
events.OnKey(events.EventQuit, this, func() {
|
||
remotelogs.Debug("NODE", "quit unix sock")
|
||
_ = this.sock.Close()
|
||
})
|
||
|
||
return nil
|
||
}
|
||
|
||
// 重载配置调用
|
||
func (this *Node) onReload(config *nodeconfigs.NodeConfig, reloadAll bool) {
|
||
nodeconfigs.ResetNodeConfig(config)
|
||
sharedNodeConfig = config
|
||
|
||
// 并发读写数
|
||
fsutils.ReaderLimiter.SetThreads(config.MaxConcurrentReads)
|
||
fsutils.WriterLimiter.SetThreads(config.MaxConcurrentWrites)
|
||
|
||
if reloadAll {
|
||
// 缓存策略
|
||
var subDirs = config.CacheDiskSubDirs
|
||
for _, subDir := range subDirs {
|
||
subDir.Path = filepath.Clean(subDir.Path)
|
||
}
|
||
if len(subDirs) > 0 {
|
||
sort.Slice(subDirs, func(i, j int) bool {
|
||
return subDirs[i].Path < subDirs[j].Path
|
||
})
|
||
}
|
||
|
||
var cachePoliciesChanged = !jsonutils.Equal(caches.SharedManager.MaxDiskCapacity, config.MaxCacheDiskCapacity) ||
|
||
!jsonutils.Equal(caches.SharedManager.MaxMemoryCapacity, config.MaxCacheMemoryCapacity) ||
|
||
!jsonutils.Equal(caches.SharedManager.MainDiskDir, config.CacheDiskDir) ||
|
||
!jsonutils.Equal(caches.SharedManager.SubDiskDirs, subDirs) ||
|
||
!jsonutils.Equal(this.oldHTTPCachePolicies, config.HTTPCachePolicies)
|
||
|
||
caches.SharedManager.MaxDiskCapacity = config.MaxCacheDiskCapacity
|
||
caches.SharedManager.MaxMemoryCapacity = config.MaxCacheMemoryCapacity
|
||
caches.SharedManager.MainDiskDir = config.CacheDiskDir
|
||
caches.SharedManager.SubDiskDirs = subDirs
|
||
|
||
if cachePoliciesChanged {
|
||
// copy
|
||
this.oldHTTPCachePolicies = []*serverconfigs.HTTPCachePolicy{}
|
||
err := jsonutils.Copy(&this.oldHTTPCachePolicies, config.HTTPCachePolicies)
|
||
if err != nil {
|
||
remotelogs.Error("NODE", "onReload: copy HTTPCachePolicies failed: "+err.Error())
|
||
}
|
||
|
||
// update
|
||
if len(config.HTTPCachePolicies) > 0 {
|
||
caches.SharedManager.UpdatePolicies(config.HTTPCachePolicies)
|
||
} else {
|
||
caches.SharedManager.UpdatePolicies([]*serverconfigs.HTTPCachePolicy{})
|
||
}
|
||
}
|
||
}
|
||
|
||
// WAF策略
|
||
// 包含了服务里的WAF策略,所以需要整体更新
|
||
var allFirewallPolicies = config.FindAllFirewallPolicies()
|
||
if !jsonutils.Equal(allFirewallPolicies, this.oldHTTPFirewallPolicies) {
|
||
// copy
|
||
this.oldHTTPFirewallPolicies = []*firewallconfigs.HTTPFirewallPolicy{}
|
||
err := jsonutils.Copy(&this.oldHTTPFirewallPolicies, allFirewallPolicies)
|
||
if err != nil {
|
||
remotelogs.Error("NODE", "onReload: copy HTTPFirewallPolicies failed: "+err.Error())
|
||
}
|
||
|
||
// update
|
||
waf.SharedWAFManager.UpdatePolicies(allFirewallPolicies)
|
||
}
|
||
|
||
if reloadAll {
|
||
if !jsonutils.Equal(config.FirewallActions, this.oldFirewallActions) {
|
||
// copy
|
||
this.oldFirewallActions = []*firewallconfigs.FirewallActionConfig{}
|
||
err := jsonutils.Copy(&this.oldFirewallActions, config.FirewallActions)
|
||
if err != nil {
|
||
remotelogs.Error("NODE", "onReload: copy FirewallActionConfigs failed: "+err.Error())
|
||
}
|
||
|
||
// update
|
||
iplibrary.SharedActionManager.UpdateActions(config.FirewallActions)
|
||
}
|
||
|
||
// 统计指标
|
||
if !jsonutils.Equal(this.oldMetricItems, config.MetricItems) {
|
||
// copy
|
||
this.oldMetricItems = []*serverconfigs.MetricItemConfig{}
|
||
err := jsonutils.Copy(&this.oldMetricItems, config.MetricItems)
|
||
if err != nil {
|
||
remotelogs.Error("NODE", "onReload: copy MetricItemConfigs failed: "+err.Error())
|
||
}
|
||
|
||
// update
|
||
metrics.SharedManager.Update(config.MetricItems)
|
||
}
|
||
|
||
// max cpu
|
||
if config.MaxCPU != this.oldMaxCPU {
|
||
if config.MaxCPU > 0 && config.MaxCPU < int32(runtime.NumCPU()) {
|
||
runtime.GOMAXPROCS(int(config.MaxCPU))
|
||
remotelogs.Println("NODE", "[CPU]set max cpu to '"+types.String(config.MaxCPU)+"'")
|
||
} else {
|
||
var threads = runtime.NumCPU()
|
||
runtime.GOMAXPROCS(threads)
|
||
remotelogs.Println("NODE", "[CPU]set max cpu to '"+types.String(threads)+"'")
|
||
}
|
||
|
||
this.oldMaxCPU = config.MaxCPU
|
||
}
|
||
|
||
// max threads
|
||
if config.MaxThreads != this.oldMaxThreads {
|
||
if config.MaxThreads > 0 {
|
||
debug.SetMaxThreads(config.MaxThreads)
|
||
remotelogs.Println("NODE", "[THREADS]set max threads to '"+types.String(config.MaxThreads)+"'")
|
||
} else {
|
||
debug.SetMaxThreads(nodeconfigs.DefaultMaxThreads)
|
||
remotelogs.Println("NODE", "[THREADS]set max threads to '"+types.String(nodeconfigs.DefaultMaxThreads)+"'")
|
||
}
|
||
this.oldMaxThreads = config.MaxThreads
|
||
}
|
||
|
||
// timezone
|
||
var timeZone = config.TimeZone
|
||
if len(timeZone) == 0 {
|
||
timeZone = "Asia/Shanghai"
|
||
}
|
||
|
||
if this.oldTimezone != timeZone {
|
||
location, err := time.LoadLocation(timeZone)
|
||
if err != nil {
|
||
remotelogs.Error("NODE", "[TIMEZONE]change time zone failed: "+err.Error())
|
||
return
|
||
}
|
||
|
||
remotelogs.Println("NODE", "[TIMEZONE]change time zone to '"+timeZone+"'")
|
||
time.Local = location
|
||
this.oldTimezone = timeZone
|
||
}
|
||
|
||
// product information
|
||
if config.ProductConfig != nil {
|
||
teaconst.GlobalProductName = config.ProductConfig.Name
|
||
}
|
||
|
||
// DNS resolver
|
||
if config.DNSResolver != nil {
|
||
var err error
|
||
switch config.DNSResolver.Type {
|
||
case nodeconfigs.DNSResolverTypeGoNative:
|
||
err = os.Setenv("GODEBUG", "netdns=go")
|
||
case nodeconfigs.DNSResolverTypeCGO:
|
||
err = os.Setenv("GODEBUG", "netdns=cgo")
|
||
default:
|
||
// 默认使用go原生
|
||
err = os.Setenv("GODEBUG", "netdns=go")
|
||
}
|
||
if err != nil {
|
||
remotelogs.Error("NODE", "[DNS_RESOLVER]set env failed: "+err.Error())
|
||
}
|
||
} else {
|
||
// 默认使用go原生
|
||
err := os.Setenv("GODEBUG", "netdns=go")
|
||
if err != nil {
|
||
remotelogs.Error("NODE", "[DNS_RESOLVER]set env failed: "+err.Error())
|
||
}
|
||
}
|
||
|
||
// API Node地址,这里不限制是否为空,因为在为空时仍然要有对应的处理
|
||
this.changeAPINodeAddrs(config.APINodeAddrs)
|
||
}
|
||
|
||
// 刷新IP库
|
||
this.reloadIPLibrary()
|
||
}
|
||
|
||
// reload server config
|
||
func (this *Node) reloadServer() {
|
||
this.locker.Lock()
|
||
defer this.locker.Unlock()
|
||
|
||
var countUpdatingServers = len(this.updatingServerMap)
|
||
const maxPrintServers = 10
|
||
if countUpdatingServers > 0 {
|
||
var updatingServerMap = this.updatingServerMap
|
||
this.updatingServerMap = map[int64]*serverconfigs.ServerConfig{}
|
||
newNodeConfig, err := nodeconfigs.CloneNodeConfig(sharedNodeConfig)
|
||
if err != nil {
|
||
remotelogs.Error("NODE", "apply server config error: "+err.Error())
|
||
return
|
||
}
|
||
for serverId, serverConfig := range updatingServerMap {
|
||
if serverConfig != nil {
|
||
if countUpdatingServers < maxPrintServers {
|
||
remotelogs.Debug("NODE", "reload server '"+types.String(serverId)+"'")
|
||
}
|
||
newNodeConfig.AddServer(serverConfig)
|
||
} else {
|
||
if countUpdatingServers < maxPrintServers {
|
||
remotelogs.Debug("NODE", "remove server '"+types.String(serverId)+"'")
|
||
}
|
||
newNodeConfig.RemoveServer(serverId)
|
||
}
|
||
}
|
||
|
||
if countUpdatingServers >= maxPrintServers {
|
||
remotelogs.Debug("NODE", "reload "+types.String(countUpdatingServers)+" servers")
|
||
}
|
||
|
||
err, serverErrors := newNodeConfig.Init(context.Background())
|
||
if err != nil {
|
||
remotelogs.Error("NODE", "apply server config error: "+err.Error())
|
||
return
|
||
}
|
||
if len(serverErrors) > 0 {
|
||
for _, serverErr := range serverErrors {
|
||
remotelogs.ServerError(serverErr.Id, "NODE", serverErr.Message, nodeconfigs.NodeLogTypeServerConfigInitFailed, maps.Map{})
|
||
}
|
||
}
|
||
|
||
this.onReload(newNodeConfig, false)
|
||
|
||
err = sharedListenerManager.Start(newNodeConfig)
|
||
if err != nil {
|
||
remotelogs.Error("NODE", "apply server config error: "+err.Error())
|
||
}
|
||
|
||
// notify event
|
||
events.Notify(events.EventReloadSomeServers)
|
||
}
|
||
}
|
||
|
||
// 检查系统
|
||
func (this *Node) tuneSystemParameters() {
|
||
if runtime.GOOS != "linux" || os.Getgid() != 0 {
|
||
return
|
||
}
|
||
|
||
if sharedNodeConfig == nil || !sharedNodeConfig.AutoSystemTuning {
|
||
return
|
||
}
|
||
|
||
type variable struct {
|
||
name string
|
||
minValue int
|
||
maxValue int
|
||
}
|
||
|
||
const dir = "/proc/sys"
|
||
|
||
// net
|
||
var systemParameters = []variable{
|
||
{name: "net.core.somaxconn", minValue: 2048},
|
||
{name: "net.ipv4.tcp_max_syn_backlog", minValue: 2048},
|
||
{name: "net.core.netdev_max_backlog", minValue: 4096},
|
||
{name: "net.ipv4.tcp_fin_timeout", maxValue: 10},
|
||
{name: "net.ipv4.tcp_max_tw_buckets", minValue: 65535},
|
||
{name: "net.core.rmem_default", minValue: 4 << 20},
|
||
{name: "net.core.wmem_default", minValue: 4 << 20},
|
||
{name: "net.core.rmem_max", minValue: 32 << 20},
|
||
{name: "net.core.wmem_max", minValue: 32 << 20},
|
||
{name: "vm.max_map_count", minValue: 256 << 10},
|
||
}
|
||
|
||
// vm
|
||
var systemMemory = memutils.SystemMemoryGB()
|
||
if systemMemory >= 128 {
|
||
systemParameters = append(systemParameters, []variable{
|
||
{name: "vm.dirty_background_ratio", minValue: 40},
|
||
{name: "vm.dirty_ratio", minValue: 60},
|
||
}...)
|
||
} else if systemMemory >= 64 {
|
||
systemParameters = append(systemParameters, []variable{
|
||
{name: "vm.dirty_background_ratio", minValue: 30},
|
||
{name: "vm.dirty_ratio", minValue: 50},
|
||
}...)
|
||
} else if systemMemory >= 16 {
|
||
systemParameters = append(systemParameters, []variable{
|
||
{name: "vm.dirty_background_ratio", minValue: 15},
|
||
{name: "vm.dirty_ratio", minValue: 30},
|
||
}...)
|
||
}
|
||
|
||
for _, v := range systemParameters {
|
||
var path = dir + "/" + strings.Replace(v.name, ".", "/", -1)
|
||
data, err := os.ReadFile(path)
|
||
if err != nil {
|
||
continue
|
||
}
|
||
data = bytes.TrimSpace(data)
|
||
if len(data) == 0 {
|
||
continue
|
||
}
|
||
|
||
var oldValue = types.Int(string(data))
|
||
if v.minValue > 0 && oldValue < v.minValue {
|
||
err = os.WriteFile(path, []byte(types.String(v.minValue)), 0666)
|
||
if err == nil {
|
||
remotelogs.Println("NODE", "change kernel parameter '"+v.name+"' from '"+types.String(oldValue)+"' to '"+types.String(v.minValue)+"'")
|
||
}
|
||
} else if v.maxValue > 0 && oldValue > v.maxValue {
|
||
err = os.WriteFile(path, []byte(types.String(v.maxValue)), 0666)
|
||
if err == nil {
|
||
remotelogs.Println("NODE", "change kernel parameter '"+v.name+"' from '"+types.String(oldValue)+"' to '"+types.String(v.maxValue)+"'")
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// 检查API节点地址
|
||
func (this *Node) changeAPINodeAddrs(apiNodeAddrs []*serverconfigs.NetworkAddressConfig) {
|
||
var addrs = []string{}
|
||
for _, addr := range apiNodeAddrs {
|
||
err := addr.Init()
|
||
if err != nil {
|
||
remotelogs.Error("NODE", "changeAPINodeAddrs: validate api node address '"+configutils.QuoteIP(addr.Host)+":"+addr.PortRange+"' failed: "+err.Error())
|
||
} else {
|
||
addrs = append(addrs, addr.FullAddresses()...)
|
||
}
|
||
}
|
||
sort.Strings(addrs)
|
||
|
||
if utils.EqualStrings(this.lastAPINodeAddrs, addrs) {
|
||
return
|
||
}
|
||
|
||
this.lastAPINodeAddrs = addrs
|
||
|
||
config, err := configs.LoadAPIConfig()
|
||
if err != nil {
|
||
remotelogs.Error("NODE", "changeAPINodeAddrs: "+err.Error())
|
||
return
|
||
}
|
||
if config == nil {
|
||
return
|
||
}
|
||
var oldEndpoints = config.RPCEndpoints
|
||
|
||
rpcClient, err := rpc.SharedRPC()
|
||
if err != nil {
|
||
return
|
||
}
|
||
if len(addrs) > 0 {
|
||
this.lastAPINodeVersion++
|
||
var v = this.lastAPINodeVersion
|
||
|
||
// 异步检测,防止阻塞
|
||
go func(v int64) {
|
||
// 测试新的API节点地址
|
||
if rpcClient.TestEndpoints(addrs) {
|
||
config.RPCEndpoints = addrs
|
||
} else {
|
||
config.RPCEndpoints = oldEndpoints
|
||
this.lastAPINodeAddrs = nil // 恢复为空,以便于下次更新重试
|
||
}
|
||
|
||
// 检查测试中间有无新的变更
|
||
if v != this.lastAPINodeVersion {
|
||
return
|
||
}
|
||
|
||
err = rpcClient.UpdateConfig(config)
|
||
if err != nil {
|
||
remotelogs.Error("NODE", "changeAPINodeAddrs: update rpc config failed: "+err.Error())
|
||
}
|
||
}(v)
|
||
return
|
||
}
|
||
|
||
err = rpcClient.UpdateConfig(config)
|
||
if err != nil {
|
||
remotelogs.Error("NODE", "changeAPINodeAddrs: update rpc config failed: "+err.Error())
|
||
}
|
||
}
|