增加节点同步状态提示和任务列表

This commit is contained in:
GoEdgeLab
2021-01-17 16:48:00 +08:00
parent 381f83c94c
commit 747cdac7cf
49 changed files with 1959 additions and 580 deletions

View File

@@ -51,7 +51,7 @@ func (this *HTTPAccessLogService) ListHTTPAccessLogs(ctx context.Context, req *p
return nil, errors.New("invalid serverId")
}
err = models.SharedServerDAO.CheckUserServer(tx, req.ServerId, userId)
err = models.SharedServerDAO.CheckUserServer(tx, userId, req.ServerId)
if err != nil {
return nil, err
}
@@ -98,7 +98,7 @@ func (this *HTTPAccessLogService) FindHTTPAccessLog(ctx context.Context, req *pb
// 检查权限
if userId > 0 {
err = models.SharedServerDAO.CheckUserServer(tx, int64(accessLog.ServerId), userId)
err = models.SharedServerDAO.CheckUserServer(tx, userId, int64(accessLog.ServerId))
if err != nil {
return nil, err
}

View File

@@ -52,6 +52,12 @@ func (this *IPItemService) CreateIPItem(ctx context.Context, req *pb.CreateIPIte
return nil, err
}
// 通知更新
err = models.SharedIPListDAO.NotifyUpdate(tx, req.IpListId, models.NodeTaskTypeIPItemChanged)
if err != nil {
return nil, err
}
return &pb.CreateIPItemResponse{IpItemId: itemId}, nil
}
@@ -81,6 +87,13 @@ func (this *IPItemService) UpdateIPItem(ctx context.Context, req *pb.UpdateIPIte
if err != nil {
return nil, err
}
// 通知更新
err = models.SharedIPItemDAO.NotifyClustersUpdate(tx, req.IpItemId, models.NodeTaskTypeIPItemChanged)
if err != nil {
return nil, err
}
return this.Success()
}
@@ -110,6 +123,13 @@ func (this *IPItemService) DeleteIPItem(ctx context.Context, req *pb.DeleteIPIte
if err != nil {
return nil, err
}
// 通知更新
err = models.SharedIPItemDAO.NotifyClustersUpdate(tx, req.IpItemId, models.NodeTaskTypeIPItemChanged)
if err != nil {
return nil, err
}
return this.Success()
}

View File

@@ -351,6 +351,12 @@ func (this *NodeService) DeleteNode(ctx context.Context, req *pb.DeleteNodeReque
}
}()
// 删除节点相关任务
err = models.SharedNodeTaskDAO.DeleteNodeTasks(tx, req.NodeId)
if err != nil {
return nil, err
}
return this.Success()
}
@@ -577,23 +583,6 @@ func (this *NodeService) UpdateNodeStatus(ctx context.Context, req *pb.UpdateNod
return this.Success()
}
// 同步集群中的节点版本
func (this *NodeService) SyncNodesVersionWithCluster(ctx context.Context, req *pb.SyncNodesVersionWithClusterRequest) (*pb.SyncNodesVersionWithClusterResponse, error) {
_, _, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeAdmin)
if err != nil {
return nil, err
}
tx := this.NullTx()
err = models.SharedNodeDAO.SyncNodeVersionsWithCluster(tx, req.NodeClusterId)
if err != nil {
return nil, err
}
return &pb.SyncNodesVersionWithClusterResponse{}, nil
}
// 修改节点安装状态
func (this *NodeService) UpdateNodeIsInstalled(ctx context.Context, req *pb.UpdateNodeIsInstalledRequest) (*pb.RPCSuccess, error) {
_, _, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeAdmin)
@@ -1326,3 +1315,32 @@ func (this *NodeService) CountAllEnabledNodesWithNodeRegionId(ctx context.Contex
}
return this.SuccessCount(count)
}
// 根据一组ID获取节点信息
func (this *NodeService) FindEnabledNodesWithIds(ctx context.Context, req *pb.FindEnabledNodesWithIdsRequest) (*pb.FindEnabledNodesWithIdsResponse, error) {
_, err := this.ValidateAdmin(ctx, 0)
if err != nil {
return nil, err
}
tx := this.NullTx()
nodes, err := models.SharedNodeDAO.FindEnabledNodesWithIds(tx, req.NodeIds)
if err != nil {
return nil, err
}
pbNodes := []*pb.Node{}
for _, node := range nodes {
connectedAPINodeIds, err := node.DecodeConnectedAPINodeIds()
if err != nil {
return nil, err
}
pbNodes = append(pbNodes, &pb.Node{
Id: int64(node.Id),
IsOn: node.IsOn == 1,
IsActive: node.IsActive == 1,
ConnectedAPINodeIds: connectedAPINodeIds,
})
}
return &pb.FindEnabledNodesWithIdsResponse{Nodes: pbNodes}, nil
}

