diff --git a/internal/db/models/node_ip_address_dao.go b/internal/db/models/node_ip_address_dao.go index 3bc7c3d7..9d6bcbdd 100644 --- a/internal/db/models/node_ip_address_dao.go +++ b/internal/db/models/node_ip_address_dao.go @@ -103,7 +103,7 @@ func (this *NodeIPAddressDAO) FindAddressName(tx *dbs.Tx, id int64) (string, err } // CreateAddress 创建IP地址 -func (this *NodeIPAddressDAO) CreateAddress(tx *dbs.Tx, adminId int64, nodeId int64, role nodeconfigs.NodeRole, name string, ip string, canAccess bool, thresholdsJSON []byte) (addressId int64, err error) { +func (this *NodeIPAddressDAO) CreateAddress(tx *dbs.Tx, adminId int64, nodeId int64, role nodeconfigs.NodeRole, name string, ip string, canAccess bool) (addressId int64, err error) { if len(role) == 0 { role = nodeconfigs.NodeRoleNode } @@ -115,12 +115,6 @@ func (this *NodeIPAddressDAO) CreateAddress(tx *dbs.Tx, adminId int64, nodeId in op.Ip = ip op.CanAccess = canAccess - if len(thresholdsJSON) > 0 { - op.Thresholds = thresholdsJSON - } else { - op.Thresholds = "[]" - } - op.State = NodeIPAddressStateEnabled addressId, err = this.SaveInt64(tx, op) if err != nil { @@ -142,7 +136,7 @@ func (this *NodeIPAddressDAO) CreateAddress(tx *dbs.Tx, adminId int64, nodeId in } // UpdateAddress 修改IP地址 -func (this *NodeIPAddressDAO) UpdateAddress(tx *dbs.Tx, adminId int64, addressId int64, name string, ip string, canAccess bool, isOn bool, thresholdsJSON []byte) (err error) { +func (this *NodeIPAddressDAO) UpdateAddress(tx *dbs.Tx, adminId int64, addressId int64, name string, ip string, canAccess bool, isOn bool) (err error) { if addressId <= 0 { return errors.New("invalid addressId") } @@ -154,12 +148,6 @@ func (this *NodeIPAddressDAO) UpdateAddress(tx *dbs.Tx, adminId int64, addressId op.CanAccess = canAccess op.IsOn = isOn - if len(thresholdsJSON) > 0 { - op.Thresholds = thresholdsJSON - } else { - op.Thresholds = "[]" - } - op.State = NodeIPAddressStateEnabled // 恢复状态 err = this.Save(tx, op) if err != nil { @@ -401,6 +389,18 @@ func (this *NodeIPAddressDAO) UpdateAddressConnectivity(tx *dbs.Tx, addrId int64 UpdateQuickly() } +// UpdateAddressIsUp 设置IP地址在线状态 +func (this *NodeIPAddressDAO) UpdateAddressIsUp(tx *dbs.Tx, addressId int64, isUp bool) error { + var err = this.Query(tx). + Pk(addressId). + Set("isUp", isUp). + UpdateQuickly() + if err != nil { + return err + } + return this.NotifyUpdate(tx, addressId) +} + // NotifyUpdate 通知更新 func (this *NodeIPAddressDAO) NotifyUpdate(tx *dbs.Tx, addressId int64) error { address, err := this.Query(tx). diff --git a/internal/db/models/node_ip_address_dao_test.go b/internal/db/models/node_ip_address_dao_test.go index 97c24b56..25c9f8bf 100644 --- a/internal/db/models/node_ip_address_dao_test.go +++ b/internal/db/models/node_ip_address_dao_test.go @@ -1,5 +1,31 @@ +//go:build plus +// +build plus + package models import ( + "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" _ "github.com/go-sql-driver/mysql" + "github.com/iwind/TeaGo/dbs" + "testing" ) + +func TestNodeIPAddressDAO_FireThresholds(t *testing.T) { + dbs.NotifyReady() + + var tx *dbs.Tx + var nodeId int64 = 126 + node, err := SharedNodeDAO.FindEnabledNode(tx, nodeId) + if err != nil { + t.Fatal(err) + } + if node == nil { + t.Log("node not found") + return + } + err = SharedNodeIPAddressDAO.FireThresholds(tx, nodeconfigs.NodeRoleNode, nodeId) + if err != nil { + t.Fatal(err) + } + t.Log("ok") +} diff --git a/internal/db/models/node_ip_address_model.go b/internal/db/models/node_ip_address_model.go index 297ba02e..8f5e998b 100644 --- a/internal/db/models/node_ip_address_model.go +++ b/internal/db/models/node_ip_address_model.go @@ -2,34 +2,34 @@ package models // NodeIPAddress 节点IP地址 type NodeIPAddress struct { - Id uint32 `field:"id"` // ID - NodeId uint32 `field:"nodeId"` // 节点ID - Role string `field:"role"` // 节点角色 - Name string `field:"name"` // 名称 - Ip string `field:"ip"` // IP地址 - Description string `field:"description"` // 描述 - State uint8 `field:"state"` // 状态 - Order uint32 `field:"order"` // 排序 - CanAccess uint8 `field:"canAccess"` // 是否可以访问 - IsOn uint8 `field:"isOn"` // 是否启用 - IsUp uint8 `field:"isUp"` // 是否上线 - Thresholds string `field:"thresholds"` // 上线阈值 + Id uint32 `field:"id"` // ID + NodeId uint32 `field:"nodeId"` // 节点ID + Role string `field:"role"` // 节点角色 + Name string `field:"name"` // 名称 + Ip string `field:"ip"` // IP地址 + Description string `field:"description"` // 描述 + State uint8 `field:"state"` // 状态 + Order uint32 `field:"order"` // 排序 + CanAccess uint8 `field:"canAccess"` // 是否可以访问 + IsOn uint8 `field:"isOn"` // 是否启用 + IsUp uint8 `field:"isUp"` // 是否上线 + //Thresholds string `field:"thresholds"` // 上线阈值 Connectivity string `field:"connectivity"` // 连通性状态 } type NodeIPAddressOperator struct { - Id interface{} // ID - NodeId interface{} // 节点ID - Role interface{} // 节点角色 - Name interface{} // 名称 - Ip interface{} // IP地址 - Description interface{} // 描述 - State interface{} // 状态 - Order interface{} // 排序 - CanAccess interface{} // 是否可以访问 - IsOn interface{} // 是否启用 - IsUp interface{} // 是否上线 - Thresholds interface{} // 上线阈值 + Id interface{} // ID + NodeId interface{} // 节点ID + Role interface{} // 节点角色 + Name interface{} // 名称 + Ip interface{} // IP地址 + Description interface{} // 描述 + State interface{} // 状态 + Order interface{} // 排序 + CanAccess interface{} // 是否可以访问 + IsOn interface{} // 是否启用 + IsUp interface{} // 是否上线 + //Thresholds interface{} // 上线阈值 Connectivity interface{} // 连通性状态 } diff --git a/internal/db/models/node_ip_address_model_ext.go b/internal/db/models/node_ip_address_model_ext.go index 77d28d2b..77bff77a 100644 --- a/internal/db/models/node_ip_address_model_ext.go +++ b/internal/db/models/node_ip_address_model_ext.go @@ -4,22 +4,8 @@ import ( "encoding/json" "github.com/TeaOSLab/EdgeAPI/internal/remotelogs" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" - "github.com/iwind/TeaGo/logs" ) -func (this *NodeIPAddress) DecodeThresholds() []*nodeconfigs.NodeValueThresholdConfig { - var result = []*nodeconfigs.NodeValueThresholdConfig{} - if len(this.Thresholds) == 0 { - return result - } - err := json.Unmarshal([]byte(this.Thresholds), &result) - if err != nil { - // 不处理错误 - logs.Error(err) - } - return result -} - func (this *NodeIPAddress) DecodeConnectivity() *nodeconfigs.Connectivity { var connectivity = &nodeconfigs.Connectivity{} if len(this.Connectivity) > 0 { diff --git a/internal/db/models/node_ip_address_threshold_dao.go b/internal/db/models/node_ip_address_threshold_dao.go new file mode 100644 index 00000000..5012790d --- /dev/null +++ b/internal/db/models/node_ip_address_threshold_dao.go @@ -0,0 +1,236 @@ +package models + +import ( + "encoding/json" + "github.com/TeaOSLab/EdgeAPI/internal/errors" + "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" + _ "github.com/go-sql-driver/mysql" + "github.com/iwind/TeaGo/Tea" + "github.com/iwind/TeaGo/dbs" + "github.com/iwind/TeaGo/maps" +) + +const ( + NodeIPAddressThresholdStateEnabled = 1 // 已启用 + NodeIPAddressThresholdStateDisabled = 0 // 已禁用 +) + +type NodeIPAddressThresholdDAO dbs.DAO + +func NewNodeIPAddressThresholdDAO() *NodeIPAddressThresholdDAO { + return dbs.NewDAO(&NodeIPAddressThresholdDAO{ + DAOObject: dbs.DAOObject{ + DB: Tea.Env, + Table: "edgeNodeIPAddressThresholds", + Model: new(NodeIPAddressThreshold), + PkName: "id", + }, + }).(*NodeIPAddressThresholdDAO) +} + +var SharedNodeIPAddressThresholdDAO *NodeIPAddressThresholdDAO + +func init() { + dbs.OnReady(func() { + SharedNodeIPAddressThresholdDAO = NewNodeIPAddressThresholdDAO() + }) +} + +// EnableNodeIPAddressThreshold 启用条目 +func (this *NodeIPAddressThresholdDAO) EnableNodeIPAddressThreshold(tx *dbs.Tx, id uint64) error { + _, err := this.Query(tx). + Pk(id). + Set("state", NodeIPAddressThresholdStateEnabled). + Update() + return err +} + +// DisableNodeIPAddressThreshold 禁用条目 +func (this *NodeIPAddressThresholdDAO) DisableNodeIPAddressThreshold(tx *dbs.Tx, id int64) error { + _, err := this.Query(tx). + Pk(id). + Set("state", NodeIPAddressThresholdStateDisabled). + Update() + return err +} + +// FindEnabledNodeIPAddressThreshold 查找启用中的条目 +func (this *NodeIPAddressThresholdDAO) FindEnabledNodeIPAddressThreshold(tx *dbs.Tx, id uint64) (*NodeIPAddressThreshold, error) { + result, err := this.Query(tx). + Pk(id). + Attr("state", NodeIPAddressThresholdStateEnabled). + Find() + if result == nil { + return nil, err + } + return result.(*NodeIPAddressThreshold), err +} + +// FindAllEnabledThresholdsWithAddrId 查找所有阈值 +func (this *NodeIPAddressThresholdDAO) FindAllEnabledThresholdsWithAddrId(tx *dbs.Tx, addrId int64) (result []*NodeIPAddressThreshold, err error) { + _, err = this.Query(tx). + Attr("addressId", addrId). + State(NodeIPAddressThresholdStateEnabled). + AscPk(). + Desc("order"). + Slice(&result). + FindAll() + if err != nil { + return nil, err + } + + // 过滤参数 + for _, threshold := range result { + err := this.formatThreshold(tx, threshold) + if err != nil { + return nil, err + } + } + + return +} + +// CountAllEnabledThresholdsWithAddrId 计算所有阈值数量 +func (this *NodeIPAddressThresholdDAO) CountAllEnabledThresholdsWithAddrId(tx *dbs.Tx, addrId int64) (int64, error) { + return this.Query(tx). + Attr("addressId", addrId). + State(NodeIPAddressThresholdStateEnabled). + Count() +} + +// FindThresholdNotifiedAt 查找上次通知时间 +func (this *NodeIPAddressThresholdDAO) FindThresholdNotifiedAt(tx *dbs.Tx, thresholdId int64) (int64, error) { + return this.Query(tx). + Pk(thresholdId). + Result("notifiedAt"). + FindInt64Col(0) +} + +// UpdateThresholdNotifiedAt 设置上次通知时间 +func (this *NodeIPAddressThresholdDAO) UpdateThresholdNotifiedAt(tx *dbs.Tx, thresholdId int64, timestamp int64) error { + return this.Query(tx). + Pk(thresholdId). + Set("notifiedAt", timestamp). + UpdateQuickly() +} + +// CreateThreshold 创建阈值 +func (this *NodeIPAddressThresholdDAO) CreateThreshold(tx *dbs.Tx, addressId int64, items []*nodeconfigs.NodeValueThresholdItemConfig, actions []*nodeconfigs.NodeValueThresholdActionConfig, order int) (int64, error) { + if addressId <= 0 { + return 0, errors.New("invalid addressId") + } + var op = NewNodeIPAddressThresholdOperator() + op.Order = order + op.AddressId = addressId + + if len(items) > 0 { + itemsJSON, err := json.Marshal(items) + if err != nil { + return 0, err + } + op.Items = itemsJSON + } else { + op.Items = "[]" + } + + if len(actions) > 0 { + actionsJSON, err := json.Marshal(actions) + if err != nil { + return 0, err + } + op.Actions = actionsJSON + } else { + op.Actions = "[]" + } + + op.State = NodeIPAddressThresholdStateEnabled + return this.SaveInt64(tx, op) +} + +// UpdateThreshold 修改阈值 +func (this *NodeIPAddressThresholdDAO) UpdateThreshold(tx *dbs.Tx, thresholdId int64, items []*nodeconfigs.NodeValueThresholdItemConfig, actions []*nodeconfigs.NodeValueThresholdActionConfig, order int) error { + if thresholdId <= 0 { + return errors.New("invalid thresholdId") + } + var op = NewNodeIPAddressThresholdOperator() + + op.State = NodeIPAddressThresholdStateEnabled // 恢复状态 + if order >= 0 { + op.Order = order + } + + op.Id = thresholdId + + if len(items) > 0 { + itemsJSON, err := json.Marshal(items) + if err != nil { + return err + } + op.Items = itemsJSON + } else { + op.Items = "[]" + } + + if len(actions) > 0 { + actionsJSON, err := json.Marshal(actions) + if err != nil { + return err + } + op.Actions = actionsJSON + } else { + op.Actions = "[]" + } + + return this.Save(tx, op) +} + +// DisableAllThresholdsWithAddrId 禁用所有阈值 +func (this *NodeIPAddressThresholdDAO) DisableAllThresholdsWithAddrId(tx *dbs.Tx, addrId int64) error { + return this.Query(tx). + Attr("addressId", addrId). + Set("state", NodeIPAddressThresholdStateDisabled). + UpdateQuickly() +} + +// 格式化阈值 +func (this *NodeIPAddressThresholdDAO) formatThreshold(tx *dbs.Tx, threshold *NodeIPAddressThreshold) error { + if len(threshold.Items) == 0 { + return nil + } + var items = threshold.DecodeItems() + for _, item := range items { + if item.Item == nodeconfigs.IPAddressThresholdItemConnectivity { + if item.Options == nil { + continue + } + var groups = item.Options.GetSlice("groups") + if len(groups) > 0 { + var newGroups = []maps.Map{} + for _, groupOne := range groups { + var groupMap = maps.NewMap(groupOne) + var groupId = groupMap.GetInt64("id") + group, err := SharedReportNodeGroupDAO.FindEnabledReportNodeGroup(tx, groupId) + if err != nil { + return err + } + if group == nil { + continue + } + newGroups = append(newGroups, maps.Map{ + "id": group.Id, + "name": group.Name, + }) + } + item.Options["groups"] = newGroups + } + } + } + + itemsJSON, err := json.Marshal(items) + if err != nil { + return err + } + threshold.Items = string(itemsJSON) + + return nil +} diff --git a/internal/db/models/node_ip_address_threshold_dao_test.go b/internal/db/models/node_ip_address_threshold_dao_test.go new file mode 100644 index 00000000..224e9db7 --- /dev/null +++ b/internal/db/models/node_ip_address_threshold_dao_test.go @@ -0,0 +1,6 @@ +package models + +import ( + _ "github.com/go-sql-driver/mysql" + _ "github.com/iwind/TeaGo/bootstrap" +) diff --git a/internal/db/models/node_ip_address_threshold_model.go b/internal/db/models/node_ip_address_threshold_model.go new file mode 100644 index 00000000..b588300a --- /dev/null +++ b/internal/db/models/node_ip_address_threshold_model.go @@ -0,0 +1,26 @@ +package models + +// NodeIPAddressThreshold IP地址阈值 +type NodeIPAddressThreshold struct { + Id uint64 `field:"id"` // ID + AddressId uint64 `field:"addressId"` // IP地址ID + Items string `field:"items"` // 阈值条目 + Actions string `field:"actions"` // 动作 + NotifiedAt uint64 `field:"notifiedAt"` // 上次通知时间 + State uint8 `field:"state"` // 状态 + Order uint32 `field:"order"` // 排序 +} + +type NodeIPAddressThresholdOperator struct { + Id interface{} // ID + AddressId interface{} // IP地址ID + Items interface{} // 阈值条目 + Actions interface{} // 动作 + NotifiedAt interface{} // 上次通知时间 + State interface{} // 状态 + Order interface{} // 排序 +} + +func NewNodeIPAddressThresholdOperator() *NodeIPAddressThresholdOperator { + return &NodeIPAddressThresholdOperator{} +} diff --git a/internal/db/models/node_ip_address_threshold_model_ext.go b/internal/db/models/node_ip_address_threshold_model_ext.go new file mode 100644 index 00000000..8e117075 --- /dev/null +++ b/internal/db/models/node_ip_address_threshold_model_ext.go @@ -0,0 +1,31 @@ +package models + +import ( + "encoding/json" + "github.com/TeaOSLab/EdgeAPI/internal/remotelogs" + "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" +) + +func (this *NodeIPAddressThreshold) DecodeItems() (result []*nodeconfigs.NodeValueThresholdItemConfig) { + if len(this.Items) == 0 { + return + } + + err := json.Unmarshal([]byte(this.Items), &result) + if err != nil { + remotelogs.Error("NodeIPAddressThreshold", "decode items: "+err.Error()) + } + return +} + +func (this *NodeIPAddressThreshold) DecodeActions() (result []*nodeconfigs.NodeValueThresholdActionConfig) { + if len(this.Actions) == 0 { + return + } + + err := json.Unmarshal([]byte(this.Actions), &result) + if err != nil { + remotelogs.Error("NodeIPAddressThreshold", "decode actions: "+err.Error()) + } + return +} diff --git a/internal/db/models/report_result_dao.go b/internal/db/models/report_result_dao.go index 9a9c0689..b415625b 100644 --- a/internal/db/models/report_result_dao.go +++ b/internal/db/models/report_result_dao.go @@ -7,6 +7,7 @@ import ( "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/maps" + "github.com/iwind/TeaGo/types" "time" ) @@ -163,26 +164,41 @@ func (this *ReportResultDAO) FindAvgLevelWithTarget(tx *dbs.Tx, taskType reporte } // FindConnectivityWithTarget 获取某个对象的连通率 -func (this *ReportResultDAO) FindConnectivityWithTarget(tx *dbs.Tx, taskType reporterconfigs.TaskType, targetId int64) (float64, error) { - // 已汇报数据的数量 - total, err := this.Query(tx). +func (this *ReportResultDAO) FindConnectivityWithTarget(tx *dbs.Tx, taskType reporterconfigs.TaskType, targetId int64, groupId int64) (float64, error) { + var query = this.Query(tx). Attr("type", taskType). - Attr("targetId", targetId). - Where("reportNodeId IN (SELECT id FROM "+SharedReportNodeDAO.Table+" WHERE state=1 AND isOn=1)"). + Attr("targetId", targetId) + if groupId > 0 { + query.Where("reportNodeId IN (SELECT id FROM "+SharedReportNodeDAO.Table+" WHERE state=1 AND isOn=1 AND JSON_CONTAINS(groupIds, :groupIdString))"). + Param("groupIdString", types.String(groupId)) + } else { + query.Where("reportNodeId IN (SELECT id FROM " + SharedReportNodeDAO.Table + " WHERE state=1 AND isOn=1)") + } + + // 已汇报数据的数量 + total, err := query. Gt("updatedAt", time.Now().Unix()-600). Count() if err != nil { return 0, err } if total == 0 { - return 0, nil + return 1, nil } // 连通的数量 - countConnected, err := this.Query(tx). + var connectedQuery = this.Query(tx). Attr("type", taskType). - Attr("targetId", targetId). - Where("reportNodeId IN (SELECT id FROM "+SharedReportNodeDAO.Table+" WHERE state=1 AND isOn=1)"). + Attr("targetId", targetId) + + if groupId > 0 { + connectedQuery.Where("reportNodeId IN (SELECT id FROM "+SharedReportNodeDAO.Table+" WHERE state=1 AND isOn=1 AND JSON_CONTAINS(groupIds, :groupIdString))"). + Param("groupIdString", types.String(groupId)) + } else { + connectedQuery.Where("reportNodeId IN (SELECT id FROM " + SharedReportNodeDAO.Table + " WHERE state=1 AND isOn=1)") + } + + countConnected, err := connectedQuery. Attr("isOk", true). Gt("updatedAt", time.Now().Unix()-600). Count() diff --git a/internal/nodes/api_node_services.go b/internal/nodes/api_node_services.go index 5da59d0f..b4c2287b 100644 --- a/internal/nodes/api_node_services.go +++ b/internal/nodes/api_node_services.go @@ -53,6 +53,11 @@ func (this *APINode) registerServices(server *grpc.Server) { pb.RegisterNodeIPAddressLogServiceServer(server, instance) this.rest(instance) } + { + instance := this.serviceInstance(&services.NodeIPAddressThresholdService{}).(*services.NodeIPAddressThresholdService) + pb.RegisterNodeIPAddressThresholdServiceServer(server, instance) + this.rest(instance) + } { instance := this.serviceInstance(&services.APINodeService{}).(*services.APINodeService) pb.RegisterAPINodeServiceServer(server, instance) diff --git a/internal/rpc/services/service_node.go b/internal/rpc/services/service_node.go index 4696ce7d..879bbcad 100644 --- a/internal/rpc/services/service_node.go +++ b/internal/rpc/services/service_node.go @@ -1412,7 +1412,7 @@ func (this *NodeService) UpdateNodeDNS(ctx context.Context, req *pb.UpdateNodeDN return nil, err } } else { - _, err = models.SharedNodeIPAddressDAO.CreateAddress(tx, adminId, req.NodeId, nodeconfigs.NodeRoleNode, "DNS IP", req.IpAddr, true, nil) + _, err = models.SharedNodeIPAddressDAO.CreateAddress(tx, adminId, req.NodeId, nodeconfigs.NodeRoleNode, "DNS IP", req.IpAddr, true) if err != nil { return nil, err } diff --git a/internal/rpc/services/service_node_ip_address.go b/internal/rpc/services/service_node_ip_address.go index b9de5a34..6d09dc9f 100644 --- a/internal/rpc/services/service_node_ip_address.go +++ b/internal/rpc/services/service_node_ip_address.go @@ -21,7 +21,7 @@ func (this *NodeIPAddressService) CreateNodeIPAddress(ctx context.Context, req * tx := this.NullTx() - addressId, err := models.SharedNodeIPAddressDAO.CreateAddress(tx, adminId, req.NodeId, req.Role, req.Name, req.Ip, req.CanAccess, req.ThresholdsJSON) + addressId, err := models.SharedNodeIPAddressDAO.CreateAddress(tx, adminId, req.NodeId, req.Role, req.Name, req.Ip, req.CanAccess) if err != nil { return nil, err } @@ -39,7 +39,7 @@ func (this *NodeIPAddressService) UpdateNodeIPAddress(ctx context.Context, req * tx := this.NullTx() - err = models.SharedNodeIPAddressDAO.UpdateAddress(tx, adminId, req.NodeIPAddressId, req.Name, req.Ip, req.CanAccess, req.IsOn, req.ThresholdsJSON) + err = models.SharedNodeIPAddressDAO.UpdateAddress(tx, adminId, req.NodeIPAddressId, req.Name, req.Ip, req.CanAccess, req.IsOn) if err != nil { return nil, err } @@ -119,18 +119,17 @@ func (this *NodeIPAddressService) FindEnabledNodeIPAddress(ctx context.Context, var result *pb.NodeIPAddress = nil if address != nil { result = &pb.NodeIPAddress{ - Id: int64(address.Id), - NodeId: int64(address.NodeId), - Role: address.Role, - Name: address.Name, - Ip: address.Ip, - Description: address.Description, - State: int64(address.State), - Order: int64(address.Order), - CanAccess: address.CanAccess == 1, - IsOn: address.IsOn == 1, - IsUp: address.IsUp == 1, - ThresholdsJSON: []byte(address.Thresholds), + Id: int64(address.Id), + NodeId: int64(address.NodeId), + Role: address.Role, + Name: address.Name, + Ip: address.Ip, + Description: address.Description, + State: int64(address.State), + Order: int64(address.Order), + CanAccess: address.CanAccess == 1, + IsOn: address.IsOn == 1, + IsUp: address.IsUp == 1, } } @@ -155,18 +154,17 @@ func (this *NodeIPAddressService) FindAllEnabledIPAddressesWithNodeId(ctx contex result := []*pb.NodeIPAddress{} for _, address := range addresses { result = append(result, &pb.NodeIPAddress{ - Id: int64(address.Id), - NodeId: int64(address.NodeId), - Role: address.Role, - Name: address.Name, - Ip: address.Ip, - Description: address.Description, - State: int64(address.State), - Order: int64(address.Order), - CanAccess: address.CanAccess == 1, - IsOn: address.IsOn == 1, - IsUp: address.IsUp == 1, - ThresholdsJSON: []byte(address.Thresholds), + Id: int64(address.Id), + NodeId: int64(address.NodeId), + Role: address.Role, + Name: address.Name, + Ip: address.Ip, + Description: address.Description, + State: int64(address.State), + Order: int64(address.Order), + CanAccess: address.CanAccess == 1, + IsOn: address.IsOn == 1, + IsUp: address.IsUp == 1, }) } @@ -207,16 +205,15 @@ func (this *NodeIPAddressService) ListEnabledIPAddresses(ctx context.Context, re var pbAddrs = []*pb.NodeIPAddress{} for _, addr := range addresses { pbAddrs = append(pbAddrs, &pb.NodeIPAddress{ - Id: int64(addr.Id), - NodeId: int64(addr.NodeId), - Role: addr.Role, - Name: addr.Name, - Ip: addr.Ip, - Description: addr.Description, - CanAccess: addr.CanAccess == 1, - IsOn: addr.IsOn == 1, - IsUp: addr.IsUp == 1, - ThresholdsJSON: []byte(addr.Thresholds), + Id: int64(addr.Id), + NodeId: int64(addr.NodeId), + Role: addr.Role, + Name: addr.Name, + Ip: addr.Ip, + Description: addr.Description, + CanAccess: addr.CanAccess == 1, + IsOn: addr.IsOn == 1, + IsUp: addr.IsUp == 1, }) } return &pb.ListEnabledIPAddressesResponse{NodeIPAddresses: pbAddrs}, nil diff --git a/internal/rpc/services/service_node_ip_address_threshold.go b/internal/rpc/services/service_node_ip_address_threshold.go new file mode 100644 index 00000000..e24d9c11 --- /dev/null +++ b/internal/rpc/services/service_node_ip_address_threshold.go @@ -0,0 +1,171 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package services + +import ( + "context" + "encoding/json" + "github.com/TeaOSLab/EdgeAPI/internal/db/models" + "github.com/TeaOSLab/EdgeAPI/internal/errors" + "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" +) + +// NodeIPAddressThresholdService IP阈值相关服务 +type NodeIPAddressThresholdService struct { + BaseService +} + +// CreateNodeIPAddressThreshold 创建阈值 +func (this *NodeIPAddressThresholdService) CreateNodeIPAddressThreshold(ctx context.Context, req *pb.CreateNodeIPAddressThresholdRequest) (*pb.CreateNodeIPAddressThresholdResponse, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + var items = []*nodeconfigs.NodeValueThresholdItemConfig{} + if len(req.ItemsJSON) > 0 { + err = json.Unmarshal(req.ItemsJSON, &items) + if err != nil { + return nil, errors.New("decode items failed: " + err.Error()) + } + } + + var actions = []*nodeconfigs.NodeValueThresholdActionConfig{} + if len(req.ActionsJSON) > 0 { + err = json.Unmarshal(req.ActionsJSON, &actions) + if err != nil { + return nil, errors.New("decode actions failed: " + err.Error()) + } + } + + thresholdId, err := models.SharedNodeIPAddressThresholdDAO.CreateThreshold(tx, req.NodeIPAddressId, items, actions, 0) + if err != nil { + return nil, err + } + return &pb.CreateNodeIPAddressThresholdResponse{NodeIPAddressThresholdId: thresholdId}, nil +} + +// UpdateNodeIPAddressThreshold 修改阈值 +func (this *NodeIPAddressThresholdService) UpdateNodeIPAddressThreshold(ctx context.Context, req *pb.UpdateNodeIPAddressThresholdRequest) (*pb.RPCSuccess, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + var items = []*nodeconfigs.NodeValueThresholdItemConfig{} + if len(req.ItemsJSON) > 0 { + err = json.Unmarshal(req.ItemsJSON, &items) + if err != nil { + return nil, errors.New("decode items failed: " + err.Error()) + } + } + + var actions = []*nodeconfigs.NodeValueThresholdActionConfig{} + if len(req.ActionsJSON) > 0 { + err = json.Unmarshal(req.ActionsJSON, &actions) + if err != nil { + return nil, errors.New("decode actions failed: " + err.Error()) + } + } + err = models.SharedNodeIPAddressThresholdDAO.UpdateThreshold(tx, req.NodeIPAddressThresholdId, items, actions, -1) + if err != nil { + return nil, err + } + return this.Success() +} + +// DeleteNodeIPAddressThreshold 删除阈值 +func (this *NodeIPAddressThresholdService) DeleteNodeIPAddressThreshold(ctx context.Context, req *pb.DeleteNodeIPAddressThresholdRequest) (*pb.RPCSuccess, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + err = models.SharedNodeIPAddressThresholdDAO.DisableNodeIPAddressThreshold(tx, req.NodeIPAddressThresholdId) + if err != nil { + return nil, err + } + return this.Success() +} + +// FindAllEnabledNodeIPAddressThresholds 查找IP的所有阈值 +func (this *NodeIPAddressThresholdService) FindAllEnabledNodeIPAddressThresholds(ctx context.Context, req *pb.FindAllEnabledNodeIPAddressThresholdsRequest) (*pb.FindAllEnabledNodeIPAddressThresholdsResponse, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + thresholds, err := models.SharedNodeIPAddressThresholdDAO.FindAllEnabledThresholdsWithAddrId(tx, req.NodeIPAddressId) + if err != nil { + return nil, err + } + + var pbThresholds = []*pb.NodeIPAddressThreshold{} + for _, threshold := range thresholds { + pbThresholds = append(pbThresholds, &pb.NodeIPAddressThreshold{ + Id: int64(threshold.Id), + ItemsJSON: []byte(threshold.Items), + ActionsJSON: []byte(threshold.Actions), + }) + } + return &pb.FindAllEnabledNodeIPAddressThresholdsResponse{NodeIPAddressThresholds: pbThresholds}, nil +} + +// CountAllEnabledNodeIPAddressThresholds 计算IP阈值的数量 +func (this *NodeIPAddressThresholdService) CountAllEnabledNodeIPAddressThresholds(ctx context.Context, req *pb.CountAllEnabledNodeIPAddressThresholdsRequest) (*pb.RPCCountResponse, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + count, err := models.SharedNodeIPAddressThresholdDAO.CountAllEnabledThresholdsWithAddrId(tx, req.NodeIPAddressId) + if err != nil { + return nil, err + } + return this.SuccessCount(count) +} + +// UpdateAllNodeIPAddressThresholds 批量更新阈值 +func (this *NodeIPAddressThresholdService) UpdateAllNodeIPAddressThresholds(ctx context.Context, req *pb.UpdateAllNodeIPAddressThresholdsRequest) (*pb.RPCSuccess, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + + var thresholds = []*nodeconfigs.NodeValueThresholdConfig{} + err = json.Unmarshal(req.NodeIPAddressThresholdsJSON, &thresholds) + if err != nil { + return nil, errors.New("decode thresholds failed: " + err.Error()) + } + + err = models.SharedNodeIPAddressThresholdDAO.DisableAllThresholdsWithAddrId(tx, req.NodeIPAddressId) + if err != nil { + return nil, err + } + if len(thresholds) == 0 { + return this.Success() + } + + var count = len(thresholds) + for index, threshold := range thresholds { + var order = count - index + if threshold.Id > 0 { + err = models.SharedNodeIPAddressThresholdDAO.UpdateThreshold(tx, threshold.Id, threshold.Items, threshold.Actions, order) + } else { + _, err = models.SharedNodeIPAddressThresholdDAO.CreateThreshold(tx, req.NodeIPAddressId, threshold.Items, threshold.Actions, order) + } + if err != nil { + return nil, err + } + } + + return this.Success() +} diff --git a/internal/setup/sql_upgrade.go b/internal/setup/sql_upgrade.go index 47d91372..d09b36ef 100644 --- a/internal/setup/sql_upgrade.go +++ b/internal/setup/sql_upgrade.go @@ -367,8 +367,7 @@ func upgradeV0_3_0(db *dbs.DB) error { // v0.3.1 func upgradeV0_3_1(db *dbs.DB) error { - // 忽略错误 + // 清空域名统计,已使用分表代替 _, _ = db.Exec("TRUNCATE table edgeServerDomainHourlyStats") - return nil } diff --git a/internal/setup/sql_upgrade_test.go b/internal/setup/sql_upgrade_test.go index 6a5f1994..060269ba 100644 --- a/internal/setup/sql_upgrade_test.go +++ b/internal/setup/sql_upgrade_test.go @@ -20,3 +20,20 @@ func TestUpgradeSQLData(t *testing.T) { } t.Log("ok") } + + +func TestUpgradeSQLData_v1_3_1(t *testing.T) { + db, err := dbs.NewInstanceFromConfig(&dbs.DBConfig{ + Driver: "mysql", + Dsn: "root:123456@tcp(127.0.0.1:3306)/db_edge_new?charset=utf8mb4&timeout=30s", + Prefix: "edge", + }) + if err != nil { + t.Fatal(err) + } + err = upgradeV0_3_1(db) + if err != nil { + t.Fatal(err) + } + t.Log("ok") +} \ No newline at end of file