配置更新时立即向集群节点发消息

This commit is contained in:
GoEdgeLab
2020-10-09 12:03:32 +08:00
parent b64089335e
commit c1a4083a87
13 changed files with 36 additions and 39 deletions

View File

@@ -0,0 +1,65 @@
package clusters
import (
"github.com/TeaOSLab/EdgeAdmin/internal/web/actions/actionutils"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/iwind/TeaGo/maps"
"time"
)
// 检查变更的集群列表
type CheckChangeAction struct {
actionutils.ParentAction
}
func (this *CheckChangeAction) Init() {
this.Nav("", "", "")
}
func (this *CheckChangeAction) RunPost(params struct {
IsNotifying bool
}) {
timeout := time.NewTimer(55 * time.Second) // 比客户端提前结束,避免在客户端产生一个请求错误
this.Data["clusters"] = []interface{}{}
Loop:
for {
select {
case <-this.Request.Context().Done():
break Loop
case <-timeout.C:
break Loop
default:
// 继续
}
resp, err := this.RPC().NodeClusterRPC().FindAllChangedNodeClusters(this.AdminContext(), &pb.FindAllChangedNodeClustersRequest{})
if err != nil {
this.ErrorPage(err)
return
}
result := []maps.Map{}
for _, cluster := range resp.Clusters {
result = append(result, maps.Map{
"id": cluster.Id,
"name": cluster.Name,
})
}
// 从提醒到提醒消失
if len(result) == 0 && params.IsNotifying {
break
}
this.Data["clusters"] = result
if len(result) > 0 {
break
}
time.Sleep(1 * time.Second)
}
this.Success()
}

View File

@@ -13,6 +13,8 @@ func init() {
Prefix("/clusters").
Get("", new(IndexAction)).
GetPost("/create", new(CreateAction)).
Post("/sync", new(SyncAction)).
Post("/checkChange", new(CheckChangeAction)).
EndAll()
})
}

View File

@@ -0,0 +1,44 @@
package clusters
import (
"github.com/TeaOSLab/EdgeAdmin/internal/web/actions/actionutils"
"github.com/TeaOSLab/EdgeAdmin/internal/web/actions/default/nodes/nodeutils"
"github.com/TeaOSLab/EdgeCommon/pkg/messageconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
)
// 同步集群
type SyncAction struct {
actionutils.ParentAction
}
func (this *SyncAction) RunPost(params struct{}) {
// TODO 将来可以单独选择某一个集群进行单独的同步
// 所有有变化的集群
clustersResp, err := this.RPC().NodeClusterRPC().FindAllChangedNodeClusters(this.AdminContext(), &pb.FindAllChangedNodeClustersRequest{})
if err != nil {
this.ErrorPage(err)
return
}
clusters := clustersResp.Clusters
for _, cluster := range clusters {
_, err := this.RPC().NodeRPC().SyncNodesVersionWithCluster(this.AdminContext(), &pb.SyncNodesVersionWithClusterRequest{
ClusterId: cluster.Id,
})
if err != nil {
this.ErrorPage(err)
return
}
// 发送通知
_, err = nodeutils.SendMessageToCluster(this.AdminContext(), cluster.Id, messageconfigs.MessageCodeConfigChanged, &messageconfigs.ConfigChangedMessage{}, 10)
if err != nil {
this.ErrorPage(err)
return
}
}
this.Success()
}