View File

@@ -83,6 +83,12 @@ func (this *NodeClusterService) DeleteNodeCluster(ctx context.Context, req *pb.D
return nil, err
}
// 删除相关任务
err = models.SharedNodeTaskDAO.DeleteAllClusterTasks(tx, req.NodeClusterId)
if err != nil {
return nil, err
}
return this.Success()
}
@@ -208,44 +214,6 @@ func (this *NodeClusterService) FindAllEnabledNodeClusters(ctx context.Context,
}, nil
}
// 查找所有变更的集群
func (this *NodeClusterService) FindAllChangedNodeClusters(ctx context.Context, req *pb.FindAllChangedNodeClustersRequest) (*pb.FindAllChangedNodeClustersResponse, error) {
_, _, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeAdmin)
if err != nil {
return nil, err
}
tx := this.NullTx()
clusterIds, err := models.SharedNodeDAO.FindChangedClusterIds(tx)
if err != nil {
return nil, err
}
if len(clusterIds) == 0 {
return &pb.FindAllChangedNodeClustersResponse{
NodeClusters: []*pb.NodeCluster{},
}, nil
}
result := []*pb.NodeCluster{}
for _, clusterId := range clusterIds {
cluster, err := models.SharedNodeClusterDAO.FindEnabledNodeCluster(tx, clusterId)
if err != nil {
return nil, err
}
if cluster == nil {
continue
}
result = append(result, &pb.NodeCluster{
Id: int64(cluster.Id),
Name: cluster.Name,
CreatedAt: int64(cluster.CreatedAt),
UniqueId: cluster.UniqueId,
Secret: cluster.Secret,
})
}
return &pb.FindAllChangedNodeClustersResponse{NodeClusters: result}, nil
}
// 计算所有集群数量
func (this *NodeClusterService) CountAllEnabledNodeClusters(ctx context.Context, req *pb.CountAllEnabledNodeClustersRequest) (*pb.RPCCountResponse, error) {
_, _, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeAdmin)
@@ -659,12 +627,6 @@ func (this *NodeClusterService) UpdateNodeClusterTOA(ctx context.Context, req *p
return nil, err
}
// 增加节点版本号
err = models.SharedNodeDAO.IncreaseAllNodesLatestVersionMatch(tx, req.NodeClusterId)
if err != nil {
return nil, err
}
return this.Success()
}
@@ -764,12 +726,6 @@ func (this *NodeClusterService) UpdateNodeClusterHTTPCachePolicyId(ctx context.C
return nil, err
}
// 增加节点版本号
err = models.SharedNodeDAO.IncreaseAllNodesLatestVersionMatch(tx, req.NodeClusterId)
if err != nil {
return nil, err
}
return this.Success()
}
@@ -787,12 +743,6 @@ func (this *NodeClusterService) UpdateNodeClusterHTTPFirewallPolicyId(ctx contex
return nil, err
}
// 增加节点版本号
err = models.SharedNodeDAO.IncreaseAllNodesLatestVersionMatch(tx, req.NodeClusterId)
if err != nil {
return nil, err
}
return this.Success()
}
@@ -817,12 +767,6 @@ func (this *NodeClusterService) UpdateNodeClusterSystemService(ctx context.Conte
return nil, err
}
// 增加节点版本号
err = models.SharedNodeDAO.IncreaseAllNodesLatestVersionMatch(tx, req.NodeClusterId)
if err != nil {
return nil, err
}
return this.Success()
}

