增加节点同步状态提示和任务列表

This commit is contained in:
GoEdgeLab
2021-01-17 16:47:29 +08:00
parent 9a6620d80b
commit b6ae2292c3
23 changed files with 557 additions and 245 deletions

View File

@@ -252,6 +252,10 @@ func (this *RPCClient) LoginRPC() pb.LoginServiceClient {
return pb.NewLoginServiceClient(this.pickConn())
}
func (this *RPCClient) NodeTaskRPC() pb.NodeTaskServiceClient {
return pb.NewNodeTaskServiceClient(this.pickConn())
}
// 构造Admin上下文
func (this *RPCClient) Context(adminId int64) context.Context {
ctx := context.Background()

View File

@@ -8,6 +8,7 @@ import (
"github.com/TeaOSLab/EdgeCommon/pkg/messageconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
_ "github.com/iwind/TeaGo/bootstrap"
"github.com/iwind/TeaGo/lists"
"github.com/iwind/TeaGo/logs"
"time"
)
@@ -48,24 +49,32 @@ func (this *SyncClusterTask) loop() error {
return err
}
ctx := rpcClient.Context(0)
resp, err := rpcClient.NodeClusterRPC().FindAllChangedNodeClusters(ctx, &pb.FindAllChangedNodeClustersRequest{})
tasksResp, err := rpcClient.NodeTaskRPC().FindNotifyingNodeTasks(ctx, &pb.FindNotifyingNodeTasksRequest{Size: 100})
if err != nil {
return err
}
nodeIds := []int64{}
taskIds := []int64{}
for _, task := range tasksResp.NodeTasks {
if !lists.ContainsInt64(nodeIds, task.Node.Id) {
nodeIds = append(nodeIds, task.Node.Id)
}
taskIds = append(taskIds, task.Id)
}
if len(nodeIds) == 0 {
return nil
}
_, err = nodeutils.SendMessageToNodeIds(ctx, nodeIds, messageconfigs.MessageCodeNewNodeTask, &messageconfigs.NewNodeTaskMessage{}, 3)
if err != nil {
return err
}
for _, cluster := range resp.NodeClusters {
_, err := rpcClient.NodeRPC().SyncNodesVersionWithCluster(ctx, &pb.SyncNodesVersionWithClusterRequest{
NodeClusterId: cluster.Id,
})
// 设置已通知
_, err = rpcClient.NodeTaskRPC().UpdateNodeTasksNotified(ctx, &pb.UpdateNodeTasksNotifiedRequest{NodeTaskIds: taskIds})
if err != nil {
return err
}
// 发送通知
_, err = nodeutils.SendMessageToCluster(ctx, cluster.Id, messageconfigs.MessageCodeConfigChanged, &messageconfigs.ConfigChangedMessage{}, 10)
if err != nil {
return err
}
}
return nil
}

View File

@@ -1,37 +0,0 @@
package clusters
import (
"github.com/TeaOSLab/EdgeAdmin/internal/web/actions/actionutils"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/iwind/TeaGo/maps"
)
// 检查变更的集群列表
type CheckChangeAction struct {
actionutils.ParentAction
}
func (this *CheckChangeAction) Init() {
this.Nav("", "", "")
}
func (this *CheckChangeAction) RunPost(params struct {
IsNotifying bool
}) {
resp, err := this.RPC().NodeClusterRPC().FindAllChangedNodeClusters(this.AdminContext(), &pb.FindAllChangedNodeClustersRequest{})
if err != nil {
this.ErrorPage(err)
return
}
result := []maps.Map{}
for _, cluster := range resp.NodeClusters {
result = append(result, maps.Map{
"id": cluster.Id,
"name": cluster.Name,
})
}
this.Data["clusters"] = result
this.Success()
}

View File

