2020-07-21 11:18:47 +08:00
package nodes
import (
2021-11-11 14:16:57 +08:00
"bytes"
2023-08-08 10:07:24 +08:00
"context"
2020-09-09 18:53:53 +08:00
"encoding/json"
"errors"
2023-08-11 14:38:00 +08:00
"fmt"
2022-11-21 19:55:28 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/configutils"
2022-08-21 23:09:47 +08:00
iplib "github.com/TeaOSLab/EdgeCommon/pkg/iplibrary"
2020-09-26 08:07:07 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
2020-09-13 20:37:40 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
2020-12-17 17:36:10 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
2022-11-25 10:50:57 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/firewallconfigs"
2020-10-05 16:55:14 +08:00
"github.com/TeaOSLab/EdgeNode/internal/caches"
2020-10-17 11:14:40 +08:00
"github.com/TeaOSLab/EdgeNode/internal/configs"
2022-09-02 15:20:58 +08:00
"github.com/TeaOSLab/EdgeNode/internal/conns"
2021-01-12 11:48:38 +08:00
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
2020-10-28 11:19:06 +08:00
"github.com/TeaOSLab/EdgeNode/internal/events"
2022-01-09 17:07:37 +08:00
"github.com/TeaOSLab/EdgeNode/internal/firewalls"
2021-12-08 15:17:45 +08:00
"github.com/TeaOSLab/EdgeNode/internal/goman"
2021-01-17 16:47:37 +08:00
"github.com/TeaOSLab/EdgeNode/internal/iplibrary"
2021-06-30 20:01:00 +08:00
"github.com/TeaOSLab/EdgeNode/internal/metrics"
2020-12-17 17:36:10 +08:00
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
2020-09-09 18:53:53 +08:00
"github.com/TeaOSLab/EdgeNode/internal/rpc"
2021-01-25 16:40:31 +08:00
"github.com/TeaOSLab/EdgeNode/internal/stats"
2021-11-14 10:55:09 +08:00
"github.com/TeaOSLab/EdgeNode/internal/trackers"
2020-07-22 22:18:47 +08:00
"github.com/TeaOSLab/EdgeNode/internal/utils"
2022-12-22 11:38:59 +08:00
_ "github.com/TeaOSLab/EdgeNode/internal/utils/agents" // 引入Agent管理器
_ "github.com/TeaOSLab/EdgeNode/internal/utils/clock" // 触发时钟更新
2022-11-25 10:50:57 +08:00
"github.com/TeaOSLab/EdgeNode/internal/utils/jsonutils"
2022-05-21 11:17:53 +08:00
"github.com/TeaOSLab/EdgeNode/internal/waf"
2021-11-11 14:16:57 +08:00
"github.com/andybalholm/brotli"
2020-10-17 11:14:40 +08:00
"github.com/iwind/TeaGo/Tea"
2021-01-12 11:48:38 +08:00
"github.com/iwind/TeaGo/lists"
2021-07-25 17:14:44 +08:00
"github.com/iwind/TeaGo/maps"
2021-12-02 11:30:47 +08:00
"github.com/iwind/TeaGo/types"
2021-07-25 17:14:44 +08:00
"github.com/iwind/gosock/pkg/gosock"
2022-03-04 12:30:06 +08:00
"gopkg.in/yaml.v3"
2021-01-12 11:48:38 +08:00
"log"
2020-10-17 11:14:40 +08:00
"os"
2021-01-12 11:48:38 +08:00
"os/exec"
2022-03-14 11:47:34 +08:00
"os/signal"
2022-11-15 20:42:25 +08:00
"path/filepath"
2020-10-10 12:31:46 +08:00
"runtime"
2021-12-09 12:07:59 +08:00
"runtime/debug"
2021-12-08 15:17:45 +08:00
"sort"
2023-04-02 21:30:03 +08:00
"strings"
2021-10-01 11:13:36 +08:00
"sync"
2022-03-14 11:47:34 +08:00
"syscall"
2020-09-09 18:53:53 +08:00
"time"
2020-07-21 11:18:47 +08:00
)
2020-09-26 08:07:07 +08:00
var sharedNodeConfig * nodeconfigs . NodeConfig
2021-01-17 16:47:37 +08:00
var nodeTaskNotify = make ( chan bool , 8 )
2021-10-01 11:13:36 +08:00
var nodeConfigChangedNotify = make ( chan bool , 8 )
var nodeConfigUpdatedAt int64
2021-06-10 19:19:15 +08:00
var DaemonIsOn = false
var DaemonPid = 0
2023-03-31 14:06:01 +08:00
var nodeInstance * Node
2020-07-21 11:18:47 +08:00
2021-06-10 19:19:15 +08:00
// Node 节点
2020-07-21 11:18:47 +08:00
type Node struct {
2021-01-17 16:47:37 +08:00
isLoaded bool
2021-07-25 17:14:44 +08:00
sock * gosock . Sock
2021-10-01 11:13:36 +08:00
locker sync . Mutex
2021-12-09 12:07:59 +08:00
2022-11-25 10:50:57 +08:00
oldMaxCPU int32
oldMaxThreads int
oldTimezone string
oldHTTPCachePolicies [ ] * serverconfigs . HTTPCachePolicy
oldHTTPFirewallPolicies [ ] * firewallconfigs . HTTPFirewallPolicy
oldFirewallActions [ ] * firewallconfigs . FirewallActionConfig
oldMetricItems [ ] * serverconfigs . MetricItemConfig
2022-01-19 22:16:46 +08:00
updatingServerMap map [ int64 ] * serverconfigs . ServerConfig
2022-11-06 12:07:26 +08:00
2022-11-21 19:55:28 +08:00
lastAPINodeVersion int64
lastAPINodeAddrs [ ] string // 以前的API节点地址
2023-04-06 20:50:34 +08:00
lastTaskVersion int64
lastUpdatingServerListId int64
2020-07-21 11:18:47 +08:00
}
func NewNode ( ) * Node {
2023-03-31 14:06:01 +08:00
nodeInstance = & Node {
2022-01-19 22:16:46 +08:00
sock : gosock . NewTmpSock ( teaconst . ProcessName ) ,
2022-11-25 10:50:57 +08:00
oldMaxThreads : - 1 ,
oldMaxCPU : - 1 ,
2022-01-19 22:16:46 +08:00
updatingServerMap : map [ int64 ] * serverconfigs . ServerConfig { } ,
2021-07-25 17:39:09 +08:00
}
2023-03-31 14:06:01 +08:00
return nodeInstance
2020-07-21 11:18:47 +08:00
}
2021-06-10 19:19:15 +08:00
// Test 检查配置
2020-10-27 12:33:34 +08:00
func ( this * Node ) Test ( ) error {
// 检查是否能连接API
rpcClient , err := rpc . SharedRPC ( )
if err != nil {
2023-08-11 14:38:00 +08:00
return fmt . Errorf ( "test rpc failed: %w" , err )
2020-10-27 12:33:34 +08:00
}
2022-08-24 20:04:46 +08:00
_ , err = rpcClient . APINodeRPC . FindCurrentAPINodeVersion ( rpcClient . Context ( ) , & pb . FindCurrentAPINodeVersionRequest { } )
2020-10-27 12:33:34 +08:00
if err != nil {
2023-08-11 14:38:00 +08:00
return fmt . Errorf ( "test rpc failed: %w" , err )
2020-10-27 12:33:34 +08:00
}
return nil
}
2021-06-10 19:19:15 +08:00
// Start 启动
2020-07-21 11:18:47 +08:00
func ( this * Node ) Start ( ) {
2022-09-12 17:43:48 +08:00
// 设置netdns
// 这个需要放在所有网络访问的最前面
_ = os . Setenv ( "GODEBUG" , "netdns=go" )
2021-06-10 19:19:15 +08:00
_ , ok := os . LookupEnv ( "EdgeDaemon" )
if ok {
remotelogs . Println ( "NODE" , "start from daemon" )
DaemonIsOn = true
DaemonPid = os . Getppid ( )
}
2021-10-14 10:28:32 +08:00
// 处理异常
this . handlePanic ( )
2022-03-14 11:47:34 +08:00
// 监听signal
this . listenSignals ( )
2020-10-27 12:33:34 +08:00
// 本地Sock
err := this . listenSock ( )
if err != nil {
2020-12-17 17:36:10 +08:00
remotelogs . Error ( "NODE" , err . Error ( ) )
2020-10-27 12:33:34 +08:00
return
}
2022-08-21 23:09:47 +08:00
// 启动IP库
remotelogs . Println ( "NODE" , "initializing ip library ..." )
2022-08-23 14:32:39 +08:00
err = iplib . InitDefault ( )
2022-08-21 23:09:47 +08:00
if err != nil {
remotelogs . Error ( "NODE" , "initialize ip library failed: " + err . Error ( ) )
}
2023-04-02 21:30:03 +08:00
// 调整系统参数
this . checkSystem ( )
2022-08-21 23:09:47 +08:00
// 启动事件
events . Notify ( events . EventStart )
2020-09-09 18:53:53 +08:00
// 读取API配置
2022-05-21 21:32:10 +08:00
remotelogs . Println ( "NODE" , "init config ..." )
2021-11-11 14:16:57 +08:00
err = this . syncConfig ( 0 )
2021-06-15 10:55:49 +08:00
if err != nil {
_ , err := nodeconfigs . SharedNodeConfig ( )
2021-01-11 18:16:15 +08:00
if err != nil {
2021-06-15 10:55:49 +08:00
// 无本地数据时,会尝试多次读取
tryTimes := 0
for {
2021-11-11 14:16:57 +08:00
err := this . syncConfig ( 0 )
2021-06-15 10:55:49 +08:00
if err != nil {
tryTimes ++
if tryTimes % 10 == 0 {
remotelogs . Error ( "NODE" , err . Error ( ) )
}
time . Sleep ( 1 * time . Second )
// 不做长时间的无意义的重试
if tryTimes > 1000 {
return
}
} else {
break
}
2021-01-11 18:16:15 +08:00
}
}
2020-09-09 18:53:53 +08:00
}
// 启动同步计时器
this . startSyncTimer ( )
2022-08-23 14:53:39 +08:00
// 更新IP库
goman . New ( func ( ) {
iplib . NewUpdater ( NewIPLibraryUpdater ( ) , 10 * time . Minute ) . Start ( )
} )
// 监控节点运行状态
2021-12-08 15:17:45 +08:00
goman . New ( func ( ) {
NewNodeStatusExecutor ( ) . Listen ( )
} )
2020-09-09 18:53:53 +08:00
2020-07-22 22:18:47 +08:00
// 读取配置
2020-09-26 08:07:07 +08:00
nodeConfig , err := nodeconfigs . SharedNodeConfig ( )
2020-07-21 11:18:47 +08:00
if err != nil {
2020-12-17 17:36:10 +08:00
remotelogs . Error ( "NODE" , "start failed: read node config failed: " + err . Error ( ) )
2020-07-21 11:18:47 +08:00
return
}
2021-11-16 16:11:05 +08:00
teaconst . NodeId = nodeConfig . Id
2021-12-02 11:30:47 +08:00
teaconst . NodeIdString = types . String ( teaconst . NodeId )
2023-08-08 10:07:24 +08:00
err , serverErrors := nodeConfig . Init ( context . Background ( ) )
2020-09-26 08:07:07 +08:00
if err != nil {
2020-12-17 17:36:10 +08:00
remotelogs . Error ( "NODE" , "init node config failed: " + err . Error ( ) )
2020-09-26 08:07:07 +08:00
return
}
2021-12-01 15:52:38 +08:00
if len ( serverErrors ) > 0 {
for _ , serverErr := range serverErrors {
remotelogs . ServerError ( serverErr . Id , "NODE" , serverErr . Message , nodeconfigs . NodeLogTypeServerConfigInitFailed , maps . Map { } )
}
}
2020-09-26 08:07:07 +08:00
sharedNodeConfig = nodeConfig
2022-11-25 10:50:57 +08:00
this . onReload ( nodeConfig , true )
2020-07-21 11:18:47 +08:00
2020-12-03 10:17:28 +08:00
// 发送事件
events . Notify ( events . EventLoaded )
2020-07-22 22:18:47 +08:00
// 设置rlimit
_ = utils . SetRLimit ( 1024 * 1024 )
2020-10-04 14:30:42 +08:00
// 连接API
2021-12-08 15:17:45 +08:00
goman . New ( func ( ) {
NewAPIStream ( ) . Start ( )
} )
2020-10-04 14:30:42 +08:00
2021-01-25 16:40:31 +08:00
// 统计
2021-12-14 10:01:21 +08:00
goman . New ( func ( ) {
2022-10-23 19:41:21 +08:00
stats . SharedTrafficStatManager . Start ( )
2021-01-25 16:40:31 +08:00
} )
2021-12-08 15:17:45 +08:00
goman . New ( func ( ) {
stats . SharedHTTPRequestStatManager . Start ( )
} )
2021-01-25 16:40:31 +08:00
2020-07-22 22:18:47 +08:00
// 启动端口
err = sharedListenerManager . Start ( nodeConfig )
if err != nil {
2020-12-17 17:36:10 +08:00
remotelogs . Error ( "NODE" , "start failed: " + err . Error ( ) )
2020-10-28 11:19:06 +08:00
return
}
2020-07-22 22:18:47 +08:00
// hold住进程
2020-10-04 14:30:42 +08:00
select { }
2020-07-21 11:18:47 +08:00
}
2020-09-09 18:53:53 +08:00
2021-06-10 19:19:15 +08:00
// Daemon 实现守护进程
2021-01-12 11:48:38 +08:00
func ( this * Node ) Daemon ( ) {
2022-07-26 09:41:43 +08:00
var isDebug = lists . ContainsString ( os . Args , "debug" )
2021-01-12 11:48:38 +08:00
for {
2021-07-25 17:39:09 +08:00
conn , err := this . sock . Dial ( )
2021-01-12 11:48:38 +08:00
if err != nil {
if isDebug {
log . Println ( "[DAEMON]starting ..." )
}
// 尝试启动
err = func ( ) error {
exe , err := os . Executable ( )
if err != nil {
return err
}
2021-06-10 19:19:15 +08:00
// 可以标记当前是从守护进程启动的
_ = os . Setenv ( "EdgeDaemon" , "on" )
2021-10-14 10:28:32 +08:00
_ = os . Setenv ( "EdgeBackground" , "on" )
2021-06-10 19:19:15 +08:00
2022-07-26 09:41:43 +08:00
var cmd = exec . Command ( exe )
2022-08-09 22:59:37 +08:00
var buf = & bytes . Buffer { }
cmd . Stderr = buf
2021-01-12 11:48:38 +08:00
err = cmd . Start ( )
if err != nil {
return err
}
err = cmd . Wait ( )
if err != nil {
2022-08-09 22:59:37 +08:00
if isDebug {
log . Println ( "[DAEMON]" + buf . String ( ) )
}
2021-01-12 11:48:38 +08:00
return err
}
return nil
} ( )
if err != nil {
if isDebug {
log . Println ( "[DAEMON]" , err )
}
time . Sleep ( 1 * time . Second )
} else {
time . Sleep ( 5 * time . Second )
}
} else {
_ = conn . Close ( )
time . Sleep ( 5 * time . Second )
}
}
}
2021-06-10 19:19:15 +08:00
// InstallSystemService 安装系统服务
2021-01-12 11:48:38 +08:00
func ( this * Node ) InstallSystemService ( ) error {
shortName := teaconst . SystemdServiceName
exe , err := os . Executable ( )
if err != nil {
return err
}
manager := utils . NewServiceManager ( shortName , teaconst . ProductName )
err = manager . Install ( exe , [ ] string { } )
if err != nil {
return err
}
return nil
}
2020-09-09 18:53:53 +08:00
// 读取API配置
2021-11-11 14:16:57 +08:00
func ( this * Node ) syncConfig ( taskVersion int64 ) error {
2021-10-01 11:13:36 +08:00
this . locker . Lock ( )
defer this . locker . Unlock ( )
2020-10-17 11:14:40 +08:00
// 检查api.yaml是否存在
apiConfigFile := Tea . ConfigFile ( "api.yaml" )
_ , err := os . Stat ( apiConfigFile )
if err != nil {
if os . IsNotExist ( err ) {
clusterErr := this . checkClusterConfig ( )
if clusterErr != nil {
if os . IsNotExist ( clusterErr ) {
2022-05-21 21:32:10 +08:00
return errors . New ( "can not find config file 'configs/api.yaml'" )
2020-10-17 11:14:40 +08:00
}
2021-09-16 15:58:10 +08:00
return errors . New ( "check cluster config failed: " + clusterErr . Error ( ) )
2020-10-17 11:14:40 +08:00
}
} else {
return err
}
}
2020-09-09 18:53:53 +08:00
rpcClient , err := rpc . SharedRPC ( )
if err != nil {
2023-08-11 14:38:00 +08:00
return fmt . Errorf ( "create rpc client failed: %w" , err )
2020-09-09 18:53:53 +08:00
}
2021-01-17 16:47:37 +08:00
// 获取同步任务
2020-09-26 08:07:07 +08:00
// TODO 这里考虑只同步版本号有变更的
2022-11-23 20:13:34 +08:00
configResp , err := rpcClient . NodeRPC . FindCurrentNodeConfig ( rpcClient . Context ( ) , & pb . FindCurrentNodeConfigRequest {
2021-11-11 14:16:57 +08:00
Version : - 1 , // 更新所有版本
Compress : true ,
NodeTaskVersion : taskVersion ,
2023-03-18 22:18:48 +08:00
UseDataMap : true ,
2020-12-03 10:17:28 +08:00
} )
2020-09-09 18:53:53 +08:00
if err != nil {
2023-08-11 14:38:00 +08:00
return fmt . Errorf ( "read config from rpc failed: %w" , err )
2020-09-09 18:53:53 +08:00
}
2020-12-03 10:17:28 +08:00
if ! configResp . IsChanged {
return nil
}
2022-05-12 21:07:45 +08:00
var configJSON = configResp . NodeJSON
2021-11-11 14:16:57 +08:00
if configResp . IsCompressed {
var reader = brotli . NewReader ( bytes . NewReader ( configJSON ) )
var configBuf = & bytes . Buffer { }
var buf = make ( [ ] byte , 32 * 1024 )
for {
n , err := reader . Read ( buf )
if n > 0 {
configBuf . Write ( buf [ : n ] )
}
if err != nil {
break
}
}
configJSON = configBuf . Bytes ( )
}
nodeConfigUpdatedAt = time . Now ( ) . Unix ( )
2022-05-12 21:07:45 +08:00
var nodeConfig = & nodeconfigs . NodeConfig { }
2020-09-26 08:07:07 +08:00
err = json . Unmarshal ( configJSON , nodeConfig )
2020-09-09 18:53:53 +08:00
if err != nil {
2023-08-11 14:38:00 +08:00
return fmt . Errorf ( "decode config failed: %w" , err )
2020-09-09 18:53:53 +08:00
}
2021-11-16 16:11:05 +08:00
teaconst . NodeId = nodeConfig . Id
2021-12-02 11:30:47 +08:00
teaconst . NodeIdString = types . String ( teaconst . NodeId )
2020-09-09 18:53:53 +08:00
2022-05-12 21:07:45 +08:00
// 检查时间是否一致
// 这个需要在 teaconst.NodeId 设置之后, 因为上报到API节点的时候需要节点ID
if configResp . Timestamp > 0 {
var timestampDelta = configResp . Timestamp - time . Now ( ) . Unix ( )
if timestampDelta > 60 || timestampDelta < - 60 {
remotelogs . Error ( "NODE" , "node timestamp ('" + types . String ( time . Now ( ) . Unix ( ) ) + "') is not same as api node ('" + types . String ( configResp . Timestamp ) + "'), please sync the time" )
}
}
2020-09-09 18:53:53 +08:00
// 写入到文件中
err = nodeConfig . Save ( )
if err != nil {
return err
}
2023-08-08 10:07:24 +08:00
err , serverErrors := nodeConfig . Init ( context . Background ( ) )
2020-09-09 18:53:53 +08:00
if err != nil {
return err
}
2021-12-01 15:52:38 +08:00
if len ( serverErrors ) > 0 {
for _ , serverErr := range serverErrors {
remotelogs . ServerError ( serverErr . Id , "NODE" , serverErr . Message , nodeconfigs . NodeLogTypeServerConfigInitFailed , maps . Map { } )
}
}
2020-09-09 18:53:53 +08:00
2020-09-26 08:07:07 +08:00
// 刷新配置
2021-01-17 16:47:37 +08:00
if this . isLoaded {
2023-03-19 11:00:27 +08:00
remotelogs . Println ( "NODE" , "reloading node config ..." )
2020-10-10 12:31:46 +08:00
} else {
2023-03-19 11:00:27 +08:00
remotelogs . Println ( "NODE" , "loading node config ..." )
2020-10-10 12:31:46 +08:00
}
2021-09-16 15:58:10 +08:00
2022-11-25 10:50:57 +08:00
this . onReload ( nodeConfig , true )
2020-09-26 08:07:07 +08:00
2020-12-03 10:17:28 +08:00
// 发送事件
events . Notify ( events . EventReload )
2021-01-17 16:47:37 +08:00
if this . isLoaded {
2020-09-09 18:53:53 +08:00
return sharedListenerManager . Start ( nodeConfig )
}
2021-01-17 16:47:37 +08:00
this . isLoaded = true
2023-04-03 16:12:14 +08:00
// 整体更新不需要再更新单个服务
this . updatingServerMap = map [ int64 ] * serverconfigs . ServerConfig { }
2020-09-09 18:53:53 +08:00
return nil
}
2022-01-19 22:16:46 +08:00
// 读取单个服务配置
func ( this * Node ) syncServerConfig ( serverId int64 ) error {
rpcClient , err := rpc . SharedRPC ( )
if err != nil {
return err
}
2022-08-24 20:04:46 +08:00
resp , err := rpcClient . ServerRPC . ComposeServerConfig ( rpcClient . Context ( ) , & pb . ComposeServerConfigRequest { ServerId : serverId } )
2022-01-19 22:16:46 +08:00
if err != nil {
return err
}
this . locker . Lock ( )
defer this . locker . Unlock ( )
if len ( resp . ServerConfigJSON ) == 0 {
this . updatingServerMap [ serverId ] = nil
} else {
var config = & serverconfigs . ServerConfig { }
err = json . Unmarshal ( resp . ServerConfigJSON , config )
if err != nil {
return err
}
this . updatingServerMap [ serverId ] = config
}
return nil
}
2022-10-23 16:21:11 +08:00
// 同步某个用户下的所有服务配置
func ( this * Node ) syncUserServersConfig ( userId int64 ) error {
rpcClient , err := rpc . SharedRPC ( )
if err != nil {
return err
}
serverConfigsResp , err := rpcClient . ServerRPC . ComposeAllUserServersConfig ( rpcClient . Context ( ) , & pb . ComposeAllUserServersConfigRequest {
UserId : userId ,
} )
if err != nil {
return err
}
if len ( serverConfigsResp . ServersConfigJSON ) == 0 {
return nil
}
var serverConfigs = [ ] * serverconfigs . ServerConfig { }
err = json . Unmarshal ( serverConfigsResp . ServersConfigJSON , & serverConfigs )
if err != nil {
return err
}
this . locker . Lock ( )
defer this . locker . Unlock ( )
for _ , config := range serverConfigs {
this . updatingServerMap [ config . Id ] = config
}
return nil
}
2020-09-09 18:53:53 +08:00
// 启动同步计时器
func ( this * Node ) startSyncTimer ( ) {
2020-09-26 08:07:07 +08:00
// TODO 这个时间间隔可以自行设置
2022-01-19 22:16:46 +08:00
var taskTicker = time . NewTicker ( 60 * time . Second )
var serverChangeTicker = time . NewTicker ( 5 * time . Second )
2022-01-12 20:31:04 +08:00
events . OnKey ( events . EventQuit , this , func ( ) {
2020-12-17 17:36:10 +08:00
remotelogs . Println ( "NODE" , "quit sync timer" )
2022-01-19 22:16:46 +08:00
taskTicker . Stop ( )
serverChangeTicker . Stop ( )
2020-10-28 11:19:06 +08:00
} )
2021-12-08 15:17:45 +08:00
goman . New ( func ( ) {
2020-10-09 12:03:53 +08:00
for {
select {
2022-01-19 22:16:46 +08:00
case <- taskTicker . C : // 定期执行
2023-04-06 20:50:34 +08:00
err := this . loopTasks ( )
2020-10-09 12:03:53 +08:00
if err != nil {
2020-12-17 17:36:10 +08:00
remotelogs . Error ( "NODE" , "sync config error: " + err . Error ( ) )
2020-10-09 12:03:53 +08:00
continue
}
2022-01-19 22:16:46 +08:00
case <- serverChangeTicker . C : // 服务变化
2022-04-18 15:39:02 +08:00
this . reloadServer ( )
2022-01-19 22:16:46 +08:00
case <- nodeTaskNotify : // 有新的更新任务
2023-04-06 20:50:34 +08:00
err := this . loopTasks ( )
2020-10-09 12:03:53 +08:00
if err != nil {
2020-12-17 17:36:10 +08:00
remotelogs . Error ( "NODE" , "sync config error: " + err . Error ( ) )
2020-10-09 12:03:53 +08:00
continue
}
2022-01-19 22:16:46 +08:00
case <- nodeConfigChangedNotify : // 节点变化通知
2021-11-11 14:16:57 +08:00
err := this . syncConfig ( 0 )
2021-10-01 11:13:36 +08:00
if err != nil {
remotelogs . Error ( "NODE" , "sync config error: " + err . Error ( ) )
continue
}
2020-09-09 18:53:53 +08:00
}
}
2021-12-08 15:17:45 +08:00
} )
2020-09-09 18:53:53 +08:00
}
2020-10-17 11:14:40 +08:00
// 检查集群设置
func ( this * Node ) checkClusterConfig ( ) error {
configFile := Tea . ConfigFile ( "cluster.yaml" )
2022-08-04 11:34:06 +08:00
data , err := os . ReadFile ( configFile )
2020-10-17 11:14:40 +08:00
if err != nil {
return err
}
config := & configs . ClusterConfig { }
err = yaml . Unmarshal ( data , config )
if err != nil {
return err
}
rpcClient , err := rpc . NewRPCClient ( & configs . APIConfig {
RPC : config . RPC ,
NodeId : config . ClusterId ,
Secret : config . Secret ,
} )
if err != nil {
return err
}
2022-11-06 12:07:26 +08:00
remotelogs . Debug ( "NODE" , "registering node to cluster ..." )
2022-08-24 20:04:46 +08:00
resp , err := rpcClient . NodeRPC . RegisterClusterNode ( rpcClient . ClusterContext ( config . ClusterId , config . Secret ) , & pb . RegisterClusterNodeRequest { Name : HOSTNAME } )
2020-10-17 11:14:40 +08:00
if err != nil {
return err
}
2022-11-06 12:07:26 +08:00
remotelogs . Debug ( "NODE" , "registered successfully" )
2020-10-17 11:14:40 +08:00
// 写入到配置文件中
if len ( resp . Endpoints ) == 0 {
resp . Endpoints = [ ] string { }
}
2022-07-21 14:06:38 +08:00
var apiConfig = & configs . APIConfig {
2020-10-17 11:14:40 +08:00
RPC : struct {
2022-11-21 19:55:28 +08:00
Endpoints [ ] string ` yaml:"endpoints" json:"endpoints" `
DisableUpdate bool ` yaml:"disableUpdate" json:"disableUpdate" `
2020-10-17 11:14:40 +08:00
} {
2022-07-21 14:06:38 +08:00
Endpoints : resp . Endpoints ,
DisableUpdate : false ,
2020-10-17 11:14:40 +08:00
} ,
NodeId : resp . UniqueId ,
Secret : resp . Secret ,
}
2022-11-06 12:07:26 +08:00
remotelogs . Debug ( "NODE" , "writing 'configs/api.yaml' ..." )
2020-10-17 11:14:40 +08:00
err = apiConfig . WriteFile ( Tea . ConfigFile ( "api.yaml" ) )
if err != nil {
return err
}
2022-11-06 12:07:26 +08:00
remotelogs . Debug ( "NODE" , "wrote 'configs/api.yaml' successfully" )
2020-10-17 11:14:40 +08:00
return nil
}
2020-10-27 12:33:34 +08:00
2022-03-14 11:47:34 +08:00
// 监听一些信号
func ( this * Node ) listenSignals ( ) {
var queue = make ( chan os . Signal , 8 )
signal . Notify ( queue , syscall . SIGTERM , syscall . SIGINT , syscall . SIGKILL , syscall . SIGQUIT )
goman . New ( func ( ) {
for range queue {
time . Sleep ( 100 * time . Millisecond )
2022-03-15 18:32:39 +08:00
utils . Exit ( )
2022-03-14 11:47:34 +08:00
return
}
} )
}
2020-10-27 12:33:34 +08:00
// 监听本地sock
func ( this * Node ) listenSock ( ) error {
2021-07-25 17:14:44 +08:00
// 检查是否在运行
if this . sock . IsListening ( ) {
reply , err := this . sock . Send ( & gosock . Command { Code : "pid" } )
if err == nil {
2022-08-03 23:31:08 +08:00
return errors . New ( "error: the process is already running, pid: " + types . String ( maps . NewMap ( reply . Params ) . GetInt ( "pid" ) ) )
2020-10-27 12:33:34 +08:00
} else {
2021-07-25 17:14:44 +08:00
return errors . New ( "error: the process is already running" )
2020-10-27 12:33:34 +08:00
}
}
2021-07-25 17:14:44 +08:00
// 启动监听
2021-12-08 15:17:45 +08:00
goman . New ( func ( ) {
2021-07-25 17:14:44 +08:00
this . sock . OnCommand ( func ( cmd * gosock . Command ) {
switch cmd . Code {
case "pid" :
_ = cmd . Reply ( & gosock . Command {
Code : "pid" ,
Params : map [ string ] interface { } {
"pid" : os . Getpid ( ) ,
} ,
} )
2021-11-04 11:14:02 +08:00
case "info" :
exePath , _ := os . Executable ( )
_ = cmd . Reply ( & gosock . Command {
Code : "info" ,
Params : map [ string ] interface { } {
"pid" : os . Getpid ( ) ,
"version" : teaconst . Version ,
"path" : exePath ,
} ,
} )
2021-07-25 17:14:44 +08:00
case "stop" :
_ = cmd . ReplyOk ( )
// 退出主进程
events . Notify ( events . EventQuit )
2022-09-26 16:27:51 +08:00
time . Sleep ( 100 * time . Millisecond )
2022-03-15 18:32:39 +08:00
utils . Exit ( )
2021-07-25 17:14:44 +08:00
case "quit" :
_ = cmd . ReplyOk ( )
_ = this . sock . Close ( )
events . Notify ( events . EventQuit )
2022-07-28 14:38:08 +08:00
events . Notify ( events . EventTerminated )
2021-07-25 17:14:44 +08:00
// 监控连接数, 如果连接数为0, 则退出进程
2021-12-08 15:17:45 +08:00
goman . New ( func ( ) {
2021-07-25 17:14:44 +08:00
for {
countActiveConnections := sharedListenerManager . TotalActiveConnections ( )
if countActiveConnections <= 0 {
2022-03-15 18:32:39 +08:00
utils . Exit ( )
2021-07-25 17:14:44 +08:00
return
}
time . Sleep ( 1 * time . Second )
}
2021-12-08 15:17:45 +08:00
} )
2021-11-14 10:55:09 +08:00
case "trackers" :
_ = cmd . Reply ( & gosock . Command {
Params : map [ string ] interface { } {
"labels" : trackers . SharedManager . Labels ( ) ,
} ,
} )
2021-12-08 15:17:45 +08:00
case "goman" :
var posMap = map [ string ] maps . Map { } // file#line => Map
for _ , instance := range goman . List ( ) {
var pos = instance . File + "#" + types . String ( instance . Line )
m , ok := posMap [ pos ]
if ok {
m [ "count" ] = m [ "count" ] . ( int ) + 1
} else {
m = maps . Map {
"pos" : pos ,
"count" : 1 ,
}
posMap [ pos ] = m
}
}
var result = [ ] maps . Map { }
for _ , m := range posMap {
result = append ( result , m )
}
sort . Slice ( result , func ( i , j int ) bool {
return result [ i ] [ "count" ] . ( int ) > result [ j ] [ "count" ] . ( int )
} )
_ = cmd . Reply ( & gosock . Command {
Params : map [ string ] interface { } {
2021-12-14 10:01:21 +08:00
"total" : runtime . NumGoroutine ( ) ,
2021-12-08 15:17:45 +08:00
"result" : result ,
} ,
2021-12-12 11:48:01 +08:00
} )
case "conns" :
2022-12-02 17:03:16 +08:00
var connMaps = [ ] maps . Map { }
2022-09-02 15:20:58 +08:00
var connMap = conns . SharedMap . AllConns ( )
2022-12-21 15:59:07 +08:00
for _ , conn := range connMap {
var createdAt int64
var lastReadAt int64
var lastWriteAt int64
var lastErrString = ""
2023-06-02 10:54:17 +08:00
var protocol = "tcp"
2022-12-21 15:59:07 +08:00
clientConn , ok := conn . ( * ClientConn )
if ok {
createdAt = clientConn . CreatedAt ( )
lastReadAt = clientConn . LastReadAt ( )
lastWriteAt = clientConn . LastWriteAt ( )
var lastErr = clientConn . LastErr ( )
if lastErr != nil {
lastErrString = lastErr . Error ( )
}
2023-06-02 10:54:17 +08:00
} else {
protocol = "udp"
2022-12-21 15:59:07 +08:00
}
2023-01-05 11:13:35 +08:00
var age int64 = - 1
var lastReadAge int64 = - 1
var lastWriteAge int64 = - 1
2022-12-21 15:59:07 +08:00
var currentTime = time . Now ( ) . Unix ( )
if createdAt > 0 {
age = currentTime - createdAt
}
if lastReadAt > 0 {
lastReadAge = currentTime - lastReadAt
}
if lastWriteAt > 0 {
lastWriteAge = currentTime - lastWriteAt
}
2022-12-02 17:03:16 +08:00
connMaps = append ( connMaps , maps . Map {
2023-06-02 10:54:17 +08:00
"protocol" : protocol ,
2022-12-21 15:59:07 +08:00
"addr" : conn . RemoteAddr ( ) . String ( ) ,
"age" : age ,
"readAge" : lastReadAge ,
"writeAge" : lastWriteAge ,
"lastErr" : lastErrString ,
2022-12-02 17:03:16 +08:00
} )
2022-09-02 15:20:58 +08:00
}
2022-12-21 15:59:07 +08:00
sort . Slice ( connMaps , func ( i , j int ) bool {
var m1 = connMaps [ i ]
var m2 = connMaps [ j ]
return m1 . GetInt64 ( "age" ) < m2 . GetInt64 ( "age" )
} )
2021-12-12 11:48:01 +08:00
_ = cmd . Reply ( & gosock . Command {
Params : map [ string ] interface { } {
2022-12-02 17:03:16 +08:00
"conns" : connMaps ,
"total" : len ( connMaps ) ,
2021-12-12 11:48:01 +08:00
} ,
2021-12-08 15:17:45 +08:00
} )
2022-01-09 17:07:37 +08:00
case "dropIP" :
var m = maps . NewMap ( cmd . Params )
var ip = m . GetString ( "ip" )
var timeSeconds = m . GetInt ( "timeoutSeconds" )
2022-08-04 11:01:16 +08:00
var async = m . GetBool ( "async" )
err := firewalls . Firewall ( ) . DropSourceIP ( ip , timeSeconds , async )
2022-01-09 17:07:37 +08:00
if err != nil {
_ = cmd . Reply ( & gosock . Command {
Params : map [ string ] interface { } {
"error" : err . Error ( ) ,
} ,
} )
} else {
_ = cmd . ReplyOk ( )
}
case "rejectIP" :
var m = maps . NewMap ( cmd . Params )
var ip = m . GetString ( "ip" )
var timeSeconds = m . GetInt ( "timeoutSeconds" )
err := firewalls . Firewall ( ) . RejectSourceIP ( ip , timeSeconds )
if err != nil {
_ = cmd . Reply ( & gosock . Command {
Params : map [ string ] interface { } {
"error" : err . Error ( ) ,
} ,
} )
} else {
_ = cmd . ReplyOk ( )
}
2022-12-12 19:23:58 +08:00
case "closeIP" :
var m = maps . NewMap ( cmd . Params )
var ip = m . GetString ( "ip" )
conns . SharedMap . CloseIPConns ( ip )
_ = cmd . ReplyOk ( )
2022-01-09 17:07:37 +08:00
case "removeIP" :
var m = maps . NewMap ( cmd . Params )
var ip = m . GetString ( "ip" )
err := firewalls . Firewall ( ) . RemoveSourceIP ( ip )
if err != nil {
_ = cmd . Reply ( & gosock . Command {
Params : map [ string ] interface { } {
"error" : err . Error ( ) ,
} ,
} )
} else {
_ = cmd . ReplyOk ( )
}
2021-12-31 19:45:54 +08:00
case "gc" :
runtime . GC ( )
debug . FreeOSMemory ( )
2022-01-01 17:18:34 +08:00
_ = cmd . ReplyOk ( )
2022-02-25 11:23:32 +08:00
case "reload" :
err := this . syncConfig ( 0 )
if err != nil {
_ = cmd . Reply ( & gosock . Command {
Params : map [ string ] interface { } {
"error" : err . Error ( ) ,
} ,
} )
} else {
_ = cmd . ReplyOk ( )
}
2022-05-18 23:14:57 +08:00
case "accesslog" :
err := sharedHTTPAccessLogViewer . Start ( )
if err != nil {
_ = cmd . Reply ( & gosock . Command {
Code : "error" ,
Params : map [ string ] interface { } {
"message" : "start failed: " + err . Error ( ) ,
} ,
} )
} else {
_ = cmd . ReplyOk ( )
}
2022-08-26 16:47:42 +08:00
case "bandwidth" :
var m = stats . SharedBandwidthStatManager . Map ( )
_ = cmd . Reply ( & gosock . Command { Params : maps . Map {
"stats" : m ,
} } )
2020-10-27 12:33:34 +08:00
}
2021-07-25 17:14:44 +08:00
} )
err := this . sock . Listen ( )
if err != nil {
2022-11-06 12:07:26 +08:00
remotelogs . Debug ( "NODE" , err . Error ( ) )
2020-10-27 12:33:34 +08:00
}
2021-12-08 15:17:45 +08:00
} )
2020-10-27 12:33:34 +08:00
2022-01-12 20:31:04 +08:00
events . OnKey ( events . EventQuit , this , func ( ) {
2022-11-06 12:07:26 +08:00
remotelogs . Debug ( "NODE" , "quit unix sock" )
2021-07-25 17:14:44 +08:00
_ = this . sock . Close ( )
} )
2020-10-27 12:33:34 +08:00
return nil
}
2021-12-09 12:07:59 +08:00
// 重载配置调用
2022-11-25 10:50:57 +08:00
func ( this * Node ) onReload ( config * nodeconfigs . NodeConfig , reloadAll bool ) {
2022-01-19 22:16:46 +08:00
nodeconfigs . ResetNodeConfig ( config )
sharedNodeConfig = config
2022-11-28 18:13:08 +08:00
if reloadAll {
// 缓存策略
var subDirs = config . CacheDiskSubDirs
for _ , subDir := range subDirs {
subDir . Path = filepath . Clean ( subDir . Path )
}
if len ( subDirs ) > 0 {
sort . Slice ( subDirs , func ( i , j int ) bool {
return subDirs [ i ] . Path < subDirs [ j ] . Path
} )
2022-11-25 10:50:57 +08:00
}
2022-11-28 18:13:08 +08:00
var cachePoliciesChanged = ! jsonutils . Equal ( caches . SharedManager . MaxDiskCapacity , config . MaxCacheDiskCapacity ) ||
! jsonutils . Equal ( caches . SharedManager . MaxMemoryCapacity , config . MaxCacheMemoryCapacity ) ||
! jsonutils . Equal ( caches . SharedManager . MainDiskDir , config . CacheDiskDir ) ||
! jsonutils . Equal ( caches . SharedManager . SubDiskDirs , subDirs ) ||
! jsonutils . Equal ( this . oldHTTPCachePolicies , config . HTTPCachePolicies )
caches . SharedManager . MaxDiskCapacity = config . MaxCacheDiskCapacity
caches . SharedManager . MaxMemoryCapacity = config . MaxCacheMemoryCapacity
caches . SharedManager . MainDiskDir = config . CacheDiskDir
caches . SharedManager . SubDiskDirs = subDirs
if cachePoliciesChanged {
// copy
this . oldHTTPCachePolicies = [ ] * serverconfigs . HTTPCachePolicy { }
err := jsonutils . Copy ( & this . oldHTTPCachePolicies , config . HTTPCachePolicies )
if err != nil {
remotelogs . Error ( "NODE" , "onReload: copy HTTPCachePolicies failed: " + err . Error ( ) )
}
// update
if len ( config . HTTPCachePolicies ) > 0 {
caches . SharedManager . UpdatePolicies ( config . HTTPCachePolicies )
} else {
caches . SharedManager . UpdatePolicies ( [ ] * serverconfigs . HTTPCachePolicy { } )
}
2022-11-25 10:50:57 +08:00
}
2022-01-19 22:16:46 +08:00
}
// WAF策略
2022-11-28 18:13:08 +08:00
// 包含了服务里的WAF策略, 所以需要整体更新
2022-11-25 10:50:57 +08:00
var allFirewallPolicies = config . FindAllFirewallPolicies ( )
if ! jsonutils . Equal ( allFirewallPolicies , this . oldHTTPFirewallPolicies ) {
// copy
this . oldHTTPFirewallPolicies = [ ] * firewallconfigs . HTTPFirewallPolicy { }
err := jsonutils . Copy ( & this . oldHTTPFirewallPolicies , allFirewallPolicies )
if err != nil {
remotelogs . Error ( "NODE" , "onReload: copy HTTPFirewallPolicies failed: " + err . Error ( ) )
}
// update
waf . SharedWAFManager . UpdatePolicies ( allFirewallPolicies )
}
2022-11-28 18:13:08 +08:00
if reloadAll {
if ! jsonutils . Equal ( config . FirewallActions , this . oldFirewallActions ) {
// copy
this . oldFirewallActions = [ ] * firewallconfigs . FirewallActionConfig { }
err := jsonutils . Copy ( & this . oldFirewallActions , config . FirewallActions )
if err != nil {
remotelogs . Error ( "NODE" , "onReload: copy FirewallActionConfigs failed: " + err . Error ( ) )
}
// update
iplibrary . SharedActionManager . UpdateActions ( config . FirewallActions )
2022-11-25 10:50:57 +08:00
}
2022-11-28 18:13:08 +08:00
// 统计指标
if ! jsonutils . Equal ( this . oldMetricItems , config . MetricItems ) {
// copy
this . oldMetricItems = [ ] * serverconfigs . MetricItemConfig { }
err := jsonutils . Copy ( & this . oldMetricItems , config . MetricItems )
if err != nil {
remotelogs . Error ( "NODE" , "onReload: copy MetricItemConfigs failed: " + err . Error ( ) )
}
2022-01-19 22:16:46 +08:00
2022-11-28 18:13:08 +08:00
// update
metrics . SharedManager . Update ( config . MetricItems )
2022-11-25 10:50:57 +08:00
}
2022-11-28 18:13:08 +08:00
// max cpu
if config . MaxCPU != this . oldMaxCPU {
if config . MaxCPU > 0 && config . MaxCPU < int32 ( runtime . NumCPU ( ) ) {
runtime . GOMAXPROCS ( int ( config . MaxCPU ) )
remotelogs . Println ( "NODE" , "[CPU]set max cpu to '" + types . String ( config . MaxCPU ) + "'" )
} else {
var threads = runtime . NumCPU ( ) * 4
runtime . GOMAXPROCS ( threads )
remotelogs . Println ( "NODE" , "[CPU]set max cpu to '" + types . String ( threads ) + "'" )
}
2022-01-19 22:16:46 +08:00
2022-11-28 18:13:08 +08:00
this . oldMaxCPU = config . MaxCPU
2021-12-09 17:34:05 +08:00
}
2022-11-28 18:13:08 +08:00
// max threads
if config . MaxThreads != this . oldMaxThreads {
if config . MaxThreads > 0 {
debug . SetMaxThreads ( config . MaxThreads )
remotelogs . Println ( "NODE" , "[THREADS]set max threads to '" + types . String ( config . MaxThreads ) + "'" )
} else {
debug . SetMaxThreads ( nodeconfigs . DefaultMaxThreads )
remotelogs . Println ( "NODE" , "[THREADS]set max threads to '" + types . String ( nodeconfigs . DefaultMaxThreads ) + "'" )
}
this . oldMaxThreads = config . MaxThreads
}
2021-12-09 12:07:59 +08:00
2022-11-28 18:13:08 +08:00
// timezone
var timeZone = config . TimeZone
if len ( timeZone ) == 0 {
timeZone = "Asia/Shanghai"
2021-12-09 17:34:05 +08:00
}
2022-11-28 18:13:08 +08:00
if this . oldTimezone != timeZone {
location , err := time . LoadLocation ( timeZone )
if err != nil {
remotelogs . Error ( "NODE" , "[TIMEZONE]change time zone failed: " + err . Error ( ) )
return
}
2021-12-09 12:07:59 +08:00
2022-11-28 18:13:08 +08:00
remotelogs . Println ( "NODE" , "[TIMEZONE]change time zone to '" + timeZone + "'" )
time . Local = location
this . oldTimezone = timeZone
2021-12-09 12:07:59 +08:00
}
2022-11-28 18:13:08 +08:00
// product information
if config . ProductConfig != nil {
teaconst . GlobalProductName = config . ProductConfig . Name
}
2022-05-04 16:40:25 +08:00
2022-11-28 18:13:08 +08:00
// DNS resolver
if config . DNSResolver != nil {
var err error
switch config . DNSResolver . Type {
case nodeconfigs . DNSResolverTypeGoNative :
err = os . Setenv ( "GODEBUG" , "netdns=go" )
case nodeconfigs . DNSResolverTypeCGO :
err = os . Setenv ( "GODEBUG" , "netdns=cgo" )
default :
// 默认使用go原生
err = os . Setenv ( "GODEBUG" , "netdns=go" )
}
if err != nil {
remotelogs . Error ( "NODE" , "[DNS_RESOLVER]set env failed: " + err . Error ( ) )
}
} else {
2022-06-07 11:49:38 +08:00
// 默认使用go原生
2022-11-28 18:13:08 +08:00
err := os . Setenv ( "GODEBUG" , "netdns=go" )
if err != nil {
remotelogs . Error ( "NODE" , "[DNS_RESOLVER]set env failed: " + err . Error ( ) )
}
2022-06-07 11:49:38 +08:00
}
2022-11-21 19:55:28 +08:00
2022-11-28 18:13:08 +08:00
// API Node地址, 这里不限制是否为空, 因为在为空时仍然要有对应的处理
this . changeAPINodeAddrs ( config . APINodeAddrs )
}
2023-03-31 14:06:01 +08:00
// 刷新IP库
this . reloadIPLibrary ( )
2021-12-09 12:07:59 +08:00
}
2022-04-14 10:25:34 +08:00
2022-04-18 15:39:02 +08:00
// reload server config
func ( this * Node ) reloadServer ( ) {
this . locker . Lock ( )
defer this . locker . Unlock ( )
2023-04-06 20:50:34 +08:00
var countUpdatingServers = len ( this . updatingServerMap )
const maxPrintServers = 10
if countUpdatingServers > 0 {
2022-04-18 15:39:02 +08:00
var updatingServerMap = this . updatingServerMap
this . updatingServerMap = map [ int64 ] * serverconfigs . ServerConfig { }
newNodeConfig , err := nodeconfigs . CloneNodeConfig ( sharedNodeConfig )
if err != nil {
remotelogs . Error ( "NODE" , "apply server config error: " + err . Error ( ) )
return
}
for serverId , serverConfig := range updatingServerMap {
if serverConfig != nil {
2023-04-06 20:50:34 +08:00
if countUpdatingServers < maxPrintServers {
remotelogs . Debug ( "NODE" , "load server '" + types . String ( serverId ) + "'" )
}
2022-04-18 15:39:02 +08:00
newNodeConfig . AddServer ( serverConfig )
} else {
2023-04-06 20:50:34 +08:00
if countUpdatingServers < maxPrintServers {
remotelogs . Debug ( "NODE" , "remove server '" + types . String ( serverId ) + "'" )
}
2022-04-18 15:39:02 +08:00
newNodeConfig . RemoveServer ( serverId )
}
}
2023-04-06 20:50:34 +08:00
if countUpdatingServers >= maxPrintServers {
remotelogs . Debug ( "NODE" , "reload " + types . String ( countUpdatingServers ) + " servers" )
}
2023-08-08 10:07:24 +08:00
err , serverErrors := newNodeConfig . Init ( context . Background ( ) )
2022-04-18 15:39:02 +08:00
if err != nil {
remotelogs . Error ( "NODE" , "apply server config error: " + err . Error ( ) )
return
}
if len ( serverErrors ) > 0 {
for _ , serverErr := range serverErrors {
remotelogs . ServerError ( serverErr . Id , "NODE" , serverErr . Message , nodeconfigs . NodeLogTypeServerConfigInitFailed , maps . Map { } )
}
}
2022-11-25 10:50:57 +08:00
this . onReload ( newNodeConfig , false )
2022-04-18 15:39:02 +08:00
err = sharedListenerManager . Start ( newNodeConfig )
if err != nil {
remotelogs . Error ( "NODE" , "apply server config error: " + err . Error ( ) )
}
2023-07-27 10:37:16 +08:00
// notify event
events . Notify ( events . EventReloadSomeServers )
2022-04-18 15:39:02 +08:00
}
}
2023-04-02 21:30:03 +08:00
// 检查系统
func ( this * Node ) checkSystem ( ) {
if runtime . GOOS != "linux" || os . Getgid ( ) != 0 {
return
}
type variable struct {
name string
minValue int
maxValue int
}
const dir = "/proc/sys"
for _ , v := range [ ] variable {
{ name : "net.core.somaxconn" , minValue : 2048 } ,
{ name : "net.ipv4.tcp_max_syn_backlog" , minValue : 2048 } ,
{ name : "net.core.netdev_max_backlog" , minValue : 4096 } ,
{ name : "net.ipv4.tcp_fin_timeout" , maxValue : 10 } ,
{ name : "net.ipv4.tcp_max_tw_buckets" , minValue : 65535 } ,
{ name : "net.core.rmem_default" , minValue : 4 << 20 } ,
{ name : "net.core.wmem_default" , minValue : 4 << 20 } ,
{ name : "net.core.rmem_max" , minValue : 32 << 20 } ,
{ name : "net.core.wmem_max" , minValue : 32 << 20 } ,
} {
var path = dir + "/" + strings . Replace ( v . name , "." , "/" , - 1 )
data , err := os . ReadFile ( path )
if err != nil {
continue
}
data = bytes . TrimSpace ( data )
if len ( data ) == 0 {
continue
}
var oldValue = types . Int ( string ( data ) )
if v . minValue > 0 && oldValue < v . minValue {
err = os . WriteFile ( path , [ ] byte ( types . String ( v . minValue ) ) , 0666 )
if err == nil {
remotelogs . Println ( "NODE" , "change kernel parameter '" + v . name + "' from '" + types . String ( oldValue ) + "' to '" + types . String ( v . minValue ) + "'" )
}
} else if v . maxValue > 0 && oldValue > v . maxValue {
err = os . WriteFile ( path , [ ] byte ( types . String ( v . maxValue ) ) , 0666 )
if err == nil {
remotelogs . Println ( "NODE" , "change kernel parameter '" + v . name + "' from '" + types . String ( oldValue ) + "' to '" + types . String ( v . maxValue ) + "'" )
}
}
}
}
2022-11-21 21:08:47 +08:00
// 检查API节点地址
2022-11-21 19:55:28 +08:00
func ( this * Node ) changeAPINodeAddrs ( apiNodeAddrs [ ] * serverconfigs . NetworkAddressConfig ) {
var addrs = [ ] string { }
for _ , addr := range apiNodeAddrs {
err := addr . Init ( )
if err != nil {
remotelogs . Error ( "NODE" , "changeAPINodeAddrs: validate api node address '" + configutils . QuoteIP ( addr . Host ) + ":" + addr . PortRange + "' failed: " + err . Error ( ) )
} else {
addrs = append ( addrs , addr . FullAddresses ( ) ... )
}
}
sort . Strings ( addrs )
if utils . EqualStrings ( this . lastAPINodeAddrs , addrs ) {
return
}
this . lastAPINodeAddrs = addrs
config , err := configs . LoadAPIConfig ( )
if err != nil {
remotelogs . Error ( "NODE" , "changeAPINodeAddrs: " + err . Error ( ) )
return
}
if config == nil {
return
}
var oldEndpoints = config . RPC . Endpoints
rpcClient , err := rpc . SharedRPC ( )
if err != nil {
return
}
if len ( addrs ) > 0 {
this . lastAPINodeVersion ++
var v = this . lastAPINodeVersion
// 异步检测,防止阻塞
go func ( v int64 ) {
// 测试新的API节点地址
if rpcClient . TestEndpoints ( addrs ) {
config . RPC . Endpoints = addrs
} else {
config . RPC . Endpoints = oldEndpoints
this . lastAPINodeAddrs = nil // 恢复为空,以便于下次更新重试
}
// 检查测试中间有无新的变更
if v != this . lastAPINodeVersion {
return
}
err = rpcClient . UpdateConfig ( config )
if err != nil {
remotelogs . Error ( "NODE" , "changeAPINodeAddrs: update rpc config failed: " + err . Error ( ) )
}
} ( v )
return
}
err = rpcClient . UpdateConfig ( config )
if err != nil {
remotelogs . Error ( "NODE" , "changeAPINodeAddrs: update rpc config failed: " + err . Error ( ) )
}
}