View File

@@ -0,0 +1,243 @@
package services
import (
"context"
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/iwind/TeaGo/dbs"
"time"
)
// 节点同步任务相关服务
type NodeTaskService struct {
BaseService
}
// 获取单节点同步任务
func (this *NodeTaskService) FindNodeTasks(ctx context.Context, req *pb.FindNodeTasksRequest) (*pb.FindNodeTasksResponse, error) {
nodeId, err := this.ValidateNode(ctx)
if err != nil {
return nil, err
}
_ = req
var tx = this.NullTx()
tasks, err := models.SharedNodeTaskDAO.FindDoingNodeTasks(tx, nodeId)
if err != nil {
return nil, err
}
pbTasks := []*pb.NodeTask{}
for _, task := range tasks {
pbTasks = append(pbTasks, &pb.NodeTask{
Id: int64(task.Id),
Type: task.Type,
})
}
return &pb.FindNodeTasksResponse{NodeTasks: pbTasks}, nil
}
// 报告同步任务结果
func (this *NodeTaskService) ReportNodeTaskDone(ctx context.Context, req *pb.ReportNodeTaskDoneRequest) (*pb.RPCSuccess, error) {
_, err := this.ValidateNode(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
err = models.SharedNodeTaskDAO.UpdateNodeTaskDone(tx, req.NodeTaskId, req.IsOk, req.Error)
if err != nil {
return nil, err
}
return this.Success()
}
// 获取所有正在同步的集群信息
func (this *NodeTaskService) FindNodeClusterTasks(ctx context.Context, req *pb.FindNodeClusterTasksRequest) (*pb.FindNodeClusterTasksResponse, error) {
_, err := this.ValidateAdmin(ctx, 0)
if err != nil {
return nil, err
}
_ = req
var tx = this.NullTx()
clusterIds, err := models.SharedNodeTaskDAO.FindAllDoingTaskClusterIds(tx)
if err != nil {
return nil, err
}
if len(clusterIds) == 0 {
return &pb.FindNodeClusterTasksResponse{ClusterTasks: []*pb.ClusterTask{}}, nil
}
pbClusterTasks := []*pb.ClusterTask{}
for _, clusterId := range clusterIds {
pbClusterTask := &pb.ClusterTask{}
clusterName, err := models.SharedNodeClusterDAO.FindNodeClusterName(tx, clusterId)
if err != nil {
return nil, err
}
pbClusterTask.ClusterId = clusterId
pbClusterTask.ClusterName = clusterName
// 错误的节点任务
pbNodeTasks := []*pb.NodeTask{}
// TODO 考虑节点特别多的情形比如只显示前100个
tasks, err := models.SharedNodeTaskDAO.FindAllDoingNodeTasksWithClusterId(tx, clusterId)
if err != nil {
return nil, err
}
for _, task := range tasks {
// 节点
nodeName, err := models.SharedNodeDAO.FindNodeName(tx, int64(task.NodeId))
if err != nil {
return nil, err
}
// 是否超时N秒内没有更新
if int64(task.UpdatedAt) < time.Now().Unix()-120 {
task.IsDone = 1
task.IsOk = 0
task.Error = "节点响应超时"
}
pbNodeTasks = append(pbNodeTasks, &pb.NodeTask{
Id: int64(task.Id),
Type: task.Type,
IsDone: task.IsDone == 1,
IsOk: task.IsOk == 1,
Error: task.Error,
UpdatedAt: int64(task.UpdatedAt),
Node: &pb.Node{
Id: int64(task.NodeId),
Name: nodeName,
},
})
}
pbClusterTask.NodeTasks = pbNodeTasks
pbClusterTasks = append(pbClusterTasks, pbClusterTask)
}
return &pb.FindNodeClusterTasksResponse{ClusterTasks: pbClusterTasks}, nil
}
// 检查是否有正在执行的任务
func (this *NodeTaskService) ExistsNodeTasks(ctx context.Context, req *pb.ExistsNodeTasksRequest) (*pb.ExistsNodeTasksResponse, error) {
_, err := this.ValidateAdmin(ctx, 0)
if err != nil {
return nil, err
}
_ = req
var tx = this.NullTx()
// 是否有任务
existTask, err := models.SharedNodeTaskDAO.ExistsDoingNodeTasks(tx)
if err != nil {
return nil, err
}
// 是否有错误
existError, err := models.SharedNodeTaskDAO.ExistsErrorNodeTasks(tx)
if err != nil {
return nil, err
}
return &pb.ExistsNodeTasksResponse{
ExistTasks: existTask,
ExistError: existError,
}, nil
}
// 删除任务
func (this *NodeTaskService) DeleteNodeTask(ctx context.Context, req *pb.DeleteNodeTaskRequest) (*pb.RPCSuccess, error) {
_, err := this.ValidateAdmin(ctx, 0)
if err != nil {
return nil, err
}
var tx = this.NullTx()
err = models.SharedNodeTaskDAO.DeleteNodeTask(tx, req.NodeTaskId)
if err != nil {
return nil, err
}
return this.Success()
}
// 计算正在执行的任务数量
func (this *NodeTaskService) CountDoingNodeTasks(ctx context.Context, req *pb.CountDoingNodeTasksRequest) (*pb.RPCCountResponse, error) {
_, err := this.ValidateAdmin(ctx, 0)
if err != nil {
return nil, err
}
_ = req
var tx = this.NullTx()
count, err := models.SharedNodeTaskDAO.CountDoingNodeTasks(tx)
if err != nil {
return nil, err
}
return this.SuccessCount(count)
}
// 查找需要通知的任务
func (this *NodeTaskService) FindNotifyingNodeTasks(ctx context.Context, req *pb.FindNotifyingNodeTasksRequest) (*pb.FindNotifyingNodeTasksResponse, error) {
_, err := this.ValidateAdmin(ctx, 0)
if err != nil {
return nil, err
}
if req.Size <= 0 {
req.Size = 100
}
if req.Size > 1000 {
req.Size = 1000
}
var tx = this.NullTx()
tasks, err := models.SharedNodeTaskDAO.FindNotifyingNodeTasks(tx, req.Size)
if err != nil {
return nil, err
}
pbTasks := []*pb.NodeTask{}
for _, task := range tasks {
pbTasks = append(pbTasks, &pb.NodeTask{
Id: int64(task.Id),
Type: task.Type,
IsDone: task.IsDone == 1,
IsOk: task.IsOk == 1,
Error: task.Error,
UpdatedAt: int64(task.UpdatedAt),
Node: &pb.Node{Id: int64(task.NodeId)},
})
}
return &pb.FindNotifyingNodeTasksResponse{NodeTasks: pbTasks}, nil
}
// 设置任务已通知
func (this *NodeTaskService) UpdateNodeTasksNotified(ctx context.Context, req *pb.UpdateNodeTasksNotifiedRequest) (*pb.RPCSuccess, error) {
_, err := this.ValidateAdmin(ctx, 0)
if err != nil {
return nil, err
}
err = this.RunTx(func(tx *dbs.Tx) error {
err = models.SharedNodeTaskDAO.UpdateTasksNotified(tx, req.NodeTaskIds)
return err
})
if err != nil {
return nil, err
}
return this.Success()
}

View File

@@ -84,12 +84,6 @@ func (this *ServerService) CreateServer(ctx context.Context, req *pb.CreateServe
return nil, err
}
// 更新节点版本
err = models.SharedNodeDAO.IncreaseAllNodesLatestVersionMatch(tx, req.NodeClusterId)
if err != nil {
return nil, err
}
return &pb.CreateServerResponse{ServerId: serverId}, nil
}
@@ -132,20 +126,6 @@ func (this *ServerService) UpdateServerBasic(ctx context.Context, req *pb.Update
}()
}
// 更新老的节点版本
if req.NodeClusterId != int64(server.ClusterId) {
err = models.SharedNodeDAO.IncreaseAllNodesLatestVersionMatch(tx, int64(server.ClusterId))
if err != nil {
return nil, err
}
}
// 更新新的节点版本
err = models.SharedNodeDAO.IncreaseAllNodesLatestVersionMatch(tx, req.NodeClusterId)
if err != nil {
return nil, err
}
return this.Success()
}
@@ -159,7 +139,7 @@ func (this *ServerService) UpdateServerIsOn(ctx context.Context, req *pb.UpdateS
tx := this.NullTx()
if userId > 0 {
err = models.SharedServerDAO.CheckUserServer(tx, req.ServerId, userId)
err = models.SharedServerDAO.CheckUserServer(tx, userId, req.ServerId)
if err != nil {
return nil, err
}
@@ -182,7 +162,7 @@ func (this *ServerService) UpdateServerHTTP(ctx context.Context, req *pb.UpdateS
tx := this.NullTx()
if userId > 0 {
err = models.SharedServerDAO.CheckUserServer(tx, req.ServerId, userId)
err = models.SharedServerDAO.CheckUserServer(tx, userId, req.ServerId)
if err != nil {
return nil, err
}
@@ -208,7 +188,7 @@ func (this *ServerService) UpdateServerHTTPS(ctx context.Context, req *pb.Update
tx := this.NullTx()
if userId > 0 {
err = models.SharedServerDAO.CheckUserServer(tx, req.ServerId, userId)
err = models.SharedServerDAO.CheckUserServer(tx, userId, req.ServerId)
if err != nil {
return nil, err
}
@@ -326,7 +306,7 @@ func (this *ServerService) UpdateServerWeb(ctx context.Context, req *pb.UpdateSe
tx := this.NullTx()
if userId > 0 {
err = models.SharedServerDAO.CheckUserServer(tx, req.ServerId, userId)
err = models.SharedServerDAO.CheckUserServer(tx, userId, req.ServerId)
if err != nil {
return nil, err
}
@@ -352,7 +332,7 @@ func (this *ServerService) UpdateServerReverseProxy(ctx context.Context, req *pb
tx := this.NullTx()
if userId > 0 {
err = models.SharedServerDAO.CheckUserServer(tx, req.ServerId, userId)
err = models.SharedServerDAO.CheckUserServer(tx, userId, req.ServerId)
if err != nil {
return nil, err
}
@@ -377,7 +357,7 @@ func (this *ServerService) FindServerNames(ctx context.Context, req *pb.FindServ
tx := this.NullTx()
if userId > 0 {
err = models.SharedServerDAO.CheckUserServer(tx, req.ServerId, userId)
err = models.SharedServerDAO.CheckUserServer(tx, userId, req.ServerId)
if err != nil {
return nil, err
}
@@ -625,33 +605,18 @@ func (this *ServerService) DeleteServer(ctx context.Context, req *pb.DeleteServe
tx := this.NullTx()
if userId > 0 {
err = models.SharedServerDAO.CheckUserServer(tx, req.ServerId, userId)
err = models.SharedServerDAO.CheckUserServer(tx, userId, req.ServerId)
if err != nil {
return nil, err
}
}
// 查找服务
server, err := models.SharedServerDAO.FindEnabledServer(tx, req.ServerId)
if err != nil {
return nil, err
}
if server == nil {
return nil, errors.New("can not find the server")
}
// 禁用服务
err = models.SharedServerDAO.DisableServer(tx, req.ServerId)
if err != nil {
return nil, err
}
// 更新节点版本
err = models.SharedNodeDAO.IncreaseAllNodesLatestVersionMatch(tx, int64(server.ClusterId))
if err != nil {
return nil, err
}
return this.Success()
}
@@ -667,7 +632,7 @@ func (this *ServerService) FindEnabledServer(ctx context.Context, req *pb.FindEn
// 检查权限
if userId > 0 {
err = models.SharedServerDAO.CheckUserServer(tx, req.ServerId, userId)
err = models.SharedServerDAO.CheckUserServer(tx, userId, req.ServerId)
if err != nil {
return nil, err
}
@@ -769,7 +734,7 @@ func (this *ServerService) FindEnabledServerConfig(ctx context.Context, req *pb.
// 检查权限
if userId > 0 {
err = models.SharedServerDAO.CheckUserServer(tx, req.ServerId, userId)
err = models.SharedServerDAO.CheckUserServer(tx, userId, req.ServerId)
if err != nil {
return nil, err
}
@@ -802,7 +767,7 @@ func (this *ServerService) FindEnabledServerType(ctx context.Context, req *pb.Fi
// 检查权限
if userId > 0 {
err = models.SharedServerDAO.CheckUserServer(tx, req.ServerId, userId)
err = models.SharedServerDAO.CheckUserServer(tx, userId, req.ServerId)
if err != nil {
return nil, err
}
@@ -880,7 +845,7 @@ func (this *ServerService) FindAndInitServerWebConfig(ctx context.Context, req *
tx := this.NullTx()
if userId > 0 {
err = models.SharedServerDAO.CheckUserServer(tx, req.ServerId, userId)
err = models.SharedServerDAO.CheckUserServer(tx, userId, req.ServerId)
if err != nil {
return nil, err
}
@@ -1022,10 +987,16 @@ func (this *ServerService) NotifyServersChange(ctx context.Context, req *pb.Noti
tx := this.NullTx()
err = models.SharedSysEventDAO.CreateEvent(tx, models.NewServerChangeEvent())
clusterIds, err := models.SharedNodeClusterDAO.FindAllEnableClusterIds(tx)
if err != nil {
return nil, err
}
for _, clusterId := range clusterIds {
err = models.SharedNodeClusterDAO.NotifyUpdate(tx, clusterId)
if err != nil {
return nil, err
}
}
return &pb.NotifyServersChangeResponse{}, nil
}
@@ -1167,7 +1138,7 @@ func (this *ServerService) CheckUserServer(ctx context.Context, req *pb.CheckUse
tx := this.NullTx()
err = models.SharedServerDAO.CheckUserServer(tx, req.ServerId, userId)
err = models.SharedServerDAO.CheckUserServer(tx, userId, req.ServerId)
if err != nil {
return nil, err
}
@@ -1217,7 +1188,7 @@ func (this *ServerService) FindEnabledUserServerBasic(ctx context.Context, req *
var tx = this.NullTx()
if userId > 0 {
err = models.SharedServerDAO.CheckUserServer(tx, req.ServerId, userId)
err = models.SharedServerDAO.CheckUserServer(tx, userId, req.ServerId)
if err != nil {
return nil, err
}
@@ -1250,7 +1221,7 @@ func (this *ServerService) UpdateEnabledUserServerBasic(ctx context.Context, req
var tx = this.NullTx()
if userId > 0 {
err = models.SharedServerDAO.CheckUserServer(tx, req.ServerId, userId)
err = models.SharedServerDAO.CheckUserServer(tx, userId, req.ServerId)
if err != nil {
return nil, err
}

View File

@@ -57,16 +57,6 @@ func (this *UserService) UpdateUser(ctx context.Context, req *pb.UpdateUserReque
if err != nil {
return nil, err
}
err = models.SharedNodeDAO.IncreaseAllNodesLatestVersionMatch(tx, oldClusterId)
if err != nil {
return nil, err
}
err = models.SharedNodeDAO.IncreaseAllNodesLatestVersionMatch(tx, req.NodeClusterId)
if err != nil {
return nil, err
}
}
return this.Success()
@@ -81,6 +71,18 @@ func (this *UserService) DeleteUser(ctx context.Context, req *pb.DeleteUserReque
tx := this.NullTx()
// 删除其下的Server
serverIds, err := models.SharedServerDAO.FindAllEnabledServerIdsWithUserId(tx, req.UserId)
if err != nil {
return nil, err
}
for _, serverId := range serverIds {
err := models.SharedServerDAO.DisableServer(tx, serverId)
if err != nil {
return nil, err
}
}
_, err = models.SharedUserDAO.DisableUser(tx, req.UserId)
if err != nil {
return nil, err