当服务配置变化时创建单个服务通知任务

This commit is contained in:
GoEdgeLab
2022-01-19 22:15:01 +08:00
parent 72668b68e9
commit f01cacc471
16 changed files with 60 additions and 26 deletions

View File

@@ -464,7 +464,7 @@ func (this *IPItemDAO) NotifyUpdate(tx *dbs.Tx, itemId int64) error {
return err return err
} }
for _, clusterId := range clusterIds { for _, clusterId := range clusterIds {
err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, NodeTaskTypeIPItemChanged) err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, 0, NodeTaskTypeIPItemChanged)
if err != nil { if err != nil {
return err return err
} }
@@ -472,7 +472,7 @@ func (this *IPItemDAO) NotifyUpdate(tx *dbs.Tx, itemId int64) error {
} else { } else {
clusterIds, err := SharedNodeClusterDAO.FindAllEnabledNodeClusterIds(tx) clusterIds, err := SharedNodeClusterDAO.FindAllEnabledNodeClusterIds(tx)
for _, clusterId := range clusterIds { for _, clusterId := range clusterIds {
err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, NodeTaskTypeIPItemChanged) err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, 0, NodeTaskTypeIPItemChanged)
if err != nil { if err != nil {
return err return err
} }
@@ -524,7 +524,7 @@ func (this *IPItemDAO) NotifyUpdate(tx *dbs.Tx, itemId int64) error {
if len(resultClusterIds) > 0 { if len(resultClusterIds) > 0 {
for _, clusterId := range resultClusterIds { for _, clusterId := range resultClusterIds {
err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, NodeTaskTypeIPItemChanged) err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, 0, NodeTaskTypeIPItemChanged)
if err != nil { if err != nil {
return err return err
} }

View File

@@ -284,7 +284,7 @@ func (this *IPListDAO) NotifyUpdate(tx *dbs.Tx, listId int64, taskType NodeTaskT
if len(resultClusterIds) > 0 { if len(resultClusterIds) > 0 {
for _, clusterId := range resultClusterIds { for _, clusterId := range resultClusterIds {
err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, taskType) err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, 0, taskType)
if err != nil { if err != nil {
return err return err
} }

View File

@@ -334,7 +334,7 @@ func (this *MetricItemDAO) NotifyUpdate(tx *dbs.Tx, itemId int64, isPublic bool)
return err return err
} }
for _, clusterId := range clusterIds { for _, clusterId := range clusterIds {
err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, NodeTaskTypeConfigChanged) err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, 0, NodeTaskTypeConfigChanged)
if err != nil { if err != nil {
return err return err
} }
@@ -346,7 +346,7 @@ func (this *MetricItemDAO) NotifyUpdate(tx *dbs.Tx, itemId int64, isPublic bool)
return err return err
} }
for _, clusterId := range clusterIds { for _, clusterId := range clusterIds {
err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, NodeTaskTypeConfigChanged) err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, 0, NodeTaskTypeConfigChanged)
if err != nil { if err != nil {
return err return err
} }

View File

@@ -282,7 +282,7 @@ func (this *NSDomainDAO) NotifyUpdate(tx *dbs.Tx, domainId int64) error {
return err return err
} }
if clusterId > 0 { if clusterId > 0 {
return models.SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleDNS, clusterId, models.NSNodeTaskTypeDomainChanged) return models.SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleDNS, clusterId, 0, models.NSNodeTaskTypeDomainChanged)
} }
return nil return nil

View File

@@ -199,7 +199,7 @@ func (this *NSKeyDAO) NotifyUpdate(tx *dbs.Tx, keyId int64) error {
return err return err
} }
if clusterId > 0 { if clusterId > 0 {
err = models.SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleDNS, clusterId, models.NSNodeTaskTypeKeyChanged) err = models.SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleDNS, clusterId, 0, models.NSNodeTaskTypeKeyChanged)
if err != nil { if err != nil {
return err return err
} }

View File

@@ -279,7 +279,7 @@ func (this *NSRecordDAO) NotifyUpdate(tx *dbs.Tx, recordId int64) error {
} }
if clusterId > 0 { if clusterId > 0 {
err = models.SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleDNS, clusterId, models.NSNodeTaskTypeRecordChanged) err = models.SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleDNS, clusterId, 0, models.NSNodeTaskTypeRecordChanged)
if err != nil { if err != nil {
return err return err
} }

View File

