From 9cf217bcdf14744425a8aa9c8704c8c787864d73 Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Sun, 15 Nov 2020 21:17:42 +0800 Subject: [PATCH] =?UTF-8?q?=E8=8A=82=E7=82=B9=E6=A0=B9=E6=8D=AE=E5=81=A5?= =?UTF-8?q?=E5=BA=B7=E6=A3=80=E6=9F=A5=E8=87=AA=E5=8A=A8=E4=B8=8A=E4=B8=8B?= =?UTF-8?q?=E7=BA=BF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/db/models/message_dao.go | 5 +- internal/db/models/node_cluster_dao.go | 30 +- internal/db/models/node_cluster_model.go | 2 + internal/db/models/node_cluster_model_ext.go | 22 ++ internal/db/models/node_dao.go | 56 +++ internal/db/models/node_dao_test.go | 10 + internal/db/models/node_model.go | 6 + internal/events/events.go | 5 + internal/rpc/services/service_dns_domain.go | 321 +++++++++--------- internal/rpc/services/service_node.go | 119 ++++++- internal/rpc/services/service_node_cluster.go | 24 +- internal/tasks/health_check_cluster_task.go | 3 +- internal/tasks/health_check_executor.go | 30 +- 13 files changed, 450 insertions(+), 183 deletions(-) create mode 100644 internal/events/events.go diff --git a/internal/db/models/message_dao.go b/internal/db/models/message_dao.go index 247b6341..2a721fd4 100644 --- a/internal/db/models/message_dao.go +++ b/internal/db/models/message_dao.go @@ -24,8 +24,9 @@ const ( type MessageType = string const ( - MessageTypeHealthCheckFail MessageType = "HealthCheckFail" - MessageTypeNodeInactive MessageType = "NodeInactive" + MessageTypeHealthCheckFailed MessageType = "HealthCheckFailed" + MessageTypeNodeInactive MessageType = "NodeInactive" + MessageTypeClusterDNSSyncFailed MessageType = "ClusterDNSSyncFailed" ) type MessageDAO dbs.DAO diff --git a/internal/db/models/node_cluster_dao.go b/internal/db/models/node_cluster_dao.go index 2d9e53c9..50ad7779 100644 --- a/internal/db/models/node_cluster_dao.go +++ b/internal/db/models/node_cluster_dao.go @@ -4,6 +4,7 @@ import ( "encoding/json" "errors" "github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils" + "github.com/TeaOSLab/EdgeCommon/pkg/dnsconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" _ "github.com/go-sql-driver/mysql" @@ -116,8 +117,20 @@ func (this *NodeClusterDAO) CreateCluster(name string, grantId int64, installDir op.Name = name op.GrantId = grantId op.InstallDir = installDir + + // DNS设置 op.DnsDomainId = dnsDomainId op.DnsName = dnsName + dnsConfig := &dnsconfigs.ClusterDNSConfig{ + NodesAutoSync: true, + ServersAutoSync: true, + } + dnsJSON, err := json.Marshal(dnsConfig) + if err != nil { + return 0, err + } + op.Dns = dnsJSON + op.UseAllAPINodes = 1 op.ApiNodes = "[]" op.UniqueId = uniqueId @@ -352,7 +365,7 @@ func (this *NodeClusterDAO) FindClusterGrantId(clusterId int64) (int64, error) { func (this *NodeClusterDAO) FindClusterDNSInfo(clusterId int64) (*NodeCluster, error) { one, err := this.Query(). Pk(clusterId). - Result("id", "name", "dnsName", "dnsDomainId"). + Result("id", "name", "dnsName", "dnsDomainId", "dns"). Find() if err != nil { return nil, err @@ -374,7 +387,7 @@ func (this *NodeClusterDAO) ExistClusterDNSName(dnsName string, excludeClusterId } // 修改集群DNS相关信息 -func (this *NodeClusterDAO) UpdateClusterDNS(clusterId int64, dnsName string, dnsDomainId int64) error { +func (this *NodeClusterDAO) UpdateClusterDNS(clusterId int64, dnsName string, dnsDomainId int64, nodesAutoSync bool, serversAutoSync bool) error { if clusterId <= 0 { return errors.New("invalid clusterId") } @@ -382,7 +395,18 @@ func (this *NodeClusterDAO) UpdateClusterDNS(clusterId int64, dnsName string, dn op.Id = clusterId op.DnsName = dnsName op.DnsDomainId = dnsDomainId - _, err := this.Save(op) + + dnsConfig := &dnsconfigs.ClusterDNSConfig{ + NodesAutoSync: nodesAutoSync, + ServersAutoSync: serversAutoSync, + } + dnsJSON, err := json.Marshal(dnsConfig) + if err != nil { + return err + } + op.Dns = dnsJSON + + _, err = this.Save(op) return err } diff --git a/internal/db/models/node_cluster_model.go b/internal/db/models/node_cluster_model.go index 3d6dc667..27fcd8a8 100644 --- a/internal/db/models/node_cluster_model.go +++ b/internal/db/models/node_cluster_model.go @@ -19,6 +19,7 @@ type NodeCluster struct { HealthCheck string `field:"healthCheck"` // 健康检查 DnsName string `field:"dnsName"` // DNS名称 DnsDomainId uint32 `field:"dnsDomainId"` // 域名ID + Dns string `field:"dns"` // DNS配置 } type NodeClusterOperator struct { @@ -39,6 +40,7 @@ type NodeClusterOperator struct { HealthCheck interface{} // 健康检查 DnsName interface{} // DNS名称 DnsDomainId interface{} // 域名ID + Dns interface{} // DNS配置 } func NewNodeClusterOperator() *NodeClusterOperator { diff --git a/internal/db/models/node_cluster_model_ext.go b/internal/db/models/node_cluster_model_ext.go index 2640e7f9..98b747c6 100644 --- a/internal/db/models/node_cluster_model_ext.go +++ b/internal/db/models/node_cluster_model_ext.go @@ -1 +1,23 @@ package models + +import ( + "encoding/json" + "github.com/TeaOSLab/EdgeCommon/pkg/dnsconfigs" +) + +// 解析DNS配置 +func (this *NodeCluster) DecodeDNSConfig() (*dnsconfigs.ClusterDNSConfig, error) { + if len(this.Dns) == 0 || this.Dns == "null" { + // 一定要返回一个默认的值,防止产生nil + return &dnsconfigs.ClusterDNSConfig{ + NodesAutoSync: false, + ServersAutoSync: false, + }, nil + } + dnsConfig := &dnsconfigs.ClusterDNSConfig{} + err := json.Unmarshal([]byte(this.Dns), &dnsConfig) + if err != nil { + return nil, err + } + return dnsConfig, nil +} diff --git a/internal/db/models/node_dao.go b/internal/db/models/node_dao.go index 60d55e36..536a5eb0 100644 --- a/internal/db/models/node_dao.go +++ b/internal/db/models/node_dao.go @@ -604,6 +604,7 @@ func (this *NodeDAO) FindAllEnabledNodesDNSWithClusterId(clusterId int64) (resul State(NodeStateEnabled). Attr("clusterId", clusterId). Attr("isOn", true). + Attr("isUp", true). Result("id", "name", "dnsRoutes", "isOn"). DescPk(). Slice(&result). @@ -644,6 +645,61 @@ func (this *NodeDAO) UpdateNodeDNS(nodeId int64, routes map[int64]string) error return err } +// 计算节点上线|下线状态 +func (this *NodeDAO) UpdateNodeUp(nodeId int64, isUp bool, maxUp int, maxDown int) (changed bool, err error) { + if nodeId <= 0 { + return false, errors.New("invalid nodeId") + } + one, err := this.Query(). + Pk(nodeId). + Result("isUp", "countUp", "countDown"). + Find() + if err != nil { + return false, err + } + if one == nil { + return false, nil + } + oldIsUp := one.(*Node).IsUp == 1 + + // 如果新老状态一致,则不做任何事情 + if oldIsUp == isUp { + return isUp, nil + } + + countUp := int(one.(*Node).CountUp) + countDown := int(one.(*Node).CountDown) + + op := NewNodeOperator() + op.Id = nodeId + + if isUp { + countUp++ + countDown = 0 + + if countUp >= maxUp { + changed = true + op.IsUp = true + } + } else { + countDown++ + countUp = 0 + + if countDown >= maxDown { + changed = true + op.IsUp = false + } + } + + op.CountUp = countUp + op.CountDown = countDown + _, err = this.Save(op) + if err != nil { + return false, err + } + return +} + // 生成唯一ID func (this *NodeDAO) genUniqueId() (string, error) { for { diff --git a/internal/db/models/node_dao_test.go b/internal/db/models/node_dao_test.go index 27378fb8..c1078458 100644 --- a/internal/db/models/node_dao_test.go +++ b/internal/db/models/node_dao_test.go @@ -2,6 +2,7 @@ package models import ( _ "github.com/go-sql-driver/mysql" + "github.com/iwind/TeaGo/dbs" "testing" ) @@ -20,3 +21,12 @@ func TestNodeDAO_FindChangedClusterIds(t *testing.T) { } t.Log(clusterIds) } + +func TestNodeDAO_UpdateNodeUp(t *testing.T) { + dbs.NotifyReady() + isChanged, err := SharedNodeDAO.UpdateNodeUp(57, false, 3, 3) + if err != nil { + t.Fatal(err) + } + t.Log("changed:", isChanged) +} diff --git a/internal/db/models/node_model.go b/internal/db/models/node_model.go index 51f85d42..4eb80997 100644 --- a/internal/db/models/node_model.go +++ b/internal/db/models/node_model.go @@ -6,6 +6,9 @@ type Node struct { AdminId uint32 `field:"adminId"` // 管理员ID UserId uint32 `field:"userId"` // 用户ID IsOn uint8 `field:"isOn"` // 是否启用 + IsUp uint8 `field:"isUp"` // 是否在线 + CountUp uint32 `field:"countUp"` // 连续在线次数 + CountDown uint32 `field:"countDown"` // 连续下线次数 UniqueId string `field:"uniqueId"` // 节点ID Secret string `field:"secret"` // 密钥 Name string `field:"name"` // 节点名 @@ -31,6 +34,9 @@ type NodeOperator struct { AdminId interface{} // 管理员ID UserId interface{} // 用户ID IsOn interface{} // 是否启用 + IsUp interface{} // 是否在线 + CountUp interface{} // 连续在线次数 + CountDown interface{} // 连续下线次数 UniqueId interface{} // 节点ID Secret interface{} // 密钥 Name interface{} // 节点名 diff --git a/internal/events/events.go b/internal/events/events.go new file mode 100644 index 00000000..a233d1db --- /dev/null +++ b/internal/events/events.go @@ -0,0 +1,5 @@ +package events + +// 节点更新事件 +// TODO 改成事件 +var NodeDNSChanges = make(chan int64, 128) diff --git a/internal/rpc/services/service_dns_domain.go b/internal/rpc/services/service_dns_domain.go index 3931b9ac..b75cc8f8 100644 --- a/internal/rpc/services/service_dns_domain.go +++ b/internal/rpc/services/service_dns_domain.go @@ -232,164 +232,7 @@ func (this *DNSDomainService) SyncDNSDomainData(ctx context.Context, req *pb.Syn if err != nil { return nil, err } - - // 查询集群信息 - clusters := []*models.NodeCluster{} - if req.NodeClusterId > 0 { - cluster, err := models.SharedNodeClusterDAO.FindEnabledNodeCluster(req.NodeClusterId) - if err != nil { - return nil, err - } - if cluster == nil { - return &pb.SyncDNSDomainDataResponse{ - IsOk: false, - Error: "找不到要同步的集群", - ShouldFix: false, - }, nil - } - if int64(cluster.DnsDomainId) != req.DnsDomainId { - return &pb.SyncDNSDomainDataResponse{ - IsOk: false, - Error: "集群设置的域名和参数不符", - ShouldFix: false, - }, nil - } - clusters = append(clusters, cluster) - } else { - clusters, err = models.SharedNodeClusterDAO.FindAllEnabledClustersWithDNSDomainId(req.DnsDomainId) - if err != nil { - return nil, err - } - } - - // 域名信息 - domain, err := models.SharedDNSDomainDAO.FindEnabledDNSDomain(req.DnsDomainId) - if err != nil { - return nil, err - } - if domain == nil { - return &pb.SyncDNSDomainDataResponse{IsOk: false, Error: "找不到要操作的域名"}, nil - } - domainId := int64(domain.Id) - domainName := domain.Name - - // 服务商信息 - provider, err := models.SharedDNSProviderDAO.FindEnabledDNSProvider(int64(domain.ProviderId)) - if err != nil { - return nil, err - } - if provider == nil { - return &pb.SyncDNSDomainDataResponse{IsOk: false, Error: "域名没有设置服务商"}, nil - } - apiParams := maps.Map{} - if len(provider.ApiParams) > 0 && provider.ApiParams != "null" { - err = json.Unmarshal([]byte(provider.ApiParams), &apiParams) - if err != nil { - return nil, err - } - } - - // 开始同步 - manager := dnsclients.FindProvider(provider.Type) - if manager == nil { - return &pb.SyncDNSDomainDataResponse{IsOk: false, Error: "目前不支持'" + provider.Type + "'"}, nil - } - err = manager.Auth(apiParams) - if err != nil { - return &pb.SyncDNSDomainDataResponse{IsOk: false, Error: "调用API认证失败:" + err.Error()}, nil - } - - // 更新线路 - routes, err := manager.GetRoutes(domainName) - if err != nil { - return &pb.SyncDNSDomainDataResponse{IsOk: false, Error: "获取线路失败:" + err.Error()}, nil - } - routesJSON, err := json.Marshal(routes) - if err != nil { - return nil, err - } - err = models.SharedDNSDomainDAO.UpdateDomainRoutes(domainId, routesJSON) - if err != nil { - return nil, err - } - - // 检查集群设置 - for _, cluster := range clusters { - issues, err := models.SharedNodeClusterDAO.CheckClusterDNS(cluster) - if err != nil { - return nil, err - } - if len(issues) > 0 { - return &pb.SyncDNSDomainDataResponse{IsOk: false, Error: "发现问题需要修复", ShouldFix: true}, nil - } - } - - // 所有记录 - records, err := manager.GetRecords(domainName) - if err != nil { - return &pb.SyncDNSDomainDataResponse{IsOk: false, Error: "获取域名解析记录失败:" + err.Error()}, nil - } - recordsJSON, err := json.Marshal(records) - if err != nil { - return nil, err - } - err = models.SharedDNSDomainDAO.UpdateDomainRecords(domainId, recordsJSON) - if err != nil { - return nil, err - } - - // 对比变化 - allChanges := []maps.Map{} - for _, cluster := range clusters { - changes, _, _, _, _, err := this.findClusterDNSChanges(cluster, records, domainName) - if err != nil { - return nil, err - } - allChanges = append(allChanges, changes...) - } - for _, change := range allChanges { - action := change.GetString("action") - record := change.Get("record").(*dnsclients.Record) - - if len(record.Route) == 0 { - record.Route = manager.DefaultRoute() - } - - switch action { - case "create": - err = manager.AddRecord(domainName, record) - if err != nil { - return &pb.SyncDNSDomainDataResponse{IsOk: false, Error: "创建域名记录失败:" + err.Error()}, nil - } - case "delete": - err = manager.DeleteRecord(domainName, record) - if err != nil { - return &pb.SyncDNSDomainDataResponse{IsOk: false, Error: "删除域名记录失败:" + err.Error()}, nil - } - } - - //logs.Println(action, record.Name, record.Type, record.Value, record.Route) - } - - // 重新更新记录 - if len(allChanges) > 0 { - records, err := manager.GetRecords(domainName) - if err != nil { - return &pb.SyncDNSDomainDataResponse{IsOk: false, Error: "重新获取域名解析记录失败:" + err.Error()}, nil - } - recordsJSON, err := json.Marshal(records) - if err != nil { - return nil, err - } - err = models.SharedDNSDomainDAO.UpdateDomainRecords(domainId, recordsJSON) - if err != nil { - return nil, err - } - } - - return &pb.SyncDNSDomainDataResponse{ - IsOk: true, - }, nil + return this.syncClusterDNS(req) } // 查看支持的线路 @@ -631,3 +474,165 @@ func (this *DNSDomainService) findClusterDNSChanges(cluster *models.NodeCluster, return } + +// 执行同步 +func (this *DNSDomainService) syncClusterDNS(req *pb.SyncDNSDomainDataRequest) (*pb.SyncDNSDomainDataResponse, error) { + // 查询集群信息 + var err error + clusters := []*models.NodeCluster{} + if req.NodeClusterId > 0 { + cluster, err := models.SharedNodeClusterDAO.FindEnabledNodeCluster(req.NodeClusterId) + if err != nil { + return nil, err + } + if cluster == nil { + return &pb.SyncDNSDomainDataResponse{ + IsOk: false, + Error: "找不到要同步的集群", + ShouldFix: false, + }, nil + } + if int64(cluster.DnsDomainId) != req.DnsDomainId { + return &pb.SyncDNSDomainDataResponse{ + IsOk: false, + Error: "集群设置的域名和参数不符", + ShouldFix: false, + }, nil + } + clusters = append(clusters, cluster) + } else { + clusters, err = models.SharedNodeClusterDAO.FindAllEnabledClustersWithDNSDomainId(req.DnsDomainId) + if err != nil { + return nil, err + } + } + + // 域名信息 + domain, err := models.SharedDNSDomainDAO.FindEnabledDNSDomain(req.DnsDomainId) + if err != nil { + return nil, err + } + if domain == nil { + return &pb.SyncDNSDomainDataResponse{IsOk: false, Error: "找不到要操作的域名"}, nil + } + domainId := int64(domain.Id) + domainName := domain.Name + + // 服务商信息 + provider, err := models.SharedDNSProviderDAO.FindEnabledDNSProvider(int64(domain.ProviderId)) + if err != nil { + return nil, err + } + if provider == nil { + return &pb.SyncDNSDomainDataResponse{IsOk: false, Error: "域名没有设置服务商"}, nil + } + apiParams := maps.Map{} + if len(provider.ApiParams) > 0 && provider.ApiParams != "null" { + err = json.Unmarshal([]byte(provider.ApiParams), &apiParams) + if err != nil { + return nil, err + } + } + + // 开始同步 + manager := dnsclients.FindProvider(provider.Type) + if manager == nil { + return &pb.SyncDNSDomainDataResponse{IsOk: false, Error: "目前不支持'" + provider.Type + "'"}, nil + } + err = manager.Auth(apiParams) + if err != nil { + return &pb.SyncDNSDomainDataResponse{IsOk: false, Error: "调用API认证失败:" + err.Error()}, nil + } + + // 更新线路 + routes, err := manager.GetRoutes(domainName) + if err != nil { + return &pb.SyncDNSDomainDataResponse{IsOk: false, Error: "获取线路失败:" + err.Error()}, nil + } + routesJSON, err := json.Marshal(routes) + if err != nil { + return nil, err + } + err = models.SharedDNSDomainDAO.UpdateDomainRoutes(domainId, routesJSON) + if err != nil { + return nil, err + } + + // 检查集群设置 + for _, cluster := range clusters { + issues, err := models.SharedNodeClusterDAO.CheckClusterDNS(cluster) + if err != nil { + return nil, err + } + if len(issues) > 0 { + return &pb.SyncDNSDomainDataResponse{IsOk: false, Error: "发现问题需要修复", ShouldFix: true}, nil + } + } + + // 所有记录 + records, err := manager.GetRecords(domainName) + if err != nil { + return &pb.SyncDNSDomainDataResponse{IsOk: false, Error: "获取域名解析记录失败:" + err.Error()}, nil + } + recordsJSON, err := json.Marshal(records) + if err != nil { + return nil, err + } + err = models.SharedDNSDomainDAO.UpdateDomainRecords(domainId, recordsJSON) + if err != nil { + return nil, err + } + + // 对比变化 + allChanges := []maps.Map{} + for _, cluster := range clusters { + changes, _, _, _, _, err := this.findClusterDNSChanges(cluster, records, domainName) + if err != nil { + return nil, err + } + allChanges = append(allChanges, changes...) + } + for _, change := range allChanges { + action := change.GetString("action") + record := change.Get("record").(*dnsclients.Record) + + if len(record.Route) == 0 { + record.Route = manager.DefaultRoute() + } + + switch action { + case "create": + err = manager.AddRecord(domainName, record) + if err != nil { + return &pb.SyncDNSDomainDataResponse{IsOk: false, Error: "创建域名记录失败:" + err.Error()}, nil + } + case "delete": + err = manager.DeleteRecord(domainName, record) + if err != nil { + return &pb.SyncDNSDomainDataResponse{IsOk: false, Error: "删除域名记录失败:" + err.Error()}, nil + } + } + + //logs.Println(action, record.Name, record.Type, record.Value, record.Route) + } + + // 重新更新记录 + if len(allChanges) > 0 { + records, err := manager.GetRecords(domainName) + if err != nil { + return &pb.SyncDNSDomainDataResponse{IsOk: false, Error: "重新获取域名解析记录失败:" + err.Error()}, nil + } + recordsJSON, err := json.Marshal(records) + if err != nil { + return nil, err + } + err = models.SharedDNSDomainDAO.UpdateDomainRecords(domainId, recordsJSON) + if err != nil { + return nil, err + } + } + + return &pb.SyncDNSDomainDataResponse{ + IsOk: true, + }, nil +} diff --git a/internal/rpc/services/service_node.go b/internal/rpc/services/service_node.go index 88ed4eb3..69ccaa4d 100644 --- a/internal/rpc/services/service_node.go +++ b/internal/rpc/services/service_node.go @@ -5,15 +5,32 @@ import ( "encoding/json" "github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/errors" + "github.com/TeaOSLab/EdgeAPI/internal/events" "github.com/TeaOSLab/EdgeAPI/internal/installers" rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils" "github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils" "github.com/TeaOSLab/EdgeCommon/pkg/configutils" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/logs" "github.com/iwind/TeaGo/types" ) +func init() { + dbs.OnReady(func() { + go func() { + service := &NodeService{} + for nodeId := range events.NodeDNSChanges { + logs.Println("change dns: ", nodeId) + err := service.notifyNodeDNSChanged(nodeId) + if err != nil { + logs.Println("[ERROR]change node dns: " + err.Error()) + } + } + }() + }) +} + // 边缘节点相关服务 type NodeService struct { } @@ -38,6 +55,24 @@ func (this *NodeService) CreateNode(ctx context.Context, req *pb.CreateNodeReque } } + // 保存DNS相关 + if req.DnsDomainId > 0 && len(req.DnsRoute) > 0 { + err = models.SharedNodeDAO.UpdateNodeDNS(nodeId, map[int64]string{ + req.DnsDomainId: req.DnsRoute, + }) + if err != nil { + return nil, err + } + } + + // 同步DNS + go func() { + err := this.notifyNodeDNSChanged(nodeId) + if err != nil { + logs.Println("sync node DNS error: " + err.Error()) + } + }() + return &pb.CreateNodeResponse{ NodeId: nodeId, }, nil @@ -173,6 +208,7 @@ func (this *NodeService) ListEnabledNodesMatch(ctx context.Context, req *pb.List InstallStatus: installStatusResult, MaxCPU: types.Int32(node.MaxCPU), IsOn: node.IsOn == 1, + IsUp: node.IsUp == 1, Group: pbGroup, }) } @@ -216,8 +252,8 @@ func (this *NodeService) FindAllEnabledNodesWithClusterId(ctx context.Context, r return &pb.FindAllEnabledNodesWithClusterIdResponse{Nodes: result}, nil } -// 禁用节点 -func (this *NodeService) DisableNode(ctx context.Context, req *pb.DisableNodeRequest) (*pb.DisableNodeResponse, error) { +// 删除节点 +func (this *NodeService) DeleteNode(ctx context.Context, req *pb.DeleteNodeRequest) (*pb.RPCSuccess, error) { _, _, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeAdmin) if err != nil { return nil, err @@ -228,7 +264,15 @@ func (this *NodeService) DisableNode(ctx context.Context, req *pb.DisableNodeReq return nil, err } - return &pb.DisableNodeResponse{}, nil + // 同步DNS + go func() { + err := this.notifyNodeDNSChanged(req.NodeId) + if err != nil { + logs.Println("sync node DNS error: " + err.Error()) + } + }() + + return rpcutils.Success() } // 修改节点 @@ -262,6 +306,15 @@ func (this *NodeService) UpdateNode(ctx context.Context, req *pb.UpdateNodeReque } } + // 同步DNS + go func() { + // TODO 只有状态变化的时候才需要同步 + err := this.notifyNodeDNSChanged(req.NodeId) + if err != nil { + logs.Println("sync node DNS error: " + err.Error()) + } + }() + return &pb.RPCSuccess{}, nil } @@ -502,6 +555,15 @@ func (this *NodeService) StartNode(ctx context.Context, req *pb.StartNodeRequest Error: err.Error(), }, nil } + + // 同步DNS + go func() { + err := this.notifyNodeDNSChanged(req.NodeId) + if err != nil { + logs.Println("sync node DNS error: " + err.Error()) + } + }() + return &pb.StartNodeResponse{IsOk: true}, nil } @@ -520,6 +582,15 @@ func (this *NodeService) StopNode(ctx context.Context, req *pb.StopNodeRequest) Error: err.Error(), }, nil } + + // 同步DNS + go func() { + err := this.notifyNodeDNSChanged(req.NodeId) + if err != nil { + logs.Println("sync node DNS error: " + err.Error()) + } + }() + return &pb.StopNodeResponse{IsOk: true}, nil } @@ -1015,3 +1086,45 @@ func (this *NodeService) UpdateNodeDNS(ctx context.Context, req *pb.UpdateNodeDN return rpcutils.Success() } + +// 自动同步DNS状态 +func (this *NodeService) notifyNodeDNSChanged(nodeId int64) error { + clusterId, err := models.SharedNodeDAO.FindNodeClusterId(nodeId) + if err != nil { + return err + } + dnsInfo, err := models.SharedNodeClusterDAO.FindClusterDNSInfo(clusterId) + if err != nil { + return err + } + if dnsInfo == nil { + return nil + } + if len(dnsInfo.DnsName) == 0 || dnsInfo.DnsDomainId == 0 { + return nil + } + dnsConfig, err := dnsInfo.DecodeDNSConfig() + if err != nil { + return err + } + if !dnsConfig.NodesAutoSync { + return nil + } + + // 执行同步 + domainService := &DNSDomainService{} + resp, err := domainService.syncClusterDNS(&pb.SyncDNSDomainDataRequest{ + DnsDomainId: int64(dnsInfo.DnsDomainId), + NodeClusterId: clusterId, + }) + if err != nil { + return err + } + if !resp.IsOk { + err = models.SharedMessageDAO.CreateClusterMessage(clusterId, models.MessageTypeClusterDNSSyncFailed, models.LevelError, "集群DNS同步失败:"+resp.Error, nil) + if err != nil { + logs.Println("[NODE_SERVICE]" + err.Error()) + } + } + return nil +} diff --git a/internal/rpc/services/service_node_cluster.go b/internal/rpc/services/service_node_cluster.go index 7e54a875..0e70af45 100644 --- a/internal/rpc/services/service_node_cluster.go +++ b/internal/rpc/services/service_node_cluster.go @@ -373,11 +373,19 @@ func (this *NodeClusterService) FindEnabledNodeClusterDNS(ctx context.Context, r Provider: nil, }, nil } + + dnsConfig, err := dnsInfo.DecodeDNSConfig() + if err != nil { + return nil, err + } + if dnsInfo.DnsDomainId == 0 { return &pb.FindEnabledNodeClusterDNSResponse{ - Name: dnsInfo.DnsName, - Domain: nil, - Provider: nil, + Name: dnsInfo.DnsName, + Domain: nil, + Provider: nil, + NodesAutoSync: dnsConfig.NodesAutoSync, + ServersAutoSync: dnsConfig.ServersAutoSync, }, nil } @@ -414,9 +422,11 @@ func (this *NodeClusterService) FindEnabledNodeClusterDNS(ctx context.Context, r } return &pb.FindEnabledNodeClusterDNSResponse{ - Name: dnsInfo.DnsName, - Domain: pbDomain, - Provider: pbProvider, + Name: dnsInfo.DnsName, + Domain: pbDomain, + Provider: pbProvider, + NodesAutoSync: dnsConfig.NodesAutoSync, + ServersAutoSync: dnsConfig.ServersAutoSync, }, nil } @@ -473,7 +483,7 @@ func (this *NodeClusterService) UpdateNodeClusterDNS(ctx context.Context, req *p return nil, err } - err = models.SharedNodeClusterDAO.UpdateClusterDNS(req.NodeClusterId, req.DnsName, req.DnsDomainId) + err = models.SharedNodeClusterDAO.UpdateClusterDNS(req.NodeClusterId, req.DnsName, req.DnsDomainId, req.NodesAutoSync, req.ServersAutoSync) if err != nil { return nil, err } diff --git a/internal/tasks/health_check_cluster_task.go b/internal/tasks/health_check_cluster_task.go index 1b83c2c7..c68bd589 100644 --- a/internal/tasks/health_check_cluster_task.go +++ b/internal/tasks/health_check_cluster_task.go @@ -130,7 +130,8 @@ func (this *HealthCheckClusterTask) loop(seconds int64) error { if err != nil { return err } - err = models.NewMessageDAO().CreateClusterMessage(this.clusterId, models.MessageTypeHealthCheckFail, models.MessageLevelError, "有"+numberutils.FormatInt(len(failedResults))+"个节点在健康检查中出现问题", failedResultsJSON) + message := "有" + numberutils.FormatInt(len(failedResults)) + "个节点在健康检查中出现问题" + err = models.NewMessageDAO().CreateClusterMessage(this.clusterId, models.MessageTypeHealthCheckFailed, models.MessageLevelError, message, failedResultsJSON) if err != nil { return err } diff --git a/internal/tasks/health_check_executor.go b/internal/tasks/health_check_executor.go index 51da7d41..d5ebebb0 100644 --- a/internal/tasks/health_check_executor.go +++ b/internal/tasks/health_check_executor.go @@ -5,8 +5,10 @@ import ( "encoding/json" "github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/errors" + "github.com/TeaOSLab/EdgeAPI/internal/events" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/iwind/TeaGo/lists" + "github.com/iwind/TeaGo/logs" "github.com/iwind/TeaGo/types" "net/http" "strconv" @@ -54,20 +56,14 @@ func (this *HealthCheckExecutor) Run() ([]*HealthCheckResult, error) { Node: node, } - addresses, err := models.NewNodeIPAddressDAO().FindAllEnabledAddressesWithNode(int64(node.Id)) + ipAddr, err := models.NewNodeIPAddressDAO().FindFirstNodeIPAddress(int64(node.Id)) if err != nil { return nil, err } - accessAddresses := []string{} - for _, addr := range addresses { - if addr.CanAccess == 1 { - accessAddresses = append(accessAddresses, addr.Ip) - } - } - if len(accessAddresses) == 0 { + if len(ipAddr) == 0 { result.Error = "no ip address can be used" } else { - result.NodeAddr = accessAddresses[0] + result.NodeAddr = ipAddr } results = append(results, result) @@ -130,6 +126,22 @@ func (this *HealthCheckExecutor) Run() ([]*HealthCheckResult, error) { time.Sleep(tryDelay) } } + + // 修改节点状态 + if healthCheckConfig.AutoDown { + isChanged, err := models.SharedNodeDAO.UpdateNodeUp(int64(result.Node.Id), result.IsOk, healthCheckConfig.CountUp, healthCheckConfig.CountDown) + if err != nil { + logs.Println("[HEALTH_CHECK]" + err.Error()) + } else if isChanged { + // 通知更新 + select { + case events.NodeDNSChanges <- int64(result.Node.Id): + default: + + } + } + } + wg.Done() default: return