实现数据看板-DNS

This commit is contained in:
GoEdgeLab
2021-07-11 21:44:08 +08:00
parent 85f85345aa
commit 13629b497d
12 changed files with 508 additions and 3 deletions

View File

@@ -75,6 +75,15 @@ func (this *NSNodeDAO) FindEnabledNSNode(tx *dbs.Tx, id int64) (*NSNode, error)
return result.(*NSNode), err return result.(*NSNode), err
} }
// FindEnabledNSNodeName 查找节点名称
func (this *NSNodeDAO) FindEnabledNSNodeName(tx *dbs.Tx, nodeId int64) (string, error) {
return this.Query(tx).
Pk(nodeId).
State(NSNodeStateEnabled).
Result("name").
FindStringCol("")
}
// FindAllEnabledNodesWithClusterId 查找一个集群下的所有节点 // FindAllEnabledNodesWithClusterId 查找一个集群下的所有节点
func (this *NSNodeDAO) FindAllEnabledNodesWithClusterId(tx *dbs.Tx, clusterId int64) (result []*NSNode, err error) { func (this *NSNodeDAO) FindAllEnabledNodesWithClusterId(tx *dbs.Tx, clusterId int64) (result []*NSNode, err error) {
_, err = this.Query(tx). _, err = this.Query(tx).
@@ -94,6 +103,15 @@ func (this *NSNodeDAO) CountAllEnabledNodes(tx *dbs.Tx) (int64, error) {
Count() Count()
} }
// CountAllOfflineNodes 计算离线节点数量
func (this *NSNodeDAO) CountAllOfflineNodes(tx *dbs.Tx) (int64, error) {
return this.Query(tx).
State(NSNodeStateEnabled).
Where("(status IS NULL OR JSON_EXTRACT(status, '$.updatedAt')<UNIX_TIMESTAMP()-120)").
Where("clusterId IN (SELECT id FROM " + SharedNSClusterDAO.Table + " WHERE state=1)").
Count()
}
// CountAllEnabledNodesMatch 计算满足条件的节点数量 // CountAllEnabledNodesMatch 计算满足条件的节点数量
func (this *NSNodeDAO) CountAllEnabledNodesMatch(tx *dbs.Tx, clusterId int64, installState configutils.BoolState, activeState configutils.BoolState, keyword string) (int64, error) { func (this *NSNodeDAO) CountAllEnabledNodesMatch(tx *dbs.Tx, clusterId int64, installState configutils.BoolState, activeState configutils.BoolState, keyword string) (int64, error) {
query := this.Query(tx) query := this.Query(tx)

View File

@@ -112,6 +112,7 @@ func (this *NSRecordDAO) CreateRecord(tx *dbs.Tx, domainId int64, description st
return this.SaveInt64(tx, op) return this.SaveInt64(tx, op)
} }
// UpdateRecord 修改记录
func (this *NSRecordDAO) UpdateRecord(tx *dbs.Tx, recordId int64, description string, name string, dnsType dnsconfigs.RecordType, value string, ttl int32, routeIds []int64) error { func (this *NSRecordDAO) UpdateRecord(tx *dbs.Tx, recordId int64, description string, name string, dnsType dnsconfigs.RecordType, value string, ttl int32, routeIds []int64) error {
if recordId <= 0 { if recordId <= 0 {
return errors.New("invalid recordId") return errors.New("invalid recordId")
@@ -145,7 +146,8 @@ func (this *NSRecordDAO) UpdateRecord(tx *dbs.Tx, recordId int64, description st
return this.Save(tx, op) return this.Save(tx, op)
} }
func (this *NSRecordDAO) CountAllEnabledRecords(tx *dbs.Tx, domainId int64, dnsType dnsconfigs.RecordType, keyword string, routeId int64) (int64, error) { // CountAllEnabledDomainRecords 计算域名中记录数量
func (this *NSRecordDAO) CountAllEnabledDomainRecords(tx *dbs.Tx, domainId int64, dnsType dnsconfigs.RecordType, keyword string, routeId int64) (int64, error) {
query := this.Query(tx). query := this.Query(tx).
Attr("domainId", domainId). Attr("domainId", domainId).
State(NSRecordStateEnabled) State(NSRecordStateEnabled)
@@ -162,6 +164,15 @@ func (this *NSRecordDAO) CountAllEnabledRecords(tx *dbs.Tx, domainId int64, dnsT
return query.Count() return query.Count()
} }
// CountAllEnabledRecords 计算所有记录数量
func (this *NSRecordDAO) CountAllEnabledRecords(tx *dbs.Tx) (int64, error) {
return this.Query(tx).
Where("domainId IN (SELECT id FROM " + SharedNSDomainDAO.Table + " WHERE state=1)").
State(NSRecordStateEnabled).
Count()
}
// ListEnabledRecords 列出单页记录
func (this *NSRecordDAO) ListEnabledRecords(tx *dbs.Tx, domainId int64, dnsType dnsconfigs.RecordType, keyword string, routeId int64, offset int64, size int64) (result []*NSRecord, err error) { func (this *NSRecordDAO) ListEnabledRecords(tx *dbs.Tx, domainId int64, dnsType dnsconfigs.RecordType, keyword string, routeId int64, offset int64, size int64) (result []*NSRecord, err error) {
query := this.Query(tx). query := this.Query(tx).
Attr("domainId", domainId). Attr("domainId", domainId).

View File

@@ -0,0 +1,167 @@
package nameservers
import (
"github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
"github.com/TeaOSLab/EdgeAPI/internal/utils"
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/maps"
timeutil "github.com/iwind/TeaGo/utils/time"
"time"
)
type NSRecordHourlyStatDAO dbs.DAO
func init() {
dbs.OnReadyDone(func() {
// 清理数据任务
var ticker = time.NewTicker(24 * time.Hour)
go func() {
for range ticker.C {
err := SharedNSRecordHourlyStatDAO.Clean(nil, 60) // 只保留60天
if err != nil {
remotelogs.Error("NodeClusterTrafficDailyStatDAO", "clean expired data failed: "+err.Error())
}
}
}()
})
}
func NewNSRecordHourlyStatDAO() *NSRecordHourlyStatDAO {
return dbs.NewDAO(&NSRecordHourlyStatDAO{
DAOObject: dbs.DAOObject{
DB: Tea.Env,
Table: "edgeNSRecordHourlyStats",
Model: new(NSRecordHourlyStat),
PkName: "id",
},
}).(*NSRecordHourlyStatDAO)
}
var SharedNSRecordHourlyStatDAO *NSRecordHourlyStatDAO
func init() {
dbs.OnReady(func() {
SharedNSRecordHourlyStatDAO = NewNSRecordHourlyStatDAO()
})
}
// IncreaseHourlyStat 增加统计数据
func (this *NSRecordHourlyStatDAO) IncreaseHourlyStat(tx *dbs.Tx, clusterId int64, nodeId int64, hour string, domainId int64, recordId int64, countRequests int64, bytes int64) error {
if len(hour) != 10 {
return errors.New("invalid hour '" + hour + "'")
}
return this.Query(tx).
Param("countRequests", countRequests).
Param("bytes", bytes).
InsertOrUpdateQuickly(maps.Map{
"clusterId": clusterId,
"nodeId": nodeId,
"domainId": domainId,
"recordId": recordId,
"day": hour[:8],
"hour": hour,
"countRequests": countRequests,
"bytes": bytes,
}, maps.Map{
"countRequests": dbs.SQL("countRequests+:countRequests"),
"bytes": dbs.SQL("bytes+:bytes"),
})
}
// FindHourlyStats 按小时统计
func (this *NSRecordHourlyStatDAO) FindHourlyStats(tx *dbs.Tx, hourFrom string, hourTo string) (result []*NSRecordHourlyStat, err error) {
ones, err := this.Query(tx).
Result("hour", "SUM(countRequests) AS countRequests", "SUM(bytes) AS bytes").
Between("hour", hourFrom, hourTo).
Group("hour").
FindAll()
if err != nil {
return nil, err
}
var m = map[string]*NSRecordHourlyStat{} // hour => *NSRecordHourlyStat
for _, one := range ones {
m[one.(*NSRecordHourlyStat).Hour] = one.(*NSRecordHourlyStat)
}
hours, err := utils.RangeHours(hourFrom, hourTo)
if err != nil {
return nil, err
}
for _, hour := range hours {
stat, ok := m[hour]
if ok {
result = append(result, stat)
} else {
result = append(result, &NSRecordHourlyStat{
Hour: hour,
})
}
}
return
}
// FindDailyStats 按天统计
func (this *NSRecordHourlyStatDAO) FindDailyStats(tx *dbs.Tx, dayFrom string, dayTo string) (result []*NSRecordHourlyStat, err error) {
ones, err := this.Query(tx).
Result("day", "SUM(countRequests) AS countRequests", "SUM(bytes) AS bytes").
Between("day", dayFrom, dayTo).
Group("day").
FindAll()
if err != nil {
return nil, err
}
var m = map[string]*NSRecordHourlyStat{} // day => *NSRecordHourlyStat
for _, one := range ones {
m[one.(*NSRecordHourlyStat).Day] = one.(*NSRecordHourlyStat)
}
days, err := utils.RangeDays(dayFrom, dayTo)
if err != nil {
return nil, err
}
for _, day := range days {
stat, ok := m[day]
if ok {
result = append(result, stat)
} else {
result = append(result, &NSRecordHourlyStat{
Day: day,
})
}
}
return
}
// ListTopNodes 节点排行
func (this *NSRecordHourlyStatDAO) ListTopNodes(tx *dbs.Tx, hourFrom string, hourTo string, size int64) (result []*NSRecordHourlyStat, err error) {
_, err = this.Query(tx).
Result("MIN(clusterId) AS clusterId", "nodeId", "SUM(countRequests) AS countRequests", "SUM(bytes) AS bytes").
Between("hour", hourFrom, hourTo).
Group("nodeId").
Limit(size).
Slice(&result).
FindAll()
return
}
// ListTopDomains 域名排行
func (this *NSRecordHourlyStatDAO) ListTopDomains(tx *dbs.Tx, hourFrom string, hourTo string, size int64) (result []*NSRecordHourlyStat, err error) {
_, err = this.Query(tx).
Result("domainId", "SUM(countRequests) AS countRequests", "SUM(bytes) AS bytes").
Between("hour", hourFrom, hourTo).
Group("domainId").
Limit(size).
Slice(&result).
FindAll()
return
}
// Clean 清理历史数据
func (this *NSRecordHourlyStatDAO) Clean(tx *dbs.Tx, days int) error {
var hour = timeutil.Format("Ymd00", time.Now().AddDate(0, 0, -days))
_, err := this.Query(tx).
Lt("hour", hour).
Delete()
return err
}

View File

@@ -0,0 +1,6 @@
package nameservers
import (
_ "github.com/go-sql-driver/mysql"
_ "github.com/iwind/TeaGo/bootstrap"
)

View File

@@ -0,0 +1,30 @@
package nameservers
// NSRecordHourlyStat NS记录统计
type NSRecordHourlyStat struct {
Id uint64 `field:"id"` // ID
ClusterId uint32 `field:"clusterId"` // 集群ID
NodeId uint32 `field:"nodeId"` // 节点ID
DomainId uint32 `field:"domainId"` // 域名ID
RecordId uint64 `field:"recordId"` // 记录ID
Day string `field:"day"` // YYYYMMDD
Hour string `field:"hour"` // YYYYMMDDHH
CountRequests uint32 `field:"countRequests"` // 请求数
Bytes uint64 `field:"bytes"` // 流量
}
type NSRecordHourlyStatOperator struct {
Id interface{} // ID
ClusterId interface{} // 集群ID
NodeId interface{} // 节点ID
DomainId interface{} // 域名ID
RecordId interface{} // 记录ID
Day interface{} // YYYYMMDD
Hour interface{} // YYYYMMDDHH
CountRequests interface{} // 请求数
Bytes interface{} // 流量
}
func NewNSRecordHourlyStatOperator() *NSRecordHourlyStatOperator {
return &NSRecordHourlyStatOperator{}
}

View File

@@ -0,0 +1 @@
package nameservers

View File

@@ -140,6 +140,31 @@ func (this *NodeValueDAO) ListValuesForUserNodes(tx *dbs.Tx, item string, key st
return return
} }
// ListValuesForNSNodes 列出用户节点相关的平均数据
func (this *NodeValueDAO) ListValuesForNSNodes(tx *dbs.Tx, item string, key string, timeRange nodeconfigs.NodeValueRange) (result []*NodeValue, err error) {
query := this.Query(tx).
Attr("role", "dns").
Attr("item", item).
Result("AVG(JSON_EXTRACT(value, '$." + key + "')) AS value, MIN(createdAt) AS createdAt")
switch timeRange {
// TODO 支持更多的时间范围
case nodeconfigs.NodeValueRangeMinute:
fromMinute := timeutil.FormatTime("YmdHi", time.Now().Unix()-3600) // 一个小时之前的
query.Gte("minute", fromMinute)
query.Result("minute")
query.Group("minute")
default:
err = errors.New("invalid 'range' value: '" + timeRange + "'")
return
}
_, err = query.Slice(&result).
FindAll()
return
}
// SumValues 计算某项参数值 // SumValues 计算某项参数值
func (this *NodeValueDAO) SumValues(tx *dbs.Tx, role string, nodeId int64, item string, param string, method nodeconfigs.NodeValueSumMethod, duration int32, durationUnit nodeconfigs.NodeValueDurationUnit) (float64, error) { func (this *NodeValueDAO) SumValues(tx *dbs.Tx, role string, nodeId int64, item string, param string, method nodeconfigs.NodeValueSumMethod, duration int32, durationUnit nodeconfigs.NodeValueDurationUnit) (float64, error) {
if duration <= 0 { if duration <= 0 {

View File

@@ -270,6 +270,6 @@ func (this *UserNodeDAO) CountAllLowerVersionNodes(tx *dbs.Tx, version string) (
func (this *UserNodeDAO) CountOfflineNodes(tx *dbs.Tx) (int64, error) { func (this *UserNodeDAO) CountOfflineNodes(tx *dbs.Tx) (int64, error) {
return this.Query(tx). return this.Query(tx).
State(UserNodeStateEnabled). State(UserNodeStateEnabled).
Where("status IS NULL OR JSON_EXTRACT(status, '$.updatedAt')<UNIX_TIMESTAMP()-120"). Where("(status IS NULL OR JSON_EXTRACT(status, '$.updatedAt')<UNIX_TIMESTAMP()-120)").
Count() Count()
} }

View File

@@ -438,6 +438,16 @@ func (this *APINode) registerServices(server *grpc.Server) {
pb.RegisterNSAccessLogServiceServer(server, instance) pb.RegisterNSAccessLogServiceServer(server, instance)
this.rest(instance) this.rest(instance)
} }
{
instance := this.serviceInstance(&nameservers.NSRecordHourlyStatService{}).(*nameservers.NSRecordHourlyStatService)
pb.RegisterNSRecordHourlyStatServiceServer(server, instance)
this.rest(instance)
}
{
instance := this.serviceInstance(&nameservers.NSService{}).(*nameservers.NSService)
pb.RegisterNSServiceServer(server, instance)
this.rest(instance)
}
{ {
instance := this.serviceInstance(&services.HTTPAuthPolicyService{}).(*services.HTTPAuthPolicyService) instance := this.serviceInstance(&services.HTTPAuthPolicyService{}).(*services.HTTPAuthPolicyService)
pb.RegisterHTTPAuthPolicyServiceServer(server, instance) pb.RegisterHTTPAuthPolicyServiceServer(server, instance)

View File

@@ -0,0 +1,188 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package nameservers
import (
"context"
"encoding/json"
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/db/models/nameservers"
"github.com/TeaOSLab/EdgeAPI/internal/rpc/services"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/iwind/TeaGo/types"
timeutil "github.com/iwind/TeaGo/utils/time"
"time"
)
// NSService 域名服务
type NSService struct {
services.BaseService
}
// ComposeNSBoard 组合看板数据
func (this *NSService) ComposeNSBoard(ctx context.Context, req *pb.ComposeNSBoardRequest) (*pb.ComposeNSBoardResponse, error) {
_, err := this.ValidateAdmin(ctx, 0)
if err != nil {
return nil, err
}
var tx = this.NullTx()
var result = &pb.ComposeNSBoardResponse{}
// 域名
countDomains, err := nameservers.SharedNSDomainDAO.CountAllEnabledDomains(tx, 0, 0, "")
if err != nil {
return nil, err
}
result.CountNSDomains = countDomains
// 记录
countRecords, err := nameservers.SharedNSRecordDAO.CountAllEnabledRecords(tx)
if err != nil {
return nil, err
}
result.CountNSRecords = countRecords
// 集群数
countClusters, err := nameservers.SharedNSClusterDAO.CountAllEnabledClusters(tx)
if err != nil {
return nil, err
}
result.CountNSClusters = countClusters
// 节点数
countNodes, err := nameservers.SharedNSNodeDAO.CountAllEnabledNodes(tx)
if err != nil {
return nil, err
}
result.CountNSNodes = countNodes
// 离线节点数
countOfflineNodes, err := nameservers.SharedNSNodeDAO.CountAllOfflineNodes(tx)
if err != nil {
return nil, err
}
result.CountOfflineNSNodes = countOfflineNodes
// 按小时统计
hourFrom := timeutil.Format("YmdH", time.Now().Add(-23*time.Hour))
hourTo := timeutil.Format("YmdH")
hourlyStats, err := nameservers.SharedNSRecordHourlyStatDAO.FindHourlyStats(tx, hourFrom, hourTo)
if err != nil {
return nil, err
}
for _, stat := range hourlyStats {
result.HourlyTrafficStats = append(result.HourlyTrafficStats, &pb.ComposeNSBoardResponse_HourlyTrafficStat{
Hour: stat.Hour,
Bytes: int64(stat.Bytes),
CountRequests: int64(stat.CountRequests),
})
}
// 按天统计
dayFrom := timeutil.Format("Ymd", time.Now().AddDate(0, 0, -14))
dayTo := timeutil.Format("Ymd")
dailyStats, err := nameservers.SharedNSRecordHourlyStatDAO.FindDailyStats(tx, dayFrom, dayTo)
if err != nil {
return nil, err
}
for _, stat := range dailyStats {
result.DailyTrafficStats = append(result.DailyTrafficStats, &pb.ComposeNSBoardResponse_DailyTrafficStat{
Day: stat.Day,
Bytes: int64(stat.Bytes),
CountRequests: int64(stat.CountRequests),
})
}
// 域名排行
topDomainStats, err := nameservers.SharedNSRecordHourlyStatDAO.ListTopDomains(tx, hourFrom, hourTo, 10)
if err != nil {
return nil, err
}
for _, stat := range topDomainStats {
domainName, err := nameservers.SharedNSDomainDAO.FindNSDomainName(tx, int64(stat.DomainId))
if err != nil {
return nil, err
}
if len(domainName) == 0 {
continue
}
result.TopNSDomainStats = append(result.TopNSDomainStats, &pb.ComposeNSBoardResponse_DomainStat{
NsDomainId: int64(stat.DomainId),
NsDomainName: domainName,
CountRequests: int64(stat.CountRequests),
Bytes: int64(stat.Bytes),
})
}
// 节点排行
topNodeStats, err := nameservers.SharedNSRecordHourlyStatDAO.ListTopNodes(tx, hourFrom, hourTo, 10)
if err != nil {
return nil, err
}
for _, stat := range topNodeStats {
nodeName, err := nameservers.SharedNSNodeDAO.FindEnabledNSNodeName(tx, int64(stat.NodeId))
if err != nil {
return nil, err
}
if len(nodeName) == 0 {
continue
}
result.TopNSNodeStats = append(result.TopNSNodeStats, &pb.ComposeNSBoardResponse_NodeStat{
NsClusterId: int64(stat.ClusterId),
NsNodeId: int64(stat.NodeId),
NsNodeName: nodeName,
CountRequests: int64(stat.CountRequests),
Bytes: int64(stat.Bytes),
})
}
// CPU、内存、负载
cpuValues, err := models.SharedNodeValueDAO.ListValuesForNSNodes(tx, nodeconfigs.NodeValueItemCPU, "usage", nodeconfigs.NodeValueRangeMinute)
if err != nil {
return nil, err
}
for _, v := range cpuValues {
valueJSON, err := json.Marshal(types.Float32(v.Value))
if err != nil {
return nil, err
}
result.CpuNodeValues = append(result.CpuNodeValues, &pb.NodeValue{
ValueJSON: valueJSON,
CreatedAt: int64(v.CreatedAt),
})
}
memoryValues, err := models.SharedNodeValueDAO.ListValuesForNSNodes(tx, nodeconfigs.NodeValueItemMemory, "usage", nodeconfigs.NodeValueRangeMinute)
if err != nil {
return nil, err
}
for _, v := range memoryValues {
valueJSON, err := json.Marshal(types.Float32(v.Value))
if err != nil {
return nil, err
}
result.MemoryNodeValues = append(result.MemoryNodeValues, &pb.NodeValue{
ValueJSON: valueJSON,
CreatedAt: int64(v.CreatedAt),
})
}
loadValues, err := models.SharedNodeValueDAO.ListValuesForNSNodes(tx, nodeconfigs.NodeValueItemLoad, "load5m", nodeconfigs.NodeValueRangeMinute)
if err != nil {
return nil, err
}
for _, v := range loadValues {
valueJSON, err := json.Marshal(types.Float32(v.Value))
if err != nil {
return nil, err
}
result.LoadNodeValues = append(result.LoadNodeValues, &pb.NodeValue{
ValueJSON: valueJSON,
CreatedAt: int64(v.CreatedAt),
})
}
return result, nil
}

View File

@@ -69,7 +69,7 @@ func (this *NSRecordService) CountAllEnabledNSRecords(ctx context.Context, req *
} }
var tx = this.NullTx() var tx = this.NullTx()
count, err := nameservers.SharedNSRecordDAO.CountAllEnabledRecords(tx, req.NsDomainId, req.Type, req.Keyword, req.NsRouteId) count, err := nameservers.SharedNSRecordDAO.CountAllEnabledDomainRecords(tx, req.NsDomainId, req.Type, req.Keyword, req.NsRouteId)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -0,0 +1,49 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package nameservers
import (
"context"
"github.com/TeaOSLab/EdgeAPI/internal/db/models/nameservers"
"github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeAPI/internal/rpc/services"
rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
timeutil "github.com/iwind/TeaGo/utils/time"
)
// NSRecordHourlyStatService NS记录小时统计
type NSRecordHourlyStatService struct {
services.BaseService
}
// UploadNSRecordHourlyStats 上传统计
func (this *NSRecordHourlyStatService) UploadNSRecordHourlyStats(ctx context.Context, req *pb.UploadNSRecordHourlyStatsRequest) (*pb.RPCSuccess, error) {
_, nodeId, _, err := rpcutils.ValidateRequest(ctx, rpcutils.UserTypeDNS)
if err != nil {
return nil, err
}
if nodeId <= 0 {
return nil, errors.New("invalid nodeId")
}
if len(req.Stats) == 0 {
return this.Success()
}
var tx = this.NullTx()
clusterId, err := nameservers.SharedNSNodeDAO.FindNodeClusterId(tx, nodeId)
if err != nil {
return nil, err
}
// 增加小时统计
for _, stat := range req.Stats {
err := nameservers.SharedNSRecordHourlyStatDAO.IncreaseHourlyStat(tx, clusterId, nodeId, timeutil.FormatTime("YmdH", stat.CreatedAt), stat.NsDomainId, stat.NsRecordId, stat.CountRequests, stat.Bytes)
if err != nil {
return nil, err
}
}
return this.Success()
}