Files
EdgeNode/internal/nodes/node.go

154 lines
3.4 KiB
Go
Raw Normal View History

2020-07-21 11:18:47 +08:00
package nodes
import (
2020-09-09 18:53:53 +08:00
"encoding/json"
"errors"
2020-09-26 08:07:07 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
2020-09-13 20:37:40 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
2020-10-05 16:55:14 +08:00
"github.com/TeaOSLab/EdgeNode/internal/caches"
2020-10-09 11:06:43 +08:00
"github.com/TeaOSLab/EdgeNode/internal/logs"
2020-09-09 18:53:53 +08:00
"github.com/TeaOSLab/EdgeNode/internal/rpc"
2020-07-22 22:18:47 +08:00
"github.com/TeaOSLab/EdgeNode/internal/utils"
"runtime"
2020-09-09 18:53:53 +08:00
"time"
2020-07-21 11:18:47 +08:00
)
2020-09-26 08:07:07 +08:00
var lastVersion = int64(-1)
var sharedNodeConfig *nodeconfigs.NodeConfig
var changeNotify = make(chan bool, 8)
2020-07-21 11:18:47 +08:00
2020-09-09 18:53:53 +08:00
// 节点
2020-07-21 11:18:47 +08:00
type Node struct {
}
func NewNode() *Node {
return &Node{}
}
func (this *Node) Start() {
2020-09-09 18:53:53 +08:00
// 读取API配置
err := this.syncConfig(false)
if err != nil {
2020-10-09 11:06:43 +08:00
logs.Error("NODE", err.Error())
2020-09-09 18:53:53 +08:00
}
// 启动同步计时器
this.startSyncTimer()
// 状态变更计时器
go NewNodeStatusExecutor().Listen()
2020-07-22 22:18:47 +08:00
// 读取配置
2020-09-26 08:07:07 +08:00
nodeConfig, err := nodeconfigs.SharedNodeConfig()
2020-07-21 11:18:47 +08:00
if err != nil {
2020-10-09 11:06:43 +08:00
logs.Error("NODE", "start failed: read node config failed: "+err.Error())
2020-07-21 11:18:47 +08:00
return
}
2020-09-26 08:07:07 +08:00
err = nodeConfig.Init()
if err != nil {
2020-10-09 11:06:43 +08:00
logs.Error("NODE", "init node config failed: "+err.Error())
2020-09-26 08:07:07 +08:00
return
}
sharedNodeConfig = nodeConfig
2020-07-21 11:18:47 +08:00
2020-07-22 22:18:47 +08:00
// 设置rlimit
_ = utils.SetRLimit(1024 * 1024)
2020-10-04 14:30:42 +08:00
// 连接API
go NewAPIStream().Start()
2020-07-22 22:18:47 +08:00
// 启动端口
err = sharedListenerManager.Start(nodeConfig)
if err != nil {
2020-10-09 11:06:43 +08:00
logs.Error("NODE", "start failed: "+err.Error())
2020-07-22 22:18:47 +08:00
}
// hold住进程
2020-10-04 14:30:42 +08:00
select {}
2020-07-21 11:18:47 +08:00
}
2020-09-09 18:53:53 +08:00
// 读取API配置
func (this *Node) syncConfig(isFirstTime bool) error {
rpcClient, err := rpc.SharedRPC()
if err != nil {
return errors.New("[NODE]create rpc client failed: " + err.Error())
}
2020-09-26 08:07:07 +08:00
// TODO 这里考虑只同步版本号有变更的
2020-09-09 18:53:53 +08:00
configResp, err := rpcClient.NodeRPC().ComposeNodeConfig(rpcClient.Context(), &pb.ComposeNodeConfigRequest{})
if err != nil {
return errors.New("[NODE]read config from rpc failed: " + err.Error())
}
2020-09-26 08:07:07 +08:00
configJSON := configResp.NodeJSON
nodeConfig := &nodeconfigs.NodeConfig{}
err = json.Unmarshal(configJSON, nodeConfig)
2020-09-09 18:53:53 +08:00
if err != nil {
return errors.New("[NODE]decode config failed: " + err.Error())
}
// 写入到文件中
err = nodeConfig.Save()
if err != nil {
return err
}
// 如果版本相同,则只是保存
if lastVersion == nodeConfig.Version {
return nil
}
lastVersion = nodeConfig.Version
2020-09-26 08:07:07 +08:00
err = nodeConfig.Init()
2020-09-09 18:53:53 +08:00
if err != nil {
return err
}
// max cpu
if nodeConfig.MaxCPU > 0 && nodeConfig.MaxCPU < int32(runtime.NumCPU()) {
runtime.GOMAXPROCS(int(nodeConfig.MaxCPU))
} else {
runtime.GOMAXPROCS(runtime.NumCPU())
}
2020-09-26 08:07:07 +08:00
// 刷新配置
if isFirstTime {
logs.Println("NODE", "reloading config ...")
} else {
logs.Println("NODE", "loading config ...")
}
2020-09-26 08:07:07 +08:00
nodeconfigs.ResetNodeConfig(nodeConfig)
2020-10-05 16:55:14 +08:00
caches.SharedManager.UpdatePolicies(nodeConfig.AllCachePolicies())
2020-10-08 15:06:42 +08:00
sharedWAFManager.UpdatePolicies(nodeConfig.AllHTTPFirewallPolicies())
2020-09-26 08:07:07 +08:00
sharedNodeConfig = nodeConfig
2020-09-09 18:53:53 +08:00
if !isFirstTime {
return sharedListenerManager.Start(nodeConfig)
}
return nil
}
// 启动同步计时器
func (this *Node) startSyncTimer() {
2020-09-26 08:07:07 +08:00
// TODO 这个时间间隔可以自行设置
ticker := time.NewTicker(60 * time.Second)
2020-09-09 18:53:53 +08:00
go func() {
for {
select {
case <-ticker.C:
err := this.syncConfig(false)
if err != nil {
logs.Error("NODE", "sync config error: "+err.Error())
continue
}
case <-changeNotify:
err := this.syncConfig(false)
if err != nil {
logs.Error("NODE", "sync config error: "+err.Error())
continue
}
2020-09-09 18:53:53 +08:00
}
}
}()
}