@@ -15,8 +15,6 @@ func init() {
Prefix("/clusters").
Get("", new(IndexAction)).
GetPost("/create", new(CreateAction)).
Post("/sync", new(SyncAction)).
Post("/checkChange", new(CheckChangeAction)).
// 只要登录即可访问的Action
EndHelpers().

View File

@@ -1,44 +0,0 @@
package clusters
import (
"github.com/TeaOSLab/EdgeAdmin/internal/web/actions/actionutils"
"github.com/TeaOSLab/EdgeAdmin/internal/web/actions/default/nodes/nodeutils"
"github.com/TeaOSLab/EdgeCommon/pkg/messageconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
)
// 同步集群
type SyncAction struct {
actionutils.ParentAction
}
func (this *SyncAction) RunPost(params struct{}) {
// TODO 将来可以单独选择某一个集群进行单独的同步
// 所有有变化的集群
clustersResp, err := this.RPC().NodeClusterRPC().FindAllChangedNodeClusters(this.AdminContext(), &pb.FindAllChangedNodeClustersRequest{})
if err != nil {
this.ErrorPage(err)
return
}
clusters := clustersResp.NodeClusters
for _, cluster := range clusters {
_, err := this.RPC().NodeRPC().SyncNodesVersionWithCluster(this.AdminContext(), &pb.SyncNodesVersionWithClusterRequest{
NodeClusterId: cluster.Id,
})
if err != nil {
this.ErrorPage(err)
return
}
// 发送通知
_, err = nodeutils.SendMessageToCluster(this.AdminContext(), cluster.Id, messageconfigs.MessageCodeConfigChanged, &messageconfigs.ConfigChangedMessage{}, 10)
if err != nil {
this.ErrorPage(err)
return
}
}
this.Success()
}

View File

@@ -0,0 +1,23 @@
package tasks
import (
"github.com/TeaOSLab/EdgeAdmin/internal/web/actions/actionutils"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
)
type CheckAction struct {
actionutils.ParentAction
}
func (this *CheckAction) RunPost(params struct{}) {
resp, err := this.RPC().NodeTaskRPC().ExistsNodeTasks(this.AdminContext(), &pb.ExistsNodeTasksRequest{})
if err != nil {
this.ErrorPage(err)
return
}
this.Data["isDoing"] = resp.ExistTasks
this.Data["hasError"] = resp.ExistError
this.Success()
}

View File

@@ -0,0 +1,24 @@
package tasks
import (
"github.com/TeaOSLab/EdgeAdmin/internal/web/actions/actionutils"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
)
type DeleteAction struct {
actionutils.ParentAction
}
func (this *DeleteAction) RunPost(params struct {
TaskId int64
}) {
defer this.CreateLogInfo("删除同步任务 %d", params.TaskId)
_, err := this.RPC().NodeTaskRPC().DeleteNodeTask(this.AdminContext(), &pb.DeleteNodeTaskRequest{NodeTaskId: params.TaskId})
if err != nil {
this.ErrorPage(err)
return
}
this.Success()
}

View File

@@ -0,0 +1,22 @@
package tasks
import (
"github.com/TeaOSLab/EdgeAdmin/internal/configloaders"
"github.com/TeaOSLab/EdgeAdmin/internal/web/actions/default/clusters/clusterutils"
"github.com/TeaOSLab/EdgeAdmin/internal/web/helpers"
"github.com/iwind/TeaGo"
)
func init() {
TeaGo.BeforeStart(func(server *TeaGo.Server) {
server.
Helper(helpers.NewUserMustAuth(configloaders.AdminModuleCodeNode)).
Helper(clusterutils.NewClustersHelper()).
Prefix("/clusters/tasks").
GetPost("/listPopup", new(ListPopupAction)).
Post("/check", new(CheckAction)).
Post("/delete", new(DeleteAction)).
EndAll()
})
}

View File

@@ -0,0 +1,67 @@
package tasks
import (
"github.com/TeaOSLab/EdgeAdmin/internal/web/actions/actionutils"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/iwind/TeaGo/actions"
"github.com/iwind/TeaGo/maps"
timeutil "github.com/iwind/TeaGo/utils/time"
)
type ListPopupAction struct {
actionutils.ParentAction
}
func (this *ListPopupAction) Init() {
this.Nav("", "", "")
}
func (this *ListPopupAction) RunGet(params struct{}) {
this.retrieveTasks()
this.Show()
}
func (this *ListPopupAction) RunPost(params struct {
Must *actions.Must
}) {
this.retrieveTasks()
this.Success()
}
func (this *ListPopupAction) retrieveTasks() {
resp, err := this.RPC().NodeTaskRPC().FindNodeClusterTasks(this.AdminContext(), &pb.FindNodeClusterTasksRequest{})
if err != nil {
this.ErrorPage(err)
return
}
countTasks := 0
clusterMaps := []maps.Map{}
for _, cluster := range resp.ClusterTasks {
taskMaps := []maps.Map{}
for _, task := range cluster.NodeTasks {
countTasks++
taskMaps = append(taskMaps, maps.Map{
"id": task.Id,
"type": task.Type,
"node": maps.Map{
"id": task.Node.Id,
"name": task.Node.Name,
},
"isOk": task.IsOk,
"error": task.Error,
"isDone": task.IsDone,
"updatedTime": timeutil.FormatTime("Y-m-d H:i:s", task.UpdatedAt),
})
}
clusterMaps = append(clusterMaps, maps.Map{
"id": cluster.ClusterId,
"name": cluster.ClusterName,
"tasks": taskMaps,
})
}
this.Data["clusters"] = clusterMaps
this.Data["countTasks"] = countTasks
}

View File

@@ -47,6 +47,176 @@ func SendMessageToCluster(ctx context.Context, clusterId int64, code string, msg
wg := &sync.WaitGroup{}
wg.Add(len(nodes))
for _, node := range nodes {
// TODO 检查是否在线
if len(node.ConnectedAPINodeIds) == 0 {
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: false,
Message: "节点尚未连接到API",
})
locker.Unlock()
wg.Done()
continue
}
// 获取API节点信息
apiNodeId := node.ConnectedAPINodeIds[0]
rpcClient, ok := rpcMap[apiNodeId]
if !ok {
apiNodeResp, err := defaultRPCClient.APINodeRPC().FindEnabledAPINode(ctx, &pb.FindEnabledAPINodeRequest{NodeId: apiNodeId})
if err != nil {
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: false,
Message: "无法读取对应的API节点信息" + err.Error(),
})
locker.Unlock()
wg.Done()
continue
}
if apiNodeResp.Node == nil {
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: false,
Message: "无法读取对应的API节点信息API节点ID" + strconv.FormatInt(apiNodeId, 10),
})
locker.Unlock()
wg.Done()
continue
}
apiNode := apiNodeResp.Node
apiRPCClient, err := rpc.NewRPCClient(&configs.APIConfig{
RPC: struct {
Endpoints []string `yaml:"endpoints"`
}{
Endpoints: apiNode.AccessAddrs,
},
NodeId: apiNode.UniqueId,
Secret: apiNode.Secret,
})
if err != nil {
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: false,
Message: "初始化API节点错误API节点ID" + strconv.FormatInt(apiNodeId, 10) + "" + err.Error(),
})
locker.Unlock()
wg.Done()
continue
}
rpcMap[apiNodeId] = apiRPCClient
rpcClient = apiRPCClient
}
// 发送消息
go func(node *pb.Node) {
defer wg.Done()
result, err := rpcClient.NodeRPC().SendCommandToNode(ctx, &pb.NodeStreamMessage{
NodeId: node.Id,
TimeoutSeconds: timeoutSeconds,
Code: code,
DataJSON: msgJSON,
})
if err != nil {
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: false,
Message: "API返回错误" + err.Error(),
})
locker.Unlock()
return
}
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: result.IsOk,
Message: result.Message,
})
locker.Unlock()
}(node)
}
wg.Wait()
return
}
// 向一组节点发送命令消息
func SendMessageToNodeIds(ctx context.Context, nodeIds []int64, code string, msg interface{}, timeoutSeconds int32) (results []*MessageResult, err error) {
results = []*MessageResult{}
if len(nodeIds) == 0 {
return
}
msgJSON, err := json.Marshal(msg)
if err != nil {
return results, err
}
defaultRPCClient, err := rpc.SharedRPC()
if err != nil {
return results, err
}
// 获取所有节点
nodesResp, err := defaultRPCClient.NodeRPC().FindEnabledNodesWithIds(ctx, &pb.FindEnabledNodesWithIdsRequest{NodeIds: nodeIds})
if err != nil {
return nil, err
}
nodes := nodesResp.Nodes
if len(nodes) == 0 {
return results, nil
}
rpcMap := map[int64]*rpc.RPCClient{} // apiNodeId => RPCClient
locker := &sync.Mutex{}
wg := &sync.WaitGroup{}
wg.Add(len(nodes))
for _, node := range nodes {
if !node.IsActive {
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: false,
Message: "节点不在线",
})
locker.Unlock()
wg.Done()
continue
}
if !node.IsOn {
if !node.IsActive {
locker.Lock()
results = append(results, &MessageResult{
NodeId: node.Id,
NodeName: node.Name,
IsOK: false,
Message: "节点未启用",
})
locker.Unlock()
wg.Done()
continue
}
}
if len(node.ConnectedAPINodeIds) == 0 {
locker.Lock()
results = append(results, &MessageResult{

View File

@@ -4,7 +4,6 @@ import (
"github.com/TeaOSLab/EdgeAdmin/internal/oplogs"
"github.com/TeaOSLab/EdgeAdmin/internal/utils"
"github.com/TeaOSLab/EdgeAdmin/internal/web/actions/actionutils"
"github.com/TeaOSLab/EdgeAdmin/internal/web/actions/default/servers/components/waf/ipadmin/ipadminutils"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/dao"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/iwind/TeaGo/actions"
@@ -83,13 +82,6 @@ func (this *CreateIPPopupAction) RunPost(params struct {
}
itemId := createResp.IpItemId
// 发送通知
err = ipadminutils.NotifyUpdateToClustersWithFirewallPolicyId(this.AdminContext(), params.FirewallPolicyId)
if err != nil {
this.ErrorPage(err)
return
}
// 日志
defer this.CreateLog(oplogs.LevelInfo, "在WAF策略 %d 名单中添加IP %d", params.FirewallPolicyId, itemId)

View File

@@ -3,7 +3,6 @@ package ipadmin
import (
"github.com/TeaOSLab/EdgeAdmin/internal/oplogs"
"github.com/TeaOSLab/EdgeAdmin/internal/web/actions/actionutils"
"github.com/TeaOSLab/EdgeAdmin/internal/web/actions/default/servers/components/waf/ipadmin/ipadminutils"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
)
@@ -26,12 +25,5 @@ func (this *DeleteIPAction) RunPost(params struct {
return
}
// 发送通知
err = ipadminutils.NotifyUpdateToClustersWithFirewallPolicyId(this.AdminContext(), params.FirewallPolicyId)
if err != nil {
this.ErrorPage(err)
return
}
this.Success()
}

View File

@@ -1,28 +0,0 @@
package ipadminutils
import (
"context"
"github.com/TeaOSLab/EdgeAdmin/internal/rpc"
"github.com/TeaOSLab/EdgeAdmin/internal/web/actions/default/nodes/nodeutils"
"github.com/TeaOSLab/EdgeCommon/pkg/messageconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
)
// 通知使用此WAF策略的集群更新
func NotifyUpdateToClustersWithFirewallPolicyId(ctx context.Context, firewallPolicyId int64) error {
client, err := rpc.SharedRPC()
if err != nil {
return err
}
resp, err := client.NodeClusterRPC().FindAllEnabledNodeClustersWithHTTPFirewallPolicyId(ctx, &pb.FindAllEnabledNodeClustersWithHTTPFirewallPolicyIdRequest{HttpFirewallPolicyId: firewallPolicyId})
if err != nil {
return err
}
for _, cluster := range resp.NodeClusters {
_, err = nodeutils.SendMessageToCluster(ctx, cluster.Id, messageconfigs.MessageCodeIPListChanged, &messageconfigs.IPListChangedMessage{}, 3)
if err != nil {
return err
}
}
return nil
}

View File

@@ -4,7 +4,6 @@ import (
"github.com/TeaOSLab/EdgeAdmin/internal/oplogs"
"github.com/TeaOSLab/EdgeAdmin/internal/utils"
"github.com/TeaOSLab/EdgeAdmin/internal/web/actions/actionutils"
"github.com/TeaOSLab/EdgeAdmin/internal/web/actions/default/servers/components/waf/ipadmin/ipadminutils"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/iwind/TeaGo/actions"
"github.com/iwind/TeaGo/maps"
@@ -95,12 +94,5 @@ func (this *UpdateIPPopupAction) RunPost(params struct {
return
}
// 发送通知
err = ipadminutils.NotifyUpdateToClustersWithFirewallPolicyId(this.AdminContext(), params.FirewallPolicyId)
if err != nil {
this.ErrorPage(err)
return
}
this.Success()
}

View File

@@ -110,6 +110,7 @@ func (this *userMustAuth) BeforeAction(actionPtr actions.ActionWrapper, paramNam
if !action.Data.Has("teaSubMenu") {
action.Data["teaSubMenu"] = ""
}
action.Data["teaCheckClusterTask"] = configloaders.AllowModule(adminId, configloaders.AdminModuleCodeNode)
// 菜单
action.Data["firstMenuItem"] = ""

View File

@@ -12,6 +12,7 @@ import (
_ "github.com/TeaOSLab/EdgeAdmin/internal/web/actions/default/clusters/grants"
_ "github.com/TeaOSLab/EdgeAdmin/internal/web/actions/default/clusters/regions"
_ "github.com/TeaOSLab/EdgeAdmin/internal/web/actions/default/clusters/regions/items"
_ "github.com/TeaOSLab/EdgeAdmin/internal/web/actions/default/clusters/tasks"
_ "github.com/TeaOSLab/EdgeAdmin/internal/web/actions/default/csrf"
_ "github.com/TeaOSLab/EdgeAdmin/internal/web/actions/default/dashboard"
_ "github.com/TeaOSLab/EdgeAdmin/internal/web/actions/default/db"

View File

@@ -27,12 +27,24 @@
</a>
<div class="right menu">
<!-- 集群同步 -->
<a href="" class="item" v-if="teaCheckClusterTask && doingNodeTasks.isUpdated" @click.prevent="showNodeTasks()">
<span v-if="!doingNodeTasks.isDoing && !doingNodeTasks.hasError"><i class="icon sync"></i>已同步</span>
<span v-if="doingNodeTasks.isDoing && !doingNodeTasks.hasError"><i class="icon sync"></i>正在同步...</span>
<span v-if="doingNodeTasks.hasError" class="red"><i class="icon sync"></i>同步失败</span>
</a>
<!-- 消息 -->
<a href="/messages" class="item" :class="{active:teaMenu == 'message'}"><span :class="{'blink':globalMessageBadge > 0}"><i class="icon bell"></i>消息({{globalMessageBadge}}) </span></a>
<!-- 用户信息 -->
<a href="/settings/profile" class="item">
<i class="icon user" v-if="teaUserAvatar.length == 0"></i>
<img class="avatar" alt="" :src="teaUserAvatar" v-if="teaUserAvatar.length > 0"/>
{{teaUsername}}
</a>
<!-- 退出登录 -->
<a :href="Tea.url('logout')" class="item" title="安全退出登录"><i class="icon sign out"></i>退出</a>
</div>
</div>

View File

@@ -13,6 +13,9 @@ Tea.context(function () {
// 检查消息
this.checkMessages()
// 检查集群节点同步
this.loadNodeTasks();
})
/**
@@ -69,6 +72,36 @@ Tea.context(function () {
if (window.IS_POPUP === true) {
this.success = window.NotifyPopup
}
/**
* 节点同步任务
*/
this.doingNodeTasks = {
isDoing: false,
hasError: false,
isUpdated: false
}
this.loadNodeTasks = function () {
this.$post("/clusters/tasks/check")
.success(function (resp) {
this.doingNodeTasks.isDoing = resp.data.isDoing
this.doingNodeTasks.hasError = resp.data.hasError
this.doingNodeTasks.isUpdated = true
})
.done(function () {
this.$delay(function () {
this.loadNodeTasks()
}, 5000)
})
}
this.showNodeTasks = function () {
teaweb.popup("/clusters/tasks/listPopup", {
height: "24em",
width: "50em"
})
}
});
window.NotifySuccess = function (message, url, params) {

View File

@@ -0,0 +1,6 @@
h3 span {
margin-left: 0.5em;
color: grey;
font-size: 0.6em !important;
}
/*# sourceMappingURL=listPopup.css.map */

View File

@@ -0,0 +1 @@
{"version":3,"sources":["listPopup.less"],"names":[],"mappings":"AAAA,EAAG;EACF,kBAAA;EACA,WAAA;EACA,2BAAA","file":"listPopup.css"}

View File

@@ -0,0 +1,38 @@
{$layout "layout_popup"}
<h3>正在同步的任务<span v-if="countTasks > 0">(共{{countTasks}}个)</span></h3>
<p class="comment" v-if="clusters.length == 0">暂时没有同步的集群。</p>
<div v-if="clusters.length > 0">
<table class="ui table selectable">
<thead>
<tr>
<th>集群</th>
<th>节点</th>
<th>任务</th>
<th>状态</th>
<th>触发时间</th>
<th></th>
</tr>
</thead>
<tbody v-for="cluster in clusters">
<tr v-for="task in cluster.tasks">
<td>{{cluster.name}}</td>
<td>
{{task.node.name}} &nbsp; <a :href="'/clusters/cluster/node?clusterId=' + cluster.id + '&nodeId=' + task.node.id" target="_blank"><i class="icon linkify small grey"></i></a>
</td>
<td>
<span v-if="task.type == 'configChanged'">同步配置</span>
<span v-if="task.type == 'ipItemChanged'">同步IP名单</span>
</td>
<td>
<span v-if="task.isDone" class="red">{{task.error}}</span>
<span v-else>正在同步...</span>
</td>
<td>{{task.updatedTime}}</td>
<td>
<a href="" title="删除" class="remove-btn" @click.prevent="deleteTask(task.id)"><i class="icon remove small grey"></i></a>
</td>
</tr>
</tbody>
</table>
</div>

View File

@@ -0,0 +1,31 @@
Tea.context(function () {
this.$delay(function () {
this.reload()
})
this.reload = function () {
this.$post("$")
.success(function (resp) {
this.countTasks = resp.data.countTasks
this.clusters = resp.data.clusters
})
.done(function () {
this.$delay(function () {
this.reload()
}, 5000)
})
}
this.deleteTask = function (taskId) {
let that = this
teaweb.confirm("确定要删除这个任务吗?", function () {
that.$post(".delete")
.params({
taskId: taskId
})
.success(function () {
teaweb.reload()
})
})
}
})

View File

@@ -0,0 +1,5 @@
h3 span {
margin-left: 0.5em;
color: grey;
font-size: 0.6em !important;
}