节点离线时尝试自动通过API节点远程启动

This commit is contained in:
GoEdgeLab
2022-09-17 10:18:00 +08:00
parent 6e984590b8
commit fb8c8ef643
8 changed files with 89 additions and 620 deletions

View File

@@ -105,12 +105,14 @@ function build() {
done
# building edge dns installer
echo "building dns node installer ..."
architects=("amd64" "arm64")
for arch in "${architects[@]}"; do
# TODO support arm, mips ...
env GOOS=linux GOARCH="${arch}" go build -trimpath -tags $TAG --ldflags="-s -w" -o "$ROOT"/installers/edge-installer-dns-helper-linux-"${arch}" "$ROOT"/../cmd/installer-dns-helper/main.go
done
if [ $TAG = "plus" ]; then
echo "building dns node installer ..."
architects=("amd64" "arm64")
for arch in "${architects[@]}"; do
# TODO support arm, mips ...
env GOOS=linux GOARCH="${arch}" go build -trimpath -tags $TAG --ldflags="-s -w" -o "$ROOT"/installers/edge-installer-dns-helper-linux-"${arch}" "$ROOT"/../cmd/installer-dns-helper/main.go
done
fi
# building api node
env GOOS="$OS" GOARCH="$ARCH" go build -trimpath -tags $TAG --ldflags="-s -w" -o "$DIST"/bin/edge-api "$ROOT"/../cmd/edge-api/main.go

View File

@@ -0,0 +1,27 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn .
package installers
type GrantError struct {
err string
}
func newGrantError(err string) *GrantError {
return &GrantError{err: err}
}
func (this *GrantError) Error() string {
return this.err
}
func (this *GrantError) String() string {
return this.err
}
func IsGrantError(err error) bool {
if err == nil {
return false
}
_, ok := err.(*GrantError)
return ok
}

View File

@@ -42,10 +42,10 @@ func (this *BaseInstaller) Login(credentials *Credentials) error {
}
// 认证
methods := []ssh.AuthMethod{}
var methods = []ssh.AuthMethod{}
if credentials.Method == "user" {
{
authMethod := ssh.Password(credentials.Password)
var authMethod = ssh.Password(credentials.Password)
methods = append(methods, authMethod)
}
@@ -79,7 +79,7 @@ func (this *BaseInstaller) Login(credentials *Credentials) error {
if len(credentials.Username) == 0 {
credentials.Username = "root"
}
config := &ssh.ClientConfig{
var config = &ssh.ClientConfig{
User: credentials.Username,
Auth: methods,
HostKeyCallback: hostKeyCallback,

View File

@@ -1,147 +0,0 @@
package installers
import (
"bytes"
"errors"
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"os"
"path/filepath"
"regexp"
)
type NSNodeInstaller struct {
BaseInstaller
}
func (this *NSNodeInstaller) Install(dir string, params interface{}, installStatus *models.NodeInstallStatus) error {
if params == nil {
return errors.New("'params' required for node installation")
}
nodeParams, ok := params.(*NodeParams)
if !ok {
return errors.New("'params' should be *NodeParams")
}
err := nodeParams.Validate()
if err != nil {
return errors.New("params validation: " + err.Error())
}
// 检查目标目录是否存在
_, err = this.client.Stat(dir)
if err != nil {
err = this.client.MkdirAll(dir)
if err != nil {
installStatus.ErrorCode = "CREATE_ROOT_DIRECTORY_FAILED"
return errors.New("create directory '" + dir + "' failed: " + err.Error())
}
}
// 安装助手
env, err := this.InstallHelper(dir, nodeconfigs.NodeRoleDNS)
if err != nil {
installStatus.ErrorCode = "INSTALL_HELPER_FAILED"
return err
}
// 上传安装文件
filePrefix := "edge-dns-" + env.OS + "-" + env.Arch
zipFile, err := this.LookupLatestInstaller(filePrefix)
if err != nil {
return err
}
if len(zipFile) == 0 {
return errors.New("can not find installer file for " + env.OS + "/" + env.Arch)
}
targetZip := dir + "/" + filepath.Base(zipFile)
err = this.client.Copy(zipFile, targetZip, 0777)
if err != nil {
return err
}
// 测试运行环境
// 升级的节点暂时不列入测试
if !nodeParams.IsUpgrading {
_, stderr, err := this.client.Exec(dir + "/" + env.HelperName + " -cmd=test")
if err != nil {
return errors.New("test failed: " + err.Error())
}
if len(stderr) > 0 {
return errors.New("test failed: " + stderr)
}
}
// 如果是升级则优雅停止先前的进程
exePath := dir + "/edge-dns/bin/edge-dns"
if nodeParams.IsUpgrading {
_, err = this.client.Stat(exePath)
if err == nil {
_, _, _ = this.client.Exec(exePath + " stop")
// 删除可执行文件防止冲突
err = this.client.Remove(exePath)
if err != nil && err != os.ErrNotExist {
return errors.New("remove old file failed: " + err.Error())
}
}
}
// 解压
_, stderr, err := this.client.Exec(dir + "/" + env.HelperName + " -cmd=unzip -zip=\"" + targetZip + "\" -target=\"" + dir + "\"")
if err != nil {
return err
}
if len(stderr) > 0 {
return errors.New("unzip installer failed: " + stderr)
}
// 修改配置文件
{
configFile := dir + "/edge-dns/configs/api.yaml"
// sudo之后我们需要修改配置目录才能写入文件
if this.client.sudo {
_, _, _ = this.client.Exec("chown " + this.client.User() + " " + filepath.Dir(configFile))
}
var data = []byte(`rpc:
endpoints: [ ${endpoints} ]
nodeId: "${nodeId}"
secret: "${nodeSecret}"`)
data = bytes.ReplaceAll(data, []byte("${endpoints}"), []byte(nodeParams.QuoteEndpoints()))
data = bytes.ReplaceAll(data, []byte("${nodeId}"), []byte(nodeParams.NodeId))
data = bytes.ReplaceAll(data, []byte("${nodeSecret}"), []byte(nodeParams.Secret))
_, err = this.client.WriteFile(configFile, data)
if err != nil {
return errors.New("write '" + configFile + "': " + err.Error())
}
}
// 测试
_, stderr, err = this.client.Exec(dir + "/edge-dns/bin/edge-dns test")
if err != nil {
installStatus.ErrorCode = "TEST_FAILED"
return errors.New("test edge node failed: " + err.Error() + ", stderr: " + stderr)
}
if len(stderr) > 0 {
if regexp.MustCompile(`(?i)rpc`).MatchString(stderr) {
installStatus.ErrorCode = "RPC_TEST_FAILED"
}
return errors.New("test edge dns node failed: " + stderr)
}
// 启动
_, stderr, err = this.client.Exec(dir + "/edge-dns/bin/edge-dns start")
if err != nil {
return errors.New("start edge dns failed: " + err.Error())
}
if len(stderr) > 0 {
return errors.New("start edge dns failed: " + stderr)
}
return nil
}

View File

@@ -1,39 +0,0 @@
package installers
import (
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"testing"
)
func TestDNSNodeInstaller_Install(t *testing.T) {
var installer InstallerInterface = &NSNodeInstaller{}
err := installer.Login(&Credentials{
Host: "192.168.2.30",
Port: 22,
Username: "root",
Password: "123456",
PrivateKey: "",
Method: "user",
})
if err != nil {
t.Fatal(err)
}
// 关闭连接
defer func() {
err := installer.Close()
if err != nil {
t.Fatal(err)
}
}()
// 安装
err = installer.Install("/opt/edge", &NodeParams{
Endpoints: []string{"http://192.168.2.40:8003"},
NodeId: "b3f0690c793db5daaa666e89bd7b2301",
Secret: "H6nbSzjN3tLYi0ecdtUeDpQdZZPjKL7S",
}, &models.NodeInstallStatus{})
if err != nil {
t.Fatal(err)
}
}

View File

@@ -218,19 +218,19 @@ func (this *NodeQueue) StartNode(nodeId int64) error {
return err
}
if login == nil {
return errors.New("can not find node login information")
return newGrantError("can not find node login information")
}
loginParams, err := login.DecodeSSHParams()
if err != nil {
return err
return newGrantError(err.Error())
}
if len(loginParams.Host) == 0 {
return errors.New("ssh host should not be empty")
return newGrantError("ssh host should not be empty")
}
if loginParams.Port <= 0 {
return errors.New("ssh port is invalid")
return newGrantError("ssh port is invalid")
}
if loginParams.GrantId == 0 {
@@ -240,7 +240,7 @@ func (this *NodeQueue) StartNode(nodeId int64) error {
return err
}
if grantId == 0 {
return errors.New("can not find node grant")
return newGrantError("can not find node grant")
}
loginParams.GrantId = grantId
}
@@ -249,10 +249,10 @@ func (this *NodeQueue) StartNode(nodeId int64) error {
return err
}
if grant == nil {
return errors.New("can not find user grant with id '" + numberutils.FormatInt64(loginParams.GrantId) + "'")
return newGrantError("can not find user grant with id '" + numberutils.FormatInt64(loginParams.GrantId) + "'")
}
installer := &NodeInstaller{}
var installer = &NodeInstaller{}
err = installer.Login(&Credentials{
Host: loginParams.Host,
Port: loginParams.Port,

View File

@@ -1,417 +0,0 @@
package installers
import (
"errors"
"fmt"
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/goman"
"github.com/TeaOSLab/EdgeAPI/internal/utils"
"github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/iwind/TeaGo/logs"
"time"
)
var sharedNSNodeQueue = NewNSNodeQueue()
type NSNodeQueue struct {
}
func NewNSNodeQueue() *NSNodeQueue {
return &NSNodeQueue{}
}
func SharedNSNodeQueue() *NSNodeQueue {
return sharedNSNodeQueue
}
// InstallNodeProcess 安装边缘节点流程控制
func (this *NSNodeQueue) InstallNodeProcess(nodeId int64, isUpgrading bool) error {
installStatus := models.NewNodeInstallStatus()
installStatus.IsRunning = true
installStatus.UpdatedAt = time.Now().Unix()
err := models.SharedNSNodeDAO.UpdateNodeInstallStatus(nil, nodeId, installStatus)
if err != nil {
return err
}
// 更新时间
ticker := utils.NewTicker(3 * time.Second)
goman.New(func() {
for ticker.Wait() {
installStatus.UpdatedAt = time.Now().Unix()
err := models.SharedNSNodeDAO.UpdateNodeInstallStatus(nil, nodeId, installStatus)
if err != nil {
logs.Println("[INSTALL]" + err.Error())
continue
}
}
})
defer func() {
ticker.Stop()
}()
// 开始安装
err = this.InstallNode(nodeId, installStatus, isUpgrading)
// 安装结束
installStatus.IsRunning = false
installStatus.IsFinished = true
if err != nil {
installStatus.Error = err.Error()
} else {
installStatus.IsOk = true
}
err = models.SharedNSNodeDAO.UpdateNodeInstallStatus(nil, nodeId, installStatus)
if err != nil {
return err
}
// 修改为已安装
if installStatus.IsOk {
err = models.SharedNSNodeDAO.UpdateNodeIsInstalled(nil, nodeId, true)
if err != nil {
return err
}
}
return nil
}
// InstallNode 安装边缘节点
func (this *NSNodeQueue) InstallNode(nodeId int64, installStatus *models.NodeInstallStatus, isUpgrading bool) error {
node, err := models.SharedNSNodeDAO.FindEnabledNSNode(nil, nodeId)
if err != nil {
return err
}
if node == nil {
return errors.New("can not find node, ID'" + numberutils.FormatInt64(nodeId) + "'")
}
// 登录信息
login, err := models.SharedNodeLoginDAO.FindEnabledNodeLoginWithNodeId(nil, nodeconfigs.NodeRoleDNS, nodeId)
if err != nil {
return err
}
if login == nil {
installStatus.ErrorCode = "EMPTY_LOGIN"
return errors.New("can not find node login information")
}
loginParams, err := login.DecodeSSHParams()
if err != nil {
return err
}
if len(loginParams.Host) == 0 {
installStatus.ErrorCode = "EMPTY_SSH_HOST"
return errors.New("ssh host should not be empty")
}
if loginParams.Port <= 0 {
installStatus.ErrorCode = "EMPTY_SSH_PORT"
return errors.New("ssh port is invalid")
}
if loginParams.GrantId == 0 {
// 从集群中读取
grantId, err := models.SharedNSClusterDAO.FindClusterGrantId(nil, int64(node.ClusterId))
if err != nil {
return err
}
if grantId == 0 {
installStatus.ErrorCode = "EMPTY_GRANT"
return errors.New("can not find node grant")
}
loginParams.GrantId = grantId
}
grant, err := models.SharedNodeGrantDAO.FindEnabledNodeGrant(nil, loginParams.GrantId)
if err != nil {
return err
}
if grant == nil {
installStatus.ErrorCode = "EMPTY_GRANT"
return errors.New("can not find user grant with id '" + numberutils.FormatInt64(loginParams.GrantId) + "'")
}
// API终端
apiNodes, err := models.SharedAPINodeDAO.FindAllEnabledAndOnAPINodes(nil)
if err != nil {
return err
}
if len(apiNodes) == 0 {
return errors.New("no available api nodes")
}
apiEndpoints := []string{}
for _, apiNode := range apiNodes {
addrConfigs, err := apiNode.DecodeAccessAddrs()
if err != nil {
return errors.New("decode api node access addresses failed: " + err.Error())
}
for _, addrConfig := range addrConfigs {
apiEndpoints = append(apiEndpoints, addrConfig.FullAddresses()...)
}
}
params := &NodeParams{
Endpoints: apiEndpoints,
NodeId: node.UniqueId,
Secret: node.Secret,
IsUpgrading: isUpgrading,
}
installer := &NSNodeInstaller{}
err = installer.Login(&Credentials{
Host: loginParams.Host,
Port: loginParams.Port,
Username: grant.Username,
Password: grant.Password,
PrivateKey: grant.PrivateKey,
Passphrase: grant.Passphrase,
Method: grant.Method,
Sudo: grant.Su == 1,
})
if err != nil {
installStatus.ErrorCode = "SSH_LOGIN_FAILED"
return err
}
defer func() {
_ = installer.Close()
}()
// 安装目录
installDir := node.InstallDir
if len(installDir) == 0 {
clusterId := node.ClusterId
cluster, err := models.SharedNSClusterDAO.FindEnabledNSCluster(nil, int64(clusterId))
if err != nil {
return err
}
if cluster == nil {
return errors.New("can not find cluster, ID'" + fmt.Sprintf("%d", clusterId) + "'")
}
installDir = cluster.InstallDir
if len(installDir) == 0 {
// 默认是 $登录用户/edge-dns
installDir = installer.client.UserHome() + "/edge-dns"
}
}
err = installer.Install(installDir, params, installStatus)
return err
}
// StartNode 启动边缘节点
func (this *NSNodeQueue) StartNode(nodeId int64) error {
node, err := models.SharedNSNodeDAO.FindEnabledNSNode(nil, nodeId)
if err != nil {
return err
}
if node == nil {
return errors.New("can not find node, ID'" + numberutils.FormatInt64(nodeId) + "'")
}
// 登录信息
login, err := models.SharedNodeLoginDAO.FindEnabledNodeLoginWithNodeId(nil, nodeconfigs.NodeRoleDNS, nodeId)
if err != nil {
return err
}
if login == nil {
return errors.New("can not find node login information")
}
loginParams, err := login.DecodeSSHParams()
if err != nil {
return err
}
if len(loginParams.Host) == 0 {
return errors.New("ssh host should not be empty")
}
if loginParams.Port <= 0 {
return errors.New("ssh port is invalid")
}
if loginParams.GrantId == 0 {
// 从集群中读取
grantId, err := models.SharedNSClusterDAO.FindClusterGrantId(nil, int64(node.ClusterId))
if err != nil {
return err
}
if grantId == 0 {
return errors.New("can not find node grant")
}
loginParams.GrantId = grantId
}
grant, err := models.SharedNodeGrantDAO.FindEnabledNodeGrant(nil, loginParams.GrantId)
if err != nil {
return err
}
if grant == nil {
return errors.New("can not find user grant with id '" + numberutils.FormatInt64(loginParams.GrantId) + "'")
}
installer := &NSNodeInstaller{}
err = installer.Login(&Credentials{
Host: loginParams.Host,
Port: loginParams.Port,
Username: grant.Username,
Password: grant.Password,
PrivateKey: grant.PrivateKey,
Passphrase: grant.Passphrase,
Method: grant.Method,
Sudo: grant.Su == 1,
})
if err != nil {
return err
}
defer func() {
_ = installer.Close()
}()
// 安装目录
installDir := node.InstallDir
if len(installDir) == 0 {
clusterId := node.ClusterId
cluster, err := models.SharedNSClusterDAO.FindEnabledNSCluster(nil, int64(clusterId))
if err != nil {
return err
}
if cluster == nil {
return errors.New("can not find cluster, ID'" + fmt.Sprintf("%d", clusterId) + "'")
}
installDir = cluster.InstallDir
if len(installDir) == 0 {
// 默认是 $登录用户/edge-dns
installDir = installer.client.UserHome() + "/edge-dns"
}
}
// 检查命令是否存在
exeFile := installDir + "/edge-dns/bin/edge-dns"
_, err = installer.client.Stat(exeFile)
if err != nil {
return errors.New("edge node is not installed correctly, can not find executable file: " + exeFile)
}
// 我们先尝试Systemd启动
_, _, _ = installer.client.Exec("systemctl start edge-dns")
_, stderr, err := installer.client.Exec(exeFile + " start")
if err != nil {
return errors.New("start failed: " + err.Error())
}
if len(stderr) > 0 {
return errors.New("start failed: " + stderr)
}
return nil
}
// StopNode 停止节点
func (this *NSNodeQueue) StopNode(nodeId int64) error {
node, err := models.SharedNSNodeDAO.FindEnabledNSNode(nil, nodeId)
if err != nil {
return err
}
if node == nil {
return errors.New("can not find node, ID'" + numberutils.FormatInt64(nodeId) + "'")
}
// 登录信息
login, err := models.SharedNodeLoginDAO.FindEnabledNodeLoginWithNodeId(nil, nodeconfigs.NodeRoleDNS, nodeId)
if err != nil {
return err
}
if login == nil {
return errors.New("can not find node login information")
}
loginParams, err := login.DecodeSSHParams()
if err != nil {
return err
}
if len(loginParams.Host) == 0 {
return errors.New("ssh host should not be empty")
}
if loginParams.Port <= 0 {
return errors.New("ssh port is invalid")
}
if loginParams.GrantId == 0 {
// 从集群中读取
grantId, err := models.SharedNSClusterDAO.FindClusterGrantId(nil, int64(node.ClusterId))
if err != nil {
return err
}
if grantId == 0 {
return errors.New("can not find node grant")
}
loginParams.GrantId = grantId
}
grant, err := models.SharedNodeGrantDAO.FindEnabledNodeGrant(nil, loginParams.GrantId)
if err != nil {
return err
}
if grant == nil {
return errors.New("can not find user grant with id '" + numberutils.FormatInt64(loginParams.GrantId) + "'")
}
installer := &NSNodeInstaller{}
err = installer.Login(&Credentials{
Host: loginParams.Host,
Port: loginParams.Port,
Username: grant.Username,
Password: grant.Password,
PrivateKey: grant.PrivateKey,
Passphrase: grant.Passphrase,
Method: grant.Method,
Sudo: grant.Su == 1,
})
if err != nil {
return err
}
defer func() {
_ = installer.Close()
}()
// 安装目录
installDir := node.InstallDir
if len(installDir) == 0 {
clusterId := node.ClusterId
cluster, err := models.SharedNSClusterDAO.FindEnabledNSCluster(nil, int64(clusterId))
if err != nil {
return err
}
if cluster == nil {
return errors.New("can not find cluster, ID'" + fmt.Sprintf("%d", clusterId) + "'")
}
installDir = cluster.InstallDir
if len(installDir) == 0 {
// 默认是 $登录用户/edge-dns
installDir = installer.client.UserHome() + "/edge-dns"
}
}
// 检查命令是否存在
exeFile := installDir + "/edge-dns/bin/edge-dns"
_, err = installer.client.Stat(exeFile)
if err != nil {
return errors.New("edge node is not installed correctly, can not find executable file: " + exeFile)
}
// 我们先尝试Systemd停止
_, _, _ = installer.client.Exec("systemctl stop edge-dns")
_, stderr, err := installer.client.Exec(exeFile + " stop")
if err != nil {
return errors.New("stop failed: " + err.Error())
}
if len(stderr) > 0 {
return errors.New("stop failed: " + stderr)
}
return nil
}

View File

@@ -3,6 +3,7 @@ package tasks
import (
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/goman"
"github.com/TeaOSLab/EdgeAPI/internal/installers"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/types"
@@ -18,6 +19,12 @@ func init() {
})
}
// 节点启动尝试
type nodeStartingTry struct {
count int
timestamp int64
}
// NodeMonitorTask 边缘节点监控任务
type NodeMonitorTask struct {
BaseTask
@@ -26,6 +33,8 @@ type NodeMonitorTask struct {
inactiveMap map[string]int // cluster@nodeId => count
notifiedMap map[int64]int64 // nodeId => timestamp
recoverMap map[int64]*nodeStartingTry // nodeId => *nodeStartingTry
}
func NewNodeMonitorTask(duration time.Duration) *NodeMonitorTask {
@@ -33,6 +42,7 @@ func NewNodeMonitorTask(duration time.Duration) *NodeMonitorTask {
ticker: time.NewTicker(duration),
inactiveMap: map[string]int{},
notifiedMap: map[int64]int64{},
recoverMap: map[int64]*nodeStartingTry{},
}
}
@@ -74,7 +84,40 @@ func (this *NodeMonitorTask) MonitorCluster(cluster *models.NodeCluster) error {
return err
}
var nodeMap = map[int64]*models.Node{}
// 尝试自动远程启动
var nodeQueue = installers.NewNodeQueue()
for _, node := range inactiveNodes {
var nodeId = int64(node.Id)
tryInfo, ok := this.recoverMap[nodeId]
if !ok {
tryInfo = &nodeStartingTry{
count: 1,
timestamp: time.Now().Unix(),
}
this.recoverMap[nodeId] = tryInfo
} else {
if tryInfo.count >= 3 /** 3次 **/ { // N 秒内超过 M 次就暂时不再重新尝试,防止阻塞当前任务
if tryInfo.timestamp+10*60 /** 10 分钟 **/ > time.Now().Unix() {
continue
}
tryInfo.timestamp = time.Now().Unix()
tryInfo.count = 0
}
tryInfo.count++
}
// TODO 如果用户手工安装的位置不在标准位置,需要节点自身记住最近启动的位置
err = nodeQueue.StartNode(nodeId)
if err != nil {
if !installers.IsGrantError(err) {
_ = models.SharedNodeLogDAO.CreateLog(nil, nodeconfigs.NodeRoleNode, nodeId, 0, 0, models.LevelError, "NODE", "start node from remote API failed: "+err.Error(), time.Now().Unix(), "", nil)
}
} else {
_ = models.SharedNodeLogDAO.CreateLog(nil, nodeconfigs.NodeRoleNode, nodeId, 0, 0, models.LevelSuccess, "NODE", "start node from remote API successfully", time.Now().Unix(), "", nil)
}
}
var nodeMap = map[int64]*models.Node{} // nodeId => Node
for _, node := range inactiveNodes {
var nodeId = int64(node.Id)
nodeMap[nodeId] = node