Files
EdgeNode/internal/nodes/node.go

1279 lines
32 KiB
Go
Raw Normal View History

2020-07-21 11:18:47 +08:00
package nodes
import (
2021-11-11 14:16:57 +08:00
"bytes"
2020-09-09 18:53:53 +08:00
"encoding/json"
"errors"
"github.com/TeaOSLab/EdgeCommon/pkg/configutils"
2022-08-21 23:09:47 +08:00
iplib "github.com/TeaOSLab/EdgeCommon/pkg/iplibrary"
2020-09-26 08:07:07 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
2020-09-13 20:37:40 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
2022-05-18 21:03:51 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/ddosconfigs"
2022-11-25 10:50:57 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/firewallconfigs"
2020-10-05 16:55:14 +08:00
"github.com/TeaOSLab/EdgeNode/internal/caches"
"github.com/TeaOSLab/EdgeNode/internal/configs"
"github.com/TeaOSLab/EdgeNode/internal/conns"
2021-01-12 11:48:38 +08:00
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
2020-10-28 11:19:06 +08:00
"github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/firewalls"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/iplibrary"
2021-06-30 20:01:00 +08:00
"github.com/TeaOSLab/EdgeNode/internal/metrics"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
2020-09-09 18:53:53 +08:00
"github.com/TeaOSLab/EdgeNode/internal/rpc"
2021-01-25 16:40:31 +08:00
"github.com/TeaOSLab/EdgeNode/internal/stats"
"github.com/TeaOSLab/EdgeNode/internal/trackers"
2020-07-22 22:18:47 +08:00
"github.com/TeaOSLab/EdgeNode/internal/utils"
2022-09-15 15:59:29 +08:00
_ "github.com/TeaOSLab/EdgeNode/internal/utils/clock" // 触发时钟更新
2022-11-25 10:50:57 +08:00
"github.com/TeaOSLab/EdgeNode/internal/utils/jsonutils"
"github.com/TeaOSLab/EdgeNode/internal/waf"
2021-11-11 14:16:57 +08:00
"github.com/andybalholm/brotli"
"github.com/iwind/TeaGo/Tea"
2021-01-12 11:48:38 +08:00
"github.com/iwind/TeaGo/lists"
2021-07-25 17:14:44 +08:00
"github.com/iwind/TeaGo/maps"
2021-12-02 11:30:47 +08:00
"github.com/iwind/TeaGo/types"
2021-07-25 17:14:44 +08:00
"github.com/iwind/gosock/pkg/gosock"
2022-03-04 12:30:06 +08:00
"gopkg.in/yaml.v3"
2021-01-12 11:48:38 +08:00
"log"
"os"
2021-01-12 11:48:38 +08:00
"os/exec"
"os/signal"
"path/filepath"
"runtime"
2021-12-09 12:07:59 +08:00
"runtime/debug"
"sort"
"sync"
"syscall"
2020-09-09 18:53:53 +08:00
"time"
2020-07-21 11:18:47 +08:00
)
2020-09-26 08:07:07 +08:00
var sharedNodeConfig *nodeconfigs.NodeConfig
var nodeTaskNotify = make(chan bool, 8)
var nodeConfigChangedNotify = make(chan bool, 8)
var nodeConfigUpdatedAt int64
var DaemonIsOn = false
var DaemonPid = 0
2020-07-21 11:18:47 +08:00
// Node 节点
2020-07-21 11:18:47 +08:00
type Node struct {
isLoaded bool
2021-07-25 17:14:44 +08:00
sock *gosock.Sock
locker sync.Mutex
2021-12-09 12:07:59 +08:00
2022-11-25 10:50:57 +08:00
oldMaxCPU int32
oldMaxThreads int
oldTimezone string
oldHTTPCachePolicies []*serverconfigs.HTTPCachePolicy
oldHTTPFirewallPolicies []*firewallconfigs.HTTPFirewallPolicy
oldFirewallActions []*firewallconfigs.FirewallActionConfig
oldMetricItems []*serverconfigs.MetricItemConfig
2022-01-19 22:16:46 +08:00
updatingServerMap map[int64]*serverconfigs.ServerConfig
lastAPINodeVersion int64
lastAPINodeAddrs []string // 以前的API节点地址
lastTaskVersion int64
2020-07-21 11:18:47 +08:00
}
func NewNode() *Node {
2021-07-25 17:39:09 +08:00
return &Node{
2022-01-19 22:16:46 +08:00
sock: gosock.NewTmpSock(teaconst.ProcessName),
2022-11-25 10:50:57 +08:00
oldMaxThreads: -1,
oldMaxCPU: -1,
2022-01-19 22:16:46 +08:00
updatingServerMap: map[int64]*serverconfigs.ServerConfig{},
2021-07-25 17:39:09 +08:00
}
2020-07-21 11:18:47 +08:00
}
// Test 检查配置
func (this *Node) Test() error {
// 检查是否能连接API
rpcClient, err := rpc.SharedRPC()
if err != nil {
return errors.New("test rpc failed: " + err.Error())
}
2022-08-24 20:04:46 +08:00
_, err = rpcClient.APINodeRPC.FindCurrentAPINodeVersion(rpcClient.Context(), &pb.FindCurrentAPINodeVersionRequest{})
if err != nil {
return errors.New("test rpc failed: " + err.Error())
}
return nil
}
// Start 启动
2020-07-21 11:18:47 +08:00
func (this *Node) Start() {
2022-09-12 17:43:48 +08:00
// 设置netdns
// 这个需要放在所有网络访问的最前面
_ = os.Setenv("GODEBUG", "netdns=go")
_, ok := os.LookupEnv("EdgeDaemon")
if ok {
remotelogs.Println("NODE", "start from daemon")
DaemonIsOn = true
DaemonPid = os.Getppid()
}
// 处理异常
this.handlePanic()
// 监听signal
this.listenSignals()
// 本地Sock
err := this.listenSock()
if err != nil {
remotelogs.Error("NODE", err.Error())
return
}
2022-08-21 23:09:47 +08:00
// 启动IP库
remotelogs.Println("NODE", "initializing ip library ...")
2022-08-23 14:32:39 +08:00
err = iplib.InitDefault()
2022-08-21 23:09:47 +08:00
if err != nil {
remotelogs.Error("NODE", "initialize ip library failed: "+err.Error())
}
2022-04-14 10:25:34 +08:00
// 检查硬盘类型
this.checkDisk()
2022-08-21 23:09:47 +08:00
// 启动事件
events.Notify(events.EventStart)
2020-09-09 18:53:53 +08:00
// 读取API配置
remotelogs.Println("NODE", "init config ...")
2021-11-11 14:16:57 +08:00
err = this.syncConfig(0)
if err != nil {
_, err := nodeconfigs.SharedNodeConfig()
2021-01-11 18:16:15 +08:00
if err != nil {
// 无本地数据时,会尝试多次读取
tryTimes := 0
for {
2021-11-11 14:16:57 +08:00
err := this.syncConfig(0)
if err != nil {
tryTimes++
if tryTimes%10 == 0 {
remotelogs.Error("NODE", err.Error())
}
time.Sleep(1 * time.Second)
// 不做长时间的无意义的重试
if tryTimes > 1000 {
return
}
} else {
break
}
2021-01-11 18:16:15 +08:00
}
}
2020-09-09 18:53:53 +08:00
}
// 启动同步计时器
this.startSyncTimer()
2022-08-23 14:53:39 +08:00
// 更新IP库
goman.New(func() {
iplib.NewUpdater(NewIPLibraryUpdater(), 10*time.Minute).Start()
})
// 监控节点运行状态
goman.New(func() {
NewNodeStatusExecutor().Listen()
})
2020-09-09 18:53:53 +08:00
2020-07-22 22:18:47 +08:00
// 读取配置
2020-09-26 08:07:07 +08:00
nodeConfig, err := nodeconfigs.SharedNodeConfig()
2020-07-21 11:18:47 +08:00
if err != nil {
remotelogs.Error("NODE", "start failed: read node config failed: "+err.Error())
2020-07-21 11:18:47 +08:00
return
}
teaconst.NodeId = nodeConfig.Id
2021-12-02 11:30:47 +08:00
teaconst.NodeIdString = types.String(teaconst.NodeId)
2021-12-01 15:52:38 +08:00
err, serverErrors := nodeConfig.Init()
2020-09-26 08:07:07 +08:00
if err != nil {
remotelogs.Error("NODE", "init node config failed: "+err.Error())
2020-09-26 08:07:07 +08:00
return
}
2021-12-01 15:52:38 +08:00
if len(serverErrors) > 0 {
for _, serverErr := range serverErrors {
remotelogs.ServerError(serverErr.Id, "NODE", serverErr.Message, nodeconfigs.NodeLogTypeServerConfigInitFailed, maps.Map{})
}
}
2020-09-26 08:07:07 +08:00
sharedNodeConfig = nodeConfig
2022-11-25 10:50:57 +08:00
this.onReload(nodeConfig, true)
2020-07-21 11:18:47 +08:00
2020-12-03 10:17:28 +08:00
// 发送事件
events.Notify(events.EventLoaded)
2020-07-22 22:18:47 +08:00
// 设置rlimit
_ = utils.SetRLimit(1024 * 1024)
2020-10-04 14:30:42 +08:00
// 连接API
goman.New(func() {
NewAPIStream().Start()
})
2020-10-04 14:30:42 +08:00
2021-01-25 16:40:31 +08:00
// 统计
2021-12-14 10:01:21 +08:00
goman.New(func() {
stats.SharedTrafficStatManager.Start()
2021-01-25 16:40:31 +08:00
})
goman.New(func() {
stats.SharedHTTPRequestStatManager.Start()
})
2021-01-25 16:40:31 +08:00
2020-07-22 22:18:47 +08:00
// 启动端口
err = sharedListenerManager.Start(nodeConfig)
if err != nil {
remotelogs.Error("NODE", "start failed: "+err.Error())
2020-10-28 11:19:06 +08:00
return
}
2020-07-22 22:18:47 +08:00
// hold住进程
2020-10-04 14:30:42 +08:00
select {}
2020-07-21 11:18:47 +08:00
}
2020-09-09 18:53:53 +08:00
// Daemon 实现守护进程
2021-01-12 11:48:38 +08:00
func (this *Node) Daemon() {
2022-07-26 09:41:43 +08:00
teaconst.IsDaemon = true
var isDebug = lists.ContainsString(os.Args, "debug")
2021-01-12 11:48:38 +08:00
for {
2021-07-25 17:39:09 +08:00
conn, err := this.sock.Dial()
2021-01-12 11:48:38 +08:00
if err != nil {
if isDebug {
log.Println("[DAEMON]starting ...")
}
// 尝试启动
err = func() error {
exe, err := os.Executable()
if err != nil {
return err
}
// 可以标记当前是从守护进程启动的
_ = os.Setenv("EdgeDaemon", "on")
_ = os.Setenv("EdgeBackground", "on")
2022-07-26 09:41:43 +08:00
var cmd = exec.Command(exe)
2022-08-09 22:59:37 +08:00
var buf = &bytes.Buffer{}
cmd.Stderr = buf
2021-01-12 11:48:38 +08:00
err = cmd.Start()
if err != nil {
return err
}
err = cmd.Wait()
if err != nil {
2022-08-09 22:59:37 +08:00
if isDebug {
log.Println("[DAEMON]" + buf.String())
}
2021-01-12 11:48:38 +08:00
return err
}
return nil
}()
if err != nil {
if isDebug {
log.Println("[DAEMON]", err)
}
time.Sleep(1 * time.Second)
} else {
time.Sleep(5 * time.Second)
}
} else {
_ = conn.Close()
time.Sleep(5 * time.Second)
}
}
}
// InstallSystemService 安装系统服务
2021-01-12 11:48:38 +08:00
func (this *Node) InstallSystemService() error {
shortName := teaconst.SystemdServiceName
exe, err := os.Executable()
if err != nil {
return err
}
manager := utils.NewServiceManager(shortName, teaconst.ProductName)
err = manager.Install(exe, []string{})
if err != nil {
return err
}
return nil
}
// 循环
func (this *Node) loop() error {
var tr = trackers.Begin("CHECK_NODE_CONFIG_CHANGES")
defer tr.End()
// 检查api.yaml是否存在
2022-01-19 22:16:46 +08:00
var apiConfigFile = Tea.ConfigFile("api.yaml")
_, err := os.Stat(apiConfigFile)
if err != nil {
return nil
}
rpcClient, err := rpc.SharedRPC()
if err != nil {
return errors.New("create rpc client failed: " + err.Error())
}
2022-11-23 20:13:34 +08:00
tasksResp, err := rpcClient.NodeTaskRPC.FindNodeTasks(rpcClient.Context(), &pb.FindNodeTasksRequest{
Version: this.lastTaskVersion,
})
if err != nil {
2022-08-07 19:02:06 +08:00
if rpc.IsConnError(err) && !Tea.IsTesting() {
return nil
}
return errors.New("read node tasks failed: " + err.Error())
}
for _, task := range tasksResp.NodeTasks {
2022-11-23 20:13:34 +08:00
err := this.execTask(rpcClient, task)
if !this.finishTask(task.Id, task.Version, err) {
// 防止失败的任务无法重试
break
}
}
return nil
}
// 执行任务
2022-11-23 20:13:34 +08:00
func (this *Node) execTask(rpcClient *rpc.RPCClient, task *pb.NodeTask) error {
switch task.Type {
case "ipItemChanged":
// 防止阻塞
select {
case iplibrary.IPListUpdateNotify <- true:
default:
2022-03-25 14:11:34 +08:00
}
case "configChanged":
if task.ServerId > 0 {
return this.syncServerConfig(task.ServerId)
}
if !task.IsPrimary {
// 我们等等主节点配置准备完毕
time.Sleep(2 * time.Second)
}
return this.syncConfig(task.Version)
case "nodeVersionChanged":
if !sharedUpgradeManager.IsInstalling() {
goman.New(func() {
sharedUpgradeManager.Start()
2022-04-04 12:06:53 +08:00
})
}
case "scriptsChanged":
err := this.reloadCommonScripts()
if err != nil {
return errors.New("reload common scripts failed: " + err.Error())
}
case "nodeLevelChanged":
2022-11-23 20:13:34 +08:00
levelInfoResp, err := rpcClient.NodeRPC.FindNodeLevelInfo(rpcClient.Context(), &pb.FindNodeLevelInfoRequest{})
if err != nil {
return err
}
2022-04-04 12:06:53 +08:00
if sharedNodeConfig != nil {
2022-04-04 12:06:53 +08:00
sharedNodeConfig.Level = levelInfoResp.Level
}
2022-04-04 12:06:53 +08:00
var parentNodes = map[int64][]*nodeconfigs.ParentNodeConfig{}
if len(levelInfoResp.ParentNodesMapJSON) > 0 {
err = json.Unmarshal(levelInfoResp.ParentNodesMapJSON, &parentNodes)
if err != nil {
return errors.New("decode level info failed: " + err.Error())
2022-04-04 12:06:53 +08:00
}
}
2022-04-04 12:06:53 +08:00
if sharedNodeConfig != nil {
sharedNodeConfig.ParentNodes = parentNodes
}
case "ddosProtectionChanged":
2022-11-23 20:13:34 +08:00
resp, err := rpcClient.NodeRPC.FindNodeDDoSProtection(rpcClient.Context(), &pb.FindNodeDDoSProtectionRequest{})
if err != nil {
return err
}
if len(resp.DdosProtectionJSON) == 0 {
if sharedNodeConfig != nil {
sharedNodeConfig.DDoSProtection = nil
2022-05-18 21:03:51 +08:00
}
return nil
}
var ddosProtectionConfig = &ddosconfigs.ProtectionConfig{}
err = json.Unmarshal(resp.DdosProtectionJSON, ddosProtectionConfig)
if err != nil {
return errors.New("decode DDoS protection config failed: " + err.Error())
}
if ddosProtectionConfig != nil && sharedNodeConfig != nil {
sharedNodeConfig.DDoSProtection = ddosProtectionConfig
}
err = firewalls.SharedDDoSProtectionManager.Apply(ddosProtectionConfig)
if err != nil {
// 不阻塞
remotelogs.Warn("NODE", "apply DDoS protection failed: "+err.Error())
return nil
}
case "globalServerConfigChanged":
2022-11-23 20:13:34 +08:00
resp, err := rpcClient.NodeRPC.FindNodeGlobalServerConfig(rpcClient.Context(), &pb.FindNodeGlobalServerConfigRequest{})
if err != nil {
return err
}
if len(resp.GlobalServerConfigJSON) > 0 {
var globalServerConfig = serverconfigs.DefaultGlobalServerConfig()
err = json.Unmarshal(resp.GlobalServerConfigJSON, globalServerConfig)
2022-05-18 21:03:51 +08:00
if err != nil {
return errors.New("decode global server config failed: " + err.Error())
2022-05-18 21:03:51 +08:00
}
if globalServerConfig != nil {
err = globalServerConfig.Init()
2022-05-18 21:03:51 +08:00
if err != nil {
return errors.New("validate global server config failed: " + err.Error())
2022-05-18 21:03:51 +08:00
}
if sharedNodeConfig != nil {
sharedNodeConfig.GlobalServerConfig = globalServerConfig
}
2022-03-25 14:11:34 +08:00
}
}
case "userServersStateChanged":
if task.UserId > 0 {
2022-11-23 20:13:34 +08:00
resp, err := rpcClient.UserRPC.CheckUserServersState(rpcClient.Context(), &pb.CheckUserServersStateRequest{UserId: task.UserId})
if err != nil {
return err
}
SharedUserManager.UpdateUserServersIsEnabled(task.UserId, resp.IsEnabled)
if resp.IsEnabled {
err = this.syncUserServersConfig(task.UserId)
if err != nil {
return err
}
}
}
default:
remotelogs.Error("NODE", "task '"+types.String(task.Id)+"', type '"+task.Type+"' has not been handled")
}
return nil
}
// 标记任务完成
func (this *Node) finishTask(taskId int64, taskVersion int64, taskErr error) (success bool) {
if taskId <= 0 {
return true
}
rpcClient, err := rpc.SharedRPC()
if err != nil {
remotelogs.Debug("NODE", "create rpc client failed: "+err.Error())
return false
}
var isOk = taskErr == nil
if isOk && taskVersion > this.lastTaskVersion {
this.lastTaskVersion = taskVersion
}
var errMsg = ""
if taskErr != nil {
errMsg = taskErr.Error()
}
2022-11-23 20:13:34 +08:00
_, err = rpcClient.NodeTaskRPC.ReportNodeTaskDone(rpcClient.Context(), &pb.ReportNodeTaskDoneRequest{
NodeTaskId: taskId,
IsOk: isOk,
Error: errMsg,
})
success = err == nil
if err != nil {
// 连接错误不需要上报到服务中心
if rpc.IsConnError(err) {
remotelogs.Debug("NODE", "report task done failed: "+err.Error())
} else {
remotelogs.Error("NODE", "report task done failed: "+err.Error())
}
}
return success
}
2020-09-09 18:53:53 +08:00
// 读取API配置
2021-11-11 14:16:57 +08:00
func (this *Node) syncConfig(taskVersion int64) error {
this.locker.Lock()
defer this.locker.Unlock()
// 检查api.yaml是否存在
apiConfigFile := Tea.ConfigFile("api.yaml")
_, err := os.Stat(apiConfigFile)
if err != nil {
if os.IsNotExist(err) {
clusterErr := this.checkClusterConfig()
if clusterErr != nil {
if os.IsNotExist(clusterErr) {
return errors.New("can not find config file 'configs/api.yaml'")
}
return errors.New("check cluster config failed: " + clusterErr.Error())
}
} else {
return err
}
}
2020-09-09 18:53:53 +08:00
rpcClient, err := rpc.SharedRPC()
if err != nil {
return errors.New("create rpc client failed: " + err.Error())
2020-09-09 18:53:53 +08:00
}
// 获取同步任务
2020-09-26 08:07:07 +08:00
// TODO 这里考虑只同步版本号有变更的
2022-11-23 20:13:34 +08:00
configResp, err := rpcClient.NodeRPC.FindCurrentNodeConfig(rpcClient.Context(), &pb.FindCurrentNodeConfigRequest{
2021-11-11 14:16:57 +08:00
Version: -1, // 更新所有版本
Compress: true,
NodeTaskVersion: taskVersion,
2020-12-03 10:17:28 +08:00
})
2020-09-09 18:53:53 +08:00
if err != nil {
return errors.New("read config from rpc failed: " + err.Error())
2020-09-09 18:53:53 +08:00
}
2020-12-03 10:17:28 +08:00
if !configResp.IsChanged {
return nil
}
var configJSON = configResp.NodeJSON
2021-11-11 14:16:57 +08:00
if configResp.IsCompressed {
var reader = brotli.NewReader(bytes.NewReader(configJSON))
var configBuf = &bytes.Buffer{}
var buf = make([]byte, 32*1024)
for {
n, err := reader.Read(buf)
if n > 0 {
configBuf.Write(buf[:n])
}
if err != nil {
break
}
}
configJSON = configBuf.Bytes()
}
nodeConfigUpdatedAt = time.Now().Unix()
var nodeConfig = &nodeconfigs.NodeConfig{}
2020-09-26 08:07:07 +08:00
err = json.Unmarshal(configJSON, nodeConfig)
2020-09-09 18:53:53 +08:00
if err != nil {
return errors.New("decode config failed: " + err.Error())
2020-09-09 18:53:53 +08:00
}
teaconst.NodeId = nodeConfig.Id
2021-12-02 11:30:47 +08:00
teaconst.NodeIdString = types.String(teaconst.NodeId)
2020-09-09 18:53:53 +08:00
// 检查时间是否一致
// 这个需要在 teaconst.NodeId 设置之后因为上报到API节点的时候需要节点ID
if configResp.Timestamp > 0 {
var timestampDelta = configResp.Timestamp - time.Now().Unix()
if timestampDelta > 60 || timestampDelta < -60 {
remotelogs.Error("NODE", "node timestamp ('"+types.String(time.Now().Unix())+"') is not same as api node ('"+types.String(configResp.Timestamp)+"'), please sync the time")
}
}
2020-09-09 18:53:53 +08:00
// 写入到文件中
err = nodeConfig.Save()
if err != nil {
return err
}
2021-12-01 15:52:38 +08:00
err, serverErrors := nodeConfig.Init()
2020-09-09 18:53:53 +08:00
if err != nil {
return err
}
2021-12-01 15:52:38 +08:00
if len(serverErrors) > 0 {
for _, serverErr := range serverErrors {
remotelogs.ServerError(serverErr.Id, "NODE", serverErr.Message, nodeconfigs.NodeLogTypeServerConfigInitFailed, maps.Map{})
}
}
2020-09-09 18:53:53 +08:00
2020-09-26 08:07:07 +08:00
// 刷新配置
if this.isLoaded {
remotelogs.Println("NODE", "reloading config ...")
} else {
remotelogs.Println("NODE", "loading config ...")
}
2022-11-25 10:50:57 +08:00
this.onReload(nodeConfig, true)
2020-09-26 08:07:07 +08:00
2020-12-03 10:17:28 +08:00
// 发送事件
events.Notify(events.EventReload)
if this.isLoaded {
2020-09-09 18:53:53 +08:00
return sharedListenerManager.Start(nodeConfig)
}
this.isLoaded = true
2020-09-09 18:53:53 +08:00
return nil
}
2022-01-19 22:16:46 +08:00
// 读取单个服务配置
func (this *Node) syncServerConfig(serverId int64) error {
rpcClient, err := rpc.SharedRPC()
if err != nil {
return err
}
2022-08-24 20:04:46 +08:00
resp, err := rpcClient.ServerRPC.ComposeServerConfig(rpcClient.Context(), &pb.ComposeServerConfigRequest{ServerId: serverId})
2022-01-19 22:16:46 +08:00
if err != nil {
return err
}
this.locker.Lock()
defer this.locker.Unlock()
if len(resp.ServerConfigJSON) == 0 {
this.updatingServerMap[serverId] = nil
} else {
var config = &serverconfigs.ServerConfig{}
err = json.Unmarshal(resp.ServerConfigJSON, config)
if err != nil {
return err
}
this.updatingServerMap[serverId] = config
}
return nil
}
// 同步某个用户下的所有服务配置
func (this *Node) syncUserServersConfig(userId int64) error {
rpcClient, err := rpc.SharedRPC()
if err != nil {
return err
}
serverConfigsResp, err := rpcClient.ServerRPC.ComposeAllUserServersConfig(rpcClient.Context(), &pb.ComposeAllUserServersConfigRequest{
UserId: userId,
})
if err != nil {
return err
}
if len(serverConfigsResp.ServersConfigJSON) == 0 {
return nil
}
var serverConfigs = []*serverconfigs.ServerConfig{}
err = json.Unmarshal(serverConfigsResp.ServersConfigJSON, &serverConfigs)
if err != nil {
return err
}
this.locker.Lock()
defer this.locker.Unlock()
for _, config := range serverConfigs {
this.updatingServerMap[config.Id] = config
}
return nil
}
2020-09-09 18:53:53 +08:00
// 启动同步计时器
func (this *Node) startSyncTimer() {
2020-09-26 08:07:07 +08:00
// TODO 这个时间间隔可以自行设置
2022-01-19 22:16:46 +08:00
var taskTicker = time.NewTicker(60 * time.Second)
var serverChangeTicker = time.NewTicker(5 * time.Second)
2022-01-12 20:31:04 +08:00
events.OnKey(events.EventQuit, this, func() {
remotelogs.Println("NODE", "quit sync timer")
2022-01-19 22:16:46 +08:00
taskTicker.Stop()
serverChangeTicker.Stop()
2020-10-28 11:19:06 +08:00
})
goman.New(func() {
for {
select {
2022-01-19 22:16:46 +08:00
case <-taskTicker.C: // 定期执行
err := this.loop()
if err != nil {
remotelogs.Error("NODE", "sync config error: "+err.Error())
continue
}
2022-01-19 22:16:46 +08:00
case <-serverChangeTicker.C: // 服务变化
this.reloadServer()
2022-01-19 22:16:46 +08:00
case <-nodeTaskNotify: // 有新的更新任务
err := this.loop()
if err != nil {
remotelogs.Error("NODE", "sync config error: "+err.Error())
continue
}
2022-01-19 22:16:46 +08:00
case <-nodeConfigChangedNotify: // 节点变化通知
2021-11-11 14:16:57 +08:00
err := this.syncConfig(0)
if err != nil {
remotelogs.Error("NODE", "sync config error: "+err.Error())
continue
}
2020-09-09 18:53:53 +08:00
}
}
})
2020-09-09 18:53:53 +08:00
}
// 检查集群设置
func (this *Node) checkClusterConfig() error {
configFile := Tea.ConfigFile("cluster.yaml")
2022-08-04 11:34:06 +08:00
data, err := os.ReadFile(configFile)
if err != nil {
return err
}
config := &configs.ClusterConfig{}
err = yaml.Unmarshal(data, config)
if err != nil {
return err
}
rpcClient, err := rpc.NewRPCClient(&configs.APIConfig{
RPC: config.RPC,
NodeId: config.ClusterId,
Secret: config.Secret,
})
if err != nil {
return err
}
remotelogs.Debug("NODE", "registering node to cluster ...")
2022-08-24 20:04:46 +08:00
resp, err := rpcClient.NodeRPC.RegisterClusterNode(rpcClient.ClusterContext(config.ClusterId, config.Secret), &pb.RegisterClusterNodeRequest{Name: HOSTNAME})
if err != nil {
return err
}
remotelogs.Debug("NODE", "registered successfully")
// 写入到配置文件中
if len(resp.Endpoints) == 0 {
resp.Endpoints = []string{}
}
var apiConfig = &configs.APIConfig{
RPC: struct {
Endpoints []string `yaml:"endpoints" json:"endpoints"`
DisableUpdate bool `yaml:"disableUpdate" json:"disableUpdate"`
}{
Endpoints: resp.Endpoints,
DisableUpdate: false,
},
NodeId: resp.UniqueId,
Secret: resp.Secret,
}
remotelogs.Debug("NODE", "writing 'configs/api.yaml' ...")
err = apiConfig.WriteFile(Tea.ConfigFile("api.yaml"))
if err != nil {
return err
}
remotelogs.Debug("NODE", "wrote 'configs/api.yaml' successfully")
return nil
}
// 监听一些信号
func (this *Node) listenSignals() {
var queue = make(chan os.Signal, 8)
signal.Notify(queue, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGQUIT)
goman.New(func() {
for range queue {
time.Sleep(100 * time.Millisecond)
2022-03-15 18:32:39 +08:00
utils.Exit()
return
}
})
}
// 监听本地sock
func (this *Node) listenSock() error {
2021-07-25 17:14:44 +08:00
// 检查是否在运行
if this.sock.IsListening() {
reply, err := this.sock.Send(&gosock.Command{Code: "pid"})
if err == nil {
2022-08-03 23:31:08 +08:00
return errors.New("error: the process is already running, pid: " + types.String(maps.NewMap(reply.Params).GetInt("pid")))
} else {
2021-07-25 17:14:44 +08:00
return errors.New("error: the process is already running")
}
}
2021-07-25 17:14:44 +08:00
// 启动监听
goman.New(func() {
2021-07-25 17:14:44 +08:00
this.sock.OnCommand(func(cmd *gosock.Command) {
switch cmd.Code {
case "pid":
_ = cmd.Reply(&gosock.Command{
Code: "pid",
Params: map[string]interface{}{
"pid": os.Getpid(),
},
})
case "info":
exePath, _ := os.Executable()
_ = cmd.Reply(&gosock.Command{
Code: "info",
Params: map[string]interface{}{
"pid": os.Getpid(),
"version": teaconst.Version,
"path": exePath,
},
})
2021-07-25 17:14:44 +08:00
case "stop":
_ = cmd.ReplyOk()
// 退出主进程
events.Notify(events.EventQuit)
2022-09-26 16:27:51 +08:00
time.Sleep(100 * time.Millisecond)
2022-03-15 18:32:39 +08:00
utils.Exit()
2021-07-25 17:14:44 +08:00
case "quit":
_ = cmd.ReplyOk()
_ = this.sock.Close()
events.Notify(events.EventQuit)
events.Notify(events.EventTerminated)
2021-07-25 17:14:44 +08:00
// 监控连接数如果连接数为0则退出进程
goman.New(func() {
2021-07-25 17:14:44 +08:00
for {
countActiveConnections := sharedListenerManager.TotalActiveConnections()
if countActiveConnections <= 0 {
2022-03-15 18:32:39 +08:00
utils.Exit()
2021-07-25 17:14:44 +08:00
return
}
time.Sleep(1 * time.Second)
}
})
case "trackers":
_ = cmd.Reply(&gosock.Command{
Params: map[string]interface{}{
"labels": trackers.SharedManager.Labels(),
},
})
case "goman":
var posMap = map[string]maps.Map{} // file#line => Map
for _, instance := range goman.List() {
var pos = instance.File + "#" + types.String(instance.Line)
m, ok := posMap[pos]
if ok {
m["count"] = m["count"].(int) + 1
} else {
m = maps.Map{
"pos": pos,
"count": 1,
}
posMap[pos] = m
}
}
var result = []maps.Map{}
for _, m := range posMap {
result = append(result, m)
}
sort.Slice(result, func(i, j int) bool {
return result[i]["count"].(int) > result[j]["count"].(int)
})
_ = cmd.Reply(&gosock.Command{
Params: map[string]interface{}{
2021-12-14 10:01:21 +08:00
"total": runtime.NumGoroutine(),
"result": result,
},
2021-12-12 11:48:01 +08:00
})
case "conns":
var connMaps = []maps.Map{}
var connMap = conns.SharedMap.AllConns()
for _, connInfo := range connMap {
connMaps = append(connMaps, maps.Map{
"addr": connInfo.Conn.RemoteAddr().String(),
"age": time.Now().Unix() - connInfo.CreatedAt,
})
}
2021-12-12 11:48:01 +08:00
_ = cmd.Reply(&gosock.Command{
Params: map[string]interface{}{
"conns": connMaps,
"total": len(connMaps),
2021-12-12 11:48:01 +08:00
},
})
case "dropIP":
var m = maps.NewMap(cmd.Params)
var ip = m.GetString("ip")
var timeSeconds = m.GetInt("timeoutSeconds")
2022-08-04 11:01:16 +08:00
var async = m.GetBool("async")
err := firewalls.Firewall().DropSourceIP(ip, timeSeconds, async)
if err != nil {
_ = cmd.Reply(&gosock.Command{
Params: map[string]interface{}{
"error": err.Error(),
},
})
} else {
_ = cmd.ReplyOk()
}
case "rejectIP":
var m = maps.NewMap(cmd.Params)
var ip = m.GetString("ip")
var timeSeconds = m.GetInt("timeoutSeconds")
err := firewalls.Firewall().RejectSourceIP(ip, timeSeconds)
if err != nil {
_ = cmd.Reply(&gosock.Command{
Params: map[string]interface{}{
"error": err.Error(),
},
})
} else {
_ = cmd.ReplyOk()
}
case "removeIP":
var m = maps.NewMap(cmd.Params)
var ip = m.GetString("ip")
err := firewalls.Firewall().RemoveSourceIP(ip)
if err != nil {
_ = cmd.Reply(&gosock.Command{
Params: map[string]interface{}{
"error": err.Error(),
},
})
} else {
_ = cmd.ReplyOk()
}
case "gc":
runtime.GC()
debug.FreeOSMemory()
2022-01-01 17:18:34 +08:00
_ = cmd.ReplyOk()
case "reload":
err := this.syncConfig(0)
if err != nil {
_ = cmd.Reply(&gosock.Command{
Params: map[string]interface{}{
"error": err.Error(),
},
})
} else {
_ = cmd.ReplyOk()
}
case "accesslog":
err := sharedHTTPAccessLogViewer.Start()
if err != nil {
_ = cmd.Reply(&gosock.Command{
Code: "error",
Params: map[string]interface{}{
"message": "start failed: " + err.Error(),
},
})
} else {
_ = cmd.ReplyOk()
}
case "bandwidth":
var m = stats.SharedBandwidthStatManager.Map()
_ = cmd.Reply(&gosock.Command{Params: maps.Map{
"stats": m,
}})
}
2021-07-25 17:14:44 +08:00
})
err := this.sock.Listen()
if err != nil {
remotelogs.Debug("NODE", err.Error())
}
})
2022-01-12 20:31:04 +08:00
events.OnKey(events.EventQuit, this, func() {
remotelogs.Debug("NODE", "quit unix sock")
2021-07-25 17:14:44 +08:00
_ = this.sock.Close()
})
return nil
}
2021-12-09 12:07:59 +08:00
// 重载配置调用
2022-11-25 10:50:57 +08:00
func (this *Node) onReload(config *nodeconfigs.NodeConfig, reloadAll bool) {
2022-01-19 22:16:46 +08:00
nodeconfigs.ResetNodeConfig(config)
sharedNodeConfig = config
if reloadAll {
// 缓存策略
var subDirs = config.CacheDiskSubDirs
for _, subDir := range subDirs {
subDir.Path = filepath.Clean(subDir.Path)
}
if len(subDirs) > 0 {
sort.Slice(subDirs, func(i, j int) bool {
return subDirs[i].Path < subDirs[j].Path
})
2022-11-25 10:50:57 +08:00
}
var cachePoliciesChanged = !jsonutils.Equal(caches.SharedManager.MaxDiskCapacity, config.MaxCacheDiskCapacity) ||
!jsonutils.Equal(caches.SharedManager.MaxMemoryCapacity, config.MaxCacheMemoryCapacity) ||
!jsonutils.Equal(caches.SharedManager.MainDiskDir, config.CacheDiskDir) ||
!jsonutils.Equal(caches.SharedManager.SubDiskDirs, subDirs) ||
!jsonutils.Equal(this.oldHTTPCachePolicies, config.HTTPCachePolicies)
caches.SharedManager.MaxDiskCapacity = config.MaxCacheDiskCapacity
caches.SharedManager.MaxMemoryCapacity = config.MaxCacheMemoryCapacity
caches.SharedManager.MainDiskDir = config.CacheDiskDir
caches.SharedManager.SubDiskDirs = subDirs
if cachePoliciesChanged {
// copy
this.oldHTTPCachePolicies = []*serverconfigs.HTTPCachePolicy{}
err := jsonutils.Copy(&this.oldHTTPCachePolicies, config.HTTPCachePolicies)
if err != nil {
remotelogs.Error("NODE", "onReload: copy HTTPCachePolicies failed: "+err.Error())
}
// update
if len(config.HTTPCachePolicies) > 0 {
caches.SharedManager.UpdatePolicies(config.HTTPCachePolicies)
} else {
caches.SharedManager.UpdatePolicies([]*serverconfigs.HTTPCachePolicy{})
}
2022-11-25 10:50:57 +08:00
}
2022-01-19 22:16:46 +08:00
}
// WAF策略
// 包含了服务里的WAF策略所以需要整体更新
2022-11-25 10:50:57 +08:00
var allFirewallPolicies = config.FindAllFirewallPolicies()
if !jsonutils.Equal(allFirewallPolicies, this.oldHTTPFirewallPolicies) {
// copy
this.oldHTTPFirewallPolicies = []*firewallconfigs.HTTPFirewallPolicy{}
err := jsonutils.Copy(&this.oldHTTPFirewallPolicies, allFirewallPolicies)
if err != nil {
remotelogs.Error("NODE", "onReload: copy HTTPFirewallPolicies failed: "+err.Error())
}
// update
waf.SharedWAFManager.UpdatePolicies(allFirewallPolicies)
}
if reloadAll {
if !jsonutils.Equal(config.FirewallActions, this.oldFirewallActions) {
// copy
this.oldFirewallActions = []*firewallconfigs.FirewallActionConfig{}
err := jsonutils.Copy(&this.oldFirewallActions, config.FirewallActions)
if err != nil {
remotelogs.Error("NODE", "onReload: copy FirewallActionConfigs failed: "+err.Error())
}
// update
iplibrary.SharedActionManager.UpdateActions(config.FirewallActions)
2022-11-25 10:50:57 +08:00
}
// 统计指标
if !jsonutils.Equal(this.oldMetricItems, config.MetricItems) {
// copy
this.oldMetricItems = []*serverconfigs.MetricItemConfig{}
err := jsonutils.Copy(&this.oldMetricItems, config.MetricItems)
if err != nil {
remotelogs.Error("NODE", "onReload: copy MetricItemConfigs failed: "+err.Error())
}
2022-01-19 22:16:46 +08:00
// update
metrics.SharedManager.Update(config.MetricItems)
2022-11-25 10:50:57 +08:00
}
// max cpu
if config.MaxCPU != this.oldMaxCPU {
if config.MaxCPU > 0 && config.MaxCPU < int32(runtime.NumCPU()) {
runtime.GOMAXPROCS(int(config.MaxCPU))
remotelogs.Println("NODE", "[CPU]set max cpu to '"+types.String(config.MaxCPU)+"'")
} else {
var threads = runtime.NumCPU() * 4
runtime.GOMAXPROCS(threads)
remotelogs.Println("NODE", "[CPU]set max cpu to '"+types.String(threads)+"'")
}
2022-01-19 22:16:46 +08:00
this.oldMaxCPU = config.MaxCPU
2021-12-09 17:34:05 +08:00
}
// max threads
if config.MaxThreads != this.oldMaxThreads {
if config.MaxThreads > 0 {
debug.SetMaxThreads(config.MaxThreads)
remotelogs.Println("NODE", "[THREADS]set max threads to '"+types.String(config.MaxThreads)+"'")
} else {
debug.SetMaxThreads(nodeconfigs.DefaultMaxThreads)
remotelogs.Println("NODE", "[THREADS]set max threads to '"+types.String(nodeconfigs.DefaultMaxThreads)+"'")
}
this.oldMaxThreads = config.MaxThreads
}
2021-12-09 12:07:59 +08:00
// timezone
var timeZone = config.TimeZone
if len(timeZone) == 0 {
timeZone = "Asia/Shanghai"
2021-12-09 17:34:05 +08:00
}
if this.oldTimezone != timeZone {
location, err := time.LoadLocation(timeZone)
if err != nil {
remotelogs.Error("NODE", "[TIMEZONE]change time zone failed: "+err.Error())
return
}
2021-12-09 12:07:59 +08:00
remotelogs.Println("NODE", "[TIMEZONE]change time zone to '"+timeZone+"'")
time.Local = location
this.oldTimezone = timeZone
2021-12-09 12:07:59 +08:00
}
// product information
if config.ProductConfig != nil {
teaconst.GlobalProductName = config.ProductConfig.Name
}
2022-05-04 16:40:25 +08:00
// DNS resolver
if config.DNSResolver != nil {
var err error
switch config.DNSResolver.Type {
case nodeconfigs.DNSResolverTypeGoNative:
err = os.Setenv("GODEBUG", "netdns=go")
case nodeconfigs.DNSResolverTypeCGO:
err = os.Setenv("GODEBUG", "netdns=cgo")
default:
// 默认使用go原生
err = os.Setenv("GODEBUG", "netdns=go")
}
if err != nil {
remotelogs.Error("NODE", "[DNS_RESOLVER]set env failed: "+err.Error())
}
} else {
2022-06-07 11:49:38 +08:00
// 默认使用go原生
err := os.Setenv("GODEBUG", "netdns=go")
if err != nil {
remotelogs.Error("NODE", "[DNS_RESOLVER]set env failed: "+err.Error())
}
2022-06-07 11:49:38 +08:00
}
// API Node地址这里不限制是否为空因为在为空时仍然要有对应的处理
this.changeAPINodeAddrs(config.APINodeAddrs)
}
2021-12-09 12:07:59 +08:00
}
2022-04-14 10:25:34 +08:00
// reload server config
func (this *Node) reloadServer() {
this.locker.Lock()
defer this.locker.Unlock()
if len(this.updatingServerMap) > 0 {
var updatingServerMap = this.updatingServerMap
this.updatingServerMap = map[int64]*serverconfigs.ServerConfig{}
newNodeConfig, err := nodeconfigs.CloneNodeConfig(sharedNodeConfig)
if err != nil {
remotelogs.Error("NODE", "apply server config error: "+err.Error())
return
}
for serverId, serverConfig := range updatingServerMap {
if serverConfig != nil {
newNodeConfig.AddServer(serverConfig)
} else {
newNodeConfig.RemoveServer(serverId)
}
}
err, serverErrors := newNodeConfig.Init()
if err != nil {
remotelogs.Error("NODE", "apply server config error: "+err.Error())
return
}
if len(serverErrors) > 0 {
for _, serverErr := range serverErrors {
remotelogs.ServerError(serverErr.Id, "NODE", serverErr.Message, nodeconfigs.NodeLogTypeServerConfigInitFailed, maps.Map{})
}
}
2022-11-25 10:50:57 +08:00
this.onReload(newNodeConfig, false)
err = sharedListenerManager.Start(newNodeConfig)
if err != nil {
remotelogs.Error("NODE", "apply server config error: "+err.Error())
}
}
}
2022-11-21 21:08:47 +08:00
// 检查硬盘
2022-04-14 10:25:34 +08:00
func (this *Node) checkDisk() {
if runtime.GOOS != "linux" {
return
}
for n := 'a'; n <= 'z'; n++ {
2022-04-14 10:25:34 +08:00
for _, path := range []string{
"/sys/block/vd" + string(n) + "/queue/rotational",
"/sys/block/sd" + string(n) + "/queue/rotational",
2022-04-14 10:25:34 +08:00
} {
2022-08-04 11:34:06 +08:00
data, err := os.ReadFile(path)
2022-04-14 10:25:34 +08:00
if err != nil {
continue
}
if string(data) == "0" {
teaconst.DiskIsFast = true
}
return
2022-04-14 10:25:34 +08:00
}
}
}
2022-11-21 21:08:47 +08:00
// 检查API节点地址
func (this *Node) changeAPINodeAddrs(apiNodeAddrs []*serverconfigs.NetworkAddressConfig) {
var addrs = []string{}
for _, addr := range apiNodeAddrs {
err := addr.Init()
if err != nil {
remotelogs.Error("NODE", "changeAPINodeAddrs: validate api node address '"+configutils.QuoteIP(addr.Host)+":"+addr.PortRange+"' failed: "+err.Error())
} else {
addrs = append(addrs, addr.FullAddresses()...)
}
}
sort.Strings(addrs)
if utils.EqualStrings(this.lastAPINodeAddrs, addrs) {
return
}
this.lastAPINodeAddrs = addrs
config, err := configs.LoadAPIConfig()
if err != nil {
remotelogs.Error("NODE", "changeAPINodeAddrs: "+err.Error())
return
}
if config == nil {
return
}
var oldEndpoints = config.RPC.Endpoints
rpcClient, err := rpc.SharedRPC()
if err != nil {
return
}
if len(addrs) > 0 {
this.lastAPINodeVersion++
var v = this.lastAPINodeVersion
// 异步检测,防止阻塞
go func(v int64) {
// 测试新的API节点地址
if rpcClient.TestEndpoints(addrs) {
config.RPC.Endpoints = addrs
} else {
config.RPC.Endpoints = oldEndpoints
this.lastAPINodeAddrs = nil // 恢复为空,以便于下次更新重试
}
// 检查测试中间有无新的变更
if v != this.lastAPINodeVersion {
return
}
err = rpcClient.UpdateConfig(config)
if err != nil {
remotelogs.Error("NODE", "changeAPINodeAddrs: update rpc config failed: "+err.Error())
}
}(v)
return
}
err = rpcClient.UpdateConfig(config)
if err != nil {
remotelogs.Error("NODE", "changeAPINodeAddrs: update rpc config failed: "+err.Error())
}
}