package services import ( "bytes" "context" "encoding/json" "github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/db/models/dns" "github.com/TeaOSLab/EdgeAPI/internal/dnsclients/dnstypes" "github.com/TeaOSLab/EdgeAPI/internal/errors" "github.com/TeaOSLab/EdgeAPI/internal/goman" "github.com/TeaOSLab/EdgeAPI/internal/installers" "github.com/TeaOSLab/EdgeAPI/internal/remotelogs" rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils" "github.com/TeaOSLab/EdgeAPI/internal/utils" "github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils" "github.com/TeaOSLab/EdgeCommon/pkg/configutils" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/ddosconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/shared" "github.com/andybalholm/brotli" "github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/lists" "github.com/iwind/TeaGo/types" stringutil "github.com/iwind/TeaGo/utils/string" "io" "net" "path/filepath" "strings" "sync" "time" ) // NodeVersionCache 节点版本缓存 type NodeVersionCache struct { CacheMap map[int64]*utils.CacheMap // version => map } var nodeVersionCacheMap = map[int64]*NodeVersionCache{} // [cluster_id] => { [version] => cache } var nodeVersionCacheLocker = &sync.Mutex{} // NodeService 边缘节点相关服务 type NodeService struct { BaseService } // CreateNode 创建节点 func (this *NodeService) CreateNode(ctx context.Context, req *pb.CreateNodeRequest) (*pb.CreateNodeResponse, error) { adminId, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } var tx = this.NullTx() nodeId, err := models.SharedNodeDAO.CreateNode(tx, adminId, req.Name, req.NodeClusterId, req.NodeGroupId, req.NodeRegionId) if err != nil { return nil, err } // 增加认证相关 if req.NodeLogin != nil { _, err = models.SharedNodeLoginDAO.CreateNodeLogin(tx, nodeconfigs.NodeRoleNode, nodeId, req.NodeLogin.Name, req.NodeLogin.Type, req.NodeLogin.Params) if err != nil { return nil, err } } // 保存DNS相关 if len(req.DnsRoutes) > 0 { var routesMap = map[int64][]string{} var m = map[int64][]string{} // domainId => codes for _, route := range req.DnsRoutes { var pieces = strings.SplitN(route, "@", 2) if len(pieces) != 2 { continue } var code = pieces[0] var domainId = types.Int64(pieces[1]) m[domainId] = append(m[domainId], code) } for domainId, codes := range m { routesMap[domainId] = codes } err = models.SharedNodeDAO.UpdateNodeDNS(tx, nodeId, routesMap) if err != nil { return nil, err } } return &pb.CreateNodeResponse{ NodeId: nodeId, }, nil } // RegisterClusterNode 注册集群节点 func (this *NodeService) RegisterClusterNode(ctx context.Context, req *pb.RegisterClusterNodeRequest) (*pb.RegisterClusterNodeResponse, error) { // 校验请求 _, _, clusterId, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeCluster) if err != nil { return nil, err } var tx = this.NullTx() adminId, err := models.SharedNodeClusterDAO.FindClusterAdminId(tx, clusterId) if err != nil { return nil, err } nodeId, err := models.SharedNodeDAO.CreateNode(tx, adminId, req.Name, clusterId, 0, 0) if err != nil { return nil, err } err = models.SharedNodeDAO.UpdateNodeIsInstalled(tx, nodeId, true) if err != nil { return nil, err } node, err := models.SharedNodeDAO.FindEnabledNode(tx, nodeId) if err != nil { return nil, err } if node == nil { return nil, errors.New("can not find node after creating") } // 获取集群可以使用的所有API节点 apiAddrs, err := models.SharedNodeClusterDAO.FindAllAPINodeAddrsWithCluster(tx, clusterId) if err != nil { return nil, err } return &pb.RegisterClusterNodeResponse{ UniqueId: node.UniqueId, Secret: node.Secret, Endpoints: apiAddrs, }, nil } // CountAllEnabledNodes 计算节点数量 func (this *NodeService) CountAllEnabledNodes(ctx context.Context, req *pb.CountAllEnabledNodesRequest) (*pb.RPCCountResponse, error) { // 校验请求 _, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } var tx = this.NullTx() count, err := models.SharedNodeDAO.CountAllEnabledNodes(tx) if err != nil { return nil, err } return this.SuccessCount(count) } // CountAllEnabledNodesMatch 计算匹配的节点数量 func (this *NodeService) CountAllEnabledNodesMatch(ctx context.Context, req *pb.CountAllEnabledNodesMatchRequest) (*pb.RPCCountResponse, error) { _, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } var tx = this.NullTx() count, err := models.SharedNodeDAO.CountAllEnabledNodesMatch(tx, req.NodeClusterId, configutils.ToBoolState(req.InstallState), configutils.ToBoolState(req.ActiveState), req.Keyword, req.NodeGroupId, req.NodeRegionId, req.Level, true) if err != nil { return nil, err } return this.SuccessCount(count) } // ListEnabledNodesMatch 列出单页的节点 func (this *NodeService) ListEnabledNodesMatch(ctx context.Context, req *pb.ListEnabledNodesMatchRequest) (*pb.ListEnabledNodesMatchResponse, error) { _, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } var tx = this.NullTx() var dnsDomainId = int64(0) var domainRoutes = []*dnstypes.Route{} if req.NodeClusterId > 0 { clusterDNS, err := models.SharedNodeClusterDAO.FindClusterDNSInfo(tx, req.NodeClusterId, nil) if err != nil { return nil, err } if clusterDNS != nil { dnsDomainId = int64(clusterDNS.DnsDomainId) if clusterDNS.DnsDomainId > 0 { domainRoutes, err = dns.SharedDNSDomainDAO.FindDomainRoutes(tx, dnsDomainId) if err != nil { return nil, err } } } } // 排序 var order = "" if req.CpuAsc { order = "cpuAsc" } else if req.CpuDesc { order = "cpuDesc" } else if req.MemoryAsc { order = "memoryAsc" } else if req.MemoryDesc { order = "memoryDesc" } else if req.TrafficInAsc { order = "trafficInAsc" } else if req.TrafficInDesc { order = "trafficInDesc" } else if req.TrafficOutAsc { order = "trafficOutAsc" } else if req.TrafficOutDesc { order = "trafficOutDesc" } else if req.LoadAsc { order = "loadAsc" } else if req.LoadDesc { order = "loadDesc" } nodes, err := models.SharedNodeDAO.ListEnabledNodesMatch(tx, req.NodeClusterId, configutils.ToBoolState(req.InstallState), configutils.ToBoolState(req.ActiveState), req.Keyword, req.NodeGroupId, req.NodeRegionId, req.Level, true, order, req.Offset, req.Size) if err != nil { return nil, err } var result = []*pb.Node{} var cacheMap = utils.NewCacheMap() for _, node := range nodes { // 主集群信息 clusterName, err := models.SharedNodeClusterDAO.FindNodeClusterName(tx, int64(node.ClusterId)) if err != nil { return nil, err } // 从集群 secondaryClusters, err := models.SharedNodeClusterDAO.FindEnabledNodeClustersWithIds(tx, node.DecodeSecondaryClusterIds()) if err != nil { return nil, err } var pbSecondaryClusters = []*pb.NodeCluster{} for _, secondaryCluster := range secondaryClusters { pbSecondaryClusters = append(pbSecondaryClusters, &pb.NodeCluster{ Id: int64(secondaryCluster.Id), IsOn: secondaryCluster.IsOn, Name: secondaryCluster.Name, }) } // 安装信息 installStatus, err := node.DecodeInstallStatus() if err != nil { return nil, err } installStatusResult := &pb.NodeInstallStatus{} if installStatus != nil { installStatusResult = &pb.NodeInstallStatus{ IsRunning: installStatus.IsRunning, IsFinished: installStatus.IsFinished, IsOk: installStatus.IsOk, Error: installStatus.Error, ErrorCode: installStatus.ErrorCode, UpdatedAt: installStatus.UpdatedAt, } } // 分组信息 var pbGroup *pb.NodeGroup = nil if node.GroupId > 0 { group, err := models.SharedNodeGroupDAO.FindEnabledNodeGroup(tx, int64(node.GroupId)) if err != nil { return nil, err } if group != nil { pbGroup = &pb.NodeGroup{ Id: int64(group.Id), Name: group.Name, } } } // DNS线路 var pbRoutes = []*pb.DNSRoute{} if dnsDomainId > 0 { routeCodes, err := node.DNSRouteCodesForDomainId(dnsDomainId) if err != nil { return nil, err } for _, routeCode := range routeCodes { for _, route := range domainRoutes { if route.Code == routeCode { pbRoutes = append(pbRoutes, &pb.DNSRoute{ Name: route.Name, Code: route.Code, }) break } } } } else if req.NodeClusterId == 0 { var clusterDomainIds = []int64{} for _, clusterId := range node.AllClusterIds() { clusterDNSInfo, err := models.SharedNodeClusterDAO.FindClusterDNSInfo(tx, clusterId, cacheMap) if err != nil { return nil, err } if clusterDNSInfo != nil && clusterDNSInfo.DnsDomainId > 0 { clusterDomainIds = append(clusterDomainIds, int64(clusterDNSInfo.DnsDomainId)) } } for domainId, routeCodes := range node.DNSRouteCodes() { if domainId == 0 { continue } if !lists.ContainsInt64(clusterDomainIds, domainId) { continue } for _, routeCode := range routeCodes { routeName, err := dns.SharedDNSDomainDAO.FindDomainRouteName(tx, domainId, routeCode) if err != nil { return nil, err } if len(routeName) > 0 { pbRoutes = append(pbRoutes, &pb.DNSRoute{ Name: routeName, Code: routeCode, }) } } } } // 区域 var pbRegion *pb.NodeRegion = nil if node.RegionId > 0 { region, err := models.SharedNodeRegionDAO.FindEnabledNodeRegion(tx, int64(node.RegionId)) if err != nil { return nil, err } if region != nil { pbRegion = &pb.NodeRegion{ Id: int64(region.Id), IsOn: region.IsOn, Name: region.Name, } } } // 状态 statusJSON, err := models.SharedNodeValueDAO.ComposeNodeStatusJSON(tx, nodeconfigs.NodeRoleNode, int64(node.Id), node.Status) if err != nil { return nil, err } result = append(result, &pb.Node{ Id: int64(node.Id), Name: node.Name, Version: int64(node.Version), IsInstalled: node.IsInstalled, StatusJSON: statusJSON, NodeCluster: &pb.NodeCluster{ Id: int64(node.ClusterId), Name: clusterName, }, SecondaryNodeClusters: pbSecondaryClusters, InstallStatus: installStatusResult, MaxCPU: types.Int32(node.MaxCPU), IsOn: node.IsOn, IsUp: node.IsUp, NodeGroup: pbGroup, NodeRegion: pbRegion, DnsRoutes: pbRoutes, Level: int32(node.Level), }) } return &pb.ListEnabledNodesMatchResponse{ Nodes: result, }, nil } // FindAllEnabledNodesWithNodeClusterId 查找一个集群下的所有节点 func (this *NodeService) FindAllEnabledNodesWithNodeClusterId(ctx context.Context, req *pb.FindAllEnabledNodesWithNodeClusterIdRequest) (*pb.FindAllEnabledNodesWithNodeClusterIdResponse, error) { _, userId, err := this.ValidateAdminAndUser(ctx, false) if err != nil { return nil, err } if userId > 0 { // TODO 检查权限 } var tx = this.NullTx() nodes, err := models.SharedNodeDAO.FindAllEnabledNodesWithClusterId(tx, req.NodeClusterId, req.IncludeSecondary) if err != nil { return nil, err } result := []*pb.Node{} for _, node := range nodes { apiNodeIds := []int64{} if models.IsNotNull(node.ConnectedAPINodes) { err = json.Unmarshal(node.ConnectedAPINodes, &apiNodeIds) if err != nil { return nil, err } } result = append(result, &pb.Node{ Id: int64(node.Id), Name: node.Name, UniqueId: node.UniqueId, Secret: node.Secret, ConnectedAPINodeIds: apiNodeIds, MaxCPU: types.Int32(node.MaxCPU), IsOn: node.IsOn, }) } return &pb.FindAllEnabledNodesWithNodeClusterIdResponse{Nodes: result}, nil } // DeleteNode 删除节点 func (this *NodeService) DeleteNode(ctx context.Context, req *pb.DeleteNodeRequest) (*pb.RPCSuccess, error) { _, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } var tx = this.NullTx() err = models.SharedNodeDAO.DisableNode(tx, req.NodeId) if err != nil { return nil, err } // 删除节点相关任务 err = models.SharedNodeTaskDAO.DeleteNodeTasks(tx, nodeconfigs.NodeRoleNode, req.NodeId) if err != nil { return nil, err } return this.Success() } // DeleteNodeFromNodeCluster 从集群中删除节点 func (this *NodeService) DeleteNodeFromNodeCluster(ctx context.Context, req *pb.DeleteNodeFromNodeClusterRequest) (*pb.RPCSuccess, error) { _, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } var tx = this.NullTx() err = models.SharedNodeDAO.DeleteNodeFromCluster(tx, req.NodeId, req.NodeClusterId) if err != nil { return nil, err } return this.Success() } // UpdateNode 修改节点 func (this *NodeService) UpdateNode(ctx context.Context, req *pb.UpdateNodeRequest) (*pb.RPCSuccess, error) { _, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } var tx = this.NullTx() err = models.SharedNodeDAO.UpdateNode(tx, req.NodeId, req.Name, req.NodeClusterId, req.SecondaryNodeClusterIds, req.NodeGroupId, req.NodeRegionId, req.IsOn, int(req.Level), req.LnAddrs, req.EnableIPLists) if err != nil { return nil, err } return this.Success() } // FindEnabledNode 查询单个节点信息 func (this *NodeService) FindEnabledNode(ctx context.Context, req *pb.FindEnabledNodeRequest) (*pb.FindEnabledNodeResponse, error) { _, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } var tx = this.NullTx() node, err := models.SharedNodeDAO.FindEnabledNode(tx, req.NodeId) if err != nil { return nil, err } if node == nil { return &pb.FindEnabledNodeResponse{Node: nil}, nil } // 主集群信息 clusterName, err := models.SharedNodeClusterDAO.FindNodeClusterName(tx, int64(node.ClusterId)) if err != nil { return nil, err } var clusterIds = []int64{int64(node.ClusterId)} // 从集群信息 var secondaryPBClusters []*pb.NodeCluster for _, secondaryClusterId := range node.DecodeSecondaryClusterIds() { cluster, err := models.SharedNodeClusterDAO.FindEnabledNodeCluster(tx, secondaryClusterId) if err != nil { return nil, err } if cluster == nil { continue } secondaryPBClusters = append(secondaryPBClusters, &pb.NodeCluster{ Id: int64(cluster.Id), IsOn: cluster.IsOn, Name: cluster.Name, }) clusterIds = append(clusterIds, int64(cluster.Id)) } // 认证信息 login, err := models.SharedNodeLoginDAO.FindEnabledNodeLoginWithNodeId(tx, nodeconfigs.NodeRoleNode, req.NodeId) if err != nil { return nil, err } var respLogin *pb.NodeLogin = nil if login != nil { respLogin = &pb.NodeLogin{ Id: int64(login.Id), Name: login.Name, Type: login.Type, Params: login.Params, } } // 安装信息 installStatus, err := node.DecodeInstallStatus() if err != nil { return nil, err } var installStatusResult = &pb.NodeInstallStatus{} if installStatus != nil { installStatusResult = &pb.NodeInstallStatus{ IsRunning: installStatus.IsRunning, IsFinished: installStatus.IsFinished, IsOk: installStatus.IsOk, Error: installStatus.Error, ErrorCode: installStatus.ErrorCode, UpdatedAt: installStatus.UpdatedAt, } } // 分组信息 var pbGroup *pb.NodeGroup = nil if node.GroupId > 0 { group, err := models.SharedNodeGroupDAO.FindEnabledNodeGroup(tx, int64(node.GroupId)) if err != nil { return nil, err } if group != nil { pbGroup = &pb.NodeGroup{ Id: int64(group.Id), Name: group.Name, } } } // 区域 var pbRegion *pb.NodeRegion = nil if node.RegionId > 0 { region, err := models.SharedNodeRegionDAO.FindEnabledNodeRegion(tx, int64(node.RegionId)) if err != nil { return nil, err } if region != nil { pbRegion = &pb.NodeRegion{ Id: int64(region.Id), IsOn: region.IsOn, Name: region.Name, } } } // 最大硬盘容量 var pbMaxCacheDiskCapacity *pb.SizeCapacity if models.IsNotNull(node.MaxCacheDiskCapacity) { pbMaxCacheDiskCapacity = &pb.SizeCapacity{} err = json.Unmarshal(node.MaxCacheDiskCapacity, pbMaxCacheDiskCapacity) if err != nil { return nil, err } } // 最大内存容量 var pbMaxCacheMemoryCapacity *pb.SizeCapacity if models.IsNotNull(node.MaxCacheMemoryCapacity) { pbMaxCacheMemoryCapacity = &pb.SizeCapacity{} err = json.Unmarshal(node.MaxCacheMemoryCapacity, pbMaxCacheMemoryCapacity) if err != nil { return nil, err } } // 线路 var pbRoutes = []*pb.DNSRoute{} var clusterDomainIds = []int64{} for _, clusterId := range node.AllClusterIds() { clusterDNSInfo, err := models.SharedNodeClusterDAO.FindClusterDNSInfo(tx, clusterId, nil) if err != nil { return nil, err } if clusterDNSInfo != nil && clusterDNSInfo.DnsDomainId > 0 { clusterDomainIds = append(clusterDomainIds, int64(clusterDNSInfo.DnsDomainId)) } } for domainId, routeCodes := range node.DNSRouteCodes() { if domainId == 0 { continue } if !lists.ContainsInt64(clusterDomainIds, domainId) { continue } for _, routeCode := range routeCodes { routeName, err := dns.SharedDNSDomainDAO.FindDomainRouteName(tx, domainId, routeCode) if err != nil { return nil, err } if len(routeName) > 0 { pbRoutes = append(pbRoutes, &pb.DNSRoute{ Name: routeName, Code: routeCode, }) } } } // 监控状态 statusJSON, err := models.SharedNodeValueDAO.ComposeNodeStatusJSON(tx, nodeconfigs.NodeRoleNode, int64(node.Id), node.Status) if err != nil { return nil, err } return &pb.FindEnabledNodeResponse{Node: &pb.Node{ Id: int64(node.Id), Name: node.Name, StatusJSON: statusJSON, UniqueId: node.UniqueId, Version: int64(node.Version), LatestVersion: int64(node.LatestVersion), Secret: node.Secret, InstallDir: node.InstallDir, IsInstalled: node.IsInstalled, NodeCluster: &pb.NodeCluster{ Id: int64(node.ClusterId), Name: clusterName, }, SecondaryNodeClusters: secondaryPBClusters, NodeLogin: respLogin, InstallStatus: installStatusResult, MaxCPU: types.Int32(node.MaxCPU), IsOn: node.IsOn, IsUp: node.IsUp, NodeGroup: pbGroup, NodeRegion: pbRegion, MaxCacheDiskCapacity: pbMaxCacheDiskCapacity, MaxCacheMemoryCapacity: pbMaxCacheMemoryCapacity, CacheDiskDir: node.CacheDiskDir, CacheDiskSubDirsJSON: node.CacheDiskSubDirs, Level: int32(node.Level), LnAddrs: node.DecodeLnAddrs(), DnsRoutes: pbRoutes, EnableIPLists: node.EnableIPLists, ApiNodeAddrsJSON: node.ApiNodeAddrs, }}, nil } // FindEnabledBasicNode 获取单个节点基本信息 func (this *NodeService) FindEnabledBasicNode(ctx context.Context, req *pb.FindEnabledBasicNodeRequest) (*pb.FindEnabledBasicNodeResponse, error) { _, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } var tx = this.NullTx() node, err := models.SharedNodeDAO.FindEnabledBasicNode(tx, req.NodeId) if err != nil { return nil, err } if node == nil { return &pb.FindEnabledBasicNodeResponse{Node: nil}, nil } clusterName, err := models.SharedNodeClusterDAO.FindNodeClusterName(tx, int64(node.ClusterId)) if err != nil { return nil, err } return &pb.FindEnabledBasicNodeResponse{Node: &pb.BasicNode{ Id: int64(node.Id), Name: node.Name, IsOn: node.IsOn, IsUp: node.IsUp, Level: int32(node.Level), NodeCluster: &pb.NodeCluster{ Id: int64(node.ClusterId), Name: clusterName, }, }}, nil } // FindCurrentNodeConfig 组合节点配置 func (this *NodeService) FindCurrentNodeConfig(ctx context.Context, req *pb.FindCurrentNodeConfigRequest) (*pb.FindCurrentNodeConfigResponse, error) { // 校验节点 _, _, nodeId, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeNode) if err != nil { return nil, err } var tx = this.NullTx() // 检查版本号 currentVersion, err := models.SharedNodeDAO.FindNodeVersion(tx, nodeId) if err != nil { return nil, err } if currentVersion == req.Version { return &pb.FindCurrentNodeConfigResponse{IsChanged: false}, nil } clusterId, err := models.SharedNodeDAO.FindNodeClusterId(tx, nodeId) if err != nil { return nil, err } var cacheMap = this.findClusterCacheMap(clusterId, req.NodeTaskVersion) nodeConfig, err := models.SharedNodeDAO.ComposeNodeConfig(tx, nodeId, cacheMap) if err != nil { return nil, err } data, err := json.Marshal(nodeConfig) if err != nil { return nil, err } // 压缩 var isCompressed = false if req.Compress { var buf = &bytes.Buffer{} writer := brotli.NewWriterLevel(buf, 5) _, err = writer.Write(data) if err != nil { _ = writer.Close() } else { err = writer.Close() if err == nil { isCompressed = true data = buf.Bytes() buf.Reset() } } } return &pb.FindCurrentNodeConfigResponse{ IsChanged: true, NodeJSON: data, DataSize: int64(len(data)), IsCompressed: isCompressed, Timestamp: time.Now().Unix(), }, nil } // UpdateNodeStatus 更新节点状态 func (this *NodeService) UpdateNodeStatus(ctx context.Context, req *pb.UpdateNodeStatusRequest) (*pb.RPCSuccess, error) { // 校验节点 nodeId, err := this.ValidateNode(ctx) if err != nil { return nil, err } if req.NodeId > 0 { nodeId = req.NodeId } if nodeId <= 0 { return nil, errors.New("'nodeId' should be greater than 0") } var tx = this.NullTx() // 修改时间戳 var nodeStatus = &nodeconfigs.NodeStatus{} err = json.Unmarshal(req.StatusJSON, nodeStatus) if err != nil { return nil, errors.New("decode node status json failed: " + err.Error()) } nodeStatus.UpdatedAt = time.Now().Unix() // 保存 err = models.SharedNodeDAO.UpdateNodeStatus(tx, nodeId, nodeStatus) if err != nil { return nil, err } return this.Success() } // UpdateNodeIsInstalled 修改节点安装状态 func (this *NodeService) UpdateNodeIsInstalled(ctx context.Context, req *pb.UpdateNodeIsInstalledRequest) (*pb.RPCSuccess, error) { _, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } var tx = this.NullTx() err = models.SharedNodeDAO.UpdateNodeIsInstalled(tx, req.NodeId, req.IsInstalled) if err != nil { return nil, err } return this.Success() } // InstallNode 安装节点 func (this *NodeService) InstallNode(ctx context.Context, req *pb.InstallNodeRequest) (*pb.InstallNodeResponse, error) { // 校验节点 _, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } goman.New(func() { err = installers.SharedNodeQueue().InstallNodeProcess(req.NodeId, false) if err != nil { remotelogs.Error("NODE_SERVICE", "install node failed:"+err.Error()) } }) return &pb.InstallNodeResponse{}, nil } // UpgradeNode 升级节点 func (this *NodeService) UpgradeNode(ctx context.Context, req *pb.UpgradeNodeRequest) (*pb.UpgradeNodeResponse, error) { // 校验节点 _, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } var tx = this.NullTx() err = models.SharedNodeDAO.UpdateNodeIsInstalled(tx, req.NodeId, false) if err != nil { return nil, err } // 检查状态 installStatus, err := models.SharedNodeDAO.FindNodeInstallStatus(tx, req.NodeId) if err != nil { return nil, err } if installStatus == nil { installStatus = &models.NodeInstallStatus{} } installStatus.IsOk = false installStatus.IsFinished = false err = models.SharedNodeDAO.UpdateNodeInstallStatus(tx, req.NodeId, installStatus) if err != nil { return nil, err } goman.New(func() { err = installers.SharedNodeQueue().InstallNodeProcess(req.NodeId, true) if err != nil { remotelogs.Error("NODE_SERVICE", "install node:"+err.Error()) } }) return &pb.UpgradeNodeResponse{}, nil } // StartNode 启动节点 func (this *NodeService) StartNode(ctx context.Context, req *pb.StartNodeRequest) (*pb.StartNodeResponse, error) { // 校验节点 _, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } err = installers.SharedNodeQueue().StartNode(req.NodeId) if err != nil { return &pb.StartNodeResponse{ IsOk: false, Error: err.Error(), }, nil } return &pb.StartNodeResponse{IsOk: true}, nil } // StopNode 停止节点 func (this *NodeService) StopNode(ctx context.Context, req *pb.StopNodeRequest) (*pb.StopNodeResponse, error) { // 校验节点 _, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } err = installers.SharedNodeQueue().StopNode(req.NodeId) if err != nil { return &pb.StopNodeResponse{ IsOk: false, Error: err.Error(), }, nil } return &pb.StopNodeResponse{IsOk: true}, nil } // UpdateNodeConnectedAPINodes 更改节点连接的API节点信息 func (this *NodeService) UpdateNodeConnectedAPINodes(ctx context.Context, req *pb.UpdateNodeConnectedAPINodesRequest) (*pb.RPCSuccess, error) { // 校验节点 _, _, nodeId, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeNode) if err != nil { return nil, err } var tx = this.NullTx() err = models.SharedNodeDAO.UpdateNodeConnectedAPINodes(tx, nodeId, req.ApiNodeIds) if err != nil { return nil, errors.Wrap(err) } return this.Success() } // CountAllEnabledNodesWithNodeGrantId 计算使用某个认证的节点数量 func (this *NodeService) CountAllEnabledNodesWithNodeGrantId(ctx context.Context, req *pb.CountAllEnabledNodesWithNodeGrantIdRequest) (*pb.RPCCountResponse, error) { // 校验请求 _, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } var tx = this.NullTx() count, err := models.SharedNodeDAO.CountAllEnabledNodesWithGrantId(tx, req.NodeGrantId) if err != nil { return nil, err } return this.SuccessCount(count) } // FindAllEnabledNodesWithNodeGrantId 查找使用某个认证的所有节点 func (this *NodeService) FindAllEnabledNodesWithNodeGrantId(ctx context.Context, req *pb.FindAllEnabledNodesWithNodeGrantIdRequest) (*pb.FindAllEnabledNodesWithNodeGrantIdResponse, error) { // 校验请求 _, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } var tx = this.NullTx() nodes, err := models.SharedNodeDAO.FindAllEnabledNodesWithGrantId(tx, req.NodeGrantId) if err != nil { return nil, err } result := []*pb.Node{} for _, node := range nodes { // 集群信息 clusterName, err := models.SharedNodeClusterDAO.FindNodeClusterName(tx, int64(node.ClusterId)) if err != nil { return nil, err } result = append(result, &pb.Node{ Id: int64(node.Id), Name: node.Name, Version: int64(node.Version), IsInstalled: node.IsInstalled, StatusJSON: node.Status, NodeCluster: &pb.NodeCluster{ Id: int64(node.ClusterId), Name: clusterName, }, IsOn: node.IsOn, }) } return &pb.FindAllEnabledNodesWithNodeGrantIdResponse{Nodes: result}, nil } // CountAllNotInstalledNodesWithNodeClusterId 计算没有安装的节点数量 func (this *NodeService) CountAllNotInstalledNodesWithNodeClusterId(ctx context.Context, req *pb.CountAllNotInstalledNodesWithNodeClusterIdRequest) (*pb.RPCCountResponse, error) { _, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } var tx = this.NullTx() count, err := models.SharedNodeDAO.CountAllNotInstalledNodesWithClusterId(tx, req.NodeClusterId) if err != nil { return nil, err } return this.SuccessCount(count) } // FindAllNotInstalledNodesWithNodeClusterId 列出所有未安装的节点 func (this *NodeService) FindAllNotInstalledNodesWithNodeClusterId(ctx context.Context, req *pb.FindAllNotInstalledNodesWithNodeClusterIdRequest) (*pb.FindAllNotInstalledNodesWithNodeClusterIdResponse, error) { _, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } var tx = this.NullTx() nodes, err := models.SharedNodeDAO.FindAllNotInstalledNodesWithClusterId(tx, req.NodeClusterId) if err != nil { return nil, err } result := []*pb.Node{} for _, node := range nodes { // 认证信息 login, err := models.SharedNodeLoginDAO.FindEnabledNodeLoginWithNodeId(tx, nodeconfigs.NodeRoleNode, int64(node.Id)) if err != nil { return nil, err } var pbLogin *pb.NodeLogin = nil if login != nil { pbLogin = &pb.NodeLogin{ Id: int64(login.Id), Name: login.Name, Type: login.Type, Params: login.Params, } } // IP信息 addresses, err := models.SharedNodeIPAddressDAO.FindAllEnabledAddressesWithNode(tx, int64(node.Id), nodeconfigs.NodeRoleNode) if err != nil { return nil, err } pbAddresses := []*pb.NodeIPAddress{} for _, address := range addresses { pbAddresses = append(pbAddresses, &pb.NodeIPAddress{ Id: int64(address.Id), NodeId: int64(address.NodeId), Name: address.Name, Ip: address.Ip, Description: address.Description, State: int64(address.State), Order: int64(address.Order), CanAccess: address.CanAccess, }) } // 安装信息 installStatus, err := node.DecodeInstallStatus() if err != nil { return nil, err } pbInstallStatus := &pb.NodeInstallStatus{} if installStatus != nil { pbInstallStatus = &pb.NodeInstallStatus{ IsRunning: installStatus.IsRunning, IsFinished: installStatus.IsFinished, IsOk: installStatus.IsOk, Error: installStatus.Error, ErrorCode: installStatus.ErrorCode, UpdatedAt: installStatus.UpdatedAt, } } result = append(result, &pb.Node{ Id: int64(node.Id), Name: node.Name, Version: int64(node.Version), IsInstalled: node.IsInstalled, StatusJSON: node.Status, IsOn: node.IsOn, NodeLogin: pbLogin, IpAddresses: pbAddresses, InstallStatus: pbInstallStatus, }) } return &pb.FindAllNotInstalledNodesWithNodeClusterIdResponse{Nodes: result}, nil } // CountAllUpgradeNodesWithNodeClusterId 计算需要升级的节点数量 func (this *NodeService) CountAllUpgradeNodesWithNodeClusterId(ctx context.Context, req *pb.CountAllUpgradeNodesWithNodeClusterIdRequest) (*pb.RPCCountResponse, error) { // 校验请求 _, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } var tx = this.NullTx() var deployFiles = installers.SharedDeployManager.LoadNodeFiles() total := int64(0) for _, deployFile := range deployFiles { count, err := models.SharedNodeDAO.CountAllLowerVersionNodesWithClusterId(tx, req.NodeClusterId, deployFile.OS, deployFile.Arch, deployFile.Version) if err != nil { return nil, err } total += count } return this.SuccessCount(total) } // FindAllUpgradeNodesWithNodeClusterId 列出所有需要升级的节点 func (this *NodeService) FindAllUpgradeNodesWithNodeClusterId(ctx context.Context, req *pb.FindAllUpgradeNodesWithNodeClusterIdRequest) (*pb.FindAllUpgradeNodesWithNodeClusterIdResponse, error) { // 校验请求 _, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } var tx = this.NullTx() // 获取当前能升级到的最新版本 deployFiles := installers.SharedDeployManager.LoadNodeFiles() result := []*pb.FindAllUpgradeNodesWithNodeClusterIdResponse_NodeUpgrade{} for _, deployFile := range deployFiles { nodes, err := models.SharedNodeDAO.FindAllLowerVersionNodesWithClusterId(tx, req.NodeClusterId, deployFile.OS, deployFile.Arch, deployFile.Version) if err != nil { return nil, err } for _, node := range nodes { // 认证信息 login, err := models.SharedNodeLoginDAO.FindEnabledNodeLoginWithNodeId(tx, nodeconfigs.NodeRoleNode, int64(node.Id)) if err != nil { return nil, err } var pbLogin *pb.NodeLogin = nil if login != nil { pbLogin = &pb.NodeLogin{ Id: int64(login.Id), Name: login.Name, Type: login.Type, Params: login.Params, } } // IP信息 addresses, err := models.SharedNodeIPAddressDAO.FindAllEnabledAddressesWithNode(tx, int64(node.Id), nodeconfigs.NodeRoleNode) if err != nil { return nil, err } pbAddresses := []*pb.NodeIPAddress{} for _, address := range addresses { pbAddresses = append(pbAddresses, &pb.NodeIPAddress{ Id: int64(address.Id), NodeId: int64(address.NodeId), Name: address.Name, Ip: address.Ip, Description: address.Description, State: int64(address.State), Order: int64(address.Order), CanAccess: address.CanAccess, }) } // 状态 status, err := node.DecodeStatus() if err != nil { return nil, err } if status == nil { continue } // 安装信息 installStatus, err := node.DecodeInstallStatus() if err != nil { return nil, err } pbInstallStatus := &pb.NodeInstallStatus{} if installStatus != nil { pbInstallStatus = &pb.NodeInstallStatus{ IsRunning: installStatus.IsRunning, IsFinished: installStatus.IsFinished, IsOk: installStatus.IsOk, Error: installStatus.Error, ErrorCode: installStatus.ErrorCode, UpdatedAt: installStatus.UpdatedAt, } } pbNode := &pb.Node{ Id: int64(node.Id), Name: node.Name, Version: int64(node.Version), IsInstalled: node.IsInstalled, StatusJSON: node.Status, IsOn: node.IsOn, IpAddresses: pbAddresses, NodeLogin: pbLogin, InstallStatus: pbInstallStatus, } result = append(result, &pb.FindAllUpgradeNodesWithNodeClusterIdResponse_NodeUpgrade{ Os: deployFile.OS, Arch: deployFile.Arch, OldVersion: status.BuildVersion, NewVersion: deployFile.Version, Node: pbNode, }) } } return &pb.FindAllUpgradeNodesWithNodeClusterIdResponse{ Nodes: result, }, nil } // FindNodeInstallStatus 读取节点安装状态 func (this *NodeService) FindNodeInstallStatus(ctx context.Context, req *pb.FindNodeInstallStatusRequest) (*pb.FindNodeInstallStatusResponse, error) { // 校验请求 _, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } var tx = this.NullTx() installStatus, err := models.SharedNodeDAO.FindNodeInstallStatus(tx, req.NodeId) if err != nil { return nil, err } if installStatus == nil { return &pb.FindNodeInstallStatusResponse{InstallStatus: nil}, nil } pbInstallStatus := &pb.NodeInstallStatus{ IsRunning: installStatus.IsRunning, IsFinished: installStatus.IsFinished, IsOk: installStatus.IsOk, Error: installStatus.Error, ErrorCode: installStatus.ErrorCode, UpdatedAt: installStatus.UpdatedAt, } return &pb.FindNodeInstallStatusResponse{InstallStatus: pbInstallStatus}, nil } // UpdateNodeLogin 修改节点登录信息 func (this *NodeService) UpdateNodeLogin(ctx context.Context, req *pb.UpdateNodeLoginRequest) (*pb.RPCSuccess, error) { // 校验请求 _, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } var tx = this.NullTx() if req.NodeLogin.Id <= 0 { _, err := models.SharedNodeLoginDAO.CreateNodeLogin(tx, nodeconfigs.NodeRoleNode, req.NodeId, req.NodeLogin.Name, req.NodeLogin.Type, req.NodeLogin.Params) if err != nil { return nil, err } } err = models.SharedNodeLoginDAO.UpdateNodeLogin(tx, req.NodeLogin.Id, req.NodeLogin.Name, req.NodeLogin.Type, req.NodeLogin.Params) return this.Success() } // CountAllEnabledNodesWithNodeGroupId 计算某个节点分组内的节点数量 func (this *NodeService) CountAllEnabledNodesWithNodeGroupId(ctx context.Context, req *pb.CountAllEnabledNodesWithNodeGroupIdRequest) (*pb.RPCCountResponse, error) { // 校验请求 _, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } var tx = this.NullTx() count, err := models.SharedNodeDAO.CountAllEnabledNodesWithGroupId(tx, req.NodeGroupId) if err != nil { return nil, err } return this.SuccessCount(count) } // FindAllEnabledNodesDNSWithNodeClusterId 取得某个集群下的所有节点 func (this *NodeService) FindAllEnabledNodesDNSWithNodeClusterId(ctx context.Context, req *pb.FindAllEnabledNodesDNSWithNodeClusterIdRequest) (*pb.FindAllEnabledNodesDNSWithNodeClusterIdResponse, error) { // 校验请求 _, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } var tx = this.NullTx() clusterDNS, err := models.SharedNodeClusterDAO.FindClusterDNSInfo(tx, req.NodeClusterId, nil) if err != nil { return nil, err } if clusterDNS == nil { return nil, errors.New("not found clusterId '" + numberutils.FormatInt64(req.NodeClusterId) + "'") } var dnsConfig, _ = clusterDNS.DecodeDNSConfig() var dnsDomainId = int64(clusterDNS.DnsDomainId) routes, err := dns.SharedDNSDomainDAO.FindDomainRoutes(tx, dnsDomainId) if err != nil { return nil, err } nodes, err := models.SharedNodeDAO.FindAllEnabledNodesDNSWithClusterId(tx, req.NodeClusterId, true, dnsConfig != nil && dnsConfig.IncludingLnNodes) if err != nil { return nil, err } var result = []*pb.NodeDNSInfo{} for _, node := range nodes { ipAddresses, err := models.SharedNodeIPAddressDAO.FindNodeAccessAndUpIPAddresses(tx, int64(node.Id), nodeconfigs.NodeRoleNode) if err != nil { return nil, err } domainRouteCodes, err := node.DNSRouteCodesForDomainId(dnsDomainId) if err != nil { return nil, err } var pbRoutes = []*pb.DNSRoute{} for _, routeCode := range domainRouteCodes { for _, r := range routes { if r.Code == routeCode { pbRoutes = append(pbRoutes, &pb.DNSRoute{ Name: r.Name, Code: r.Code, }) break } } } for _, ipAddress := range ipAddresses { var ip = ipAddress.DNSIP() if len(ip) == 0 { continue } if net.ParseIP(ip) == nil { continue } result = append(result, &pb.NodeDNSInfo{ Id: int64(node.Id), Name: node.Name, IpAddr: ip, NodeIPAddressId: int64(ipAddress.Id), Routes: pbRoutes, NodeClusterId: req.NodeClusterId, }) } } return &pb.FindAllEnabledNodesDNSWithNodeClusterIdResponse{Nodes: result}, nil } // FindEnabledNodeDNS 查找单个节点的域名解析信息 func (this *NodeService) FindEnabledNodeDNS(ctx context.Context, req *pb.FindEnabledNodeDNSRequest) (*pb.FindEnabledNodeDNSResponse, error) { // 校验请求 _, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } var tx = this.NullTx() node, err := models.SharedNodeDAO.FindEnabledNodeDNS(tx, req.NodeId) if err != nil { return nil, err } if node == nil { return &pb.FindEnabledNodeDNSResponse{Node: nil}, nil } // 查询节点IP地址 var ipAddr string var ipAddrId int64 = 0 if req.NodeIPAddrId > 0 { address, err := models.SharedNodeIPAddressDAO.FindEnabledAddress(tx, req.NodeIPAddrId) if err != nil { return nil, err } if address != nil { ipAddr = address.Ip ipAddrId = int64(address.Id) } } if ipAddrId == 0 { ipAddr, ipAddrId, err = models.SharedNodeIPAddressDAO.FindFirstNodeAccessIPAddress(tx, int64(node.Id), true, nodeconfigs.NodeRoleNode) if err != nil { return nil, err } } var clusterId = int64(node.ClusterId) if req.NodeClusterId > 0 { clusterId = req.NodeClusterId } clusterDNS, err := models.SharedNodeClusterDAO.FindClusterDNSInfo(tx, clusterId, nil) if err != nil { return nil, err } if clusterDNS == nil { return &pb.FindEnabledNodeDNSResponse{Node: nil}, nil } var dnsDomainId = int64(clusterDNS.DnsDomainId) dnsDomainName, err := dns.SharedDNSDomainDAO.FindDNSDomainName(tx, dnsDomainId) if err != nil { return nil, err } var pbRoutes = []*pb.DNSRoute{} if dnsDomainId > 0 { routeCodes, err := node.DNSRouteCodesForDomainId(dnsDomainId) if err != nil { return nil, err } for _, routeCode := range routeCodes { routeName, err := dns.SharedDNSDomainDAO.FindDomainRouteName(tx, dnsDomainId, routeCode) if err != nil { return nil, err } pbRoutes = append(pbRoutes, &pb.DNSRoute{ Name: routeName, Code: routeCode, }) } } return &pb.FindEnabledNodeDNSResponse{ Node: &pb.NodeDNSInfo{ Id: int64(node.Id), Name: node.Name, IpAddr: ipAddr, NodeIPAddressId: ipAddrId, Routes: pbRoutes, NodeClusterId: clusterId, NodeClusterDNSName: clusterDNS.DnsName, DnsDomainId: dnsDomainId, DnsDomainName: dnsDomainName, }, }, nil } // UpdateNodeDNS 修改节点的DNS解析信息 func (this *NodeService) UpdateNodeDNS(ctx context.Context, req *pb.UpdateNodeDNSRequest) (*pb.RPCSuccess, error) { // 校验请求 adminId, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } var tx = this.NullTx() node, err := models.SharedNodeDAO.FindEnabledNodeDNS(tx, req.NodeId) if err != nil { return nil, err } if node == nil { return nil, errors.New("node not found") } var routeCodeMap = node.DNSRouteCodes() if req.DnsDomainId > 0 { if len(req.Routes) > 0 { var m = map[int64][]string{} // domainId => codes for _, route := range req.Routes { var pieces = strings.SplitN(route, "@", 2) if len(pieces) != 2 { continue } var code = pieces[0] var domainId = types.Int64(pieces[1]) m[domainId] = append(m[domainId], code) } for domainId, codes := range m { routeCodeMap[domainId] = codes } } else { delete(routeCodeMap, req.DnsDomainId) } } else { routeCodeMap = map[int64][]string{} if len(req.Routes) > 0 { var m = map[int64][]string{} // domainId => codes for _, route := range req.Routes { var pieces = strings.SplitN(route, "@", 2) if len(pieces) != 2 { continue } var code = pieces[0] var domainId = types.Int64(pieces[1]) m[domainId] = append(m[domainId], code) } for domainId, codes := range m { routeCodeMap[domainId] = codes } } } err = models.SharedNodeDAO.UpdateNodeDNS(tx, req.NodeId, routeCodeMap) if err != nil { return nil, err } // 修改IP if len(req.IpAddr) > 0 { if req.NodeIPAddressId > 0 { // 指定了IP地址ID err = models.SharedNodeIPAddressDAO.UpdateAddressIP(tx, req.NodeIPAddressId, req.IpAddr) if err != nil { return nil, err } } else { // 没有指定IP地址ID ipAddrId, err := models.SharedNodeIPAddressDAO.FindFirstNodeAccessIPAddressId(tx, req.NodeId, true, nodeconfigs.NodeRoleNode) if err != nil { return nil, err } if ipAddrId > 0 { err = models.SharedNodeIPAddressDAO.UpdateAddressIP(tx, ipAddrId, req.IpAddr) if err != nil { return nil, err } } else { _, err = models.SharedNodeIPAddressDAO.CreateAddress(tx, adminId, req.NodeId, nodeconfigs.NodeRoleNode, "DNS IP", req.IpAddr, true, true, 0) if err != nil { return nil, err } } } } return this.Success() } // CountAllEnabledNodesWithNodeRegionId 计算某个区域下的节点数量 func (this *NodeService) CountAllEnabledNodesWithNodeRegionId(ctx context.Context, req *pb.CountAllEnabledNodesWithNodeRegionIdRequest) (*pb.RPCCountResponse, error) { _, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } var tx = this.NullTx() count, err := models.SharedNodeDAO.CountAllEnabledNodesWithRegionId(tx, req.NodeRegionId) if err != nil { return nil, err } return this.SuccessCount(count) } // FindEnabledNodesWithIds 根据一组ID获取节点信息 func (this *NodeService) FindEnabledNodesWithIds(ctx context.Context, req *pb.FindEnabledNodesWithIdsRequest) (*pb.FindEnabledNodesWithIdsResponse, error) { _, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } var tx = this.NullTx() nodes, err := models.SharedNodeDAO.FindEnabledNodesWithIds(tx, req.NodeIds) if err != nil { return nil, err } pbNodes := []*pb.Node{} for _, node := range nodes { connectedAPINodeIds, err := node.DecodeConnectedAPINodeIds() if err != nil { return nil, err } pbNodes = append(pbNodes, &pb.Node{ Id: int64(node.Id), IsOn: node.IsOn, IsActive: node.IsActive, ConnectedAPINodeIds: connectedAPINodeIds, }) } return &pb.FindEnabledNodesWithIdsResponse{Nodes: pbNodes}, nil } // CheckNodeLatestVersion 检查新版本 func (this *NodeService) CheckNodeLatestVersion(ctx context.Context, req *pb.CheckNodeLatestVersionRequest) (*pb.CheckNodeLatestVersionResponse, error) { _, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } deployFiles := installers.SharedDeployManager.LoadNodeFiles() for _, file := range deployFiles { if file.OS == req.Os && file.Arch == req.Arch && stringutil.VersionCompare(file.Version, req.CurrentVersion) > 0 { return &pb.CheckNodeLatestVersionResponse{ HasNewVersion: true, NewVersion: file.Version, }, nil } } return &pb.CheckNodeLatestVersionResponse{HasNewVersion: false}, nil } // UpdateNodeUp 设置节点上线状态 func (this *NodeService) UpdateNodeUp(ctx context.Context, req *pb.UpdateNodeUpRequest) (*pb.RPCSuccess, error) { _, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } var tx = this.NullTx() err = models.SharedNodeDAO.UpdateNodeUp(tx, req.NodeId, req.IsUp) if err != nil { return nil, err } return this.Success() } // DownloadNodeInstallationFile 下载最新边缘节点安装文件 func (this *NodeService) DownloadNodeInstallationFile(ctx context.Context, req *pb.DownloadNodeInstallationFileRequest) (*pb.DownloadNodeInstallationFileResponse, error) { nodeId, err := this.ValidateNode(ctx) if err != nil { return nil, err } var file = installers.SharedDeployManager.FindNodeFile(req.Os, req.Arch) if file == nil { return &pb.DownloadNodeInstallationFileResponse{}, nil } sum, err := file.Sum() if err != nil { return nil, err } data, offset, err := file.Read(req.ChunkOffset) if err != nil && err != io.EOF { return nil, err } // 增加下载速度监控 installers.SharedUpgradeLimiter.UpdateNodeBytes(nodeconfigs.NodeRoleNode, nodeId, int64(len(data))) return &pb.DownloadNodeInstallationFileResponse{ Sum: sum, Offset: offset, ChunkData: data, Version: file.Version, Filename: filepath.Base(file.Path), }, nil } // UpdateNodeSystem 修改节点系统信息 func (this *NodeService) UpdateNodeSystem(ctx context.Context, req *pb.UpdateNodeSystemRequest) (*pb.RPCSuccess, error) { _, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } var tx = this.NullTx() err = models.SharedNodeDAO.UpdateNodeSystem(tx, req.NodeId, req.MaxCPU) if err != nil { return nil, err } return this.Success() } // UpdateNodeCache 修改节点缓存设置 func (this *NodeService) UpdateNodeCache(ctx context.Context, req *pb.UpdateNodeCacheRequest) (*pb.RPCSuccess, error) { _, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } var tx = this.NullTx() var maxCacheDiskCapacityJSON []byte if req.MaxCacheDiskCapacity != nil { maxCacheDiskCapacityJSON, err = json.Marshal(&shared.SizeCapacity{ Count: req.MaxCacheDiskCapacity.Count, Unit: req.MaxCacheDiskCapacity.Unit, }) if err != nil { return nil, err } } var maxCacheMemoryCapacityJSON []byte if req.MaxCacheMemoryCapacity != nil { maxCacheMemoryCapacityJSON, err = json.Marshal(&shared.SizeCapacity{ Count: req.MaxCacheMemoryCapacity.Count, Unit: req.MaxCacheMemoryCapacity.Unit, }) if err != nil { return nil, err } } // cache sub dirs var cacheSubDirs = []*serverconfigs.CacheDir{} if len(req.CacheDiskSubDirsJSON) > 0 { err = json.Unmarshal(req.CacheDiskSubDirsJSON, &cacheSubDirs) if err != nil { return nil, errors.New("decode 'cacheDiskSubDirsJSON' failed: " + err.Error()) } } err = models.SharedNodeDAO.UpdateNodeCache(tx, req.NodeId, maxCacheDiskCapacityJSON, maxCacheMemoryCapacityJSON, req.CacheDiskDir, cacheSubDirs) if err != nil { return nil, err } return this.Success() } // 获取缓存CacheMap func (this *NodeService) findClusterCacheMap(clusterId int64, version int64) *utils.CacheMap { nodeVersionCacheLocker.Lock() defer nodeVersionCacheLocker.Unlock() if version == 0 { return utils.NewCacheMap() } cache, ok := nodeVersionCacheMap[clusterId] if ok { cacheMap, ok := cache.CacheMap[version] if ok { return cacheMap } // 清除以前版本 for v := range cache.CacheMap { if version-v > 60*time.Second.Nanoseconds() { delete(cache.CacheMap, v) } } // 添加 cacheMap = utils.NewCacheMap() cache.CacheMap[version] = cacheMap return cacheMap } else { var cacheMap = utils.NewCacheMap() cache = &NodeVersionCache{ CacheMap: map[int64]*utils.CacheMap{ version: cacheMap, }} nodeVersionCacheMap[clusterId] = cache return cacheMap } } // FindNodeLevelInfo 读取节点级别信息 func (this *NodeService) FindNodeLevelInfo(ctx context.Context, req *pb.FindNodeLevelInfoRequest) (*pb.FindNodeLevelInfoResponse, error) { nodeId, err := this.ValidateNode(ctx) if err != nil { return nil, err } var tx *dbs.Tx node, err := models.SharedNodeDAO.FindNodeLevelInfo(tx, nodeId) if err != nil { return nil, err } if node == nil { return &pb.FindNodeLevelInfoResponse{}, nil } var result = &pb.FindNodeLevelInfoResponse{ Level: types.Int32(node.Level), } if node.Level == 1 { parentNodes, err := models.SharedNodeDAO.FindParentNodeConfigs(tx, nodeId, int64(node.GroupId), node.AllClusterIds(), types.Int(node.Level)) if err != nil { return nil, err } parentNodesJSON, err := json.Marshal(parentNodes) if err != nil { return nil, err } result.ParentNodesMapJSON = parentNodesJSON } return result, nil } // FindNodeDNSResolver 读取节点DNS Resolver func (this *NodeService) FindNodeDNSResolver(ctx context.Context, req *pb.FindNodeDNSResolverRequest) (*pb.FindNodeDNSResolverResponse, error) { _, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } var tx = this.NullTx() config, err := models.SharedNodeDAO.FindNodeDNSResolver(tx, req.NodeId) if err != nil { return nil, err } configJSON, err := json.Marshal(config) if err != nil { return nil, err } return &pb.FindNodeDNSResolverResponse{ DnsResolverJSON: configJSON, }, nil } // UpdateNodeDNSResolver 修改DNS Resolver func (this *NodeService) UpdateNodeDNSResolver(ctx context.Context, req *pb.UpdateNodeDNSResolverRequest) (*pb.RPCSuccess, error) { _, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } var tx = this.NullTx() var config = nodeconfigs.DefaultDNSResolverConfig() err = json.Unmarshal(req.DnsResolverJSON, config) if err != nil { return nil, err } err = models.SharedNodeDAO.UpdateNodeDNSResolver(tx, req.NodeId, config) if err != nil { return nil, err } return this.Success() } // FindNodeDDoSProtection 获取集群的DDoS设置 func (this *NodeService) FindNodeDDoSProtection(ctx context.Context, req *pb.FindNodeDDoSProtectionRequest) (*pb.FindNodeDDoSProtectionResponse, error) { var nodeId = req.NodeId var isFromNode = false _, err := this.ValidateAdmin(ctx) if err != nil { // 检查是否来自节点 currentNodeId, err2 := this.ValidateNode(ctx) if err2 != nil { return nil, err } if nodeId > 0 && currentNodeId != nodeId { return nil, errors.New("invalid 'nodeId'") } nodeId = currentNodeId isFromNode = true } var tx *dbs.Tx ddosProtection, err := models.SharedNodeDAO.FindNodeDDoSProtection(tx, nodeId) if err != nil { return nil, err } if ddosProtection == nil { ddosProtection = ddosconfigs.DefaultProtectionConfig() } // 组合父级节点配置 // 只有从节点读取配置时才需要组合 if isFromNode { clusterId, err := models.SharedNodeDAO.FindNodeClusterId(tx, nodeId) if err != nil { return nil, err } if clusterId > 0 { clusterDDoSProtection, err := models.SharedNodeClusterDAO.FindClusterDDoSProtection(tx, clusterId) if err != nil { return nil, err } if clusterDDoSProtection == nil { clusterDDoSProtection = ddosconfigs.DefaultProtectionConfig() } clusterDDoSProtection.Merge(ddosProtection) ddosProtection = clusterDDoSProtection } } ddosProtectionJSON, err := json.Marshal(ddosProtection) if err != nil { return nil, err } var result = &pb.FindNodeDDoSProtectionResponse{ DdosProtectionJSON: ddosProtectionJSON, } return result, nil } // UpdateNodeDDoSProtection 修改集群的DDoS设置 func (this *NodeService) UpdateNodeDDoSProtection(ctx context.Context, req *pb.UpdateNodeDDoSProtectionRequest) (*pb.RPCSuccess, error) { _, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } var ddosProtection = &ddosconfigs.ProtectionConfig{} err = json.Unmarshal(req.DdosProtectionJSON, ddosProtection) if err != nil { return nil, err } var tx *dbs.Tx err = models.SharedNodeDAO.UpdateNodeDDoSProtection(tx, req.NodeId, ddosProtection) if err != nil { return nil, err } return this.Success() } // FindNodeGlobalServerConfig 取得节点的服务全局配置 func (this *NodeService) FindNodeGlobalServerConfig(ctx context.Context, req *pb.FindNodeGlobalServerConfigRequest) (*pb.FindNodeGlobalServerConfigResponse, error) { var nodeId = req.NodeId _, err := this.ValidateAdmin(ctx) if err != nil { // 检查是否来自节点 currentNodeId, err2 := this.ValidateNode(ctx) if err2 != nil { return nil, err } if nodeId > 0 && currentNodeId != nodeId { return nil, errors.New("invalid 'nodeId'") } nodeId = currentNodeId } var tx = this.NullTx() clusterId, err := models.SharedNodeDAO.FindNodeClusterId(tx, nodeId) if err != nil { return nil, err } var config *serverconfigs.GlobalServerConfig if clusterId > 0 { config, err = models.SharedNodeClusterDAO.FindClusterGlobalServerConfig(tx, clusterId) if err != nil { return nil, err } } if config == nil { config = serverconfigs.DefaultGlobalServerConfig() } configJSON, err := json.Marshal(config) if err != nil { return nil, err } var result = &pb.FindNodeGlobalServerConfigResponse{ GlobalServerConfigJSON: configJSON, } return result, nil } // FindEnabledNodeConfigInfo 取得节点的配置概要信息 func (this *NodeService) FindEnabledNodeConfigInfo(ctx context.Context, req *pb.FindEnabledNodeConfigInfoRequest) (*pb.FindEnabledNodeConfigInfoResponse, error) { _, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } var tx = this.NullTx() var result = &pb.FindEnabledNodeConfigInfoResponse{} node, err := models.SharedNodeDAO.FindEnabledNode(tx, req.NodeId) if err != nil { return nil, err } if node == nil { // 总是返回非空 return result, nil } // dns if len(node.DNSRouteCodes()) > 0 { result.HasDNSInfo = true } // cache if len(node.CacheDiskDir) > 0 { result.HasCacheInfo = true } else { var diskCapacity = node.DecodeMaxCacheDiskCapacity() var memoryCapacity = node.DecodeMaxCacheMemoryCapacity() if (diskCapacity != nil && diskCapacity.IsNotEmpty()) || (memoryCapacity != nil && memoryCapacity.IsNotEmpty()) { result.HasCacheInfo = true } } // thresholds countThresholds, err := models.SharedNodeThresholdDAO.CountAllEnabledThresholds(tx, nodeconfigs.NodeRoleNode, 0, req.NodeId) if err != nil { return nil, err } result.HasThresholds = countThresholds > 0 // ssh nodeLogin, err := models.SharedNodeLoginDAO.FindEnabledNodeLoginWithNodeId(tx, nodeconfigs.NodeRoleNode, req.NodeId) if err != nil { return nil, err } if nodeLogin != nil { sshParams, err := nodeLogin.DecodeSSHParams() if err != nil { return nil, err } if sshParams != nil { result.HasSSH = len(sshParams.Host) > 0 || sshParams.Port > 0 } } // systemSettings if node.MaxCPU > 0 { result.HasSystemSettings = true } else { // dns resolver var dnsResolverConfig = node.DecodeDNSResolver() if dnsResolverConfig != nil { result.HasSystemSettings = dnsResolverConfig.Type != nodeconfigs.DNSResolverTypeDefault } if !result.HasSystemSettings { // api node addresses var apiNodeAddrs = node.DecodeAPINodeAddrs() if len(apiNodeAddrs) > 0 { result.HasSystemSettings = true } } } // ddos protection result.HasDDoSProtection = node.HasDDoSProtection() return result, nil } // CountAllNodeRegionInfo 查找节点区域信息数量 func (this *NodeService) CountAllNodeRegionInfo(ctx context.Context, req *pb.CountAllNodeRegionInfoRequest) (*pb.RPCCountResponse, error) { _, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } var tx = this.NullTx() count, err := models.SharedNodeDAO.CountAllNodeRegionInfo(tx, req.NodeRegionId) if err != nil { return nil, err } return this.SuccessCount(count) } // ListNodeRegionInfo 列出单页节点区域信息 func (this *NodeService) ListNodeRegionInfo(ctx context.Context, req *pb.ListNodeRegionInfoRequest) (*pb.ListNodeRegionInfoResponse, error) { _, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } var tx = this.NullTx() nodes, err := models.SharedNodeDAO.ListNodeRegionInfo(tx, req.NodeRegionId, req.Offset, req.Size) if err != nil { return nil, err } var pbInfoList = []*pb.ListNodeRegionInfoResponse_Info{} var cacheMap = utils.NewCacheMap() for _, node := range nodes { // region var pbRegion *pb.NodeRegion if node.RegionId > 0 { region, err := models.SharedNodeRegionDAO.FindEnabledNodeRegion(tx, int64(node.RegionId)) if err != nil { return nil, err } if region != nil { pbRegion = &pb.NodeRegion{ Id: int64(region.Id), Name: region.Name, IsOn: region.IsOn, } } } // cluster // 要求必须有cluster var pbCluster *pb.NodeCluster if node.ClusterId <= 0 { continue } cluster, err := models.SharedNodeClusterDAO.FindClusterBasicInfo(tx, int64(node.ClusterId), cacheMap) if err != nil { return nil, err } if cluster == nil { continue } pbCluster = &pb.NodeCluster{ Id: int64(cluster.Id), Name: cluster.Name, IsOn: cluster.IsOn, } pbInfoList = append(pbInfoList, &pb.ListNodeRegionInfoResponse_Info{ Id: int64(node.Id), Name: node.Name, NodeRegion: pbRegion, NodeCluster: pbCluster, }) } return &pb.ListNodeRegionInfoResponse{ InfoList: pbInfoList, }, nil } // UpdateNodeRegionInfo 修改节点区域信息 func (this *NodeService) UpdateNodeRegionInfo(ctx context.Context, req *pb.UpdateNodeRegionInfoRequest) (*pb.RPCSuccess, error) { _, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } var tx = this.NullTx() err = models.SharedNodeDAO.UpdateNodeRegionId(tx, req.NodeId, req.NodeRegionId) if err != nil { return nil, err } return this.Success() } // FindNodeAPIConfig 查找单个节点的API相关配置 func (this *NodeService) FindNodeAPIConfig(ctx context.Context, req *pb.FindNodeAPIConfigRequest) (*pb.FindNodeAPIConfigResponse, error) { _, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } var tx = this.NullTx() node, err := models.SharedNodeDAO.FindNodeAPIConfig(tx, req.NodeId) if err != nil { return nil, err } if node == nil { return &pb.FindNodeAPIConfigResponse{ ApiNodeAddrsJSON: nil, }, nil } return &pb.FindNodeAPIConfigResponse{ ApiNodeAddrsJSON: node.ApiNodeAddrs, }, nil } // UpdateNodeAPIConfig 修改某个节点的API相关配置 func (this *NodeService) UpdateNodeAPIConfig(ctx context.Context, req *pb.UpdateNodeAPIConfigRequest) (*pb.RPCSuccess, error) { _, err := this.ValidateAdmin(ctx) if err != nil { return nil, err } var tx = this.NullTx() var apiNodeAddrs = []*serverconfigs.NetworkAddressConfig{} if len(req.ApiNodeAddrsJSON) > 0 { err = json.Unmarshal(req.ApiNodeAddrsJSON, &apiNodeAddrs) if err != nil { return nil, err } } err = models.SharedNodeDAO.UpdateNodeAPIConfig(tx, req.NodeId, apiNodeAddrs) if err != nil { return nil, err } return this.Success() }