Files
EdgeNode/internal/nodes/node.go

336 lines
7.1 KiB
Go
Raw Normal View History

2020-07-21 11:18:47 +08:00
package nodes
import (
2020-09-09 18:53:53 +08:00
"encoding/json"
"errors"
2020-09-26 08:07:07 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
2020-09-13 20:37:40 +08:00
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
2020-10-28 11:19:06 +08:00
"github.com/TeaOSLab/EdgeNode/internal/apps"
2020-10-05 16:55:14 +08:00
"github.com/TeaOSLab/EdgeNode/internal/caches"
"github.com/TeaOSLab/EdgeNode/internal/configs"
2020-10-28 11:19:06 +08:00
"github.com/TeaOSLab/EdgeNode/internal/events"
2020-10-09 11:06:43 +08:00
"github.com/TeaOSLab/EdgeNode/internal/logs"
2020-09-09 18:53:53 +08:00
"github.com/TeaOSLab/EdgeNode/internal/rpc"
2020-07-22 22:18:47 +08:00
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/go-yaml/yaml"
"github.com/iwind/TeaGo/Tea"
tealogs "github.com/iwind/TeaGo/logs"
"io/ioutil"
"net"
"os"
2020-10-28 11:19:06 +08:00
"os/signal"
"runtime"
2020-10-28 11:19:06 +08:00
"syscall"
2020-09-09 18:53:53 +08:00
"time"
2020-07-21 11:18:47 +08:00
)
2020-09-26 08:07:07 +08:00
var lastVersion = int64(-1)
var sharedNodeConfig *nodeconfigs.NodeConfig
var changeNotify = make(chan bool, 8)
2020-07-21 11:18:47 +08:00
2020-09-09 18:53:53 +08:00
// 节点
2020-07-21 11:18:47 +08:00
type Node struct {
}
func NewNode() *Node {
return &Node{}
}
// 检查配置
func (this *Node) Test() error {
// 检查是否能连接API
rpcClient, err := rpc.SharedRPC()
if err != nil {
return errors.New("test rpc failed: " + err.Error())
}
_, err = rpcClient.APINodeRPC().FindCurrentAPINodeVersion(rpcClient.Context(), &pb.FindCurrentAPINodeVersionRequest{})
if err != nil {
return errors.New("test rpc failed: " + err.Error())
}
return nil
}
// 启动
2020-07-21 11:18:47 +08:00
func (this *Node) Start() {
2020-10-28 11:19:06 +08:00
// 处理信号
this.listenSignals()
// 本地Sock
err := this.listenSock()
if err != nil {
logs.Error("NODE", err.Error())
return
}
2020-09-09 18:53:53 +08:00
// 读取API配置
err = this.syncConfig(false)
2020-09-09 18:53:53 +08:00
if err != nil {
2020-10-09 11:06:43 +08:00
logs.Error("NODE", err.Error())
return
2020-09-09 18:53:53 +08:00
}
// 启动同步计时器
this.startSyncTimer()
// 状态变更计时器
go NewNodeStatusExecutor().Listen()
2020-07-22 22:18:47 +08:00
// 读取配置
2020-09-26 08:07:07 +08:00
nodeConfig, err := nodeconfigs.SharedNodeConfig()
2020-07-21 11:18:47 +08:00
if err != nil {
2020-10-09 11:06:43 +08:00
logs.Error("NODE", "start failed: read node config failed: "+err.Error())
2020-07-21 11:18:47 +08:00
return
}
2020-09-26 08:07:07 +08:00
err = nodeConfig.Init()
if err != nil {
2020-10-09 11:06:43 +08:00
logs.Error("NODE", "init node config failed: "+err.Error())
2020-09-26 08:07:07 +08:00
return
}
sharedNodeConfig = nodeConfig
2020-07-21 11:18:47 +08:00
2020-07-22 22:18:47 +08:00
// 设置rlimit
_ = utils.SetRLimit(1024 * 1024)
2020-10-04 14:30:42 +08:00
// 连接API
go NewAPIStream().Start()
2020-07-22 22:18:47 +08:00
// 启动端口
err = sharedListenerManager.Start(nodeConfig)
if err != nil {
2020-10-09 11:06:43 +08:00
logs.Error("NODE", "start failed: "+err.Error())
2020-10-28 11:19:06 +08:00
return
}
// 写入PID
err = apps.WritePid()
if err != nil {
logs.Error("NODE", "write pid failed: "+err.Error())
return
2020-07-22 22:18:47 +08:00
}
// hold住进程
2020-10-04 14:30:42 +08:00
select {}
2020-07-21 11:18:47 +08:00
}
2020-09-09 18:53:53 +08:00
2020-10-28 11:19:06 +08:00
// 处理信号
func (this *Node) listenSignals() {
signals := make(chan os.Signal)
signal.Notify(signals, syscall.SIGQUIT)
go func() {
for s := range signals {
switch s {
case syscall.SIGQUIT:
events.Notify(events.EventQuit)
// 监控连接数如果连接数为0则退出进程
go func() {
for {
countActiveConnections := sharedListenerManager.TotalActiveConnections()
if countActiveConnections <= 0 {
os.Exit(0)
return
}
time.Sleep(1 * time.Second)
}
}()
}
}
}()
}
2020-09-09 18:53:53 +08:00
// 读取API配置
func (this *Node) syncConfig(isFirstTime bool) error {
// 检查api.yaml是否存在
apiConfigFile := Tea.ConfigFile("api.yaml")
_, err := os.Stat(apiConfigFile)
if err != nil {
if os.IsNotExist(err) {
clusterErr := this.checkClusterConfig()
if clusterErr != nil {
if os.IsNotExist(clusterErr) {
return err
}
return clusterErr
}
} else {
return err
}
}
2020-09-09 18:53:53 +08:00
rpcClient, err := rpc.SharedRPC()
if err != nil {
return errors.New("create rpc client failed: " + err.Error())
2020-09-09 18:53:53 +08:00
}
2020-09-26 08:07:07 +08:00
// TODO 这里考虑只同步版本号有变更的
2020-11-02 22:14:45 +08:00
configResp, err := rpcClient.NodeRPC().FindCurrentNodeConfig(rpcClient.Context(), &pb.FindCurrentNodeConfigRequest{})
2020-09-09 18:53:53 +08:00
if err != nil {
return errors.New("read config from rpc failed: " + err.Error())
2020-09-09 18:53:53 +08:00
}
2020-09-26 08:07:07 +08:00
configJSON := configResp.NodeJSON
nodeConfig := &nodeconfigs.NodeConfig{}
err = json.Unmarshal(configJSON, nodeConfig)
2020-09-09 18:53:53 +08:00
if err != nil {
return errors.New("decode config failed: " + err.Error())
2020-09-09 18:53:53 +08:00
}
// 写入到文件中
err = nodeConfig.Save()
if err != nil {
return err
}
// 如果版本相同,则只是保存
if lastVersion == nodeConfig.Version {
return nil
}
lastVersion = nodeConfig.Version
2020-09-26 08:07:07 +08:00
err = nodeConfig.Init()
2020-09-09 18:53:53 +08:00
if err != nil {
return err
}
// max cpu
if nodeConfig.MaxCPU > 0 && nodeConfig.MaxCPU < int32(runtime.NumCPU()) {
runtime.GOMAXPROCS(int(nodeConfig.MaxCPU))
} else {
runtime.GOMAXPROCS(runtime.NumCPU())
}
2020-09-26 08:07:07 +08:00
// 刷新配置
if isFirstTime {
logs.Println("NODE", "reloading config ...")
} else {
logs.Println("NODE", "loading config ...")
}
2020-09-26 08:07:07 +08:00
nodeconfigs.ResetNodeConfig(nodeConfig)
2020-10-05 16:55:14 +08:00
caches.SharedManager.UpdatePolicies(nodeConfig.AllCachePolicies())
2020-10-08 15:06:42 +08:00
sharedWAFManager.UpdatePolicies(nodeConfig.AllHTTPFirewallPolicies())
2020-09-26 08:07:07 +08:00
sharedNodeConfig = nodeConfig
2020-09-09 18:53:53 +08:00
if !isFirstTime {
return sharedListenerManager.Start(nodeConfig)
}
return nil
}
// 启动同步计时器
func (this *Node) startSyncTimer() {
2020-09-26 08:07:07 +08:00
// TODO 这个时间间隔可以自行设置
ticker := time.NewTicker(60 * time.Second)
2020-10-28 11:19:06 +08:00
events.On(events.EventQuit, func() {
logs.Println("NODE", "quit sync timer")
ticker.Stop()
})
2020-09-09 18:53:53 +08:00
go func() {
for {
select {
case <-ticker.C:
err := this.syncConfig(false)
if err != nil {
logs.Error("NODE", "sync config error: "+err.Error())
continue
}
case <-changeNotify:
err := this.syncConfig(false)
if err != nil {
logs.Error("NODE", "sync config error: "+err.Error())
continue
}
2020-09-09 18:53:53 +08:00
}
}
}()
}
// 检查集群设置
func (this *Node) checkClusterConfig() error {
configFile := Tea.ConfigFile("cluster.yaml")
data, err := ioutil.ReadFile(configFile)
if err != nil {
return err
}
config := &configs.ClusterConfig{}
err = yaml.Unmarshal(data, config)
if err != nil {
return err
}
rpcClient, err := rpc.NewRPCClient(&configs.APIConfig{
RPC: config.RPC,
NodeId: config.ClusterId,
Secret: config.Secret,
})
if err != nil {
return err
}
tealogs.Println("[NODE]registering node ...")
resp, err := rpcClient.NodeRPC().RegisterClusterNode(rpcClient.ClusterContext(config.ClusterId, config.Secret), &pb.RegisterClusterNodeRequest{Name: HOSTNAME})
if err != nil {
return err
}
tealogs.Println("[NODE]registered successfully")
// 写入到配置文件中
if len(resp.Endpoints) == 0 {
resp.Endpoints = []string{}
}
apiConfig := &configs.APIConfig{
RPC: struct {
Endpoints []string `yaml:"endpoints"`
}{
Endpoints: resp.Endpoints,
},
NodeId: resp.UniqueId,
Secret: resp.Secret,
}
tealogs.Println("[NODE]writing 'configs/api.yaml' ...")
err = apiConfig.WriteFile(Tea.ConfigFile("api.yaml"))
if err != nil {
return err
}
tealogs.Println("[NODE]wrote 'configs/api.yaml' successfully")
return nil
}
// 监听本地sock
func (this *Node) listenSock() error {
path := os.TempDir() + "/edge-node.sock"
// 检查是否已经存在
_, err := os.Stat(path)
if err == nil {
conn, err := net.Dial("unix", path)
if err != nil {
_ = os.Remove(path)
} else {
_ = conn.Close()
}
}
// 新的监听任务
listener, err := net.Listen("unix", path)
if err != nil {
return err
}
2020-10-28 11:19:06 +08:00
events.On(events.EventQuit, func() {
logs.Println("NODE", "quit unix sock")
_ = listener.Close()
})
go func() {
for {
_, err := listener.Accept()
if err != nil {
return
}
}
}()
return nil
}