@@ -259,7 +259,7 @@ func (this *NSRouteDAO) NotifyUpdate(tx *dbs.Tx) error {
return err return err
} }
for _, clusterId := range clusterIds { for _, clusterId := range clusterIds {
err = models.SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleDNS, clusterId, models.NSNodeTaskTypeRouteChanged) err = models.SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleDNS, clusterId, 0, models.NSNodeTaskTypeRouteChanged)
if err != nil { if err != nil {
return err return err
} }

View File

@@ -901,7 +901,7 @@ func (this *NodeClusterDAO) FindClusterBasicInfo(tx *dbs.Tx, clusterId int64, ca
// NotifyUpdate 通知更新 // NotifyUpdate 通知更新
func (this *NodeClusterDAO) NotifyUpdate(tx *dbs.Tx, clusterId int64) error { func (this *NodeClusterDAO) NotifyUpdate(tx *dbs.Tx, clusterId int64) error {
return SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, NodeTaskTypeConfigChanged) return SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, 0, NodeTaskTypeConfigChanged)
} }
// NotifyDNSUpdate 通知DNS更新 // NotifyDNSUpdate 通知DNS更新

View File

@@ -177,5 +177,5 @@ func (this *NodeClusterMetricItemDAO) ExistsClusterItem(tx *dbs.Tx, clusterId in
// NotifyUpdate 通知更新 // NotifyUpdate 通知更新
func (this *NodeClusterMetricItemDAO) NotifyUpdate(tx *dbs.Tx, clusterId int64) error { func (this *NodeClusterMetricItemDAO) NotifyUpdate(tx *dbs.Tx, clusterId int64) error {
return SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, NodeTaskTypeConfigChanged) return SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, 0, NodeTaskTypeConfigChanged)
} }

View File

@@ -1471,7 +1471,7 @@ func (this *NodeDAO) NotifyUpdate(tx *dbs.Tx, nodeId int64) error {
return err return err
} }
if clusterId > 0 { if clusterId > 0 {
return SharedNodeTaskDAO.CreateNodeTask(tx, nodeconfigs.NodeRoleNode, clusterId, nodeId, NodeTaskTypeConfigChanged, 0) return SharedNodeTaskDAO.CreateNodeTask(tx, nodeconfigs.NodeRoleNode, clusterId, nodeId, 0, NodeTaskTypeConfigChanged, 0)
} }
return nil return nil
} }

View File

