diff --git a/internal/db/models/metrics/metric_item_dao.go b/internal/db/models/metric_item_dao.go similarity index 66% rename from internal/db/models/metrics/metric_item_dao.go rename to internal/db/models/metric_item_dao.go index fbf2eaa6..5ca62128 100644 --- a/internal/db/models/metrics/metric_item_dao.go +++ b/internal/db/models/metric_item_dao.go @@ -1,4 +1,4 @@ -package metrics +package models import ( "encoding/json" @@ -8,6 +8,8 @@ import ( "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/types" + "sort" + "strings" ) const ( @@ -46,12 +48,27 @@ func (this *MetricItemDAO) EnableMetricItem(tx *dbs.Tx, id int64) error { } // DisableMetricItem 禁用条目 -func (this *MetricItemDAO) DisableMetricItem(tx *dbs.Tx, id int64) error { +func (this *MetricItemDAO) DisableMetricItem(tx *dbs.Tx, itemId int64) error { _, err := this.Query(tx). - Pk(id). + Pk(itemId). Set("state", MetricItemStateDisabled). Update() - return err + if err != nil { + return err + } + + // 通知更新 + err = this.NotifyUpdate(tx, itemId) + if err != nil { + return err + } + + // 删除统计数据 + err = SharedMetricStatDAO.DeleteItemStats(tx, itemId) + if err != nil { + return err + } + return nil } // FindEnabledMetricItem 查找启用中的条目 @@ -76,6 +93,8 @@ func (this *MetricItemDAO) FindMetricItemName(tx *dbs.Tx, id int64) (string, err // CreateItem 创建指标 func (this *MetricItemDAO) CreateItem(tx *dbs.Tx, code string, category string, name string, keys []string, period int32, periodUnit string, value string) (int64, error) { + sort.Strings(keys) + op := NewMetricItemOperator() op.Code = code op.Category = category @@ -102,6 +121,23 @@ func (this *MetricItemDAO) UpdateItem(tx *dbs.Tx, itemId int64, name string, key if itemId <= 0 { return errors.New("invalid itemId") } + + sort.Strings(keys) + + // 是否有变化 + oldItem, err := this.FindEnabledMetricItem(tx, itemId) + if err != nil { + return err + } + if oldItem == nil { + return nil + } + var versionChanged = false + if strings.Join(oldItem.DecodeKeys(), "&") != strings.Join(keys, "&") || types.Int32(oldItem.Period) != period || oldItem.PeriodUnit != periodUnit || oldItem.Value != value { + versionChanged = true + } + + // 保存 op := NewMetricItemOperator() op.Id = itemId op.Name = name @@ -118,7 +154,31 @@ func (this *MetricItemDAO) UpdateItem(tx *dbs.Tx, itemId int64, name string, key op.PeriodUnit = periodUnit op.Value = value op.IsOn = isOn - return this.Save(tx, op) + if versionChanged { + op.Version = dbs.SQL("version+1") + } + err = this.Save(tx, op) + if err != nil { + return err + } + + // 通知更新 + if versionChanged || (oldItem.IsOn == 0 && isOn) || (oldItem.IsOn == 1 && !isOn) { + err := this.NotifyUpdate(tx, itemId) + if err != nil { + return err + } + } + + // 删除旧数据 + if versionChanged { + err := SharedMetricStatDAO.DeleteOldItemStats(tx, itemId, types.Int32(oldItem.Version+1)) + if err != nil { + return err + } + } + + return nil } // CountEnabledItems 计算指标的数量 @@ -168,7 +228,35 @@ func (this *MetricItemDAO) ComposeItemConfig(tx *dbs.Tx, itemId int64) (*serverc Category: item.Category, Value: item.Value, Keys: item.DecodeKeys(), + Version: types.Int(item.Version), } return config, nil } + +// FindItemVersion 获取指标的版本号 +func (this *MetricItemDAO) FindItemVersion(tx *dbs.Tx, itemId int64) (int32, error) { + version, err := this.Query(tx). + Pk(itemId). + Result("version"). + FindIntCol(0) + if err != nil { + return 0, err + } + return types.Int32(version), nil +} + +// NotifyUpdate 通知更新 +func (this *MetricItemDAO) NotifyUpdate(tx *dbs.Tx, itemId int64) error { + clusterIds, err := SharedNodeClusterMetricItemDAO.FindAllClusterIdsWithItemId(tx, itemId) + if err != nil { + return err + } + for _, clusterId := range clusterIds { + err = SharedNodeTaskDAO.CreateClusterTask(tx, clusterId, NodeTaskTypeConfigChanged) + if err != nil { + return err + } + } + return nil +} diff --git a/internal/db/models/metrics/metric_item_dao_test.go b/internal/db/models/metric_item_dao_test.go similarity index 84% rename from internal/db/models/metrics/metric_item_dao_test.go rename to internal/db/models/metric_item_dao_test.go index aaa4baf7..224e9db7 100644 --- a/internal/db/models/metrics/metric_item_dao_test.go +++ b/internal/db/models/metric_item_dao_test.go @@ -1,4 +1,4 @@ -package metrics +package models import ( _ "github.com/go-sql-driver/mysql" diff --git a/internal/db/models/metrics/metric_item_model.go b/internal/db/models/metric_item_model.go similarity index 92% rename from internal/db/models/metrics/metric_item_model.go rename to internal/db/models/metric_item_model.go index 457307bc..35802e19 100644 --- a/internal/db/models/metrics/metric_item_model.go +++ b/internal/db/models/metric_item_model.go @@ -1,4 +1,4 @@ -package metrics +package models // MetricItem 指标定义 type MetricItem struct { @@ -14,6 +14,7 @@ type MetricItem struct { PeriodUnit string `field:"periodUnit"` // 周期单位 Value string `field:"value"` // 值运算 State uint8 `field:"state"` // 状态 + Version uint32 `field:"version"` // 版本号 } type MetricItemOperator struct { @@ -29,6 +30,7 @@ type MetricItemOperator struct { PeriodUnit interface{} // 周期单位 Value interface{} // 值运算 State interface{} // 状态 + Version interface{} // 版本号 } func NewMetricItemOperator() *MetricItemOperator { diff --git a/internal/db/models/metrics/metric_item_model_ext.go b/internal/db/models/metric_item_model_ext.go similarity index 71% rename from internal/db/models/metrics/metric_item_model_ext.go rename to internal/db/models/metric_item_model_ext.go index 025f4aac..b2968c15 100644 --- a/internal/db/models/metrics/metric_item_model_ext.go +++ b/internal/db/models/metric_item_model_ext.go @@ -1,7 +1,10 @@ -package metrics +package models -import "encoding/json" +import ( + "encoding/json" +) +// DecodeKeys 解析Key func (this *MetricItem) DecodeKeys() []string { var result []string if len(this.Keys) > 0 { diff --git a/internal/db/models/metric_stat_dao.go b/internal/db/models/metric_stat_dao.go new file mode 100644 index 00000000..846e862b --- /dev/null +++ b/internal/db/models/metric_stat_dao.go @@ -0,0 +1,99 @@ +package models + +import ( + "encoding/json" + _ "github.com/go-sql-driver/mysql" + "github.com/iwind/TeaGo/Tea" + "github.com/iwind/TeaGo/dbs" + "github.com/iwind/TeaGo/maps" + "strconv" +) + +type MetricStatDAO dbs.DAO + +func NewMetricStatDAO() *MetricStatDAO { + return dbs.NewDAO(&MetricStatDAO{ + DAOObject: dbs.DAOObject{ + DB: Tea.Env, + Table: "edgeMetricStats", + Model: new(MetricStat), + PkName: "id", + }, + }).(*MetricStatDAO) +} + +var SharedMetricStatDAO *MetricStatDAO + +func init() { + dbs.OnReady(func() { + SharedMetricStatDAO = NewMetricStatDAO() + }) +} + +// CreateStat 创建统计数据 +func (this *MetricStatDAO) CreateStat(tx *dbs.Tx, hash string, clusterId int64, nodeId int64, serverId int64, itemId int64, keys []string, value float64, time string, version int) error { + hash += "@" + strconv.FormatInt(nodeId, 10) + var keysString string + if len(keys) > 0 { + keysJSON, err := json.Marshal(keys) + if err != nil { + return err + } + keysString = string(keysJSON) + } else { + keysString = "[]" + } + return this.Query(tx). + Param("value", value). + InsertOrUpdateQuickly(maps.Map{ + "hash": hash, + "clusterId": clusterId, + "nodeId": nodeId, + "serverId": serverId, + "itemId": itemId, + "value": value, + "time": time, + "version": version, + "keys": keysString, + }, maps.Map{ + "value": value, + }) +} + +// DeleteOldItemStats 删除以前版本的统计数据 +func (this *MetricStatDAO) DeleteOldItemStats(tx *dbs.Tx, itemId int64, version int32) error { + _, err := this.Query(tx). + Attr("itemId", itemId). + Where("version<:version"). + Delete() + return err +} + +// DeleteItemStats 删除某个指标相关的统计数据 +func (this *MetricStatDAO) DeleteItemStats(tx *dbs.Tx, itemId int64) error { + _, err := this.Query(tx). + Attr("itemId", itemId). + Delete() + return err +} + +// CountItemStats 计算统计数据数量 +func (this *MetricStatDAO) CountItemStats(tx *dbs.Tx, itemId int64, version int32) (int64, error) { + return this.Query(tx). + Attr("itemId", itemId). + Attr("version", version). + Count() +} + +// ListItemStats 列出单页统计数据 +func (this *MetricStatDAO) ListItemStats(tx *dbs.Tx, itemId int64, version int32, offset int64, size int64) (result []*MetricStat, err error) { + _, err = this.Query(tx). + Attr("itemId", itemId). + Attr("version", version). + Offset(offset). + Limit(size). + DescPk(). + Slice(&result). + FindAll() + return +} diff --git a/internal/db/models/metrics/metric_stat_dao_test.go b/internal/db/models/metric_stat_dao_test.go similarity index 84% rename from internal/db/models/metrics/metric_stat_dao_test.go rename to internal/db/models/metric_stat_dao_test.go index aaa4baf7..224e9db7 100644 --- a/internal/db/models/metrics/metric_stat_dao_test.go +++ b/internal/db/models/metric_stat_dao_test.go @@ -1,4 +1,4 @@ -package metrics +package models import ( _ "github.com/go-sql-driver/mysql" diff --git a/internal/db/models/metrics/metric_stat_model.go b/internal/db/models/metric_stat_model.go similarity index 63% rename from internal/db/models/metrics/metric_stat_model.go rename to internal/db/models/metric_stat_model.go index ce2672df..cb7c3735 100644 --- a/internal/db/models/metrics/metric_stat_model.go +++ b/internal/db/models/metric_stat_model.go @@ -1,26 +1,30 @@ -package metrics +package models // MetricStat 指标统计数据 type MetricStat struct { Id uint64 `field:"id"` // ID + Hash string `field:"hash"` // Hash值 ClusterId uint32 `field:"clusterId"` // 集群ID NodeId uint32 `field:"nodeId"` // 节点ID ServerId uint32 `field:"serverId"` // 服务ID ItemId uint64 `field:"itemId"` // 指标 - KeyId uint64 `field:"keyId"` // 指标键ID + Keys string `field:"keys"` // 键值 Value float64 `field:"value"` // 数值 - Minute string `field:"minute"` // 分钟值YYYYMMDDHHII + Time string `field:"time"` // 分钟值YYYYMMDDHHII + Version uint32 `field:"version"` // 版本号 } type MetricStatOperator struct { Id interface{} // ID + Hash interface{} // Hash值 ClusterId interface{} // 集群ID NodeId interface{} // 节点ID ServerId interface{} // 服务ID ItemId interface{} // 指标 - KeyId interface{} // 指标键ID + Keys interface{} // 键值 Value interface{} // 数值 - Minute interface{} // 分钟值YYYYMMDDHHII + Time interface{} // 分钟值YYYYMMDDHHII + Version interface{} // 版本号 } func NewMetricStatOperator() *MetricStatOperator { diff --git a/internal/db/models/metric_stat_model_ext.go b/internal/db/models/metric_stat_model_ext.go new file mode 100644 index 00000000..32a5118b --- /dev/null +++ b/internal/db/models/metric_stat_model_ext.go @@ -0,0 +1,12 @@ +package models + +import "encoding/json" + +// DecodeKeys 解析Key +func (this *MetricStat) DecodeKeys() []string { + var result []string + if len(this.Keys) > 0 { + _ = json.Unmarshal([]byte(this.Keys), &result) + } + return result +} diff --git a/internal/db/models/metrics/metric_key_dao.go b/internal/db/models/metrics/metric_key_dao.go deleted file mode 100644 index a479d509..00000000 --- a/internal/db/models/metrics/metric_key_dao.go +++ /dev/null @@ -1,28 +0,0 @@ -package metrics - -import ( - _ "github.com/go-sql-driver/mysql" - "github.com/iwind/TeaGo/Tea" - "github.com/iwind/TeaGo/dbs" -) - -type MetricKeyDAO dbs.DAO - -func NewMetricKeyDAO() *MetricKeyDAO { - return dbs.NewDAO(&MetricKeyDAO{ - DAOObject: dbs.DAOObject{ - DB: Tea.Env, - Table: "edgeMetricKeys", - Model: new(MetricKey), - PkName: "id", - }, - }).(*MetricKeyDAO) -} - -var SharedMetricKeyDAO *MetricKeyDAO - -func init() { - dbs.OnReady(func() { - SharedMetricKeyDAO = NewMetricKeyDAO() - }) -} diff --git a/internal/db/models/metrics/metric_key_dao_test.go b/internal/db/models/metrics/metric_key_dao_test.go deleted file mode 100644 index aaa4baf7..00000000 --- a/internal/db/models/metrics/metric_key_dao_test.go +++ /dev/null @@ -1,6 +0,0 @@ -package metrics - -import ( - _ "github.com/go-sql-driver/mysql" - _ "github.com/iwind/TeaGo/bootstrap" -) diff --git a/internal/db/models/metrics/metric_key_model.go b/internal/db/models/metrics/metric_key_model.go deleted file mode 100644 index 263e2a7f..00000000 --- a/internal/db/models/metrics/metric_key_model.go +++ /dev/null @@ -1,20 +0,0 @@ -package metrics - -// MetricKey 指标键值 -type MetricKey struct { - Id uint64 `field:"id"` // ID - ItemId uint64 `field:"itemId"` // 指标ID - Value string `field:"value"` // 值 - Hash string `field:"hash"` // 对值进行Hash -} - -type MetricKeyOperator struct { - Id interface{} // ID - ItemId interface{} // 指标ID - Value interface{} // 值 - Hash interface{} // 对值进行Hash -} - -func NewMetricKeyOperator() *MetricKeyOperator { - return &MetricKeyOperator{} -} diff --git a/internal/db/models/metrics/metric_key_model_ext.go b/internal/db/models/metrics/metric_key_model_ext.go deleted file mode 100644 index 1abe097a..00000000 --- a/internal/db/models/metrics/metric_key_model_ext.go +++ /dev/null @@ -1 +0,0 @@ -package metrics diff --git a/internal/db/models/metrics/metric_stat_dao.go b/internal/db/models/metrics/metric_stat_dao.go deleted file mode 100644 index a4f2c8f9..00000000 --- a/internal/db/models/metrics/metric_stat_dao.go +++ /dev/null @@ -1,28 +0,0 @@ -package metrics - -import ( - _ "github.com/go-sql-driver/mysql" - "github.com/iwind/TeaGo/Tea" - "github.com/iwind/TeaGo/dbs" -) - -type MetricStatDAO dbs.DAO - -func NewMetricStatDAO() *MetricStatDAO { - return dbs.NewDAO(&MetricStatDAO{ - DAOObject: dbs.DAOObject{ - DB: Tea.Env, - Table: "edgeMetricStats", - Model: new(MetricStat), - PkName: "id", - }, - }).(*MetricStatDAO) -} - -var SharedMetricStatDAO *MetricStatDAO - -func init() { - dbs.OnReady(func() { - SharedMetricStatDAO = NewMetricStatDAO() - }) -} diff --git a/internal/db/models/metrics/metric_stat_model_ext.go b/internal/db/models/metrics/metric_stat_model_ext.go deleted file mode 100644 index 1abe097a..00000000 --- a/internal/db/models/metrics/metric_stat_model_ext.go +++ /dev/null @@ -1 +0,0 @@ -package metrics diff --git a/internal/db/models/node_cluster_dao.go b/internal/db/models/node_cluster_dao.go index 09b3779c..a40c6827 100644 --- a/internal/db/models/node_cluster_dao.go +++ b/internal/db/models/node_cluster_dao.go @@ -85,9 +85,9 @@ func (this *NodeClusterDAO) FindEnabledClusterIdWithUniqueId(tx *dbs.Tx, uniqueI } // FindNodeClusterName 根据主键查找名称 -func (this *NodeClusterDAO) FindNodeClusterName(tx *dbs.Tx, id int64) (string, error) { +func (this *NodeClusterDAO) FindNodeClusterName(tx *dbs.Tx, clusterId int64) (string, error) { return this.Query(tx). - Pk(id). + Pk(clusterId). Result("name"). FindStringCol("") } diff --git a/internal/db/models/node_cluster_metric_item_dao.go b/internal/db/models/node_cluster_metric_item_dao.go index 2d9c7f64..b2e4789d 100644 --- a/internal/db/models/node_cluster_metric_item_dao.go +++ b/internal/db/models/node_cluster_metric_item_dao.go @@ -65,12 +65,16 @@ func (this *NodeClusterMetricItemDAO) FindEnabledNodeClusterMetricItem(tx *dbs.T // DisableClusterItem 禁用某个集群的指标 func (this *NodeClusterMetricItemDAO) DisableClusterItem(tx *dbs.Tx, clusterId int64, itemId int64) error { - return this.Query(tx). + err := this.Query(tx). Attr("clusterId", clusterId). Attr("itemId", itemId). State(NodeClusterMetricItemStateEnabled). Set("state", NodeClusterMetricItemStateDisabled). UpdateQuickly() + if err != nil { + return err + } + return this.NotifyUpdate(tx, clusterId) } // EnableClusterItem 启用某个集群的指标 @@ -82,20 +86,59 @@ func (this *NodeClusterMetricItemDAO) EnableClusterItem(tx *dbs.Tx, clusterId in op.ClusterId = clusterId op.ItemId = itemId op.IsOn = true - return this.Save(tx, op) + err := this.Save(tx, op) + if err != nil { + return err + } + return this.NotifyUpdate(tx, clusterId) } // FindAllClusterItems 查找某个集群的指标 -func (this *NodeClusterMetricItemDAO) FindAllClusterItems(tx *dbs.Tx, clusterId int64) (result []*NodeClusterMetricItem, err error) { - _, err = this.Query(tx). +// category 不填写即表示获取所有指标 +func (this *NodeClusterMetricItemDAO) FindAllClusterItems(tx *dbs.Tx, clusterId int64, category string) (result []*NodeClusterMetricItem, err error) { + var query = this.Query(tx). Attr("clusterId", clusterId). - State(NodeClusterMetricItemStateEnabled). + State(NodeClusterMetricItemStateEnabled) + if len(category) > 0 { + query.Where("itemId IN (SELECT id FROM "+SharedMetricItemDAO.Table+" WHERE category=:category)"). + Param("category", category) + } + _, err = query. DescPk(). Slice(&result). FindAll() return } +// FindAllClusterItemIds 查找某个集群的指标Ids +func (this *NodeClusterMetricItemDAO) FindAllClusterItemIds(tx *dbs.Tx, clusterId int64) (result []int64, err error) { + ones, err := this.Query(tx). + Attr("clusterId", clusterId). + State(NodeClusterMetricItemStateEnabled). + Result("itemId"). + DescPk(). + FindAll() + for _, one := range ones { + result = append(result, int64(one.(*NodeClusterMetricItem).ItemId)) + } + return +} + +// FindAllClusterIdsWithItemId 查找使用某个指标的所有集群IDs +func (this *NodeClusterMetricItemDAO) FindAllClusterIdsWithItemId(tx *dbs.Tx, itemId int64) (clusterIds []int64, err error) { + ones, err := this.Query(tx). + Attr("itemId", itemId). + Result("clusterId"). + FindAll() + if err != nil { + return nil, err + } + for _, one := range ones { + clusterIds = append(clusterIds, int64(one.(*NodeClusterMetricItem).ClusterId)) + } + return +} + // CountAllClusterItems 计算集群中指标数量 func (this *NodeClusterMetricItemDAO) CountAllClusterItems(tx *dbs.Tx, clusterId int64) (int64, error) { return this.Query(tx). @@ -103,3 +146,17 @@ func (this *NodeClusterMetricItemDAO) CountAllClusterItems(tx *dbs.Tx, clusterId State(NodeClusterMetricItemStateEnabled). Count() } + +// ExistsClusterItem 是否存在 +func (this *NodeClusterMetricItemDAO) ExistsClusterItem(tx *dbs.Tx, clusterId int64, itemId int64) (bool, error) { + return this.Query(tx). + Attr("clusterId", clusterId). + Attr("itemId", itemId). + State(NodeClusterMetricItemStateEnabled). + Exist() +} + +// NotifyUpdate 通知更新 +func (this *NodeClusterMetricItemDAO) NotifyUpdate(tx *dbs.Tx, clusterId int64) error { + return SharedNodeTaskDAO.CreateClusterTask(tx, clusterId, NodeTaskTypeConfigChanged) +} diff --git a/internal/db/models/node_dao.go b/internal/db/models/node_dao.go index d8a7cfca..e43d510f 100644 --- a/internal/db/models/node_dao.go +++ b/internal/db/models/node_dao.go @@ -641,7 +641,21 @@ func (this *NodeDAO) ComposeNodeConfig(tx *dbs.Tx, nodeId int64) (*nodeconfigs.N } // 指标 - + metricItemIds, err := SharedNodeClusterMetricItemDAO.FindAllClusterItemIds(tx, int64(node.ClusterId)) + if err != nil { + return nil, err + } + var metricItems = []*serverconfigs.MetricItemConfig{} + for _, itemId := range metricItemIds { + itemConfig, err := SharedMetricItemDAO.ComposeItemConfig(tx, itemId) + if err != nil { + return nil, err + } + if itemConfig != nil { + metricItems = append(metricItems, itemConfig) + } + } + config.MetricItems = metricItems return config, nil } diff --git a/internal/nodes/api_node_services.go b/internal/nodes/api_node_services.go index 54288121..295a7259 100644 --- a/internal/nodes/api_node_services.go +++ b/internal/nodes/api_node_services.go @@ -453,6 +453,11 @@ func (this *APINode) registerServices(server *grpc.Server) { pb.RegisterNodeClusterMetricItemServiceServer(server, instance) this.rest(instance) } + { + instance := this.serviceInstance(&services.MetricStatService{}).(*services.MetricStatService) + pb.RegisterMetricStatServiceServer(server, instance) + this.rest(instance) + } // TODO check service names for serviceName := range server.GetServiceInfo() { diff --git a/internal/rpc/services/service_base.go b/internal/rpc/services/service_base.go index 57449171..d6e93867 100644 --- a/internal/rpc/services/service_base.go +++ b/internal/rpc/services/service_base.go @@ -207,6 +207,11 @@ func (this *BaseService) SuccessCount(count int64) (*pb.RPCCountResponse, error) return &pb.RPCCountResponse{Count: count}, nil } +// Exists 返回是否存在 +func (this *BaseService) Exists(b bool) (*pb.RPCExists, error) { + return &pb.RPCExists{Exists: b}, nil +} + // PermissionError 返回权限错误 func (this *BaseService) PermissionError() error { return errors.New("Permission Denied") diff --git a/internal/rpc/services/service_metric_item.go b/internal/rpc/services/service_metric_item.go index ca8bd836..cc15b6da 100644 --- a/internal/rpc/services/service_metric_item.go +++ b/internal/rpc/services/service_metric_item.go @@ -4,7 +4,7 @@ package services import ( "context" - "github.com/TeaOSLab/EdgeAPI/internal/db/models/metrics" + "github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/iwind/TeaGo/types" ) @@ -22,7 +22,7 @@ func (this *MetricItemService) CreateMetricItem(ctx context.Context, req *pb.Cre } var tx = this.NullTx() - itemId, err := metrics.SharedMetricItemDAO.CreateItem(tx, req.Code, req.Category, req.Name, req.Keys, req.Period, req.PeriodUnit, req.Value) + itemId, err := models.SharedMetricItemDAO.CreateItem(tx, req.Code, req.Category, req.Name, req.Keys, req.Period, req.PeriodUnit, req.Value) if err != nil { return nil, err } @@ -37,7 +37,7 @@ func (this *MetricItemService) UpdateMetricItem(ctx context.Context, req *pb.Upd } var tx = this.NullTx() - err = metrics.SharedMetricItemDAO.UpdateItem(tx, req.MetricItemId, req.Name, req.Keys, req.Period, req.PeriodUnit, req.Value, req.IsOn) + err = models.SharedMetricItemDAO.UpdateItem(tx, req.MetricItemId, req.Name, req.Keys, req.Period, req.PeriodUnit, req.Value, req.IsOn) if err != nil { return nil, err } @@ -52,7 +52,7 @@ func (this *MetricItemService) FindEnabledMetricItem(ctx context.Context, req *p } var tx = this.NullTx() - item, err := metrics.SharedMetricItemDAO.FindEnabledMetricItem(tx, req.MetricItemId) + item, err := models.SharedMetricItemDAO.FindEnabledMetricItem(tx, req.MetricItemId) if err != nil { return nil, err } @@ -80,7 +80,7 @@ func (this *MetricItemService) CountAllEnabledMetricItems(ctx context.Context, r } var tx = this.NullTx() - count, err := metrics.SharedMetricItemDAO.CountEnabledItems(tx, req.Category) + count, err := models.SharedMetricItemDAO.CountEnabledItems(tx, req.Category) if err != nil { return nil, err } @@ -95,7 +95,7 @@ func (this *MetricItemService) ListEnabledMetricItems(ctx context.Context, req * } var tx = this.NullTx() - items, err := metrics.SharedMetricItemDAO.ListEnabledItems(tx, req.Category, req.Offset, req.Size) + items, err := models.SharedMetricItemDAO.ListEnabledItems(tx, req.Category, req.Offset, req.Size) if err != nil { return nil, err } @@ -124,7 +124,7 @@ func (this *MetricItemService) DeleteMetricItem(ctx context.Context, req *pb.Del } var tx = this.NullTx() - err = metrics.SharedMetricItemDAO.DisableMetricItem(tx, req.MetricItemId) + err = models.SharedMetricItemDAO.DisableMetricItem(tx, req.MetricItemId) if err != nil { return nil, err } diff --git a/internal/rpc/services/service_metric_stat.go b/internal/rpc/services/service_metric_stat.go new file mode 100644 index 00000000..1c1c30a5 --- /dev/null +++ b/internal/rpc/services/service_metric_stat.go @@ -0,0 +1,108 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package services + +import ( + "context" + "github.com/TeaOSLab/EdgeAPI/internal/db/models" + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" + "github.com/iwind/TeaGo/types" +) + +// MetricStatService 指标统计数据相关服务 +type MetricStatService struct { + BaseService +} + +// UploadMetricStats 上传统计数据 +func (this *MetricStatService) UploadMetricStats(ctx context.Context, req *pb.UploadMetricStatsRequest) (*pb.RPCSuccess, error) { + nodeId, err := this.ValidateNode(ctx) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + clusterId, err := models.SharedNodeDAO.FindNodeClusterId(tx, nodeId) + if err != nil { + return nil, err + } + + for _, stat := range req.MetricStats { + err := models.SharedMetricStatDAO.CreateStat(tx, stat.Hash, clusterId, nodeId, stat.ServerId, stat.ItemId, stat.Keys, float64(stat.Value), stat.Time, int(stat.Version)) + if err != nil { + return nil, err + } + } + + return this.Success() +} + +// CountMetricStats 计算指标数据数量 +func (this *MetricStatService) CountMetricStats(ctx context.Context, req *pb.CountMetricStatsRequest) (*pb.RPCCountResponse, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + var tx = this.NullTx() + version, err := models.SharedMetricItemDAO.FindItemVersion(tx, req.MetricItemId) + if err != nil { + return nil, err + } + count, err := models.SharedMetricStatDAO.CountItemStats(tx, req.MetricItemId, version) + if err != nil { + return nil, err + } + return this.SuccessCount(count) +} + +// ListMetricStats 读取单页指标数据 +func (this *MetricStatService) ListMetricStats(ctx context.Context, req *pb.ListMetricStatsRequest) (*pb.ListMetricStatsResponse, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + var tx = this.NullTx() + version, err := models.SharedMetricItemDAO.FindItemVersion(tx, req.MetricItemId) + if err != nil { + return nil, err + } + stats, err := models.SharedMetricStatDAO.ListItemStats(tx, req.MetricItemId, version, req.Offset, req.Size) + if err != nil { + return nil, err + } + var pbStats []*pb.MetricStat + for _, stat := range stats { + // cluster + clusterName, err := models.SharedNodeClusterDAO.FindNodeClusterName(tx, int64(stat.ClusterId)) + if err != nil { + return nil, err + } + + // node + nodeName, err := models.SharedNodeDAO.FindNodeName(tx, int64(stat.NodeId)) + if err != nil { + return nil, err + } + + // server + serverName, err := models.SharedServerDAO.FindEnabledServerName(tx, int64(stat.ServerId)) + if err != nil { + return nil, err + } + + pbStats = append(pbStats, &pb.MetricStat{ + Id: int64(stat.Id), + Hash: stat.Hash, + ServerId: int64(stat.ServerId), + ItemId: int64(stat.ItemId), + Keys: stat.DecodeKeys(), + Value: float32(stat.Value), + Time: stat.Time, + Version: types.Int32(stat.Version), + NodeCluster: &pb.NodeCluster{Id: int64(stat.ClusterId), Name: clusterName}, + Node: &pb.Node{Id: int64(stat.NodeId), Name: nodeName}, + Server: &pb.Server{Id: int64(stat.ServerId), Name: serverName}, + }) + } + return &pb.ListMetricStatsResponse{MetricStats: pbStats}, nil +} diff --git a/internal/rpc/services/service_node_cluster_metric_item.go b/internal/rpc/services/service_node_cluster_metric_item.go index e804a718..00202eee 100644 --- a/internal/rpc/services/service_node_cluster_metric_item.go +++ b/internal/rpc/services/service_node_cluster_metric_item.go @@ -5,7 +5,6 @@ package services import ( "context" "github.com/TeaOSLab/EdgeAPI/internal/db/models" - "github.com/TeaOSLab/EdgeAPI/internal/db/models/metrics" "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" "github.com/iwind/TeaGo/types" ) @@ -59,13 +58,13 @@ func (this *NodeClusterMetricItemService) FindAllNodeClusterMetricItems(ctx cont } var tx = this.NullTx() - clusterItems, err := models.SharedNodeClusterMetricItemDAO.FindAllClusterItems(tx, req.NodeClusterId, req.Category) + clusterItems, err := models.SharedNodeClusterMetricItemDAO.FindAllClusterItems(tx, req.NodeClusterId, req.Category) if err != nil { return nil, err } var pbItems = []*pb.MetricItem{} for _, clusterItem := range clusterItems { - item, err := metrics.SharedMetricItemDAO.FindEnabledMetricItem(tx, int64(clusterItem.ItemId)) + item, err := models.SharedMetricItemDAO.FindEnabledMetricItem(tx, int64(clusterItem.ItemId)) if err != nil { return nil, err }