diff --git a/internal/db/models/http_web_dao.go b/internal/db/models/http_web_dao.go index 015ff1d1..877e54ee 100644 --- a/internal/db/models/http_web_dao.go +++ b/internal/db/models/http_web_dao.go @@ -732,6 +732,32 @@ func (this *HTTPWebDAO) UpdateWebStat(tx *dbs.Tx, webId int64, statJSON []byte) return this.NotifyUpdate(tx, webId) } +// CopyWebStats 拷贝统计配置 +func (this *HTTPWebDAO) CopyWebStats(tx *dbs.Tx, fromWebId int64, toWebIds []int64) error { + if fromWebId <= 0 || len(toWebIds) == 0 { + return nil + } + + statJSON, err := this.Query(tx). + Pk(fromWebId). + Result("stat"). + FindJSONCol() + if err != nil { + return err + } + + // 暂时不处理 + if len(statJSON) == 0 { + return nil + } + + return this.Query(tx). + Pk(toWebIds). + Reuse(false). + Set("stat", statJSON). + UpdateQuickly() +} + // UpdateWebCache 更改缓存配置 func (this *HTTPWebDAO) UpdateWebCache(tx *dbs.Tx, webId int64, cacheJSON []byte) error { if webId <= 0 { diff --git a/internal/db/models/node_dao.go b/internal/db/models/node_dao.go index 99a31ef5..3164293c 100644 --- a/internal/db/models/node_dao.go +++ b/internal/db/models/node_dao.go @@ -1003,6 +1003,13 @@ func (this *NodeDAO) ComposeNodeConfig(tx *dbs.Tx, nodeId int64, dataMap *shared DataMap: dataMap, } + // 待更新服务ID + updatingServerListId, err := SharedUpdatingServerListDAO.FindLatestId(tx) + if err != nil { + return nil, err + } + config.UpdatingServerListId = updatingServerListId + // API节点IP apiNodeIPs, err := SharedAPINodeDAO.FindAllEnabledAPIAccessIPs(tx, cacheMap) if err != nil { diff --git a/internal/db/models/node_task_dao.go b/internal/db/models/node_task_dao.go index 8c8a3e21..73ddaa01 100644 --- a/internal/db/models/node_task_dao.go +++ b/internal/db/models/node_task_dao.go @@ -25,6 +25,7 @@ const ( NodeTaskTypeNodeLevelChanged NodeTaskType = "nodeLevelChanged" // 节点级别变化 NodeTaskTypeUserServersStateChanged NodeTaskType = "userServersStateChanged" // 用户服务状态变化 NodeTaskTypeUAMPolicyChanged NodeTaskType = "uamPolicyChanged" // UAM策略变化 + NodeTaskTypeUpdatingServers NodeTaskType = "updatingServers" // 更新一组服务 // NS相关 @@ -237,6 +238,7 @@ func (this *NodeTaskDAO) FindDoingNodeTasks(tx *dbs.Tx, role string, nodeId int6 var query = this.Query(tx). Attr("role", role). Attr("nodeId", nodeId). + UseIndex("nodeId"). Asc("version") if version > 0 { query.Lt("LENGTH(version)", 19) // 兼容以往版本 diff --git a/internal/db/models/node_task_dao_test.go b/internal/db/models/node_task_dao_test.go index e3f723e8..b99abb4d 100644 --- a/internal/db/models/node_task_dao_test.go +++ b/internal/db/models/node_task_dao_test.go @@ -1,17 +1,19 @@ -package models +package models_test import ( + "github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" _ "github.com/go-sql-driver/mysql" "github.com/iwind/TeaGo/dbs" "testing" + "time" ) func TestNodeTaskDAO_CreateNodeTask(t *testing.T) { dbs.NotifyReady() var tx *dbs.Tx - err := SharedNodeTaskDAO.CreateNodeTask(tx, nodeconfigs.NodeRoleNode, 1, 2, 0, 0, NodeTaskTypeConfigChanged) + err := models.SharedNodeTaskDAO.CreateNodeTask(tx, nodeconfigs.NodeRoleNode, 1, 2, 0, 0, models.NodeTaskTypeConfigChanged) if err != nil { t.Fatal(err) } @@ -22,7 +24,7 @@ func TestNodeTaskDAO_CreateClusterTask(t *testing.T) { dbs.NotifyReady() var tx *dbs.Tx - err := SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, 1, 0, 0, NodeTaskTypeConfigChanged) + err := models.SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, 1, 0, 0, models.NodeTaskTypeConfigChanged) if err != nil { t.Fatal(err) } @@ -33,9 +35,22 @@ func TestNodeTaskDAO_ExtractClusterTask(t *testing.T) { dbs.NotifyReady() var tx *dbs.Tx - err := SharedNodeTaskDAO.ExtractNodeClusterTask(tx, 1, 0, 0, NodeTaskTypeConfigChanged) + err := models.SharedNodeTaskDAO.ExtractNodeClusterTask(tx, 1, 0, 0, models.NodeTaskTypeConfigChanged) if err != nil { t.Fatal(err) } t.Log("ok") } + +func TestNodeTaskDAO_FindDoingNodeTasks(t *testing.T) { + var tx *dbs.Tx + var dao = models.NewNodeTaskDAO() + var before = time.Now() + defer func() { + t.Log(time.Since(before).Seconds()*1000, "ms") + }() + _, err := dao.FindDoingNodeTasks(tx, "node", 48, 0) + if err != nil { + t.Fatal(err) + } +} diff --git a/internal/db/models/server_dao.go b/internal/db/models/server_dao.go index 596cb048..c4b649d3 100644 --- a/internal/db/models/server_dao.go +++ b/internal/db/models/server_dao.go @@ -210,6 +210,15 @@ func (this *ServerDAO) CreateServer(tx *dbs.Tx, if IsNotNull(udpJSON) { op.Udp = udpJSON } + + // 如果没有Web配置,则创建之 + if webId <= 0 { + webId, err = SharedHTTPWebDAO.CreateWeb(tx, 0, userId, nil) + if err != nil { + return 0, err + } + } + op.WebId = webId if IsNotNull(reverseProxyJSON) { op.ReverseProxy = reverseProxyJSON @@ -1043,8 +1052,10 @@ func (this *ServerDAO) ComposeServerConfig(tx *dbs.Tx, server *Server, ignoreCer config.UserId = int64(server.UserId) config.Type = server.Type config.IsOn = server.IsOn - config.Name = server.Name - config.Description = server.Description + if !forNode { + config.Name = server.Name + config.Description = server.Description + } var groupConfig *serverconfigs.ServerGroupConfig for _, groupId := range server.DecodeGroupIds() { @@ -1452,7 +1463,7 @@ func (this *ServerDAO) CountEnabledServersWithWebIds(tx *dbs.Tx, webIds []int64) Count() } -// FindAllEnabledServersWithWebIds 查找使用某个缓存策略的所有服务 +// FindAllEnabledServersWithWebIds 通过WebId查找服务 func (this *ServerDAO) FindAllEnabledServersWithWebIds(tx *dbs.Tx, webIds []int64) (result []*Server, err error) { if len(webIds) == 0 { return @@ -1460,6 +1471,7 @@ func (this *ServerDAO) FindAllEnabledServersWithWebIds(tx *dbs.Tx, webIds []int6 _, err = this.Query(tx). State(ServerStateEnabled). Attr("webId", webIds). + UseIndex("webId"). Reuse(false). AscPk(). Slice(&result). @@ -2796,6 +2808,20 @@ func (this *ServerDAO) UpdateServerName(tx *dbs.Tx, serverId int64, name string) UpdateQuickly() } +// FindEnabledServersWithIds 根据ID查找一组服务 +func (this *ServerDAO) FindEnabledServersWithIds(tx *dbs.Tx, serverIds []int64) (result []*Server, err error) { + if len(serverIds) == 0 { + return + } + _, err = this.Query(tx). + Pk(serverIds). + Reuse(false). + State(ServerStateEnabled). + Slice(&result). + FindAll() + return +} + // NotifyUpdate 同步服务所在的集群 func (this *ServerDAO) NotifyUpdate(tx *dbs.Tx, serverId int64) error { // 创建任务 diff --git a/internal/db/models/server_dao_copy.go b/internal/db/models/server_dao_copy.go new file mode 100644 index 00000000..3f19459d --- /dev/null +++ b/internal/db/models/server_dao_copy.go @@ -0,0 +1,173 @@ +// Copyright 2023 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package models + +import ( + "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "github.com/iwind/TeaGo/dbs" +) + +// 服务基本信息 +type clusterServerList struct { + clusterId int64 + serverIds []int64 +} + +// CopyServerConfigToServers 拷贝服务配置到一组服务 +func (this *ServerDAO) CopyServerConfigToServers(tx *dbs.Tx, fromServerId int64, toServerIds []int64, configCode serverconfigs.ConfigCode) error { + if fromServerId <= 0 { + return nil + } + if len(toServerIds) == 0 { + return nil + } + + webId, err := SharedServerDAO.FindServerWebId(tx, fromServerId) + if err != nil { + return err + } + + clusterServers, toWebIds, err := this.findServerClusterIdsAndWebIds(tx, toServerIds) + if err != nil { + return err + } + if len(clusterServers) == 0 { + return nil + } + + switch configCode { + case serverconfigs.ConfigCodeStat: // 统计 + if webId <= 0 { + return nil + } + + err = SharedHTTPWebDAO.CopyWebStats(tx, webId, toWebIds) + if err != nil { + return err + } + } + + // 通知更新 + for _, serverList := range clusterServers { + err = SharedUpdatingServerListDAO.CreateList(tx, serverList.clusterId, serverList.serverIds) + if err != nil { + return err + } + + err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, serverList.clusterId, 0, 0, NodeTaskTypeUpdatingServers) + if err != nil { + return err + } + } + + return nil +} + +// 查找一组服务的集群和WebId信息 +func (this *ServerDAO) findServerClusterIdsAndWebIds(tx *dbs.Tx, serverIds []int64) (clusterServers []*clusterServerList, webIds []int64, err error) { + if len(serverIds) == 0 { + return + } + + ones, err := this.Query(tx). + Result("id", "webId", "clusterId"). + Pk(serverIds). + Reuse(false). + FindAll() + if err != nil { + return nil, nil, err + } + + var clusterMap = map[int64]*clusterServerList{} // clusterId => servers + + for _, one := range ones { + var server = one.(*Server) + var clusterId = int64(server.ClusterId) + if clusterId <= 0 { + continue + } + + serverList, ok := clusterMap[clusterId] + if ok { + serverList.serverIds = append(serverList.serverIds, int64(server.Id)) + } else { + clusterMap[clusterId] = &clusterServerList{ + clusterId: clusterId, + serverIds: []int64{int64(server.Id)}, + } + } + + var webId = int64(server.WebId) + if webId > 0 { + webIds = append(webIds, webId) + } + } + + for _, serverList := range clusterMap { + clusterServers = append(clusterServers, serverList) + } + + return +} + +// CopyServerConfigToGroups 拷贝服务配置到分组 +func (this *ServerDAO) CopyServerConfigToGroups(tx *dbs.Tx, fromServerId int64, groupIds []int64, configCode string) error { + if len(groupIds) == 0 { + return nil + } + + var serverIds = []int64{} + for _, groupId := range groupIds { + ones, err := this.Query(tx). + ResultPk(). + State(ServerStateEnabled). + Where("JSON_CONTAINS(groupIds, :groupId)"). + Param("groupId", groupId). + FindAll() + if err != nil { + return err + } + for _, one := range ones { + serverIds = append(serverIds, int64(one.(*Server).Id)) + } + } + + return this.CopyServerConfigToServers(tx, fromServerId, serverIds, configCode) +} + +// CopyServerConfigToCluster 拷贝服务配置到集群 +func (this *ServerDAO) CopyServerConfigToCluster(tx *dbs.Tx, fromServerId int64, clusterId int64, configCode string) error { + ones, err := this.Query(tx). + ResultPk(). + State(ServerStateEnabled). + Attr("clusterId", clusterId). + UseIndex("clusterId"). + FindAll() + if err != nil { + return err + } + var serverIds = []int64{} + for _, one := range ones { + serverIds = append(serverIds, int64(one.(*Server).Id)) + } + return this.CopyServerConfigToServers(tx, fromServerId, serverIds, configCode) +} + +// CopyServerConfigToUser 拷贝服务配置到用户 +func (this *ServerDAO) CopyServerConfigToUser(tx *dbs.Tx, fromServerId int64, userId int64, configCode string) error { + ones, err := this.Query(tx). + ResultPk(). + State(ServerStateEnabled). + Attr("userId", userId). + UseIndex("userId"). + FindAll() + if err != nil { + return err + } + var serverIds = []int64{} + for _, one := range ones { + serverIds = append(serverIds, int64(one.(*Server).Id)) + } + return this.CopyServerConfigToServers(tx, fromServerId, serverIds, configCode) +} diff --git a/internal/db/models/server_dao_copy_test.go b/internal/db/models/server_dao_copy_test.go new file mode 100644 index 00000000..08b7d5e8 --- /dev/null +++ b/internal/db/models/server_dao_copy_test.go @@ -0,0 +1,22 @@ +// Copyright 2023 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package models_test + +import ( + "github.com/TeaOSLab/EdgeAPI/internal/db/models" + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "github.com/iwind/TeaGo/dbs" + "testing" +) + +func TestServerDAO_CopyServerConfigToServers(t *testing.T) { + dbs.NotifyReady() + + var tx *dbs.Tx + var dao = models.NewServerDAO() + + err := dao.CopyServerConfigToServers(tx, 10170, []int64{23, 10171}, serverconfigs.ConfigCodeStat) + if err != nil { + t.Fatal(err) + } +} diff --git a/internal/db/models/server_dao_test.go b/internal/db/models/server_dao_test.go index 7dc77fe8..4aecf445 100644 --- a/internal/db/models/server_dao_test.go +++ b/internal/db/models/server_dao_test.go @@ -332,6 +332,16 @@ func TestServerDAO_UpdateServerBandwidth(t *testing.T) { } } +func TestServerDAO_FindEnabledServersWithIds(t *testing.T) { + var dao = models.NewServerDAO() + var tx *dbs.Tx + servers, err := dao.FindEnabledServersWithIds(tx, []int64{23, 1071}) + if err != nil { + t.Fatal(err) + } + t.Log(len(servers), "servers") +} + func BenchmarkServerDAO_CountAllEnabledServers(b *testing.B) { models.SharedServerDAO = models.NewServerDAO() diff --git a/internal/db/models/updating_server_list_dao.go b/internal/db/models/updating_server_list_dao.go new file mode 100644 index 00000000..cfe8bb02 --- /dev/null +++ b/internal/db/models/updating_server_list_dao.go @@ -0,0 +1,108 @@ +package models + +import ( + "encoding/json" + "github.com/TeaOSLab/EdgeAPI/internal/goman" + "github.com/TeaOSLab/EdgeAPI/internal/remotelogs" + _ "github.com/go-sql-driver/mysql" + "github.com/iwind/TeaGo/Tea" + "github.com/iwind/TeaGo/dbs" + "github.com/iwind/TeaGo/types" + stringutil "github.com/iwind/TeaGo/utils/string" + timeutil "github.com/iwind/TeaGo/utils/time" + "sort" + "time" +) + +type UpdatingServerListDAO dbs.DAO + +func init() { + dbs.OnReadyDone(func() { + var ticker = time.NewTicker(24 * time.Hour) + goman.New(func() { + for range ticker.C { + err := SharedUpdatingServerListDAO.CleanExpiredLists(nil, 7) + if err != nil { + remotelogs.Error("UpdatingServerListDAO", "CleanExpiredLists(): "+err.Error()) + } + } + }) + }) +} + +func NewUpdatingServerListDAO() *UpdatingServerListDAO { + return dbs.NewDAO(&UpdatingServerListDAO{ + DAOObject: dbs.DAOObject{ + DB: Tea.Env, + Table: "edgeUpdatingServerLists", + Model: new(UpdatingServerList), + PkName: "id", + }, + }).(*UpdatingServerListDAO) +} + +var SharedUpdatingServerListDAO *UpdatingServerListDAO + +func init() { + dbs.OnReady(func() { + SharedUpdatingServerListDAO = NewUpdatingServerListDAO() + }) +} + +// CreateList 创建待更新的服务列表 +func (this *UpdatingServerListDAO) CreateList(tx *dbs.Tx, clusterId int64, serverIds []int64) error { + if clusterId <= 0 || len(serverIds) == 0 { + return nil + } + + sort.Slice(serverIds, func(i, j int) bool { + return serverIds[i] < serverIds[j] + }) + + serverIdsJSON, err := json.Marshal(serverIds) + if err != nil { + return err + } + + var uniqueId = stringutil.Md5(types.String(clusterId) + "@" + string(serverIdsJSON)) + _, _, err = this.Query(tx). + Set("uniqueId", uniqueId). + Set("serverIds", serverIdsJSON). + Set("clusterId", clusterId). + Set("day", timeutil.Format("Ymd")). + Replace() // 使用Replace,让ID可以自增 + return err +} + +// FindLists 查找待更新服务列表 +func (this *UpdatingServerListDAO) FindLists(tx *dbs.Tx, clusterIds []int64, lastId int64) (result []*UpdatingServerList, err error) { + if len(clusterIds) == 0 { + return + } + _, err = this.Query(tx). + Attr("clusterId", clusterIds). // 即使clusterIds数量是变化的,这里也不需要使用Reuse(false),因为clusterIds通常数量有限 + Gt("id", lastId). + Asc(). // 非常重要 + Slice(&result). + FindAll() + return +} + +// FindLatestId 读取最新的ID +// 不需要区分集群 +func (this *UpdatingServerListDAO) FindLatestId(tx *dbs.Tx) (int64, error) { + return this.Query(tx). + ResultPk(). + DescPk(). + FindInt64Col(0) +} + +// CleanExpiredLists 清除过期列表 +func (this *UpdatingServerListDAO) CleanExpiredLists(tx *dbs.Tx, days int) error { + if days <= 0 { + days = 7 + } + return this.Query(tx). + Lt("day", timeutil.Format("Ymd", time.Now().AddDate(0, 0, -days))). + DeleteQuickly() +} diff --git a/internal/db/models/updating_server_list_dao_test.go b/internal/db/models/updating_server_list_dao_test.go new file mode 100644 index 00000000..6595eab6 --- /dev/null +++ b/internal/db/models/updating_server_list_dao_test.go @@ -0,0 +1,6 @@ +package models_test + +import ( + _ "github.com/go-sql-driver/mysql" + _ "github.com/iwind/TeaGo/bootstrap" +) diff --git a/internal/db/models/updating_server_list_model.go b/internal/db/models/updating_server_list_model.go new file mode 100644 index 00000000..85aa36e1 --- /dev/null +++ b/internal/db/models/updating_server_list_model.go @@ -0,0 +1,24 @@ +package models + +import "github.com/iwind/TeaGo/dbs" + +// UpdatingServerList 待更新服务列表 +type UpdatingServerList struct { + Id uint64 `field:"id"` // ID + ClusterId uint32 `field:"clusterId"` // 集群ID + UniqueId string `field:"uniqueId"` // 唯一ID + ServerIds dbs.JSON `field:"serverIds"` // 服务IDs + Day string `field:"day"` // 创建日期 +} + +type UpdatingServerListOperator struct { + Id any // ID + ClusterId any // 集群ID + UniqueId any // 唯一ID + ServerIds any // 服务IDs + Day any // 创建日期 +} + +func NewUpdatingServerListOperator() *UpdatingServerListOperator { + return &UpdatingServerListOperator{} +} diff --git a/internal/db/models/updating_server_list_model_ext.go b/internal/db/models/updating_server_list_model_ext.go new file mode 100644 index 00000000..cfe92c99 --- /dev/null +++ b/internal/db/models/updating_server_list_model_ext.go @@ -0,0 +1,20 @@ +package models + +import ( + "encoding/json" + "github.com/TeaOSLab/EdgeAPI/internal/remotelogs" +) + +func (this *UpdatingServerList) DecodeServerIds() []int64 { + if len(this.ServerIds) == 0 { + return nil + } + + var serverIds = []int64{} + err := json.Unmarshal(this.ServerIds, &serverIds) + if err != nil { + remotelogs.Error("UpdatingServerList", "DecodeServerIds(): "+err.Error()) + } + + return serverIds +} diff --git a/internal/nodes/api_node_services.go b/internal/nodes/api_node_services.go index d47ebece..b87ca9cd 100644 --- a/internal/nodes/api_node_services.go +++ b/internal/nodes/api_node_services.go @@ -559,6 +559,12 @@ func (this *APINode) registerServices(server *grpc.Server) { this.rest(instance) } + { + var instance = this.serviceInstance(&services.UpdatingServerListService{}).(*services.UpdatingServerListService) + pb.RegisterUpdatingServerListServiceServer(server, instance) + this.rest(instance) + } + APINodeServicesRegister(this, server) // TODO check service names diff --git a/internal/rpc/services/service_server.go b/internal/rpc/services/service_server.go index 67c50b42..1953c483 100644 --- a/internal/rpc/services/service_server.go +++ b/internal/rpc/services/service_server.go @@ -2277,3 +2277,89 @@ func (this *ServerService) UpdateServerName(ctx context.Context, req *pb.UpdateS return this.Success() } + +// CopyServerConfig 在服务之间复制配置 +func (this *ServerService) CopyServerConfig(ctx context.Context, req *pb.CopyServerConfigRequest) (*pb.RPCSuccess, error) { + adminId, userId, err := this.ValidateAdminAndUser(ctx, true) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + + if req.ServerId <= 0 { + return nil, errors.New("invalid 'serverId'") + } + + // 检查权限 + if userId > 0 { + err = models.SharedServerDAO.CheckUserServer(tx, userId, req.ServerId) + if err != nil { + return nil, err + } + } + + switch req.TargetType { + case "servers": + // 检查权限 + if len(req.TargetServerIds) == 0 { + return this.Success() + } + if userId > 0 { + for _, targetServerId := range req.TargetServerIds { + err = models.SharedServerDAO.CheckUserServer(tx, userId, targetServerId) + if err != nil { + return nil, err + } + } + } + err = models.SharedServerDAO.CopyServerConfigToServers(tx, req.ServerId, req.TargetServerIds, req.ConfigCode) + if err != nil { + return nil, err + } + case "groups": + // 检查权限 + if len(req.TargetServerGroupIds) == 0 { + return this.Success() + } + if userId > 0 { + for _, targetGroupId := range req.TargetServerGroupIds { + err = models.SharedServerGroupDAO.CheckUserGroup(tx, userId, targetGroupId) + if err != nil { + return nil, err + } + } + } + err = models.SharedServerDAO.CopyServerConfigToGroups(tx, req.ServerId, req.TargetServerGroupIds, req.ConfigCode) + if err != nil { + return nil, err + } + case "cluster": + // 检查权限 + if adminId <= 0 { + return nil, this.PermissionError() + } + if req.TargetClusterId <= 0 { + return this.Success() + } + err = models.SharedServerDAO.CopyServerConfigToCluster(tx, req.ServerId, req.TargetClusterId, req.ConfigCode) + if err != nil { + return nil, err + } + case "user": + if userId == 0 { + userId, err = models.SharedServerDAO.FindServerUserId(tx, req.ServerId) + if err != nil { + return nil, err + } + + // 此时如果用户为0,则同步到未分配用户的服务 + } + err = models.SharedServerDAO.CopyServerConfigToUser(tx, req.ServerId, req.TargetUserId, req.ConfigCode) + if err != nil { + return nil, err + } + } + + return this.Success() +} diff --git a/internal/rpc/services/service_updating_server_list.go b/internal/rpc/services/service_updating_server_list.go new file mode 100644 index 00000000..895a1c1f --- /dev/null +++ b/internal/rpc/services/service_updating_server_list.go @@ -0,0 +1,90 @@ +// Copyright 2023 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn . + +package services + +import ( + "context" + "encoding/json" + "github.com/TeaOSLab/EdgeAPI/internal/db/models" + "github.com/TeaOSLab/EdgeAPI/internal/utils" + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" +) + +// UpdatingServerListService 待更新服务列表服务 +type UpdatingServerListService struct { + BaseService +} + +// FindUpdatingServerLists 查找要更新的服务配置 +func (this *UpdatingServerListService) FindUpdatingServerLists(ctx context.Context, req *pb.FindUpdatingServerListsRequest) (*pb.FindUpdatingServerListsResponse, error) { + nodeId, err := this.ValidateNode(ctx) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + clusterIds, err := models.SharedNodeDAO.FindEnabledAndOnNodeClusterIds(tx, nodeId) + if err != nil { + return nil, err + } + + lists, err := models.SharedUpdatingServerListDAO.FindLists(tx, clusterIds, req.LastId) + if err != nil { + return nil, err + } + if len(lists) == 0 { + return &pb.FindUpdatingServerListsResponse{ + MaxId: req.LastId, + }, nil + } + + var serverIdMap = map[int64]bool{} + var serverIds = []int64{} + var maxId int64 + for _, list := range lists { + if int64(list.Id) > maxId { + maxId = int64(list.Id) + } + + for _, serverId := range list.DecodeServerIds() { + if !serverIdMap[serverId] { + serverIdMap[serverId] = true + serverIds = append(serverIds, serverId) + } + } + } + + if len(serverIds) == 0 { + return &pb.FindUpdatingServerListsResponse{ + MaxId: req.LastId, + }, nil + } + + servers, err := models.SharedServerDAO.FindEnabledServersWithIds(tx, serverIds) + if err != nil { + return nil, err + } + var serverConfigs = []*serverconfigs.ServerConfig{} + var cacheMap = utils.NewCacheMap() + for _, server := range servers { + serverConfig, err := models.SharedServerDAO.ComposeServerConfig(tx, server, false, nil, cacheMap, true, false) + if err != nil { + return nil, err + } + if serverConfig == nil { + continue + } + serverConfigs = append(serverConfigs, serverConfig) + } + + serversJSON, err := json.Marshal(serverConfigs) + if err != nil { + return nil, err + } + + return &pb.FindUpdatingServerListsResponse{ + ServersJSON: serversJSON, + MaxId: maxId, + }, nil +} diff --git a/internal/setup/sql.json b/internal/setup/sql.json index e7af7336..2156470d 100644 --- a/internal/setup/sql.json +++ b/internal/setup/sql.json @@ -782,7 +782,7 @@ "name": "edgeAdmins", "engine": "InnoDB", "charset": "utf8mb4_general_ci", - "definition": "CREATE TABLE `edgeAdmins` (\n `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID',\n `isOn` tinyint(1) unsigned DEFAULT '1' COMMENT '是否启用',\n `username` varchar(64) DEFAULT NULL COMMENT '用户名',\n `password` varchar(32) DEFAULT NULL COMMENT '密码',\n `fullname` varchar(64) DEFAULT NULL COMMENT '全名',\n `isSuper` tinyint(1) unsigned DEFAULT '0' COMMENT '是否为超级管理员',\n `createdAt` bigint(11) unsigned DEFAULT '0' COMMENT '创建时间',\n `updatedAt` bigint(11) unsigned DEFAULT '0' COMMENT '修改时间',\n `state` tinyint(1) unsigned DEFAULT '1' COMMENT '状态',\n `modules` json DEFAULT NULL COMMENT '允许的模块',\n `canLogin` tinyint(1) unsigned DEFAULT '1' COMMENT '是否可以登录',\n `theme` varchar(64) DEFAULT NULL COMMENT '模板设置',\n PRIMARY KEY (`id`),\n KEY `username` (`username`) USING BTREE\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='管理员'", + "definition": "CREATE TABLE `edgeAdmins` (\n `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID',\n `isOn` tinyint(1) unsigned DEFAULT '1' COMMENT '是否启用',\n `username` varchar(64) DEFAULT NULL COMMENT '用户名',\n `password` varchar(32) DEFAULT NULL COMMENT '密码',\n `fullname` varchar(64) DEFAULT NULL COMMENT '全名',\n `isSuper` tinyint(1) unsigned DEFAULT '0' COMMENT '是否为超级管理员',\n `createdAt` bigint(11) unsigned DEFAULT '0' COMMENT '创建时间',\n `updatedAt` bigint(11) unsigned DEFAULT '0' COMMENT '修改时间',\n `state` tinyint(1) unsigned DEFAULT '1' COMMENT '状态',\n `modules` json DEFAULT NULL COMMENT '允许的模块',\n `canLogin` tinyint(1) unsigned DEFAULT '1' COMMENT '是否可以登录',\n `theme` varchar(64) DEFAULT NULL COMMENT '模板设置',\n PRIMARY KEY (`id`),\n KEY `username` (`username`) USING BTREE,\n KEY `password` (`password`)\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='管理员'", "fields": [ { "name": "id", @@ -841,6 +841,10 @@ { "name": "username", "definition": "KEY `username` (`username`) USING BTREE" + }, + { + "name": "password", + "definition": "KEY `password` (`password`) USING BTREE" } ], "records": [] @@ -190143,6 +190147,53 @@ ], "records": [] }, + { + "name": "edgeUpdatingServerLists", + "engine": "InnoDB", + "charset": "utf8mb4_general_ci", + "definition": "CREATE TABLE `edgeUpdatingServerLists` (\n `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID',\n `clusterId` int(11) unsigned DEFAULT '0' COMMENT '集群ID',\n `uniqueId` varchar(32) DEFAULT NULL COMMENT '唯一ID',\n `serverIds` json DEFAULT NULL COMMENT '服务IDs',\n `day` varchar(8) DEFAULT NULL COMMENT '创建日期',\n PRIMARY KEY (`id`),\n UNIQUE KEY `uniqueId` (`uniqueId`),\n KEY `clusterId` (`clusterId`),\n KEY `day` (`day`)\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='待更新服务列表'", + "fields": [ + { + "name": "id", + "definition": "bigint(20) unsigned auto_increment COMMENT 'ID'" + }, + { + "name": "clusterId", + "definition": "int(11) unsigned DEFAULT '0' COMMENT '集群ID'" + }, + { + "name": "uniqueId", + "definition": "varchar(32) COMMENT '唯一ID'" + }, + { + "name": "serverIds", + "definition": "json COMMENT '服务IDs'" + }, + { + "name": "day", + "definition": "varchar(8) COMMENT '创建日期'" + } + ], + "indexes": [ + { + "name": "PRIMARY", + "definition": "UNIQUE KEY `PRIMARY` (`id`) USING BTREE" + }, + { + "name": "uniqueId", + "definition": "UNIQUE KEY `uniqueId` (`uniqueId`) USING BTREE" + }, + { + "name": "clusterId", + "definition": "KEY `clusterId` (`clusterId`) USING BTREE" + }, + { + "name": "day", + "definition": "KEY `day` (`day`) USING BTREE" + } + ], + "records": [] + }, { "name": "edgeUserADInstances", "engine": "InnoDB",