@@ -49,7 +49,7 @@ func init() {
} }
// CreateNodeTask 创建单个节点任务 // CreateNodeTask 创建单个节点任务
func (this *NodeTaskDAO) CreateNodeTask(tx *dbs.Tx, role string, clusterId int64, nodeId int64, taskType NodeTaskType, version int64) error { func (this *NodeTaskDAO) CreateNodeTask(tx *dbs.Tx, role string, clusterId int64, nodeId int64, serverId int64, taskType NodeTaskType, version int64) error {
if clusterId <= 0 || nodeId <= 0 { if clusterId <= 0 || nodeId <= 0 {
return nil return nil
} }
@@ -60,6 +60,7 @@ func (this *NodeTaskDAO) CreateNodeTask(tx *dbs.Tx, role string, clusterId int64
"role": role, "role": role,
"clusterId": clusterId, "clusterId": clusterId,
"nodeId": nodeId, "nodeId": nodeId,
"serverId": serverId,
"type": taskType, "type": taskType,
"uniqueId": uniqueId, "uniqueId": uniqueId,
"updatedAt": updatedAt, "updatedAt": updatedAt,
@@ -80,17 +81,18 @@ func (this *NodeTaskDAO) CreateNodeTask(tx *dbs.Tx, role string, clusterId int64
} }
// CreateClusterTask 创建集群任务 // CreateClusterTask 创建集群任务
func (this *NodeTaskDAO) CreateClusterTask(tx *dbs.Tx, role string, clusterId int64, taskType NodeTaskType) error { func (this *NodeTaskDAO) CreateClusterTask(tx *dbs.Tx, role string, clusterId int64, serverId int64, taskType NodeTaskType) error {
if clusterId <= 0 { if clusterId <= 0 {
return nil return nil
} }
uniqueId := role + "@" + types.String(clusterId) + "@cluster@" + taskType uniqueId := role + "@" + types.String(clusterId) + "@" + types.String(serverId) + "@cluster@" + taskType
updatedAt := time.Now().Unix() updatedAt := time.Now().Unix()
_, _, err := this.Query(tx). _, _, err := this.Query(tx).
InsertOrUpdate(maps.Map{ InsertOrUpdate(maps.Map{
"role": role, "role": role,
"clusterId": clusterId, "clusterId": clusterId,
"serverId": serverId,
"nodeId": 0, "nodeId": 0,
"type": taskType, "type": taskType,
"uniqueId": uniqueId, "uniqueId": uniqueId,
@@ -112,7 +114,7 @@ func (this *NodeTaskDAO) CreateClusterTask(tx *dbs.Tx, role string, clusterId in
} }
// ExtractNodeClusterTask 分解边缘节点集群任务 // ExtractNodeClusterTask 分解边缘节点集群任务
func (this *NodeTaskDAO) ExtractNodeClusterTask(tx *dbs.Tx, clusterId int64, taskType NodeTaskType) error { func (this *NodeTaskDAO) ExtractNodeClusterTask(tx *dbs.Tx, clusterId int64, serverId int64, taskType NodeTaskType) error {
nodeIds, err := SharedNodeDAO.FindAllNodeIdsMatch(tx, clusterId, true, configutils.BoolStateYes) nodeIds, err := SharedNodeDAO.FindAllNodeIdsMatch(tx, clusterId, true, configutils.BoolStateYes)
if err != nil { if err != nil {
return err return err
@@ -131,7 +133,7 @@ func (this *NodeTaskDAO) ExtractNodeClusterTask(tx *dbs.Tx, clusterId int64, tas
var version = time.Now().UnixNano() var version = time.Now().UnixNano()
for _, nodeId := range nodeIds { for _, nodeId := range nodeIds {
err = this.CreateNodeTask(tx, nodeconfigs.NodeRoleNode, clusterId, nodeId, taskType, version) err = this.CreateNodeTask(tx, nodeconfigs.NodeRoleNode, clusterId, nodeId, serverId, taskType, version)
if err != nil { if err != nil {
return err return err
} }
@@ -170,7 +172,7 @@ func (this *NodeTaskDAO) ExtractNSClusterTask(tx *dbs.Tx, clusterId int64, taskT
var version = time.Now().UnixNano() var version = time.Now().UnixNano()
for _, nodeId := range nodeIds { for _, nodeId := range nodeIds {
err = this.CreateNodeTask(tx, nodeconfigs.NodeRoleDNS, clusterId, nodeId, taskType, version) err = this.CreateNodeTask(tx, nodeconfigs.NodeRoleDNS, clusterId, nodeId, 0, taskType, version)
if err != nil { if err != nil {
return err return err
} }
@@ -202,7 +204,8 @@ func (this *NodeTaskDAO) ExtractAllClusterTasks(tx *dbs.Tx, role string) error {
clusterId := int64(one.(*NodeTask).ClusterId) clusterId := int64(one.(*NodeTask).ClusterId)
switch role { switch role {
case nodeconfigs.NodeRoleNode: case nodeconfigs.NodeRoleNode:
err = this.ExtractNodeClusterTask(tx, clusterId, one.(*NodeTask).Type) var nodeTask = one.(*NodeTask)
err = this.ExtractNodeClusterTask(tx, clusterId, int64(nodeTask.ServerId), nodeTask.Type)
if err != nil { if err != nil {
return err return err
} }

View File

@@ -6,6 +6,7 @@ type NodeTask struct {
Role string `field:"role"` // 节点角色 Role string `field:"role"` // 节点角色
NodeId uint32 `field:"nodeId"` // 节点ID NodeId uint32 `field:"nodeId"` // 节点ID
ClusterId uint32 `field:"clusterId"` // 集群ID ClusterId uint32 `field:"clusterId"` // 集群ID
ServerId uint32 `field:"serverId"` // 服务ID
Type string `field:"type"` // 任务类型 Type string `field:"type"` // 任务类型
UniqueId string `field:"uniqueId"` // 唯一IDnodeId@type UniqueId string `field:"uniqueId"` // 唯一IDnodeId@type
UpdatedAt uint64 `field:"updatedAt"` // 修改时间 UpdatedAt uint64 `field:"updatedAt"` // 修改时间
@@ -21,6 +22,7 @@ type NodeTaskOperator struct {
Role interface{} // 节点角色 Role interface{} // 节点角色
NodeId interface{} // 节点ID NodeId interface{} // 节点ID
ClusterId interface{} // 集群ID ClusterId interface{} // 集群ID
ServerId interface{} // 服务ID
Type interface{} // 任务类型 Type interface{} // 任务类型
UniqueId interface{} // 唯一IDnodeId@type UniqueId interface{} // 唯一IDnodeId@type
UpdatedAt interface{} // 修改时间 UpdatedAt interface{} // 修改时间

View File

@@ -195,5 +195,5 @@ func (this *NSClusterDAO) FindClusterRecursion(tx *dbs.Tx, clusterId int64) ([]b
// NotifyUpdate 通知更改 // NotifyUpdate 通知更改
func (this *NSClusterDAO) NotifyUpdate(tx *dbs.Tx, clusterId int64) error { func (this *NSClusterDAO) NotifyUpdate(tx *dbs.Tx, clusterId int64) error {
return SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleDNS, clusterId, NSNodeTaskTypeConfigChanged) return SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleDNS, clusterId, 0, NSNodeTaskTypeConfigChanged)
} }

View File

@@ -1500,11 +1500,11 @@ func (this *ServerDAO) UpdateUserServersClusterId(tx *dbs.Tx, userId int64, oldC
} }
if oldClusterId > 0 { if oldClusterId > 0 {
err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, oldClusterId, NodeTaskTypeConfigChanged) err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, oldClusterId, 0, NodeTaskTypeConfigChanged)
if err != nil { if err != nil {
return err return err
} }
err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, oldClusterId, NodeTaskTypeIPItemChanged) err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, oldClusterId, 0, NodeTaskTypeIPItemChanged)
if err != nil { if err != nil {
return err return err
} }
@@ -1515,11 +1515,11 @@ func (this *ServerDAO) UpdateUserServersClusterId(tx *dbs.Tx, userId int64, oldC
} }
if newClusterId > 0 { if newClusterId > 0 {
err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, newClusterId, NodeTaskTypeConfigChanged) err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, newClusterId, 0, NodeTaskTypeConfigChanged)
if err != nil { if err != nil {
return err return err
} }
err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, newClusterId, NodeTaskTypeIPItemChanged) err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, newClusterId, 0, NodeTaskTypeIPItemChanged)
if err != nil { if err != nil {
return err return err
} }
@@ -2260,7 +2260,7 @@ func (this *ServerDAO) NotifyUpdate(tx *dbs.Tx, serverId int64) error {
if clusterId == 0 { if clusterId == 0 {
return nil return nil
} }
return SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, NodeTaskTypeConfigChanged) return SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, serverId, NodeTaskTypeConfigChanged)
} }
// NotifyDNSUpdate 通知DNS更新 // NotifyDNSUpdate 通知DNS更新

View File

@@ -39,6 +39,7 @@ func (this *NodeTaskService) FindNodeTasks(ctx context.Context, req *pb.FindNode
Type: task.Type, Type: task.Type,
Version: int64(task.Version), Version: int64(task.Version),
IsPrimary: primaryNodeId == nodeId, IsPrimary: primaryNodeId == nodeId,
ServerId: int64(task.ServerId),
}) })
} }
@@ -137,6 +138,7 @@ func (this *NodeTaskService) FindNodeClusterTasks(ctx context.Context, req *pb.F
IsOk: task.IsOk == 1, IsOk: task.IsOk == 1,
Error: task.Error, Error: task.Error,
UpdatedAt: int64(task.UpdatedAt), UpdatedAt: int64(task.UpdatedAt),
ServerId: int64(task.ServerId),
Node: &pb.Node{ Node: &pb.Node{
Id: int64(task.NodeId), Id: int64(task.NodeId),
Name: nodeName, Name: nodeName,
@@ -261,6 +263,7 @@ func (this *NodeTaskService) FindNotifyingNodeTasks(ctx context.Context, req *pb
Error: task.Error, Error: task.Error,
UpdatedAt: int64(task.UpdatedAt), UpdatedAt: int64(task.UpdatedAt),
Node: &pb.Node{Id: int64(task.NodeId)}, Node: &pb.Node{Id: int64(task.NodeId)},
ServerId: int64(task.ServerId),
}) })
} }

View File

@@ -1924,3 +1924,29 @@ func (this *ServerService) FindServerUserPlan(ctx context.Context, req *pb.FindS
}, },
}, nil }, nil
} }
// ComposeServerConfig 获取服务配置
func (this *ServerService) ComposeServerConfig(ctx context.Context, req *pb.ComposeServerConfigRequest) (*pb.ComposeServerConfigResponse, error) {
_, err := this.ValidateNode(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
serverConfig, err := models.SharedServerDAO.ComposeServerConfigWithServerId(tx, req.ServerId, true)
if err != nil {
if err == models.ErrNotFound {
return &pb.ComposeServerConfigResponse{ServerConfigJSON: nil}, nil
}
return nil, err
}
if serverConfig == nil {
return &pb.ComposeServerConfigResponse{ServerConfigJSON: nil}, nil
}
configJSON, err := json.Marshal(serverConfig)
if err != nil {
return nil, err
}
return &pb.ComposeServerConfigResponse{ServerConfigJSON: configJSON}, nil
}