mirror of
https://github.com/TeaOSLab/EdgeAPI.git
synced 2026-02-12 20:55:36 +08:00
实现数据看板--用户
This commit is contained in:
@@ -116,6 +116,30 @@ func (this *NodeValueDAO) ListValuesWithClusterId(tx *dbs.Tx, role string, clust
|
||||
return
|
||||
}
|
||||
|
||||
// ListValuesForUserNodes 列出用户节点相关的平均数据
|
||||
func (this *NodeValueDAO) ListValuesForUserNodes(tx *dbs.Tx, item string, key string, timeRange nodeconfigs.NodeValueRange) (result []*NodeValue, err error) {
|
||||
query := this.Query(tx).
|
||||
Attr("role", "user").
|
||||
Attr("item", item).
|
||||
Result("AVG(JSON_EXTRACT(value, '$." + key + "')) AS value, MIN(createdAt) AS createdAt")
|
||||
|
||||
switch timeRange {
|
||||
// TODO 支持更多的时间范围
|
||||
case nodeconfigs.NodeValueRangeMinute:
|
||||
fromMinute := timeutil.FormatTime("YmdHi", time.Now().Unix()-3600) // 一个小时之前的
|
||||
query.Gte("minute", fromMinute)
|
||||
query.Result("minute")
|
||||
query.Group("minute")
|
||||
default:
|
||||
err = errors.New("invalid 'range' value: '" + timeRange + "'")
|
||||
return
|
||||
}
|
||||
|
||||
_, err = query.Slice(&result).
|
||||
FindAll()
|
||||
return
|
||||
}
|
||||
|
||||
// SumValues 计算某项参数值
|
||||
func (this *NodeValueDAO) SumValues(tx *dbs.Tx, role string, nodeId int64, item string, param string, method nodeconfigs.NodeValueSumMethod, duration int32, durationUnit nodeconfigs.NodeValueDurationUnit) (float64, error) {
|
||||
if duration <= 0 {
|
||||
|
||||
@@ -51,18 +51,29 @@ func init() {
|
||||
|
||||
// SaveStats 提交数据
|
||||
func (this *ServerDailyStatDAO) SaveStats(tx *dbs.Tx, stats []*pb.ServerDailyStat) error {
|
||||
var serverUserMap = map[int64]int64{} // serverId => userId
|
||||
for _, stat := range stats {
|
||||
day := timeutil.FormatTime("Ymd", stat.CreatedAt)
|
||||
hour := timeutil.FormatTime("YmdH", stat.CreatedAt)
|
||||
timeFrom := timeutil.FormatTime("His", stat.CreatedAt)
|
||||
timeTo := timeutil.FormatTime("His", stat.CreatedAt+5*60-1) // 5分钟
|
||||
|
||||
serverUserId, ok := serverUserMap[stat.ServerId]
|
||||
if !ok {
|
||||
userId, err := SharedServerDAO.FindServerUserId(tx, stat.ServerId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
serverUserId = userId
|
||||
}
|
||||
|
||||
_, _, err := this.Query(tx).
|
||||
Param("bytes", stat.Bytes).
|
||||
Param("cachedBytes", stat.CachedBytes).
|
||||
Param("countRequests", stat.CountRequests).
|
||||
Param("countCachedRequests", stat.CountCachedRequests).
|
||||
InsertOrUpdate(maps.Map{
|
||||
"userId": serverUserId,
|
||||
"serverId": stat.ServerId,
|
||||
"regionId": stat.RegionId,
|
||||
"bytes": dbs.SQL("bytes+:bytes"),
|
||||
@@ -94,8 +105,7 @@ func (this *ServerDailyStatDAO) SumUserMonthly(tx *dbs.Tx, userId int64, regionI
|
||||
query.Attr("regionId", regionId)
|
||||
}
|
||||
return query.Between("day", month+"01", month+"32").
|
||||
Where("serverId IN (SELECT id FROM "+SharedServerDAO.Table+" WHERE userId=:userId)").
|
||||
Param("userId", userId).
|
||||
Attr("userId", userId).
|
||||
SumInt64("bytes", 0)
|
||||
}
|
||||
|
||||
@@ -107,8 +117,7 @@ func (this *ServerDailyStatDAO) SumUserMonthlyPeek(tx *dbs.Tx, userId int64, reg
|
||||
query.Attr("regionId", regionId)
|
||||
}
|
||||
max, err := query.Between("day", month+"01", month+"32").
|
||||
Where("serverId IN (SELECT id FROM "+SharedServerDAO.Table+" WHERE userId=:userId)").
|
||||
Param("userId", userId).
|
||||
Attr("userId", userId).
|
||||
Max("bytes", 0)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
@@ -125,8 +134,7 @@ func (this *ServerDailyStatDAO) SumUserDaily(tx *dbs.Tx, userId int64, regionId
|
||||
}
|
||||
return query.
|
||||
Attr("day", day).
|
||||
Where("serverId IN (SELECT id FROM "+SharedServerDAO.Table+" WHERE userId=:userId)").
|
||||
Param("userId", userId).
|
||||
Attr("userId", userId).
|
||||
SumInt64("bytes", 0)
|
||||
}
|
||||
|
||||
@@ -139,8 +147,7 @@ func (this *ServerDailyStatDAO) SumUserDailyPeek(tx *dbs.Tx, userId int64, regio
|
||||
}
|
||||
max, err := query.
|
||||
Attr("day", day).
|
||||
Where("serverId IN (SELECT id FROM "+SharedServerDAO.Table+" WHERE userId=:userId)").
|
||||
Param("userId", userId).
|
||||
Attr("userId", userId).
|
||||
Max("bytes", 0)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
@@ -304,6 +311,18 @@ func (this *ServerDailyStatDAO) FindHourlyStats(tx *dbs.Tx, serverId int64, hour
|
||||
return
|
||||
}
|
||||
|
||||
// FindTopUserStats 流量排行
|
||||
func (this *ServerDailyStatDAO) FindTopUserStats(tx *dbs.Tx, hourFrom string, hourTo string) (result []*ServerDailyStat, err error) {
|
||||
_, err = this.Query(tx).
|
||||
Result("userId", "SUM(bytes) AS bytes", "SUM(countRequests) AS countRequests").
|
||||
Between("hour", hourFrom, hourTo).
|
||||
Where("userId>0").
|
||||
Group("userId").
|
||||
Slice(&result).
|
||||
FindAll()
|
||||
return
|
||||
}
|
||||
|
||||
// Clean 清理历史数据
|
||||
func (this *ServerDailyStatDAO) Clean(tx *dbs.Tx, days int) error {
|
||||
var day = timeutil.Format("Ymd", time.Now().AddDate(0, 0, -days))
|
||||
|
||||
@@ -3,6 +3,7 @@ package models
|
||||
// ServerDailyStat 计费流量统计
|
||||
type ServerDailyStat struct {
|
||||
Id uint64 `field:"id"` // ID
|
||||
UserId uint32 `field:"userId"` // 用户ID
|
||||
ServerId uint32 `field:"serverId"` // 服务ID
|
||||
RegionId uint32 `field:"regionId"` // 区域ID
|
||||
Bytes uint64 `field:"bytes"` // 流量
|
||||
@@ -18,6 +19,7 @@ type ServerDailyStat struct {
|
||||
|
||||
type ServerDailyStatOperator struct {
|
||||
Id interface{} // ID
|
||||
UserId interface{} // 用户ID
|
||||
ServerId interface{} // 服务ID
|
||||
RegionId interface{} // 区域ID
|
||||
Bytes interface{} // 流量
|
||||
|
||||
@@ -1198,6 +1198,18 @@ func (this *ServerDAO) FindServerAdminIdAndUserId(tx *dbs.Tx, serverId int64) (a
|
||||
return int64(one.(*Server).AdminId), int64(one.(*Server).UserId), nil
|
||||
}
|
||||
|
||||
// FindServerUserId 查找服务的用户ID
|
||||
func (this *ServerDAO) FindServerUserId(tx *dbs.Tx, serverId int64) (userId int64, err error) {
|
||||
one, _, err := this.Query(tx).
|
||||
Pk(serverId).
|
||||
Result("userId").
|
||||
FindOne()
|
||||
if err != nil || one == nil {
|
||||
return 0, err
|
||||
}
|
||||
return one.GetInt64("userId"), nil
|
||||
}
|
||||
|
||||
// CheckUserServer 检查用户服务
|
||||
func (this *ServerDAO) CheckUserServer(tx *dbs.Tx, userId int64, serverId int64) error {
|
||||
if serverId <= 0 || userId <= 0 {
|
||||
|
||||
@@ -3,11 +3,14 @@ package models
|
||||
import (
|
||||
"encoding/json"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/errors"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/utils"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
_ "github.com/go-sql-driver/mysql"
|
||||
"github.com/iwind/TeaGo/Tea"
|
||||
"github.com/iwind/TeaGo/dbs"
|
||||
"github.com/iwind/TeaGo/types"
|
||||
stringutil "github.com/iwind/TeaGo/utils/string"
|
||||
timeutil "github.com/iwind/TeaGo/utils/time"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -97,6 +100,7 @@ func (this *UserDAO) CreateUser(tx *dbs.Tx, username string, password string, fu
|
||||
op.Remark = remark
|
||||
op.Source = source
|
||||
op.ClusterId = clusterId
|
||||
op.Day = timeutil.Format("Ymd")
|
||||
|
||||
op.IsOn = true
|
||||
op.State = UserStateEnabled
|
||||
@@ -289,3 +293,50 @@ func (this *UserDAO) FindUserFeatures(tx *dbs.Tx, userId int64) ([]*UserFeature,
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// SumDailyUsers 获取当天用户数量
|
||||
func (this *UserDAO) SumDailyUsers(tx *dbs.Tx, dayFrom string, dayTo string) (int64, error) {
|
||||
return this.Query(tx).
|
||||
Between("day", dayFrom, dayTo).
|
||||
State(UserStateEnabled).
|
||||
Count()
|
||||
}
|
||||
|
||||
// CountDailyUsers 计算每天用户数
|
||||
func (this *UserDAO) CountDailyUsers(tx *dbs.Tx, dayFrom string, dayTo string) ([]*pb.ComposeUserGlobalBoardResponse_DailyStat, error) {
|
||||
ones, _, err := this.Query(tx).
|
||||
Result("COUNT(*) AS count", "day").
|
||||
Between("day", dayFrom, dayTo).
|
||||
State(UserStateEnabled).
|
||||
Group("day").
|
||||
FindOnes()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var m = map[string]*pb.ComposeUserGlobalBoardResponse_DailyStat{} // day => Stat
|
||||
for _, one := range ones {
|
||||
m[one.GetString("day")] = &pb.ComposeUserGlobalBoardResponse_DailyStat{
|
||||
Day: one.GetString("day"),
|
||||
Count: one.GetInt64("count"),
|
||||
}
|
||||
}
|
||||
|
||||
var result = []*pb.ComposeUserGlobalBoardResponse_DailyStat{}
|
||||
days, err := utils.RangeDays(dayFrom, dayTo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, day := range days {
|
||||
stat, ok := m[day]
|
||||
if ok {
|
||||
result = append(result, stat)
|
||||
} else {
|
||||
result = append(result, &pb.ComposeUserGlobalBoardResponse_DailyStat{
|
||||
Day: day,
|
||||
Count: 0,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
package models
|
||||
|
||||
// 用户
|
||||
// User 用户
|
||||
type User struct {
|
||||
Id uint32 `field:"id"` // ID
|
||||
IsOn uint8 `field:"isOn"` // 是否启用
|
||||
@@ -13,6 +13,7 @@ type User struct {
|
||||
Email string `field:"email"` // 邮箱地址
|
||||
AvatarFileId uint64 `field:"avatarFileId"` // 头像文件ID
|
||||
CreatedAt uint64 `field:"createdAt"` // 创建时间
|
||||
Day string `field:"day"` // YYYYMMDD
|
||||
UpdatedAt uint64 `field:"updatedAt"` // 修改时间
|
||||
State uint8 `field:"state"` // 状态
|
||||
Source string `field:"source"` // 来源
|
||||
@@ -32,6 +33,7 @@ type UserOperator struct {
|
||||
Email interface{} // 邮箱地址
|
||||
AvatarFileId interface{} // 头像文件ID
|
||||
CreatedAt interface{} // 创建时间
|
||||
Day interface{} // YYYYMMDD
|
||||
UpdatedAt interface{} // 修改时间
|
||||
State interface{} // 状态
|
||||
Source interface{} // 来源
|
||||
|
||||
@@ -265,3 +265,11 @@ func (this *UserNodeDAO) CountAllLowerVersionNodes(tx *dbs.Tx, version string) (
|
||||
Param("version", utils.VersionToLong(version)).
|
||||
Count()
|
||||
}
|
||||
|
||||
// CountOfflineNodes 计算离线节点数量
|
||||
func (this *UserNodeDAO) CountOfflineNodes(tx *dbs.Tx) (int64, error) {
|
||||
return this.Query(tx).
|
||||
State(UserNodeStateEnabled).
|
||||
Where("status IS NULL OR JSON_EXTRACT(status, '$.updatedAt')<UNIX_TIMESTAMP()-120").
|
||||
Count()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user