Files
EdgeAPI/internal/rpc/services/service_node.go
2024-05-07 08:39:58 +08:00

2337 lines
62 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package services
import (
"bytes"
"context"
"encoding/json"
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/db/models/dns"
"github.com/TeaOSLab/EdgeAPI/internal/dnsclients/dnstypes"
"github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeAPI/internal/goman"
"github.com/TeaOSLab/EdgeAPI/internal/installers"
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils"
"github.com/TeaOSLab/EdgeAPI/internal/utils"
"github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils"
"github.com/TeaOSLab/EdgeCommon/pkg/configutils"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/ddosconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/shared"
"github.com/andybalholm/brotli"
"github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/lists"
"github.com/iwind/TeaGo/types"
stringutil "github.com/iwind/TeaGo/utils/string"
"io"
"net"
"path/filepath"
"strings"
"sync"
"time"
)
// NodeVersionCache 节点版本缓存
type NodeVersionCache struct {
CacheMap map[int64]*utils.CacheMap // version => map
}
var nodeVersionCacheMap = map[int64]*NodeVersionCache{} // [cluster_id] => { [version] => cache }
var nodeVersionCacheLocker = &sync.Mutex{}
// NodeService 边缘节点相关服务
type NodeService struct {
BaseService
}
// CreateNode 创建节点
func (this *NodeService) CreateNode(ctx context.Context, req *pb.CreateNodeRequest) (*pb.CreateNodeResponse, error) {
adminId, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
nodeId, err := models.SharedNodeDAO.CreateNode(tx, adminId, req.Name, req.NodeClusterId, req.NodeGroupId, req.NodeRegionId)
if err != nil {
return nil, err
}
// 增加认证相关
if req.NodeLogin != nil {
_, err = models.SharedNodeLoginDAO.CreateNodeLogin(tx, nodeconfigs.NodeRoleNode, nodeId, req.NodeLogin.Name, req.NodeLogin.Type, req.NodeLogin.Params)
if err != nil {
return nil, err
}
}
// 保存DNS相关
if len(req.DnsRoutes) > 0 {
var routesMap = map[int64][]string{}
var m = map[int64][]string{} // domainId => codes
for _, route := range req.DnsRoutes {
var pieces = strings.SplitN(route, "@", 2)
if len(pieces) != 2 {
continue
}
var code = pieces[0]
var domainId = types.Int64(pieces[1])
m[domainId] = append(m[domainId], code)
}
for domainId, codes := range m {
routesMap[domainId] = codes
}
err = models.SharedNodeDAO.UpdateNodeDNS(tx, nodeId, routesMap)
if err != nil {
return nil, err
}
}
return &pb.CreateNodeResponse{
NodeId: nodeId,
}, nil
}
// RegisterClusterNode 注册集群节点
func (this *NodeService) RegisterClusterNode(ctx context.Context, req *pb.RegisterClusterNodeRequest) (*pb.RegisterClusterNodeResponse, error) {
// 校验请求
_, _, clusterId, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeCluster)
if err != nil {
return nil, err
}
var tx = this.NullTx()
adminId, err := models.SharedNodeClusterDAO.FindClusterAdminId(tx, clusterId)
if err != nil {
return nil, err
}
nodeId, err := models.SharedNodeDAO.CreateNode(tx, adminId, req.Name, clusterId, 0, 0)
if err != nil {
return nil, err
}
err = models.SharedNodeDAO.UpdateNodeIsInstalled(tx, nodeId, true)
if err != nil {
return nil, err
}
node, err := models.SharedNodeDAO.FindEnabledNode(tx, nodeId)
if err != nil {
return nil, err
}
if node == nil {
return nil, errors.New("can not find node after creating")
}
// 获取集群可以使用的所有API节点
apiAddrs, err := models.SharedNodeClusterDAO.FindAllAPINodeAddrsWithCluster(tx, clusterId)
if err != nil {
return nil, err
}
return &pb.RegisterClusterNodeResponse{
UniqueId: node.UniqueId,
Secret: node.Secret,
Endpoints: apiAddrs,
}, nil
}
// CountAllEnabledNodes 计算节点数量
func (this *NodeService) CountAllEnabledNodes(ctx context.Context, req *pb.CountAllEnabledNodesRequest) (*pb.RPCCountResponse, error) {
// 校验请求
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
count, err := models.SharedNodeDAO.CountAllEnabledNodes(tx)
if err != nil {
return nil, err
}
return this.SuccessCount(count)
}
// CountAllEnabledNodesMatch 计算匹配的节点数量
func (this *NodeService) CountAllEnabledNodesMatch(ctx context.Context, req *pb.CountAllEnabledNodesMatchRequest) (*pb.RPCCountResponse, error) {
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
count, err := models.SharedNodeDAO.CountAllEnabledNodesMatch(tx, req.NodeClusterId, configutils.ToBoolState(req.InstallState), configutils.ToBoolState(req.ActiveState), req.Keyword, req.NodeGroupId, req.NodeRegionId, req.Level, true)
if err != nil {
return nil, err
}
return this.SuccessCount(count)
}
// ListEnabledNodesMatch 列出单页的节点
func (this *NodeService) ListEnabledNodesMatch(ctx context.Context, req *pb.ListEnabledNodesMatchRequest) (*pb.ListEnabledNodesMatchResponse, error) {
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
var dnsDomainId = int64(0)
var domainRoutes = []*dnstypes.Route{}
if req.NodeClusterId > 0 {
clusterDNS, err := models.SharedNodeClusterDAO.FindClusterDNSInfo(tx, req.NodeClusterId, nil)
if err != nil {
return nil, err
}
if clusterDNS != nil {
dnsDomainId = int64(clusterDNS.DnsDomainId)
if clusterDNS.DnsDomainId > 0 {
domainRoutes, err = dns.SharedDNSDomainDAO.FindDomainRoutes(tx, dnsDomainId)
if err != nil {
return nil, err
}
}
}
}
// 排序
var order = ""
if req.CpuAsc {
order = "cpuAsc"
} else if req.CpuDesc {
order = "cpuDesc"
} else if req.MemoryAsc {
order = "memoryAsc"
} else if req.MemoryDesc {
order = "memoryDesc"
} else if req.TrafficInAsc {
order = "trafficInAsc"
} else if req.TrafficInDesc {
order = "trafficInDesc"
} else if req.TrafficOutAsc {
order = "trafficOutAsc"
} else if req.TrafficOutDesc {
order = "trafficOutDesc"
} else if req.LoadAsc {
order = "loadAsc"
} else if req.LoadDesc {
order = "loadDesc"
} else if req.ConnectionsAsc {
order = "connectionsAsc"
} else if req.ConnectionsDesc {
order = "connectionsDesc"
}
nodes, err := models.SharedNodeDAO.ListEnabledNodesMatch(tx, req.NodeClusterId, configutils.ToBoolState(req.InstallState), configutils.ToBoolState(req.ActiveState), req.Keyword, req.NodeGroupId, req.NodeRegionId, req.Level, true, order, req.Offset, req.Size)
if err != nil {
return nil, err
}
var result = []*pb.Node{}
var cacheMap = utils.NewCacheMap()
for _, node := range nodes {
// 主集群信息
clusterName, err := models.SharedNodeClusterDAO.FindNodeClusterName(tx, int64(node.ClusterId))
if err != nil {
return nil, err
}
// 从集群
secondaryClusters, err := models.SharedNodeClusterDAO.FindEnabledNodeClustersWithIds(tx, node.DecodeSecondaryClusterIds())
if err != nil {
return nil, err
}
var pbSecondaryClusters = []*pb.NodeCluster{}
for _, secondaryCluster := range secondaryClusters {
pbSecondaryClusters = append(pbSecondaryClusters, &pb.NodeCluster{
Id: int64(secondaryCluster.Id),
IsOn: secondaryCluster.IsOn,
Name: secondaryCluster.Name,
})
}
// 安装信息
installStatus, err := node.DecodeInstallStatus()
if err != nil {
return nil, err
}
installStatusResult := &pb.NodeInstallStatus{}
if installStatus != nil {
installStatusResult = &pb.NodeInstallStatus{
IsRunning: installStatus.IsRunning,
IsFinished: installStatus.IsFinished,
IsOk: installStatus.IsOk,
Error: installStatus.Error,
ErrorCode: installStatus.ErrorCode,
UpdatedAt: installStatus.UpdatedAt,
}
}
// 分组信息
var pbGroup *pb.NodeGroup = nil
if node.GroupId > 0 {
group, err := models.SharedNodeGroupDAO.FindEnabledNodeGroup(tx, int64(node.GroupId))
if err != nil {
return nil, err
}
if group != nil {
pbGroup = &pb.NodeGroup{
Id: int64(group.Id),
Name: group.Name,
}
}
}
// DNS线路
var pbRoutes = []*pb.DNSRoute{}
if dnsDomainId > 0 {
routeCodes, err := node.DNSRouteCodesForDomainId(dnsDomainId)
if err != nil {
return nil, err
}
for _, routeCode := range routeCodes {
for _, route := range domainRoutes {
if route.Code == routeCode {
pbRoutes = append(pbRoutes, &pb.DNSRoute{
Name: route.Name,
Code: route.Code,
})
break
}
}
}
} else if req.NodeClusterId == 0 {
var clusterDomainIds = []int64{}
for _, clusterId := range node.AllClusterIds() {
clusterDNSInfo, err := models.SharedNodeClusterDAO.FindClusterDNSInfo(tx, clusterId, cacheMap)
if err != nil {
return nil, err
}
if clusterDNSInfo != nil && clusterDNSInfo.DnsDomainId > 0 {
clusterDomainIds = append(clusterDomainIds, int64(clusterDNSInfo.DnsDomainId))
}
}
for domainId, routeCodes := range node.DNSRouteCodes() {
if domainId == 0 {
continue
}
if !lists.ContainsInt64(clusterDomainIds, domainId) {
continue
}
for _, routeCode := range routeCodes {
routeName, err := dns.SharedDNSDomainDAO.FindDomainRouteName(tx, domainId, routeCode)
if err != nil {
return nil, err
}
if len(routeName) > 0 {
pbRoutes = append(pbRoutes, &pb.DNSRoute{
Name: routeName,
Code: routeCode,
})
}
}
}
}
// 区域
var pbRegion *pb.NodeRegion = nil
if node.RegionId > 0 {
region, err := models.SharedNodeRegionDAO.FindEnabledNodeRegion(tx, int64(node.RegionId))
if err != nil {
return nil, err
}
if region != nil {
pbRegion = &pb.NodeRegion{
Id: int64(region.Id),
IsOn: region.IsOn,
Name: region.Name,
}
}
}
// 状态
statusJSON, err := models.SharedNodeValueDAO.ComposeNodeStatusJSON(tx, nodeconfigs.NodeRoleNode, int64(node.Id), node.Status)
if err != nil {
return nil, err
}
result = append(result, &pb.Node{
Id: int64(node.Id),
Name: node.Name,
Version: int64(node.Version),
IsInstalled: node.IsInstalled,
StatusJSON: statusJSON,
NodeCluster: &pb.NodeCluster{
Id: int64(node.ClusterId),
Name: clusterName,
},
SecondaryNodeClusters: pbSecondaryClusters,
InstallStatus: installStatusResult,
MaxCPU: types.Int32(node.MaxCPU),
IsOn: node.IsOn,
IsUp: node.IsUp,
NodeGroup: pbGroup,
NodeRegion: pbRegion,
DnsRoutes: pbRoutes,
Level: int32(node.Level),
OfflineDay: node.OfflineDay,
IsBackupForCluster: node.IsBackupForCluster,
IsBackupForGroup: node.IsBackupForGroup,
})
}
return &pb.ListEnabledNodesMatchResponse{
Nodes: result,
}, nil
}
// FindAllEnabledNodesWithNodeClusterId 查找一个集群下的所有节点
func (this *NodeService) FindAllEnabledNodesWithNodeClusterId(ctx context.Context, req *pb.FindAllEnabledNodesWithNodeClusterIdRequest) (*pb.FindAllEnabledNodesWithNodeClusterIdResponse, error) {
_, userId, err := this.ValidateAdminAndUser(ctx, false)
if err != nil {
return nil, err
}
if userId > 0 {
// TODO 检查权限
}
var tx = this.NullTx()
nodes, err := models.SharedNodeDAO.FindAllEnabledNodesWithClusterId(tx, req.NodeClusterId, req.IncludeSecondary)
if err != nil {
return nil, err
}
result := []*pb.Node{}
for _, node := range nodes {
apiNodeIds := []int64{}
if models.IsNotNull(node.ConnectedAPINodes) {
err = json.Unmarshal(node.ConnectedAPINodes, &apiNodeIds)
if err != nil {
return nil, err
}
}
result = append(result, &pb.Node{
Id: int64(node.Id),
Name: node.Name,
UniqueId: node.UniqueId,
Secret: node.Secret,
ConnectedAPINodeIds: apiNodeIds,
MaxCPU: types.Int32(node.MaxCPU),
IsOn: node.IsOn,
})
}
return &pb.FindAllEnabledNodesWithNodeClusterIdResponse{Nodes: result}, nil
}
// DeleteNode 删除节点
func (this *NodeService) DeleteNode(ctx context.Context, req *pb.DeleteNodeRequest) (*pb.RPCSuccess, error) {
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
err = models.SharedNodeDAO.DisableNode(tx, req.NodeId)
if err != nil {
return nil, err
}
// 删除节点相关任务
err = models.SharedNodeTaskDAO.DeleteNodeTasks(tx, nodeconfigs.NodeRoleNode, req.NodeId)
if err != nil {
return nil, err
}
return this.Success()
}
// DeleteNodeFromNodeCluster 从集群中删除节点
func (this *NodeService) DeleteNodeFromNodeCluster(ctx context.Context, req *pb.DeleteNodeFromNodeClusterRequest) (*pb.RPCSuccess, error) {
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
err = models.SharedNodeDAO.DeleteNodeFromCluster(tx, req.NodeId, req.NodeClusterId)
if err != nil {
return nil, err
}
return this.Success()
}
// UpdateNode 修改节点
func (this *NodeService) UpdateNode(ctx context.Context, req *pb.UpdateNodeRequest) (*pb.RPCSuccess, error) {
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
err = models.SharedNodeDAO.UpdateNode(tx, req.NodeId, req.Name, req.NodeClusterId, req.SecondaryNodeClusterIds, req.NodeGroupId, req.NodeRegionId, req.IsOn, int(req.Level), req.LnAddrs, req.EnableIPLists)
if err != nil {
return nil, err
}
return this.Success()
}
// FindEnabledNode 查询单个节点信息
func (this *NodeService) FindEnabledNode(ctx context.Context, req *pb.FindEnabledNodeRequest) (*pb.FindEnabledNodeResponse, error) {
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
node, err := models.SharedNodeDAO.FindEnabledNode(tx, req.NodeId)
if err != nil {
return nil, err
}
if node == nil {
return &pb.FindEnabledNodeResponse{Node: nil}, nil
}
// 主集群信息
clusterName, err := models.SharedNodeClusterDAO.FindNodeClusterName(tx, int64(node.ClusterId))
if err != nil {
return nil, err
}
var clusterIds = []int64{int64(node.ClusterId)}
// 从集群信息
var secondaryPBClusters []*pb.NodeCluster
for _, secondaryClusterId := range node.DecodeSecondaryClusterIds() {
cluster, err := models.SharedNodeClusterDAO.FindEnabledNodeCluster(tx, secondaryClusterId)
if err != nil {
return nil, err
}
if cluster == nil {
continue
}
secondaryPBClusters = append(secondaryPBClusters, &pb.NodeCluster{
Id: int64(cluster.Id),
IsOn: cluster.IsOn,
Name: cluster.Name,
})
clusterIds = append(clusterIds, int64(cluster.Id))
}
// 认证信息
login, err := models.SharedNodeLoginDAO.FindEnabledNodeLoginWithNodeId(tx, nodeconfigs.NodeRoleNode, req.NodeId)
if err != nil {
return nil, err
}
var respLogin *pb.NodeLogin = nil
if login != nil {
respLogin = &pb.NodeLogin{
Id: int64(login.Id),
Name: login.Name,
Type: login.Type,
Params: login.Params,
}
}
// 安装信息
installStatus, err := node.DecodeInstallStatus()
if err != nil {
return nil, err
}
var installStatusResult = &pb.NodeInstallStatus{}
if installStatus != nil {
installStatusResult = &pb.NodeInstallStatus{
IsRunning: installStatus.IsRunning,
IsFinished: installStatus.IsFinished,
IsOk: installStatus.IsOk,
Error: installStatus.Error,
ErrorCode: installStatus.ErrorCode,
UpdatedAt: installStatus.UpdatedAt,
}
}
// 分组信息
var pbGroup *pb.NodeGroup = nil
if node.GroupId > 0 {
group, err := models.SharedNodeGroupDAO.FindEnabledNodeGroup(tx, int64(node.GroupId))
if err != nil {
return nil, err
}
if group != nil {
pbGroup = &pb.NodeGroup{
Id: int64(group.Id),
Name: group.Name,
}
}
}
// 区域
var pbRegion *pb.NodeRegion = nil
if node.RegionId > 0 {
region, err := models.SharedNodeRegionDAO.FindEnabledNodeRegion(tx, int64(node.RegionId))
if err != nil {
return nil, err
}
if region != nil {
pbRegion = &pb.NodeRegion{
Id: int64(region.Id),
IsOn: region.IsOn,
Name: region.Name,
}
}
}
// 最大硬盘容量
var pbMaxCacheDiskCapacity *pb.SizeCapacity
if models.IsNotNull(node.MaxCacheDiskCapacity) {
pbMaxCacheDiskCapacity = &pb.SizeCapacity{}
err = json.Unmarshal(node.MaxCacheDiskCapacity, pbMaxCacheDiskCapacity)
if err != nil {
return nil, err
}
}
// 最大内存容量
var pbMaxCacheMemoryCapacity *pb.SizeCapacity
if models.IsNotNull(node.MaxCacheMemoryCapacity) {
pbMaxCacheMemoryCapacity = &pb.SizeCapacity{}
err = json.Unmarshal(node.MaxCacheMemoryCapacity, pbMaxCacheMemoryCapacity)
if err != nil {
return nil, err
}
}
// 线路
var pbRoutes = []*pb.DNSRoute{}
var clusterDomainIds = []int64{}
for _, clusterId := range node.AllClusterIds() {
clusterDNSInfo, err := models.SharedNodeClusterDAO.FindClusterDNSInfo(tx, clusterId, nil)
if err != nil {
return nil, err
}
if clusterDNSInfo != nil && clusterDNSInfo.DnsDomainId > 0 {
clusterDomainIds = append(clusterDomainIds, int64(clusterDNSInfo.DnsDomainId))
}
}
for domainId, routeCodes := range node.DNSRouteCodes() {
if domainId == 0 {
continue
}
if !lists.ContainsInt64(clusterDomainIds, domainId) {
continue
}
for _, routeCode := range routeCodes {
routeName, err := dns.SharedDNSDomainDAO.FindDomainRouteName(tx, domainId, routeCode)
if err != nil {
return nil, err
}
if len(routeName) > 0 {
pbRoutes = append(pbRoutes, &pb.DNSRoute{
Name: routeName,
Code: routeCode,
})
}
}
}
// 监控状态
statusJSON, err := models.SharedNodeValueDAO.ComposeNodeStatusJSON(tx, nodeconfigs.NodeRoleNode, int64(node.Id), node.Status)
if err != nil {
return nil, err
}
return &pb.FindEnabledNodeResponse{Node: &pb.Node{
Id: int64(node.Id),
Name: node.Name,
StatusJSON: statusJSON,
UniqueId: node.UniqueId,
Version: int64(node.Version),
LatestVersion: int64(node.LatestVersion),
Secret: node.Secret,
InstallDir: node.InstallDir,
IsInstalled: node.IsInstalled,
NodeCluster: &pb.NodeCluster{
Id: int64(node.ClusterId),
Name: clusterName,
},
SecondaryNodeClusters: secondaryPBClusters,
NodeLogin: respLogin,
InstallStatus: installStatusResult,
MaxCPU: types.Int32(node.MaxCPU),
IsOn: node.IsOn,
IsUp: node.IsUp,
NodeGroup: pbGroup,
NodeRegion: pbRegion,
MaxCacheDiskCapacity: pbMaxCacheDiskCapacity,
MaxCacheMemoryCapacity: pbMaxCacheMemoryCapacity,
CacheDiskDir: node.CacheDiskDir,
CacheDiskSubDirsJSON: node.CacheDiskSubDirs,
Level: int32(node.Level),
LnAddrs: node.DecodeLnAddrs(),
DnsRoutes: pbRoutes,
EnableIPLists: node.EnableIPLists,
ApiNodeAddrsJSON: node.ApiNodeAddrs,
OfflineDay: node.OfflineDay,
IsBackupForCluster: node.IsBackupForCluster,
IsBackupForGroup: node.IsBackupForGroup,
}}, nil
}
// FindEnabledBasicNode 获取单个节点基本信息
func (this *NodeService) FindEnabledBasicNode(ctx context.Context, req *pb.FindEnabledBasicNodeRequest) (*pb.FindEnabledBasicNodeResponse, error) {
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
node, err := models.SharedNodeDAO.FindEnabledBasicNode(tx, req.NodeId)
if err != nil {
return nil, err
}
if node == nil {
return &pb.FindEnabledBasicNodeResponse{Node: nil}, nil
}
clusterName, err := models.SharedNodeClusterDAO.FindNodeClusterName(tx, int64(node.ClusterId))
if err != nil {
return nil, err
}
return &pb.FindEnabledBasicNodeResponse{Node: &pb.BasicNode{
Id: int64(node.Id),
Name: node.Name,
IsOn: node.IsOn,
IsUp: node.IsUp,
Level: int32(node.Level),
NodeCluster: &pb.NodeCluster{
Id: int64(node.ClusterId),
Name: clusterName,
},
}}, nil
}
// FindCurrentNodeConfig 组合节点配置
func (this *NodeService) FindCurrentNodeConfig(ctx context.Context, req *pb.FindCurrentNodeConfigRequest) (*pb.FindCurrentNodeConfigResponse, error) {
// 校验节点
_, _, nodeId, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeNode)
if err != nil {
return nil, err
}
var tx = this.NullTx()
// 检查版本号
currentVersion, err := models.SharedNodeDAO.FindNodeVersion(tx, nodeId)
if err != nil {
return nil, err
}
if currentVersion == req.Version {
return &pb.FindCurrentNodeConfigResponse{IsChanged: false}, nil
}
clusterId, err := models.SharedNodeDAO.FindNodeClusterId(tx, nodeId)
if err != nil {
return nil, err
}
var cacheMap = this.findClusterCacheMap(clusterId, req.NodeTaskVersion)
var dataMap *shared.DataMap
if req.UseDataMap {
// 是否有共用的
if cacheMap != nil {
cachedDataMap, ok := cacheMap.Get("DataMap")
if ok {
dataMap = cachedDataMap.(*shared.DataMap)
}
}
if dataMap == nil {
dataMap = shared.NewDataMap()
}
} else {
// 如果没有使用DataMap但是获取的缓存是有DataMap的需要重新获取
_, ok := cacheMap.Get("DataMap")
if ok {
cacheMap = nil
}
}
nodeConfig, err := models.SharedNodeDAO.ComposeNodeConfig(tx, nodeId, dataMap, cacheMap)
if err != nil {
return nil, err
}
// 压缩
var data []byte
var isCompressed = false
var buffer = &bytes.Buffer{}
var writer io.Writer = buffer
var brotliWriter *brotli.Writer
if req.Compress {
brotliWriter = brotli.NewWriterLevel(writer, 5)
writer = brotliWriter
}
var encoder = json.NewEncoder(writer)
err = encoder.Encode(nodeConfig)
if err != nil {
return nil, err
}
if brotliWriter != nil {
err = brotliWriter.Close()
if err == nil {
data = buffer.Bytes()
isCompressed = true
} else {
// 如果失败,则使用最直接方法重新编码
data, err = json.Marshal(nodeConfig)
if err != nil {
return nil, err
}
}
} else {
data = buffer.Bytes()
}
buffer.Reset()
return &pb.FindCurrentNodeConfigResponse{
IsChanged: true,
NodeJSON: data,
DataSize: int64(len(data)),
IsCompressed: isCompressed,
Timestamp: time.Now().Unix(),
}, nil
}
// UpdateNodeStatus 更新节点状态
func (this *NodeService) UpdateNodeStatus(ctx context.Context, req *pb.UpdateNodeStatusRequest) (*pb.RPCSuccess, error) {
// 校验节点
nodeId, err := this.ValidateNode(ctx)
if err != nil {
return nil, err
}
if req.NodeId > 0 {
nodeId = req.NodeId
}
if nodeId <= 0 {
return nil, errors.New("'nodeId' should be greater than 0")
}
var tx = this.NullTx()
// 修改时间戳
var nodeStatus = &nodeconfigs.NodeStatus{}
err = json.Unmarshal(req.StatusJSON, nodeStatus)
if err != nil {
return nil, errors.New("decode node status json failed: " + err.Error())
}
nodeStatus.UpdatedAt = time.Now().Unix()
// 保存
err = models.SharedNodeDAO.UpdateNodeStatus(tx, nodeId, nodeStatus)
if err != nil {
return nil, err
}
return this.Success()
}
// UpdateNodeIsInstalled 修改节点安装状态
func (this *NodeService) UpdateNodeIsInstalled(ctx context.Context, req *pb.UpdateNodeIsInstalledRequest) (*pb.RPCSuccess, error) {
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
err = models.SharedNodeDAO.UpdateNodeIsInstalled(tx, req.NodeId, req.IsInstalled)
if err != nil {
return nil, err
}
return this.Success()
}
// InstallNode 安装节点
func (this *NodeService) InstallNode(ctx context.Context, req *pb.InstallNodeRequest) (*pb.InstallNodeResponse, error) {
// 校验节点
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
goman.New(func() {
err = installers.SharedNodeQueue().InstallNodeProcess(req.NodeId, false)
if err != nil {
remotelogs.Error("NODE_SERVICE", "install node failed:"+err.Error())
}
})
return &pb.InstallNodeResponse{}, nil
}
// UpgradeNode 升级节点
func (this *NodeService) UpgradeNode(ctx context.Context, req *pb.UpgradeNodeRequest) (*pb.UpgradeNodeResponse, error) {
// 校验节点
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
err = models.SharedNodeDAO.UpdateNodeIsInstalled(tx, req.NodeId, false)
if err != nil {
return nil, err
}
// 检查状态
installStatus, err := models.SharedNodeDAO.FindNodeInstallStatus(tx, req.NodeId)
if err != nil {
return nil, err
}
if installStatus == nil {
installStatus = &models.NodeInstallStatus{}
}
installStatus.IsOk = false
installStatus.IsFinished = false
err = models.SharedNodeDAO.UpdateNodeInstallStatus(tx, req.NodeId, installStatus)
if err != nil {
return nil, err
}
goman.New(func() {
err = installers.SharedNodeQueue().InstallNodeProcess(req.NodeId, true)
if err != nil {
remotelogs.Error("NODE_SERVICE", "install node:"+err.Error())
}
})
return &pb.UpgradeNodeResponse{}, nil
}
// StartNode 启动节点
func (this *NodeService) StartNode(ctx context.Context, req *pb.StartNodeRequest) (*pb.StartNodeResponse, error) {
// 校验节点
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
err = installers.SharedNodeQueue().StartNode(req.NodeId)
if err != nil {
return &pb.StartNodeResponse{
IsOk: false,
Error: err.Error(),
}, nil
}
return &pb.StartNodeResponse{IsOk: true}, nil
}
// StopNode 停止节点
func (this *NodeService) StopNode(ctx context.Context, req *pb.StopNodeRequest) (*pb.StopNodeResponse, error) {
// 校验节点
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
err = installers.SharedNodeQueue().StopNode(req.NodeId)
if err != nil {
return &pb.StopNodeResponse{
IsOk: false,
Error: err.Error(),
}, nil
}
return &pb.StopNodeResponse{IsOk: true}, nil
}
// UninstallNode 卸载节点
func (this *NodeService) UninstallNode(ctx context.Context, req *pb.UninstallNodeRequest) (*pb.UninstallNodeResponse, error) {
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
err = installers.SharedNodeQueue().UninstallNode(req.NodeId)
if err != nil {
return &pb.UninstallNodeResponse{
IsOk: false,
Error: err.Error(),
}, nil
}
// 修改为未安装
var tx = this.NullTx()
err = models.SharedNodeDAO.UpdateNodeIsInstalled(tx, req.NodeId, false)
if err != nil {
return nil, err
}
return &pb.UninstallNodeResponse{IsOk: true}, nil
}
// UpdateNodeConnectedAPINodes 更改节点连接的API节点信息
func (this *NodeService) UpdateNodeConnectedAPINodes(ctx context.Context, req *pb.UpdateNodeConnectedAPINodesRequest) (*pb.RPCSuccess, error) {
// 校验节点
_, _, nodeId, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeNode)
if err != nil {
return nil, err
}
var tx = this.NullTx()
err = models.SharedNodeDAO.UpdateNodeConnectedAPINodes(tx, nodeId, req.ApiNodeIds)
if err != nil {
return nil, errors.Wrap(err)
}
return this.Success()
}
// CountAllEnabledNodesWithNodeGrantId 计算使用某个认证的节点数量
func (this *NodeService) CountAllEnabledNodesWithNodeGrantId(ctx context.Context, req *pb.CountAllEnabledNodesWithNodeGrantIdRequest) (*pb.RPCCountResponse, error) {
// 校验请求
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
count, err := models.SharedNodeDAO.CountAllEnabledNodesWithGrantId(tx, req.NodeGrantId)
if err != nil {
return nil, err
}
return this.SuccessCount(count)
}
// FindAllEnabledNodesWithNodeGrantId 查找使用某个认证的所有节点
func (this *NodeService) FindAllEnabledNodesWithNodeGrantId(ctx context.Context, req *pb.FindAllEnabledNodesWithNodeGrantIdRequest) (*pb.FindAllEnabledNodesWithNodeGrantIdResponse, error) {
// 校验请求
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
nodes, err := models.SharedNodeDAO.FindAllEnabledNodesWithGrantId(tx, req.NodeGrantId)
if err != nil {
return nil, err
}
result := []*pb.Node{}
for _, node := range nodes {
// 集群信息
clusterName, err := models.SharedNodeClusterDAO.FindNodeClusterName(tx, int64(node.ClusterId))
if err != nil {
return nil, err
}
result = append(result, &pb.Node{
Id: int64(node.Id),
Name: node.Name,
Version: int64(node.Version),
IsInstalled: node.IsInstalled,
StatusJSON: node.Status,
NodeCluster: &pb.NodeCluster{
Id: int64(node.ClusterId),
Name: clusterName,
},
IsOn: node.IsOn,
})
}
return &pb.FindAllEnabledNodesWithNodeGrantIdResponse{Nodes: result}, nil
}
// CountAllNotInstalledNodesWithNodeClusterId 计算没有安装的节点数量
func (this *NodeService) CountAllNotInstalledNodesWithNodeClusterId(ctx context.Context, req *pb.CountAllNotInstalledNodesWithNodeClusterIdRequest) (*pb.RPCCountResponse, error) {
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
count, err := models.SharedNodeDAO.CountAllNotInstalledNodesWithClusterId(tx, req.NodeClusterId)
if err != nil {
return nil, err
}
return this.SuccessCount(count)
}
// FindAllNotInstalledNodesWithNodeClusterId 列出所有未安装的节点
func (this *NodeService) FindAllNotInstalledNodesWithNodeClusterId(ctx context.Context, req *pb.FindAllNotInstalledNodesWithNodeClusterIdRequest) (*pb.FindAllNotInstalledNodesWithNodeClusterIdResponse, error) {
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
nodes, err := models.SharedNodeDAO.FindAllNotInstalledNodesWithClusterId(tx, req.NodeClusterId)
if err != nil {
return nil, err
}
result := []*pb.Node{}
for _, node := range nodes {
// 认证信息
login, err := models.SharedNodeLoginDAO.FindEnabledNodeLoginWithNodeId(tx, nodeconfigs.NodeRoleNode, int64(node.Id))
if err != nil {
return nil, err
}
var pbLogin *pb.NodeLogin = nil
if login != nil {
pbLogin = &pb.NodeLogin{
Id: int64(login.Id),
Name: login.Name,
Type: login.Type,
Params: login.Params,
}
}
// IP信息
addresses, err := models.SharedNodeIPAddressDAO.FindAllEnabledAddressesWithNode(tx, int64(node.Id), nodeconfigs.NodeRoleNode)
if err != nil {
return nil, err
}
pbAddresses := []*pb.NodeIPAddress{}
for _, address := range addresses {
pbAddresses = append(pbAddresses, &pb.NodeIPAddress{
Id: int64(address.Id),
NodeId: int64(address.NodeId),
Name: address.Name,
Ip: address.Ip,
Description: address.Description,
State: int64(address.State),
Order: int64(address.Order),
CanAccess: address.CanAccess,
})
}
// 安装信息
installStatus, err := node.DecodeInstallStatus()
if err != nil {
return nil, err
}
pbInstallStatus := &pb.NodeInstallStatus{}
if installStatus != nil {
pbInstallStatus = &pb.NodeInstallStatus{
IsRunning: installStatus.IsRunning,
IsFinished: installStatus.IsFinished,
IsOk: installStatus.IsOk,
Error: installStatus.Error,
ErrorCode: installStatus.ErrorCode,
UpdatedAt: installStatus.UpdatedAt,
}
}
result = append(result, &pb.Node{
Id: int64(node.Id),
Name: node.Name,
Version: int64(node.Version),
IsInstalled: node.IsInstalled,
StatusJSON: node.Status,
IsOn: node.IsOn,
NodeLogin: pbLogin,
IpAddresses: pbAddresses,
InstallStatus: pbInstallStatus,
})
}
return &pb.FindAllNotInstalledNodesWithNodeClusterIdResponse{Nodes: result}, nil
}
// CountAllUpgradeNodesWithNodeClusterId 计算需要升级的节点数量
func (this *NodeService) CountAllUpgradeNodesWithNodeClusterId(ctx context.Context, req *pb.CountAllUpgradeNodesWithNodeClusterIdRequest) (*pb.RPCCountResponse, error) {
// 校验请求
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
var deployFiles = installers.SharedDeployManager.LoadNodeFiles()
total := int64(0)
for _, deployFile := range deployFiles {
count, err := models.SharedNodeDAO.CountAllLowerVersionNodesWithClusterId(tx, req.NodeClusterId, deployFile.OS, deployFile.Arch, deployFile.Version)
if err != nil {
return nil, err
}
total += count
}
return this.SuccessCount(total)
}
// FindAllUpgradeNodesWithNodeClusterId 列出所有需要升级的节点
func (this *NodeService) FindAllUpgradeNodesWithNodeClusterId(ctx context.Context, req *pb.FindAllUpgradeNodesWithNodeClusterIdRequest) (*pb.FindAllUpgradeNodesWithNodeClusterIdResponse, error) {
// 校验请求
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
// 获取当前能升级到的最新版本
deployFiles := installers.SharedDeployManager.LoadNodeFiles()
result := []*pb.FindAllUpgradeNodesWithNodeClusterIdResponse_NodeUpgrade{}
for _, deployFile := range deployFiles {
nodes, err := models.SharedNodeDAO.FindAllLowerVersionNodesWithClusterId(tx, req.NodeClusterId, deployFile.OS, deployFile.Arch, deployFile.Version)
if err != nil {
return nil, err
}
for _, node := range nodes {
// 认证信息
login, err := models.SharedNodeLoginDAO.FindEnabledNodeLoginWithNodeId(tx, nodeconfigs.NodeRoleNode, int64(node.Id))
if err != nil {
return nil, err
}
var pbLogin *pb.NodeLogin = nil
if login != nil {
pbLogin = &pb.NodeLogin{
Id: int64(login.Id),
Name: login.Name,
Type: login.Type,
Params: login.Params,
}
}
// IP信息
addresses, err := models.SharedNodeIPAddressDAO.FindAllEnabledAddressesWithNode(tx, int64(node.Id), nodeconfigs.NodeRoleNode)
if err != nil {
return nil, err
}
pbAddresses := []*pb.NodeIPAddress{}
for _, address := range addresses {
pbAddresses = append(pbAddresses, &pb.NodeIPAddress{
Id: int64(address.Id),
NodeId: int64(address.NodeId),
Name: address.Name,
Ip: address.Ip,
Description: address.Description,
State: int64(address.State),
Order: int64(address.Order),
CanAccess: address.CanAccess,
})
}
// 状态
status, err := node.DecodeStatus()
if err != nil {
return nil, err
}
if status == nil {
continue
}
// 安装信息
installStatus, err := node.DecodeInstallStatus()
if err != nil {
return nil, err
}
pbInstallStatus := &pb.NodeInstallStatus{}
if installStatus != nil {
pbInstallStatus = &pb.NodeInstallStatus{
IsRunning: installStatus.IsRunning,
IsFinished: installStatus.IsFinished,
IsOk: installStatus.IsOk,
Error: installStatus.Error,
ErrorCode: installStatus.ErrorCode,
UpdatedAt: installStatus.UpdatedAt,
}
}
pbNode := &pb.Node{
Id: int64(node.Id),
Name: node.Name,
Version: int64(node.Version),
IsInstalled: node.IsInstalled,
StatusJSON: node.Status,
IsOn: node.IsOn,
IpAddresses: pbAddresses,
NodeLogin: pbLogin,
InstallStatus: pbInstallStatus,
}
result = append(result, &pb.FindAllUpgradeNodesWithNodeClusterIdResponse_NodeUpgrade{
Os: deployFile.OS,
Arch: deployFile.Arch,
OldVersion: status.BuildVersion,
NewVersion: deployFile.Version,
Node: pbNode,
})
}
}
return &pb.FindAllUpgradeNodesWithNodeClusterIdResponse{
Nodes: result,
}, nil
}
// FindNodeInstallStatus 读取节点安装状态
func (this *NodeService) FindNodeInstallStatus(ctx context.Context, req *pb.FindNodeInstallStatusRequest) (*pb.FindNodeInstallStatusResponse, error) {
// 校验请求
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
installStatus, err := models.SharedNodeDAO.FindNodeInstallStatus(tx, req.NodeId)
if err != nil {
return nil, err
}
if installStatus == nil {
return &pb.FindNodeInstallStatusResponse{InstallStatus: nil}, nil
}
pbInstallStatus := &pb.NodeInstallStatus{
IsRunning: installStatus.IsRunning,
IsFinished: installStatus.IsFinished,
IsOk: installStatus.IsOk,
Error: installStatus.Error,
ErrorCode: installStatus.ErrorCode,
UpdatedAt: installStatus.UpdatedAt,
}
return &pb.FindNodeInstallStatusResponse{InstallStatus: pbInstallStatus}, nil
}
// UpdateNodeLogin 修改节点登录信息
func (this *NodeService) UpdateNodeLogin(ctx context.Context, req *pb.UpdateNodeLoginRequest) (*pb.RPCSuccess, error) {
// 校验请求
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
if req.NodeLogin.Id <= 0 {
_, err := models.SharedNodeLoginDAO.CreateNodeLogin(tx, nodeconfigs.NodeRoleNode, req.NodeId, req.NodeLogin.Name, req.NodeLogin.Type, req.NodeLogin.Params)
if err != nil {
return nil, err
}
}
err = models.SharedNodeLoginDAO.UpdateNodeLogin(tx, req.NodeLogin.Id, req.NodeLogin.Name, req.NodeLogin.Type, req.NodeLogin.Params)
return this.Success()
}
// CountAllEnabledNodesWithNodeGroupId 计算某个节点分组内的节点数量
func (this *NodeService) CountAllEnabledNodesWithNodeGroupId(ctx context.Context, req *pb.CountAllEnabledNodesWithNodeGroupIdRequest) (*pb.RPCCountResponse, error) {
// 校验请求
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
count, err := models.SharedNodeDAO.CountAllEnabledNodesWithGroupId(tx, req.NodeGroupId)
if err != nil {
return nil, err
}
return this.SuccessCount(count)
}
// FindAllEnabledNodesDNSWithNodeClusterId 取得某个集群下的所有节点
func (this *NodeService) FindAllEnabledNodesDNSWithNodeClusterId(ctx context.Context, req *pb.FindAllEnabledNodesDNSWithNodeClusterIdRequest) (*pb.FindAllEnabledNodesDNSWithNodeClusterIdResponse, error) {
// 校验请求
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
clusterDNS, err := models.SharedNodeClusterDAO.FindClusterDNSInfo(tx, req.NodeClusterId, nil)
if err != nil {
return nil, err
}
if clusterDNS == nil {
return nil, errors.New("not found clusterId '" + numberutils.FormatInt64(req.NodeClusterId) + "'")
}
var dnsConfig, _ = clusterDNS.DecodeDNSConfig()
var dnsDomainId = int64(clusterDNS.DnsDomainId)
routes, err := dns.SharedDNSDomainDAO.FindDomainRoutes(tx, dnsDomainId)
if err != nil {
return nil, err
}
nodes, err := models.SharedNodeDAO.FindAllEnabledNodesDNSWithClusterId(tx, req.NodeClusterId, true, dnsConfig != nil && dnsConfig.IncludingLnNodes, req.IsInstalled)
if err != nil {
return nil, err
}
var result = []*pb.NodeDNSInfo{}
for _, node := range nodes {
ipAddresses, err := models.SharedNodeIPAddressDAO.FindNodeAccessAndUpIPAddresses(tx, int64(node.Id), nodeconfigs.NodeRoleNode)
if err != nil {
return nil, err
}
domainRouteCodes, err := node.DNSRouteCodesForDomainId(dnsDomainId)
if err != nil {
return nil, err
}
var pbRoutes = []*pb.DNSRoute{}
for _, routeCode := range domainRouteCodes {
for _, r := range routes {
if r.Code == routeCode {
pbRoutes = append(pbRoutes, &pb.DNSRoute{
Name: r.Name,
Code: r.Code,
})
break
}
}
}
for _, ipAddress := range ipAddresses {
// 检查专属节点
if !ipAddress.IsValidInCluster(req.NodeClusterId) {
continue
}
var ip = ipAddress.DNSIP()
if len(ip) == 0 {
continue
}
if net.ParseIP(ip) == nil {
continue
}
result = append(result, &pb.NodeDNSInfo{
Id: int64(node.Id),
Name: node.Name,
IpAddr: ip,
NodeIPAddressId: int64(ipAddress.Id),
Routes: pbRoutes,
NodeClusterId: req.NodeClusterId,
IsBackupForCluster: node.IsBackupForCluster,
IsBackupForGroup: node.IsBackupForGroup,
IsOffline: node.CheckIsOffline(),
})
}
}
return &pb.FindAllEnabledNodesDNSWithNodeClusterIdResponse{Nodes: result}, nil
}
// FindEnabledNodeDNS 查找单个节点的域名解析信息
func (this *NodeService) FindEnabledNodeDNS(ctx context.Context, req *pb.FindEnabledNodeDNSRequest) (*pb.FindEnabledNodeDNSResponse, error) {
// 校验请求
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
node, err := models.SharedNodeDAO.FindEnabledNodeDNS(tx, req.NodeId)
if err != nil {
return nil, err
}
if node == nil {
return &pb.FindEnabledNodeDNSResponse{Node: nil}, nil
}
// 查询节点IP地址
var ipAddr string
var ipAddrId int64 = 0
if req.NodeIPAddrId > 0 {
address, err := models.SharedNodeIPAddressDAO.FindEnabledAddress(tx, req.NodeIPAddrId)
if err != nil {
return nil, err
}
if address != nil {
ipAddr = address.Ip
ipAddrId = int64(address.Id)
}
}
if ipAddrId == 0 {
ipAddr, ipAddrId, err = models.SharedNodeIPAddressDAO.FindFirstNodeAccessIPAddress(tx, int64(node.Id), true, nodeconfigs.NodeRoleNode)
if err != nil {
return nil, err
}
}
var clusterId = int64(node.ClusterId)
if req.NodeClusterId > 0 {
clusterId = req.NodeClusterId
}
clusterDNS, err := models.SharedNodeClusterDAO.FindClusterDNSInfo(tx, clusterId, nil)
if err != nil {
return nil, err
}
if clusterDNS == nil {
return &pb.FindEnabledNodeDNSResponse{Node: nil}, nil
}
var dnsDomainId = int64(clusterDNS.DnsDomainId)
dnsDomainName, err := dns.SharedDNSDomainDAO.FindDNSDomainName(tx, dnsDomainId)
if err != nil {
return nil, err
}
var pbRoutes = []*pb.DNSRoute{}
if dnsDomainId > 0 {
routeCodes, err := node.DNSRouteCodesForDomainId(dnsDomainId)
if err != nil {
return nil, err
}
for _, routeCode := range routeCodes {
routeName, err := dns.SharedDNSDomainDAO.FindDomainRouteName(tx, dnsDomainId, routeCode)
if err != nil {
return nil, err
}
pbRoutes = append(pbRoutes, &pb.DNSRoute{
Name: routeName,
Code: routeCode,
})
}
}
return &pb.FindEnabledNodeDNSResponse{
Node: &pb.NodeDNSInfo{
Id: int64(node.Id),
Name: node.Name,
IpAddr: ipAddr,
NodeIPAddressId: ipAddrId,
Routes: pbRoutes,
NodeClusterId: clusterId,
NodeClusterDNSName: clusterDNS.DnsName,
DnsDomainId: dnsDomainId,
DnsDomainName: dnsDomainName,
IsBackupForCluster: node.IsBackupForCluster,
IsBackupForGroup: node.IsBackupForGroup,
IsOffline: node.CheckIsOffline(),
},
}, nil
}
// UpdateNodeDNS 修改节点的DNS解析信息
func (this *NodeService) UpdateNodeDNS(ctx context.Context, req *pb.UpdateNodeDNSRequest) (*pb.RPCSuccess, error) {
// 校验请求
adminId, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
node, err := models.SharedNodeDAO.FindEnabledNodeDNS(tx, req.NodeId)
if err != nil {
return nil, err
}
if node == nil {
return nil, errors.New("node not found")
}
var routeCodeMap = node.DNSRouteCodes()
if req.DnsDomainId > 0 {
if len(req.Routes) > 0 {
var m = map[int64][]string{} // domainId => codes
for _, route := range req.Routes {
var pieces = strings.SplitN(route, "@", 2)
if len(pieces) != 2 {
continue
}
var code = pieces[0]
var domainId = types.Int64(pieces[1])
m[domainId] = append(m[domainId], code)
}
for domainId, codes := range m {
routeCodeMap[domainId] = codes
}
} else {
delete(routeCodeMap, req.DnsDomainId)
}
} else {
routeCodeMap = map[int64][]string{}
if len(req.Routes) > 0 {
var m = map[int64][]string{} // domainId => codes
for _, route := range req.Routes {
var pieces = strings.SplitN(route, "@", 2)
if len(pieces) != 2 {
continue
}
var code = pieces[0]
var domainId = types.Int64(pieces[1])
m[domainId] = append(m[domainId], code)
}
for domainId, codes := range m {
routeCodeMap[domainId] = codes
}
}
}
err = models.SharedNodeDAO.UpdateNodeDNS(tx, req.NodeId, routeCodeMap)
if err != nil {
return nil, err
}
// 修改IP
if len(req.IpAddr) > 0 {
if req.NodeIPAddressId > 0 { // 指定了IP地址ID
err = models.SharedNodeIPAddressDAO.UpdateAddressIP(tx, req.NodeIPAddressId, req.IpAddr)
if err != nil {
return nil, err
}
} else { // 没有指定IP地址ID
ipAddrId, err := models.SharedNodeIPAddressDAO.FindFirstNodeAccessIPAddressId(tx, req.NodeId, true, nodeconfigs.NodeRoleNode)
if err != nil {
return nil, err
}
if ipAddrId > 0 {
err = models.SharedNodeIPAddressDAO.UpdateAddressIP(tx, ipAddrId, req.IpAddr)
if err != nil {
return nil, err
}
} else {
_, err = models.SharedNodeIPAddressDAO.CreateAddress(tx, adminId, req.NodeId, nodeconfigs.NodeRoleNode, "DNS IP", req.IpAddr, true, true, 0, nil)
if err != nil {
return nil, err
}
}
}
}
return this.Success()
}
// CountAllEnabledNodesWithNodeRegionId 计算某个区域下的节点数量
func (this *NodeService) CountAllEnabledNodesWithNodeRegionId(ctx context.Context, req *pb.CountAllEnabledNodesWithNodeRegionIdRequest) (*pb.RPCCountResponse, error) {
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
count, err := models.SharedNodeDAO.CountAllEnabledNodesWithRegionId(tx, req.NodeRegionId)
if err != nil {
return nil, err
}
return this.SuccessCount(count)
}
// FindEnabledNodesWithIds 根据一组ID获取节点信息
func (this *NodeService) FindEnabledNodesWithIds(ctx context.Context, req *pb.FindEnabledNodesWithIdsRequest) (*pb.FindEnabledNodesWithIdsResponse, error) {
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
nodes, err := models.SharedNodeDAO.FindEnabledNodesWithIds(tx, req.NodeIds)
if err != nil {
return nil, err
}
pbNodes := []*pb.Node{}
for _, node := range nodes {
connectedAPINodeIds, err := node.DecodeConnectedAPINodeIds()
if err != nil {
return nil, err
}
pbNodes = append(pbNodes, &pb.Node{
Id: int64(node.Id),
IsOn: node.IsOn,
IsActive: node.IsActive,
ConnectedAPINodeIds: connectedAPINodeIds,
})
}
return &pb.FindEnabledNodesWithIdsResponse{Nodes: pbNodes}, nil
}
// CheckNodeLatestVersion 检查新版本
func (this *NodeService) CheckNodeLatestVersion(ctx context.Context, req *pb.CheckNodeLatestVersionRequest) (*pb.CheckNodeLatestVersionResponse, error) {
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
deployFiles := installers.SharedDeployManager.LoadNodeFiles()
for _, file := range deployFiles {
if file.OS == req.Os && file.Arch == req.Arch && stringutil.VersionCompare(file.Version, req.CurrentVersion) > 0 {
return &pb.CheckNodeLatestVersionResponse{
HasNewVersion: true,
NewVersion: file.Version,
}, nil
}
}
return &pb.CheckNodeLatestVersionResponse{HasNewVersion: false}, nil
}
// UpdateNodeUp 设置节点上线状态
func (this *NodeService) UpdateNodeUp(ctx context.Context, req *pb.UpdateNodeUpRequest) (*pb.RPCSuccess, error) {
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
err = models.SharedNodeDAO.UpdateNodeUp(tx, req.NodeId, req.IsUp)
if err != nil {
return nil, err
}
return this.Success()
}
// DownloadNodeInstallationFile 下载最新边缘节点安装文件
func (this *NodeService) DownloadNodeInstallationFile(ctx context.Context, req *pb.DownloadNodeInstallationFileRequest) (*pb.DownloadNodeInstallationFileResponse, error) {
nodeId, err := this.ValidateNode(ctx)
if err != nil {
return nil, err
}
var file = installers.SharedDeployManager.FindNodeFile(req.Os, req.Arch)
if file == nil {
return &pb.DownloadNodeInstallationFileResponse{}, nil
}
sum, err := file.Sum()
if err != nil {
return nil, err
}
data, offset, err := file.Read(req.ChunkOffset)
if err != nil && err != io.EOF {
return nil, err
}
// 增加下载速度监控
installers.SharedUpgradeLimiter.UpdateNodeBytes(nodeconfigs.NodeRoleNode, nodeId, int64(len(data)))
return &pb.DownloadNodeInstallationFileResponse{
Sum: sum,
Offset: offset,
ChunkData: data,
Version: file.Version,
Filename: filepath.Base(file.Path),
}, nil
}
// UpdateNodeSystem 修改节点系统信息
func (this *NodeService) UpdateNodeSystem(ctx context.Context, req *pb.UpdateNodeSystemRequest) (*pb.RPCSuccess, error) {
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
err = models.SharedNodeDAO.UpdateNodeSystem(tx, req.NodeId, req.MaxCPU)
if err != nil {
return nil, err
}
return this.Success()
}
// UpdateNodeCache 修改节点缓存设置
func (this *NodeService) UpdateNodeCache(ctx context.Context, req *pb.UpdateNodeCacheRequest) (*pb.RPCSuccess, error) {
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
var maxCacheDiskCapacityJSON []byte
if req.MaxCacheDiskCapacity != nil {
maxCacheDiskCapacityJSON, err = json.Marshal(&shared.SizeCapacity{
Count: req.MaxCacheDiskCapacity.Count,
Unit: req.MaxCacheDiskCapacity.Unit,
})
if err != nil {
return nil, err
}
}
var maxCacheMemoryCapacityJSON []byte
if req.MaxCacheMemoryCapacity != nil {
maxCacheMemoryCapacityJSON, err = json.Marshal(&shared.SizeCapacity{
Count: req.MaxCacheMemoryCapacity.Count,
Unit: req.MaxCacheMemoryCapacity.Unit,
})
if err != nil {
return nil, err
}
}
// cache sub dirs
var cacheSubDirs = []*serverconfigs.CacheDir{}
if len(req.CacheDiskSubDirsJSON) > 0 {
err = json.Unmarshal(req.CacheDiskSubDirsJSON, &cacheSubDirs)
if err != nil {
return nil, errors.New("decode 'cacheDiskSubDirsJSON' failed: " + err.Error())
}
}
err = models.SharedNodeDAO.UpdateNodeCache(tx, req.NodeId, maxCacheDiskCapacityJSON, maxCacheMemoryCapacityJSON, req.CacheDiskDir, cacheSubDirs)
if err != nil {
return nil, err
}
return this.Success()
}
// 获取缓存CacheMap
func (this *NodeService) findClusterCacheMap(clusterId int64, version int64) *utils.CacheMap {
nodeVersionCacheLocker.Lock()
defer nodeVersionCacheLocker.Unlock()
if version == 0 {
return utils.NewCacheMap()
}
cache, ok := nodeVersionCacheMap[clusterId]
if ok {
cacheMap, ok := cache.CacheMap[version]
if ok {
return cacheMap
}
// 清除以前版本
for v := range cache.CacheMap {
if version-v > 60*time.Second.Nanoseconds() {
delete(cache.CacheMap, v)
}
}
// 添加
cacheMap = utils.NewCacheMap()
cache.CacheMap[version] = cacheMap
return cacheMap
} else {
var cacheMap = utils.NewCacheMap()
cache = &NodeVersionCache{
CacheMap: map[int64]*utils.CacheMap{
version: cacheMap,
}}
nodeVersionCacheMap[clusterId] = cache
return cacheMap
}
}
// FindNodeLevelInfo 读取节点级别信息
func (this *NodeService) FindNodeLevelInfo(ctx context.Context, req *pb.FindNodeLevelInfoRequest) (*pb.FindNodeLevelInfoResponse, error) {
nodeId, err := this.ValidateNode(ctx)
if err != nil {
return nil, err
}
var tx *dbs.Tx
node, err := models.SharedNodeDAO.FindNodeLevelInfo(tx, nodeId)
if err != nil {
return nil, err
}
if node == nil {
return &pb.FindNodeLevelInfoResponse{}, nil
}
var result = &pb.FindNodeLevelInfoResponse{
Level: types.Int32(node.Level),
}
if node.Level == 1 {
parentNodes, err := models.SharedNodeDAO.FindParentNodeConfigs(tx, nodeId, int64(node.GroupId), node.AllClusterIds(), types.Int(node.Level))
if err != nil {
return nil, err
}
parentNodesJSON, err := json.Marshal(parentNodes)
if err != nil {
return nil, err
}
result.ParentNodesMapJSON = parentNodesJSON
}
return result, nil
}
// FindNodeDNSResolver 读取节点DNS Resolver
func (this *NodeService) FindNodeDNSResolver(ctx context.Context, req *pb.FindNodeDNSResolverRequest) (*pb.FindNodeDNSResolverResponse, error) {
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
config, err := models.SharedNodeDAO.FindNodeDNSResolver(tx, req.NodeId)
if err != nil {
return nil, err
}
configJSON, err := json.Marshal(config)
if err != nil {
return nil, err
}
return &pb.FindNodeDNSResolverResponse{
DnsResolverJSON: configJSON,
}, nil
}
// UpdateNodeDNSResolver 修改DNS Resolver
func (this *NodeService) UpdateNodeDNSResolver(ctx context.Context, req *pb.UpdateNodeDNSResolverRequest) (*pb.RPCSuccess, error) {
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
var config = nodeconfigs.DefaultDNSResolverConfig()
err = json.Unmarshal(req.DnsResolverJSON, config)
if err != nil {
return nil, err
}
err = models.SharedNodeDAO.UpdateNodeDNSResolver(tx, req.NodeId, config)
if err != nil {
return nil, err
}
return this.Success()
}
// FindNodeDDoSProtection 获取集群的DDoS设置
func (this *NodeService) FindNodeDDoSProtection(ctx context.Context, req *pb.FindNodeDDoSProtectionRequest) (*pb.FindNodeDDoSProtectionResponse, error) {
var nodeId = req.NodeId
var isFromNode = false
_, err := this.ValidateAdmin(ctx)
if err != nil {
// 检查是否来自节点
currentNodeId, err2 := this.ValidateNode(ctx)
if err2 != nil {
return nil, err
}
if nodeId > 0 && currentNodeId != nodeId {
return nil, errors.New("invalid 'nodeId'")
}
nodeId = currentNodeId
isFromNode = true
}
var tx *dbs.Tx
ddosProtection, err := models.SharedNodeDAO.FindNodeDDoSProtection(tx, nodeId)
if err != nil {
return nil, err
}
if ddosProtection == nil {
ddosProtection = ddosconfigs.DefaultProtectionConfig()
}
// 组合父级节点配置
// 只有从节点读取配置时才需要组合
if isFromNode {
clusterId, err := models.SharedNodeDAO.FindNodeClusterId(tx, nodeId)
if err != nil {
return nil, err
}
if clusterId > 0 {
clusterDDoSProtection, err := models.SharedNodeClusterDAO.FindClusterDDoSProtection(tx, clusterId)
if err != nil {
return nil, err
}
if clusterDDoSProtection == nil {
clusterDDoSProtection = ddosconfigs.DefaultProtectionConfig()
}
clusterDDoSProtection.Merge(ddosProtection)
ddosProtection = clusterDDoSProtection
}
}
ddosProtectionJSON, err := json.Marshal(ddosProtection)
if err != nil {
return nil, err
}
var result = &pb.FindNodeDDoSProtectionResponse{
DdosProtectionJSON: ddosProtectionJSON,
}
return result, nil
}
// UpdateNodeDDoSProtection 修改集群的DDoS设置
func (this *NodeService) UpdateNodeDDoSProtection(ctx context.Context, req *pb.UpdateNodeDDoSProtectionRequest) (*pb.RPCSuccess, error) {
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var ddosProtection = &ddosconfigs.ProtectionConfig{}
err = json.Unmarshal(req.DdosProtectionJSON, ddosProtection)
if err != nil {
return nil, err
}
var tx *dbs.Tx
err = models.SharedNodeDAO.UpdateNodeDDoSProtection(tx, req.NodeId, ddosProtection)
if err != nil {
return nil, err
}
return this.Success()
}
// FindNodeGlobalServerConfig 取得节点的服务全局配置
func (this *NodeService) FindNodeGlobalServerConfig(ctx context.Context, req *pb.FindNodeGlobalServerConfigRequest) (*pb.FindNodeGlobalServerConfigResponse, error) {
var nodeId = req.NodeId
_, err := this.ValidateAdmin(ctx)
if err != nil {
// 检查是否来自节点
currentNodeId, err2 := this.ValidateNode(ctx)
if err2 != nil {
return nil, err
}
if nodeId > 0 && currentNodeId != nodeId {
return nil, errors.New("invalid 'nodeId'")
}
nodeId = currentNodeId
}
var tx = this.NullTx()
clusterId, err := models.SharedNodeDAO.FindNodeClusterId(tx, nodeId)
if err != nil {
return nil, err
}
var config *serverconfigs.GlobalServerConfig
if clusterId > 0 {
config, err = models.SharedNodeClusterDAO.FindClusterGlobalServerConfig(tx, clusterId)
if err != nil {
return nil, err
}
}
if config == nil {
config = serverconfigs.NewGlobalServerConfig()
}
configJSON, err := json.Marshal(config)
if err != nil {
return nil, err
}
var result = &pb.FindNodeGlobalServerConfigResponse{
GlobalServerConfigJSON: configJSON,
}
return result, nil
}
// FindEnabledNodeConfigInfo 取得节点的配置概要信息
func (this *NodeService) FindEnabledNodeConfigInfo(ctx context.Context, req *pb.FindEnabledNodeConfigInfoRequest) (*pb.FindEnabledNodeConfigInfoResponse, error) {
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
var result = &pb.FindEnabledNodeConfigInfoResponse{}
node, err := models.SharedNodeDAO.FindEnabledNode(tx, req.NodeId)
if err != nil {
return nil, err
}
if node == nil {
// 总是返回非空
return result, nil
}
// dns
if len(node.DNSRouteCodes()) > 0 {
result.HasDNSInfo = true
}
// cache
if len(node.CacheDiskDir) > 0 {
result.HasCacheInfo = true
} else {
var diskCapacity = node.DecodeMaxCacheDiskCapacity()
var memoryCapacity = node.DecodeMaxCacheMemoryCapacity()
if (diskCapacity != nil && diskCapacity.IsNotEmpty()) || (memoryCapacity != nil && memoryCapacity.IsNotEmpty()) {
result.HasCacheInfo = true
}
}
// thresholds
countThresholds, err := models.SharedNodeThresholdDAO.CountAllEnabledThresholds(tx, nodeconfigs.NodeRoleNode, int64(node.ClusterId), req.NodeId)
if err != nil {
return nil, err
}
result.HasThresholds = countThresholds > 0
// ssh
nodeLogin, err := models.SharedNodeLoginDAO.FindEnabledNodeLoginWithNodeId(tx, nodeconfigs.NodeRoleNode, req.NodeId)
if err != nil {
return nil, err
}
if nodeLogin != nil {
sshParams, err := nodeLogin.DecodeSSHParams()
if err != nil {
return nil, err
}
if sshParams != nil {
result.HasSSH = len(sshParams.Host) > 0 || sshParams.Port > 0
}
}
// systemSettings
if node.MaxCPU > 0 {
result.HasSystemSettings = true
} else {
// dns resolver
var dnsResolverConfig = node.DecodeDNSResolver()
if dnsResolverConfig != nil {
result.HasSystemSettings = dnsResolverConfig.Type != nodeconfigs.DNSResolverTypeDefault
}
if !result.HasSystemSettings {
// api node addresses
var apiNodeAddrs = node.DecodeAPINodeAddrs()
if len(apiNodeAddrs) > 0 {
result.HasSystemSettings = true
}
}
}
// ddos protection
result.HasDDoSProtection = node.HasDDoSProtection()
// schedule
result.HasScheduleSettings = node.HasScheduleSettings()
// pages
return result, nil
}
// CountAllNodeRegionInfo 查找节点区域信息数量
func (this *NodeService) CountAllNodeRegionInfo(ctx context.Context, req *pb.CountAllNodeRegionInfoRequest) (*pb.RPCCountResponse, error) {
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
count, err := models.SharedNodeDAO.CountAllNodeRegionInfo(tx, req.NodeRegionId)
if err != nil {
return nil, err
}
return this.SuccessCount(count)
}
// ListNodeRegionInfo 列出单页节点区域信息
func (this *NodeService) ListNodeRegionInfo(ctx context.Context, req *pb.ListNodeRegionInfoRequest) (*pb.ListNodeRegionInfoResponse, error) {
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
nodes, err := models.SharedNodeDAO.ListNodeRegionInfo(tx, req.NodeRegionId, req.Offset, req.Size)
if err != nil {
return nil, err
}
var pbInfoList = []*pb.ListNodeRegionInfoResponse_Info{}
var cacheMap = utils.NewCacheMap()
for _, node := range nodes {
// region
var pbRegion *pb.NodeRegion
if node.RegionId > 0 {
region, err := models.SharedNodeRegionDAO.FindEnabledNodeRegion(tx, int64(node.RegionId))
if err != nil {
return nil, err
}
if region != nil {
pbRegion = &pb.NodeRegion{
Id: int64(region.Id),
Name: region.Name,
IsOn: region.IsOn,
}
}
}
// cluster
// 要求必须有cluster
var pbCluster *pb.NodeCluster
if node.ClusterId <= 0 {
continue
}
cluster, err := models.SharedNodeClusterDAO.FindClusterBasicInfo(tx, int64(node.ClusterId), cacheMap)
if err != nil {
return nil, err
}
if cluster == nil {
continue
}
pbCluster = &pb.NodeCluster{
Id: int64(cluster.Id),
Name: cluster.Name,
IsOn: cluster.IsOn,
}
pbInfoList = append(pbInfoList, &pb.ListNodeRegionInfoResponse_Info{
Id: int64(node.Id),
Name: node.Name,
NodeRegion: pbRegion,
NodeCluster: pbCluster,
})
}
return &pb.ListNodeRegionInfoResponse{
InfoList: pbInfoList,
}, nil
}
// UpdateNodeRegionInfo 修改节点区域信息
func (this *NodeService) UpdateNodeRegionInfo(ctx context.Context, req *pb.UpdateNodeRegionInfoRequest) (*pb.RPCSuccess, error) {
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
err = models.SharedNodeDAO.UpdateNodeRegionId(tx, req.NodeId, req.NodeRegionId)
if err != nil {
return nil, err
}
return this.Success()
}
// FindNodeAPIConfig 查找单个节点的API相关配置
func (this *NodeService) FindNodeAPIConfig(ctx context.Context, req *pb.FindNodeAPIConfigRequest) (*pb.FindNodeAPIConfigResponse, error) {
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
node, err := models.SharedNodeDAO.FindNodeAPIConfig(tx, req.NodeId)
if err != nil {
return nil, err
}
if node == nil {
return &pb.FindNodeAPIConfigResponse{
ApiNodeAddrsJSON: nil,
}, nil
}
return &pb.FindNodeAPIConfigResponse{
ApiNodeAddrsJSON: node.ApiNodeAddrs,
}, nil
}
// UpdateNodeAPIConfig 修改某个节点的API相关配置
func (this *NodeService) UpdateNodeAPIConfig(ctx context.Context, req *pb.UpdateNodeAPIConfigRequest) (*pb.RPCSuccess, error) {
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
var apiNodeAddrs = []*serverconfigs.NetworkAddressConfig{}
if len(req.ApiNodeAddrsJSON) > 0 {
err = json.Unmarshal(req.ApiNodeAddrsJSON, &apiNodeAddrs)
if err != nil {
return nil, err
}
}
err = models.SharedNodeDAO.UpdateNodeAPIConfig(tx, req.NodeId, apiNodeAddrs)
if err != nil {
return nil, err
}
return this.Success()
}
// FindNodeWebPPolicies 查找节点的WebP策略
func (this *NodeService) FindNodeWebPPolicies(ctx context.Context, req *pb.FindNodeWebPPoliciesRequest) (*pb.FindNodeWebPPoliciesResponse, error) {
nodeId, err := this.ValidateNode(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
clusterIds, err := models.SharedNodeDAO.FindEnabledAndOnNodeClusterIds(tx, nodeId)
if err != nil {
return nil, err
}
var pbPolicies = []*pb.FindNodeWebPPoliciesResponse_WebPPolicy{}
for _, clusterId := range clusterIds {
policy, err := models.SharedNodeClusterDAO.FindClusterWebPPolicy(tx, clusterId, nil)
if err != nil {
return nil, err
}
if policy == nil {
continue
}
policyJSON, err := json.Marshal(policy)
if err != nil {
return nil, err
}
pbPolicies = append(pbPolicies, &pb.FindNodeWebPPoliciesResponse_WebPPolicy{
NodeClusterId: clusterId,
WebPPolicyJSON: policyJSON,
})
}
return &pb.FindNodeWebPPoliciesResponse{
WebPPolicies: pbPolicies,
}, nil
}
// UpdateNodeIsOn 修改节点的启用状态
func (this *NodeService) UpdateNodeIsOn(ctx context.Context, req *pb.UpdateNodeIsOnRequest) (*pb.RPCSuccess, error) {
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
err = models.SharedNodeDAO.UpdateNodeIsOn(tx, req.NodeId, req.IsOn)
if err != nil {
return nil, err
}
return this.Success()
}