diff --git a/internal/db/models/node_value_dao.go b/internal/db/models/node_value_dao.go new file mode 100644 index 00000000..01651dc5 --- /dev/null +++ b/internal/db/models/node_value_dao.go @@ -0,0 +1,91 @@ +package models + +import ( + "github.com/TeaOSLab/EdgeAPI/internal/errors" + "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" + _ "github.com/go-sql-driver/mysql" + "github.com/iwind/TeaGo/Tea" + "github.com/iwind/TeaGo/dbs" + "github.com/iwind/TeaGo/maps" + timeutil "github.com/iwind/TeaGo/utils/time" + "time" +) + +type NodeValueDAO dbs.DAO + +func NewNodeValueDAO() *NodeValueDAO { + return dbs.NewDAO(&NodeValueDAO{ + DAOObject: dbs.DAOObject{ + DB: Tea.Env, + Table: "edgeNodeValues", + Model: new(NodeValue), + PkName: "id", + }, + }).(*NodeValueDAO) +} + +var SharedNodeValueDAO *NodeValueDAO + +func init() { + dbs.OnReady(func() { + SharedNodeValueDAO = NewNodeValueDAO() + }) +} + +// CreateValue 创建值 +func (this *NodeValueDAO) CreateValue(tx *dbs.Tx, role NodeRole, nodeId int64, item string, valueJSON []byte, createdAt int64) error { + day := timeutil.FormatTime("Ymd", createdAt) + hour := timeutil.FormatTime("YmdH", createdAt) + minute := timeutil.FormatTime("YmdHi", createdAt) + + return this.Query(tx). + InsertOrUpdateQuickly(maps.Map{ + "role": role, + "nodeId": nodeId, + "item": item, + "value": valueJSON, + "createdAt": createdAt, + "day": day, + "hour": hour, + "minute": minute, + }, maps.Map{ + "value": valueJSON, + }) +} + +// DeleteExpiredValues 清除数据 +func (this *NodeValueDAO) DeleteExpiredValues(tx *dbs.Tx) error { + // 删除N天之前的所有数据 + expiredDays := 100 + day := timeutil.Format("Ymd", time.Now().AddDate(0, 0, -expiredDays)) + _, err := this.Query(tx). + Where("day<:day"). + Param("day", day). + Delete() + if err != nil { + return err + } + return nil +} + +// ListValues 列出最近的的数据 +func (this *NodeValueDAO) ListValues(tx *dbs.Tx, role string, nodeId int64, item string, timeRange nodeconfigs.NodeValueRange) (result []*NodeValue, err error) { + query := this.Query(tx). + Attr("role", role). + Attr("nodeId", nodeId). + Attr("item", item) + + switch timeRange { + // TODO 支持更多的时间范围 + case nodeconfigs.NodeValueRangeMinute: + fromMinute := timeutil.FormatTime("YmdHi", time.Now().Unix()-3600) // 一个小时之前的 + query.Gte("minute", fromMinute) + default: + err = errors.New("invalid 'range' value: '" + timeRange + "'") + return + } + + _, err = query.Slice(&result). + FindAll() + return +} diff --git a/internal/db/models/node_value_dao_test.go b/internal/db/models/node_value_dao_test.go new file mode 100644 index 00000000..b43d975c --- /dev/null +++ b/internal/db/models/node_value_dao_test.go @@ -0,0 +1,21 @@ +package models + +import ( + _ "github.com/go-sql-driver/mysql" + _ "github.com/iwind/TeaGo/bootstrap" + "github.com/iwind/TeaGo/maps" + "testing" + "time" +) + +func TestNodeValueDAO_CreateValue(t *testing.T) { + dao := NewNodeValueDAO() + m := maps.Map{ + "hello": "world12344", + } + err := dao.CreateValue(nil, NodeRoleNode, 1, "test", m.AsJSON(), time.Now().Unix()) + if err != nil { + t.Fatal(err) + } + t.Log("ok") +} diff --git a/internal/db/models/node_value_model.go b/internal/db/models/node_value_model.go new file mode 100644 index 00000000..8a5acb8e --- /dev/null +++ b/internal/db/models/node_value_model.go @@ -0,0 +1,30 @@ +package models + +// NodeValue 节点监控数据 +type NodeValue struct { + Id uint64 `field:"id"` // ID + NodeId uint32 `field:"nodeId"` // 节点ID + Role string `field:"role"` // 节点角色 + Item string `field:"item"` // 监控项 + Value string `field:"value"` // 数据 + CreatedAt uint64 `field:"createdAt"` // 创建时间 + Day string `field:"day"` // 日期 + Hour string `field:"hour"` // 小时 + Minute string `field:"minute"` // 分钟 +} + +type NodeValueOperator struct { + Id interface{} // ID + NodeId interface{} // 节点ID + Role interface{} // 节点角色 + Item interface{} // 监控项 + Value interface{} // 数据 + CreatedAt interface{} // 创建时间 + Day interface{} // 日期 + Hour interface{} // 小时 + Minute interface{} // 分钟 +} + +func NewNodeValueOperator() *NodeValueOperator { + return &NodeValueOperator{} +} diff --git a/internal/db/models/node_value_model_ext.go b/internal/db/models/node_value_model_ext.go new file mode 100644 index 00000000..2640e7f9 --- /dev/null +++ b/internal/db/models/node_value_model_ext.go @@ -0,0 +1 @@ +package models diff --git a/internal/nodes/api_node.go b/internal/nodes/api_node.go index 6c8def8b..ad33a337 100644 --- a/internal/nodes/api_node.go +++ b/internal/nodes/api_node.go @@ -250,6 +250,7 @@ func (this *APINode) listenRPC(listener net.Listener, tlsConfig *tls.Config) err pb.RegisterUserAccessKeyServiceServer(rpcServer, &services.UserAccessKeyService{}) pb.RegisterSysLockerServiceServer(rpcServer, &services.SysLockerService{}) pb.RegisterNodeTaskServiceServer(rpcServer, &services.NodeTaskService{}) + pb.RegisterNodeValueServiceServer(rpcServer, &services.NodeValueService{}) pb.RegisterDBServiceServer(rpcServer, &services.DBService{}) pb.RegisterServerRegionCityMonthlyStatServiceServer(rpcServer, &services.ServerRegionCityMonthlyStatService{}) pb.RegisterServerRegionCountryMonthlyStatServiceServer(rpcServer, &services.ServerRegionCountryMonthlyStatService{}) @@ -288,7 +289,7 @@ func (this *APINode) checkDB() error { return err } else { if i%10 == 0 { // 这让提示不会太多 - logs.Println("[API_NODE]reconnecting to database (" + fmt.Sprintf("%.1f", float32(i * 100)/float32(maxTries+1)) + "%) ...") + logs.Println("[API_NODE]reconnecting to database (" + fmt.Sprintf("%.1f", float32(i*100)/float32(maxTries+1)) + "%) ...") } time.Sleep(1 * time.Second) } diff --git a/internal/rpc/services/service_node_value.go b/internal/rpc/services/service_node_value.go new file mode 100644 index 00000000..4fdde4c6 --- /dev/null +++ b/internal/rpc/services/service_node_value.go @@ -0,0 +1,53 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package services + +import ( + "context" + "github.com/TeaOSLab/EdgeAPI/internal/db/models" + rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils" + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" +) + +type NodeValueService struct { + BaseService +} + +// CreateNodeValue 记录数据 +func (this *NodeValueService) CreateNodeValue(ctx context.Context, req *pb.CreateNodeValueRequest) (*pb.RPCSuccess, error) { + role, nodeId, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeNode) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + err = models.SharedNodeValueDAO.CreateValue(tx, role, nodeId, req.Item, req.ValueJSON, req.CreatedAt) + if err != nil { + return nil, err + } + + return this.Success() +} + +// ListNodeValues 读取数据 +func (this *NodeValueService) ListNodeValues(ctx context.Context, req *pb.ListNodeValuesRequest) (*pb.ListNodeValuesResponse, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + var tx = this.NullTx() + values, err := models.SharedNodeValueDAO.ListValues(tx, req.Role, req.NodeId, req.Item, req.Range) + if err != nil { + return nil, err + } + pbValues := []*pb.NodeValue{} + for _, value := range values { + pbValues = append(pbValues, &pb.NodeValue{ + ValueJSON: []byte(value.Value), + CreatedAt: int64(value.CreatedAt), + }) + } + + return &pb.ListNodeValuesResponse{NodeValues: pbValues}, nil +} diff --git a/internal/tasks/monitor_item_value_task.go b/internal/tasks/monitor_item_value_task.go new file mode 100644 index 00000000..62fbcbb1 --- /dev/null +++ b/internal/tasks/monitor_item_value_task.go @@ -0,0 +1,43 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package tasks + +import ( + "github.com/TeaOSLab/EdgeAPI/internal/db/models" + "github.com/TeaOSLab/EdgeAPI/internal/remotelogs" + "github.com/iwind/TeaGo/Tea" + "github.com/iwind/TeaGo/dbs" + "time" +) + +func init() { + dbs.OnReady(func() { + go NewMonitorItemValueTask().Start() + }) +} + +// MonitorItemValueTask 节点监控数值任务 +type MonitorItemValueTask struct { +} + +// NewMonitorItemValueTask 获取新对象 +func NewMonitorItemValueTask() *MonitorItemValueTask { + return &MonitorItemValueTask{} +} + +func (this *MonitorItemValueTask) Start() { + ticker := time.NewTicker(24 * time.Hour) + if Tea.IsTesting() { + ticker = time.NewTicker(1 * time.Minute) + } + for range ticker.C { + err := this.Loop() + if err != nil { + remotelogs.Error("MonitorItemValueTask", err.Error()) + } + } +} + +func (this *MonitorItemValueTask) Loop() error { + return models.SharedNodeValueDAO.DeleteExpiredValues(nil) +} diff --git a/internal/tasks/monitor_item_value_task_test.go b/internal/tasks/monitor_item_value_task_test.go new file mode 100644 index 00000000..c7f3e460 --- /dev/null +++ b/internal/tasks/monitor_item_value_task_test.go @@ -0,0 +1,19 @@ +// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. + +package tasks + +import ( + "github.com/iwind/TeaGo/dbs" + "testing" +) + +func TestMonitorItemValueTask_Loop(t *testing.T) { + dbs.NotifyReady() + + task := NewMonitorItemValueTask() + err := task.Loop() + if err != nil { + t.Fatal(err) + } + t.Log("ok") +}