Files
EdgeNode/internal/nodes/node.go

522 lines
12 KiB
Go
Raw Normal View History

2020-07-21 11:18:47 +08:00
package nodes
import (
2020-09-09 18:53:53 +08:00
"encoding/json"
"errors"
2020-09-26 08:07:07 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
2020-09-13 20:37:40 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
2020-10-05 16:55:14 +08:00
"github.com/TeaOSLab/EdgeNode/internal/caches"
"github.com/TeaOSLab/EdgeNode/internal/configs"
2021-01-12 11:48:38 +08:00
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
2020-10-28 11:19:06 +08:00
"github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/iplibrary"
2021-06-30 20:01:00 +08:00
"github.com/TeaOSLab/EdgeNode/internal/metrics"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
2020-09-09 18:53:53 +08:00
"github.com/TeaOSLab/EdgeNode/internal/rpc"
2021-01-25 16:40:31 +08:00
"github.com/TeaOSLab/EdgeNode/internal/stats"
2020-07-22 22:18:47 +08:00
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/go-yaml/yaml"
"github.com/iwind/TeaGo/Tea"
2021-01-12 11:48:38 +08:00
"github.com/iwind/TeaGo/lists"
2021-01-11 18:16:15 +08:00
"github.com/iwind/TeaGo/logs"
2021-07-25 17:14:44 +08:00
"github.com/iwind/TeaGo/maps"
"github.com/iwind/gosock/pkg/gosock"
"io/ioutil"
2021-01-12 11:48:38 +08:00
"log"
"net"
"os"
2021-01-12 11:48:38 +08:00
"os/exec"
"runtime"
2020-09-09 18:53:53 +08:00
"time"
2020-07-21 11:18:47 +08:00
)
2020-09-26 08:07:07 +08:00
var sharedNodeConfig *nodeconfigs.NodeConfig
var nodeTaskNotify = make(chan bool, 8)
var DaemonIsOn = false
var DaemonPid = 0
2020-07-21 11:18:47 +08:00
// Node 节点
2020-07-21 11:18:47 +08:00
type Node struct {
isLoaded bool
2021-07-25 17:14:44 +08:00
sock *gosock.Sock
2020-07-21 11:18:47 +08:00
}
func NewNode() *Node {
return &Node{}
}
// Test 检查配置
func (this *Node) Test() error {
// 检查是否能连接API
rpcClient, err := rpc.SharedRPC()
if err != nil {
return errors.New("test rpc failed: " + err.Error())
}
_, err = rpcClient.APINodeRPC().FindCurrentAPINodeVersion(rpcClient.Context(), &pb.FindCurrentAPINodeVersionRequest{})
if err != nil {
return errors.New("test rpc failed: " + err.Error())
}
return nil
}
// Start 启动
2020-07-21 11:18:47 +08:00
func (this *Node) Start() {
_, ok := os.LookupEnv("EdgeDaemon")
if ok {
remotelogs.Println("NODE", "start from daemon")
DaemonIsOn = true
DaemonPid = os.Getppid()
}
// 启动事件
events.Notify(events.EventStart)
// 本地Sock
err := this.listenSock()
if err != nil {
remotelogs.Error("NODE", err.Error())
return
}
2020-09-09 18:53:53 +08:00
// 读取API配置
err = this.syncConfig()
if err != nil {
_, err := nodeconfigs.SharedNodeConfig()
2021-01-11 18:16:15 +08:00
if err != nil {
// 无本地数据时,会尝试多次读取
tryTimes := 0
for {
err := this.syncConfig()
if err != nil {
tryTimes++
if tryTimes%10 == 0 {
remotelogs.Error("NODE", err.Error())
}
time.Sleep(1 * time.Second)
// 不做长时间的无意义的重试
if tryTimes > 1000 {
return
}
} else {
break
}
2021-01-11 18:16:15 +08:00
}
}
2020-09-09 18:53:53 +08:00
}
// 启动同步计时器
this.startSyncTimer()
// 状态变更计时器
go NewNodeStatusExecutor().Listen()
2020-07-22 22:18:47 +08:00
// 读取配置
2020-09-26 08:07:07 +08:00
nodeConfig, err := nodeconfigs.SharedNodeConfig()
2020-07-21 11:18:47 +08:00
if err != nil {
remotelogs.Error("NODE", "start failed: read node config failed: "+err.Error())
2020-07-21 11:18:47 +08:00
return
}
2020-09-26 08:07:07 +08:00
err = nodeConfig.Init()
if err != nil {
remotelogs.Error("NODE", "init node config failed: "+err.Error())
2020-09-26 08:07:07 +08:00
return
}
sharedNodeConfig = nodeConfig
2020-07-21 11:18:47 +08:00
2020-12-03 10:17:28 +08:00
// 发送事件
events.Notify(events.EventLoaded)
2020-07-22 22:18:47 +08:00
// 设置rlimit
_ = utils.SetRLimit(1024 * 1024)
2020-10-04 14:30:42 +08:00
// 连接API
go NewAPIStream().Start()
2021-01-25 16:40:31 +08:00
// 统计
go stats.SharedTrafficStatManager.Start(func() *nodeconfigs.NodeConfig {
return sharedNodeConfig
})
go stats.SharedHTTPRequestStatManager.Start()
2020-07-22 22:18:47 +08:00
// 启动端口
err = sharedListenerManager.Start(nodeConfig)
if err != nil {
remotelogs.Error("NODE", "start failed: "+err.Error())
2020-10-28 11:19:06 +08:00
return
}
2020-07-22 22:18:47 +08:00
// hold住进程
2020-10-04 14:30:42 +08:00
select {}
2020-07-21 11:18:47 +08:00
}
2020-09-09 18:53:53 +08:00
// Daemon 实现守护进程
2021-01-12 11:48:38 +08:00
func (this *Node) Daemon() {
path := os.TempDir() + "/edge-node.sock"
isDebug := lists.ContainsString(os.Args, "debug")
isDebug = true
for {
conn, err := net.DialTimeout("unix", path, 1*time.Second)
if err != nil {
if isDebug {
log.Println("[DAEMON]starting ...")
}
// 尝试启动
err = func() error {
exe, err := os.Executable()
if err != nil {
return err
}
// 可以标记当前是从守护进程启动的
_ = os.Setenv("EdgeDaemon", "on")
2021-01-12 11:48:38 +08:00
cmd := exec.Command(exe)
err = cmd.Start()
if err != nil {
return err
}
err = cmd.Wait()
if err != nil {
return err
}
return nil
}()
if err != nil {
if isDebug {
log.Println("[DAEMON]", err)
}
time.Sleep(1 * time.Second)
} else {
time.Sleep(5 * time.Second)
}
} else {
_ = conn.Close()
time.Sleep(5 * time.Second)
}
}
}
// InstallSystemService 安装系统服务
2021-01-12 11:48:38 +08:00
func (this *Node) InstallSystemService() error {
shortName := teaconst.SystemdServiceName
exe, err := os.Executable()
if err != nil {
return err
}
manager := utils.NewServiceManager(shortName, teaconst.ProductName)
err = manager.Install(exe, []string{})
if err != nil {
return err
}
return nil
}
// 循环
func (this *Node) loop() error {
// 检查api.yaml是否存在
apiConfigFile := Tea.ConfigFile("api.yaml")
_, err := os.Stat(apiConfigFile)
if err != nil {
return nil
}
rpcClient, err := rpc.SharedRPC()
if err != nil {
return errors.New("create rpc client failed: " + err.Error())
}
nodeCtx := rpcClient.Context()
tasksResp, err := rpcClient.NodeTaskRPC().FindNodeTasks(nodeCtx, &pb.FindNodeTasksRequest{})
if err != nil {
return errors.New("read node tasks failed: " + err.Error())
}
for _, task := range tasksResp.NodeTasks {
switch task.Type {
case "ipItemChanged":
iplibrary.IPListUpdateNotify <- true
// 修改为已同步
_, err = rpcClient.NodeTaskRPC().ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{
NodeTaskId: task.Id,
IsOk: true,
Error: "",
})
if err != nil {
return err
}
case "configChanged":
err := this.syncConfig()
if err != nil {
_, err = rpcClient.NodeTaskRPC().ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{
NodeTaskId: task.Id,
IsOk: false,
Error: err.Error(),
})
} else {
_, err = rpcClient.NodeTaskRPC().ReportNodeTaskDone(nodeCtx, &pb.ReportNodeTaskDoneRequest{
NodeTaskId: task.Id,
IsOk: true,
Error: "",
})
}
if err != nil {
return err
}
case "nodeVersionChanged":
go sharedUpgradeManager.Start()
}
}
return nil
}
2020-09-09 18:53:53 +08:00
// 读取API配置
func (this *Node) syncConfig() error {
// 检查api.yaml是否存在
apiConfigFile := Tea.ConfigFile("api.yaml")
_, err := os.Stat(apiConfigFile)
if err != nil {
if os.IsNotExist(err) {
clusterErr := this.checkClusterConfig()
if clusterErr != nil {
if os.IsNotExist(clusterErr) {
return err
}
return clusterErr
}
} else {
return err
}
}
2020-09-09 18:53:53 +08:00
rpcClient, err := rpc.SharedRPC()
if err != nil {
return errors.New("create rpc client failed: " + err.Error())
2020-09-09 18:53:53 +08:00
}
// 获取同步任务
nodeCtx := rpcClient.Context()
2020-09-26 08:07:07 +08:00
// TODO 这里考虑只同步版本号有变更的
configResp, err := rpcClient.NodeRPC().FindCurrentNodeConfig(nodeCtx, &pb.FindCurrentNodeConfigRequest{
Version: -1, // 更新所有版本
2020-12-03 10:17:28 +08:00
})
2020-09-09 18:53:53 +08:00
if err != nil {
return errors.New("read config from rpc failed: " + err.Error())
2020-09-09 18:53:53 +08:00
}
2020-12-03 10:17:28 +08:00
if !configResp.IsChanged {
return nil
}
2020-09-26 08:07:07 +08:00
configJSON := configResp.NodeJSON
nodeConfig := &nodeconfigs.NodeConfig{}
err = json.Unmarshal(configJSON, nodeConfig)
2020-09-09 18:53:53 +08:00
if err != nil {
return errors.New("decode config failed: " + err.Error())
2020-09-09 18:53:53 +08:00
}
// 写入到文件中
err = nodeConfig.Save()
if err != nil {
return err
}
2020-09-26 08:07:07 +08:00
err = nodeConfig.Init()
2020-09-09 18:53:53 +08:00
if err != nil {
return err
}
// max cpu
if nodeConfig.MaxCPU > 0 && nodeConfig.MaxCPU < int32(runtime.NumCPU()) {
runtime.GOMAXPROCS(int(nodeConfig.MaxCPU))
} else {
runtime.GOMAXPROCS(runtime.NumCPU())
}
2020-09-26 08:07:07 +08:00
// 刷新配置
if this.isLoaded {
remotelogs.Println("NODE", "reloading config ...")
} else {
remotelogs.Println("NODE", "loading config ...")
}
2020-09-26 08:07:07 +08:00
nodeconfigs.ResetNodeConfig(nodeConfig)
caches.SharedManager.MaxDiskCapacity = nodeConfig.MaxCacheDiskCapacity
caches.SharedManager.MaxMemoryCapacity = nodeConfig.MaxCacheMemoryCapacity
if nodeConfig.HTTPCachePolicy != nil {
caches.SharedManager.UpdatePolicies([]*serverconfigs.HTTPCachePolicy{nodeConfig.HTTPCachePolicy})
} else {
caches.SharedManager.UpdatePolicies([]*serverconfigs.HTTPCachePolicy{})
}
2021-01-18 20:40:29 +08:00
sharedWAFManager.UpdatePolicies(nodeConfig.FindAllFirewallPolicies())
2021-02-06 17:34:33 +08:00
iplibrary.SharedActionManager.UpdateActions(nodeConfig.FirewallActions)
2020-09-26 08:07:07 +08:00
sharedNodeConfig = nodeConfig
2021-06-30 20:01:00 +08:00
metrics.SharedManager.Update(nodeConfig.MetricItems)
2020-12-03 10:17:28 +08:00
// 发送事件
events.Notify(events.EventReload)
if this.isLoaded {
2020-09-09 18:53:53 +08:00
return sharedListenerManager.Start(nodeConfig)
}
this.isLoaded = true
2020-09-09 18:53:53 +08:00
return nil
}
// 启动同步计时器
func (this *Node) startSyncTimer() {
2020-09-26 08:07:07 +08:00
// TODO 这个时间间隔可以自行设置
ticker := time.NewTicker(60 * time.Second)
2020-10-28 11:19:06 +08:00
events.On(events.EventQuit, func() {
remotelogs.Println("NODE", "quit sync timer")
2020-10-28 11:19:06 +08:00
ticker.Stop()
})
2020-09-09 18:53:53 +08:00
go func() {
for {
select {
case <-ticker.C:
err := this.loop()
if err != nil {
remotelogs.Error("NODE", "sync config error: "+err.Error())
continue
}
case <-nodeTaskNotify:
err := this.loop()
if err != nil {
remotelogs.Error("NODE", "sync config error: "+err.Error())
continue
}
2020-09-09 18:53:53 +08:00
}
}
}()
}
// 检查集群设置
func (this *Node) checkClusterConfig() error {
configFile := Tea.ConfigFile("cluster.yaml")
data, err := ioutil.ReadFile(configFile)
if err != nil {
return err
}
config := &configs.ClusterConfig{}
err = yaml.Unmarshal(data, config)
if err != nil {
return err
}
rpcClient, err := rpc.NewRPCClient(&configs.APIConfig{
RPC: config.RPC,
NodeId: config.ClusterId,
Secret: config.Secret,
})
if err != nil {
return err
}
2021-01-11 18:16:15 +08:00
logs.Println("[NODE]registering node ...")
resp, err := rpcClient.NodeRPC().RegisterClusterNode(rpcClient.ClusterContext(config.ClusterId, config.Secret), &pb.RegisterClusterNodeRequest{Name: HOSTNAME})
if err != nil {
return err
}
2021-01-11 18:16:15 +08:00
logs.Println("[NODE]registered successfully")
// 写入到配置文件中
if len(resp.Endpoints) == 0 {
resp.Endpoints = []string{}
}
apiConfig := &configs.APIConfig{
RPC: struct {
Endpoints []string `yaml:"endpoints"`
}{
Endpoints: resp.Endpoints,
},
NodeId: resp.UniqueId,
Secret: resp.Secret,
}
2021-01-11 18:16:15 +08:00
logs.Println("[NODE]writing 'configs/api.yaml' ...")
err = apiConfig.WriteFile(Tea.ConfigFile("api.yaml"))
if err != nil {
return err
}
2021-01-11 18:16:15 +08:00
logs.Println("[NODE]wrote 'configs/api.yaml' successfully")
return nil
}
// 监听本地sock
func (this *Node) listenSock() error {
2021-07-25 17:14:44 +08:00
this.sock = gosock.NewTmpSock(teaconst.ProcessName)
2021-07-25 17:14:44 +08:00
// 检查是否在运行
if this.sock.IsListening() {
reply, err := this.sock.Send(&gosock.Command{Code: "pid"})
if err == nil {
return errors.New("error: the process is already running, pid: " + maps.NewMap(reply.Params).GetString("pid"))
} else {
2021-07-25 17:14:44 +08:00
return errors.New("error: the process is already running")
}
}
2021-07-25 17:14:44 +08:00
// 启动监听
go func() {
2021-07-25 17:14:44 +08:00
this.sock.OnCommand(func(cmd *gosock.Command) {
switch cmd.Code {
case "pid":
_ = cmd.Reply(&gosock.Command{
Code: "pid",
Params: map[string]interface{}{
"pid": os.Getpid(),
},
})
case "stop":
_ = cmd.ReplyOk()
// 退出主进程
events.Notify(events.EventQuit)
os.Exit(0)
case "quit":
_ = cmd.ReplyOk()
_ = this.sock.Close()
events.Notify(events.EventQuit)
// 监控连接数如果连接数为0则退出进程
go func() {
for {
countActiveConnections := sharedListenerManager.TotalActiveConnections()
if countActiveConnections <= 0 {
os.Exit(0)
return
}
time.Sleep(1 * time.Second)
}
}()
}
2021-07-25 17:14:44 +08:00
})
err := this.sock.Listen()
if err != nil {
logs.Println("NODE", err.Error())
}
}()
2021-07-25 17:14:44 +08:00
events.On(events.EventQuit, func() {
logs.Println("NODE", "quit unix sock")
_ = this.sock.Close()
})
return nil
}