diff --git a/internal/const/const.go b/internal/const/const.go index 46de635f..1fb7a500 100644 --- a/internal/const/const.go +++ b/internal/const/const.go @@ -19,7 +19,7 @@ const ( // 其他节点版本号,用来检测是否有需要升级的节点 NodeVersion = "0.2.5" - UserNodeVersion = "0.0.9" + UserNodeVersion = "0.0.10" AuthorityNodeVersion = "0.0.2" MonitorNodeVersion = "0.0.2" DNSNodeVersion = "0.0.2" diff --git a/internal/db/models/node_value_dao.go b/internal/db/models/node_value_dao.go index 2dcd11b5..56b8cc72 100644 --- a/internal/db/models/node_value_dao.go +++ b/internal/db/models/node_value_dao.go @@ -116,6 +116,30 @@ func (this *NodeValueDAO) ListValuesWithClusterId(tx *dbs.Tx, role string, clust return } +// ListValuesForUserNodes 列出用户节点相关的平均数据 +func (this *NodeValueDAO) ListValuesForUserNodes(tx *dbs.Tx, item string, key string, timeRange nodeconfigs.NodeValueRange) (result []*NodeValue, err error) { + query := this.Query(tx). + Attr("role", "user"). + Attr("item", item). + Result("AVG(JSON_EXTRACT(value, '$." + key + "')) AS value, MIN(createdAt) AS createdAt") + + switch timeRange { + // TODO 支持更多的时间范围 + case nodeconfigs.NodeValueRangeMinute: + fromMinute := timeutil.FormatTime("YmdHi", time.Now().Unix()-3600) // 一个小时之前的 + query.Gte("minute", fromMinute) + query.Result("minute") + query.Group("minute") + default: + err = errors.New("invalid 'range' value: '" + timeRange + "'") + return + } + + _, err = query.Slice(&result). + FindAll() + return +} + // SumValues 计算某项参数值 func (this *NodeValueDAO) SumValues(tx *dbs.Tx, role string, nodeId int64, item string, param string, method nodeconfigs.NodeValueSumMethod, duration int32, durationUnit nodeconfigs.NodeValueDurationUnit) (float64, error) { if duration <= 0 { diff --git a/internal/db/models/server_daily_stat_dao.go b/internal/db/models/server_daily_stat_dao.go index 9760a213..32f10dba 100644 --- a/internal/db/models/server_daily_stat_dao.go +++ b/internal/db/models/server_daily_stat_dao.go @@ -51,18 +51,29 @@ func init() { // SaveStats 提交数据 func (this *ServerDailyStatDAO) SaveStats(tx *dbs.Tx, stats []*pb.ServerDailyStat) error { + var serverUserMap = map[int64]int64{} // serverId => userId for _, stat := range stats { day := timeutil.FormatTime("Ymd", stat.CreatedAt) hour := timeutil.FormatTime("YmdH", stat.CreatedAt) timeFrom := timeutil.FormatTime("His", stat.CreatedAt) timeTo := timeutil.FormatTime("His", stat.CreatedAt+5*60-1) // 5分钟 + serverUserId, ok := serverUserMap[stat.ServerId] + if !ok { + userId, err := SharedServerDAO.FindServerUserId(tx, stat.ServerId) + if err != nil { + return err + } + serverUserId = userId + } + _, _, err := this.Query(tx). Param("bytes", stat.Bytes). Param("cachedBytes", stat.CachedBytes). Param("countRequests", stat.CountRequests). Param("countCachedRequests", stat.CountCachedRequests). InsertOrUpdate(maps.Map{ + "userId": serverUserId, "serverId": stat.ServerId, "regionId": stat.RegionId, "bytes": dbs.SQL("bytes+:bytes"), @@ -94,8 +105,7 @@ func (this *ServerDailyStatDAO) SumUserMonthly(tx *dbs.Tx, userId int64, regionI query.Attr("regionId", regionId) } return query.Between("day", month+"01", month+"32"). - Where("serverId IN (SELECT id FROM "+SharedServerDAO.Table+" WHERE userId=:userId)"). - Param("userId", userId). + Attr("userId", userId). SumInt64("bytes", 0) } @@ -107,8 +117,7 @@ func (this *ServerDailyStatDAO) SumUserMonthlyPeek(tx *dbs.Tx, userId int64, reg query.Attr("regionId", regionId) } max, err := query.Between("day", month+"01", month+"32"). - Where("serverId IN (SELECT id FROM "+SharedServerDAO.Table+" WHERE userId=:userId)"). - Param("userId", userId). + Attr("userId", userId). Max("bytes", 0) if err != nil { return 0, err @@ -125,8 +134,7 @@ func (this *ServerDailyStatDAO) SumUserDaily(tx *dbs.Tx, userId int64, regionId } return query. Attr("day", day). - Where("serverId IN (SELECT id FROM "+SharedServerDAO.Table+" WHERE userId=:userId)"). - Param("userId", userId). + Attr("userId", userId). SumInt64("bytes", 0) } @@ -139,8 +147,7 @@ func (this *ServerDailyStatDAO) SumUserDailyPeek(tx *dbs.Tx, userId int64, regio } max, err := query. Attr("day", day). - Where("serverId IN (SELECT id FROM "+SharedServerDAO.Table+" WHERE userId=:userId)"). - Param("userId", userId). + Attr("userId", userId). Max("bytes", 0) if err != nil { return 0, err @@ -304,6 +311,18 @@ func (this *ServerDailyStatDAO) FindHourlyStats(tx *dbs.Tx, serverId int64, hour return } +// FindTopUserStats 流量排行 +func (this *ServerDailyStatDAO) FindTopUserStats(tx *dbs.Tx, hourFrom string, hourTo string) (result []*ServerDailyStat, err error) { + _, err = this.Query(tx). + Result("userId", "SUM(bytes) AS bytes", "SUM(countRequests) AS countRequests"). + Between("hour", hourFrom, hourTo). + Where("userId>0"). + Group("userId"). + Slice(&result). + FindAll() + return +} + // Clean 清理历史数据 func (this *ServerDailyStatDAO) Clean(tx *dbs.Tx, days int) error { var day = timeutil.Format("Ymd", time.Now().AddDate(0, 0, -days)) diff --git a/internal/db/models/server_daily_stat_model.go b/internal/db/models/server_daily_stat_model.go index 1d1c96d2..61592b4d 100644 --- a/internal/db/models/server_daily_stat_model.go +++ b/internal/db/models/server_daily_stat_model.go @@ -3,6 +3,7 @@ package models // ServerDailyStat 计费流量统计 type ServerDailyStat struct { Id uint64 `field:"id"` // ID + UserId uint32 `field:"userId"` // 用户ID ServerId uint32 `field:"serverId"` // 服务ID RegionId uint32 `field:"regionId"` // 区域ID Bytes uint64 `field:"bytes"` // 流量 @@ -18,6 +19,7 @@ type ServerDailyStat struct { type ServerDailyStatOperator struct { Id interface{} // ID + UserId interface{} // 用户ID ServerId interface{} // 服务ID RegionId interface{} // 区域ID Bytes interface{} // 流量 diff --git a/internal/db/models/server_dao.go b/internal/db/models/server_dao.go index c7c97a12..6259602a 100644 --- a/internal/db/models/server_dao.go +++ b/internal/db/models/server_dao.go @@ -1198,6 +1198,18 @@ func (this *ServerDAO) FindServerAdminIdAndUserId(tx *dbs.Tx, serverId int64) (a return int64(one.(*Server).AdminId), int64(one.(*Server).UserId), nil } +// FindServerUserId 查找服务的用户ID +func (this *ServerDAO) FindServerUserId(tx *dbs.Tx, serverId int64) (userId int64, err error) { + one, _, err := this.Query(tx). + Pk(serverId). + Result("userId"). + FindOne() + if err != nil || one == nil { + return 0, err + } + return one.GetInt64("userId"), nil +} + // CheckUserServer 检查用户服务 func (this *ServerDAO) CheckUserServer(tx *dbs.Tx, userId int64, serverId int64) error { if serverId <= 0 || userId <= 0 { diff --git a/internal/db/models/user_dao.go b/internal/db/models/user_dao.go index e407f97f..62d99931 100644 --- a/internal/db/models/user_dao.go +++ b/internal/db/models/user_dao.go @@ -3,11 +3,14 @@ package models import ( "encoding/json" "github.com/TeaOSLab/EdgeAPI/internal/errors" + "github.com/TeaOSLab/EdgeAPI/internal/utils" + "github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb" _ "github.com/go-sql-driver/mysql" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/types" stringutil "github.com/iwind/TeaGo/utils/string" + timeutil "github.com/iwind/TeaGo/utils/time" ) const ( @@ -97,6 +100,7 @@ func (this *UserDAO) CreateUser(tx *dbs.Tx, username string, password string, fu op.Remark = remark op.Source = source op.ClusterId = clusterId + op.Day = timeutil.Format("Ymd") op.IsOn = true op.State = UserStateEnabled @@ -289,3 +293,50 @@ func (this *UserDAO) FindUserFeatures(tx *dbs.Tx, userId int64) ([]*UserFeature, return result, nil } + +// SumDailyUsers 获取当天用户数量 +func (this *UserDAO) SumDailyUsers(tx *dbs.Tx, dayFrom string, dayTo string) (int64, error) { + return this.Query(tx). + Between("day", dayFrom, dayTo). + State(UserStateEnabled). + Count() +} + +// CountDailyUsers 计算每天用户数 +func (this *UserDAO) CountDailyUsers(tx *dbs.Tx, dayFrom string, dayTo string) ([]*pb.ComposeUserGlobalBoardResponse_DailyStat, error) { + ones, _, err := this.Query(tx). + Result("COUNT(*) AS count", "day"). + Between("day", dayFrom, dayTo). + State(UserStateEnabled). + Group("day"). + FindOnes() + if err != nil { + return nil, err + } + var m = map[string]*pb.ComposeUserGlobalBoardResponse_DailyStat{} // day => Stat + for _, one := range ones { + m[one.GetString("day")] = &pb.ComposeUserGlobalBoardResponse_DailyStat{ + Day: one.GetString("day"), + Count: one.GetInt64("count"), + } + } + + var result = []*pb.ComposeUserGlobalBoardResponse_DailyStat{} + days, err := utils.RangeDays(dayFrom, dayTo) + if err != nil { + return nil, err + } + for _, day := range days { + stat, ok := m[day] + if ok { + result = append(result, stat) + } else { + result = append(result, &pb.ComposeUserGlobalBoardResponse_DailyStat{ + Day: day, + Count: 0, + }) + } + } + + return result, nil +} diff --git a/internal/db/models/user_model.go b/internal/db/models/user_model.go index 8df5d524..b4741ecb 100644 --- a/internal/db/models/user_model.go +++ b/internal/db/models/user_model.go @@ -1,6 +1,6 @@ package models -// 用户 +// User 用户 type User struct { Id uint32 `field:"id"` // ID IsOn uint8 `field:"isOn"` // 是否启用 @@ -13,6 +13,7 @@ type User struct { Email string `field:"email"` // 邮箱地址 AvatarFileId uint64 `field:"avatarFileId"` // 头像文件ID CreatedAt uint64 `field:"createdAt"` // 创建时间 + Day string `field:"day"` // YYYYMMDD UpdatedAt uint64 `field:"updatedAt"` // 修改时间 State uint8 `field:"state"` // 状态 Source string `field:"source"` // 来源 @@ -32,6 +33,7 @@ type UserOperator struct { Email interface{} // 邮箱地址 AvatarFileId interface{} // 头像文件ID CreatedAt interface{} // 创建时间 + Day interface{} // YYYYMMDD UpdatedAt interface{} // 修改时间 State interface{} // 状态 Source interface{} // 来源 diff --git a/internal/db/models/user_node_dao.go b/internal/db/models/user_node_dao.go index ce2bb087..193e808d 100644 --- a/internal/db/models/user_node_dao.go +++ b/internal/db/models/user_node_dao.go @@ -265,3 +265,11 @@ func (this *UserNodeDAO) CountAllLowerVersionNodes(tx *dbs.Tx, version string) ( Param("version", utils.VersionToLong(version)). Count() } + +// CountOfflineNodes 计算离线节点数量 +func (this *UserNodeDAO) CountOfflineNodes(tx *dbs.Tx) (int64, error) { + return this.Query(tx). + State(UserNodeStateEnabled). + Where("status IS NULL OR JSON_EXTRACT(status, '$.updatedAt') 600 { // 请求超过10分钟认为超时 - return UserTypeNone, 0, errors.New("authenticate timeout, please check your system clock") + return UserTypeNone, 0, 0, errors.New("authenticate timeout, please check your system clock") } t := m.GetString("type") if len(userTypes) > 0 && !lists.ContainsString(userTypes, t) { - return UserTypeNone, 0, errors.New("not supported node type: '" + t + "'") + return UserTypeNone, 0, 0, errors.New("not supported node type: '" + t + "'") } switch apiToken.Role { @@ -141,47 +141,67 @@ func ValidateRequest(ctx context.Context, userTypes ...UserType) (userType UserT // TODO 需要检查集群是否已经删除 nodeIntId, err := models.SharedNodeDAO.FindEnabledNodeIdWithUniqueIdCacheable(nil, nodeId) if err != nil { - return UserTypeNode, 0, errors.New("context: " + err.Error()) + return UserTypeNode, 0, 0, errors.New("context: " + err.Error()) } if nodeIntId <= 0 { - return UserTypeNode, 0, errors.New("context: not found node with id '" + nodeId + "'") + return UserTypeNode, 0, 0, errors.New("context: not found node with id '" + nodeId + "'") } nodeUserId = nodeIntId + resultNodeId = nodeIntId case UserTypeCluster: clusterId, err := models.SharedNodeClusterDAO.FindEnabledClusterIdWithUniqueId(nil, nodeId) if err != nil { - return UserTypeCluster, 0, errors.New("context: " + err.Error()) + return UserTypeCluster, 0, 0, errors.New("context: " + err.Error()) } if clusterId <= 0 { - return UserTypeCluster, 0, errors.New("context: not found cluster with id '" + nodeId + "'") + return UserTypeCluster, 0, 0, errors.New("context: not found cluster with id '" + nodeId + "'") } nodeUserId = clusterId + resultNodeId = clusterId case UserTypeUser: + nodeIntId, err := models.SharedUserNodeDAO.FindEnabledUserNodeIdWithUniqueId(nil, nodeId) + if err != nil { + return UserTypeNode, 0, 0, errors.New("context: " + err.Error()) + } + if nodeIntId <= 0 { + return UserTypeNode, 0, 0, errors.New("context: not found node with id '" + nodeId + "'") + } + resultNodeId = nodeIntId case UserTypeMonitor: + nodeIntId, err := models.SharedMonitorNodeDAO.FindEnabledMonitorNodeIdWithUniqueId(nil, nodeId) + if err != nil { + return UserTypeNode, 0, 0, errors.New("context: " + err.Error()) + } + if nodeIntId <= 0 { + return UserTypeNode, 0, 0, errors.New("context: not found node with id '" + nodeId + "'") + } + resultNodeId = nodeIntId case UserTypeAuthority: nodeIntId, err := authority.SharedAuthorityNodeDAO.FindEnabledAuthorityNodeIdWithUniqueId(nil, nodeId) if err != nil { - return UserTypeNode, 0, errors.New("context: " + err.Error()) + return UserTypeNode, 0, 0, errors.New("context: " + err.Error()) } if nodeIntId <= 0 { - return UserTypeNode, 0, errors.New("context: not found node with id '" + nodeId + "'") + return UserTypeNode, 0, 0, errors.New("context: not found node with id '" + nodeId + "'") } nodeUserId = nodeIntId + resultNodeId = nodeIntId case UserTypeDNS: nodeIntId, err := nameservers.SharedNSNodeDAO.FindEnabledNodeIdWithUniqueId(nil, nodeId) if err != nil { - return UserTypeNode, 0, errors.New("context: " + err.Error()) + return UserTypeNode, nodeIntId, 0, errors.New("context: " + err.Error()) } if nodeIntId <= 0 { - return UserTypeNode, 0, errors.New("context: not found node with id '" + nodeId + "'") + return UserTypeNode, nodeIntId, 0, errors.New("context: not found node with id '" + nodeId + "'") } nodeUserId = nodeIntId + resultNodeId = nodeIntId } if nodeUserId > 0 { - return t, nodeUserId, nil + return t, resultNodeId, nodeUserId, nil } else { - return t, m.GetInt64("userId"), nil + return t, resultNodeId, m.GetInt64("userId"), nil } } diff --git a/internal/setup/sql_data.go b/internal/setup/sql_data.go index 9d2e0394..691ff5c8 100644 --- a/internal/setup/sql_data.go +++ b/internal/setup/sql_data.go @@ -33,9 +33,12 @@ var upgradeFuncs = []*upgradeVersion{ { "0.0.10", upgradeV0_0_10, }, + { + "0.2.5", upgradeV0_2_5, + }, } -// 升级SQL数据 +// UpgradeSQLData 升级SQL数据 func UpgradeSQLData(db *dbs.DB) error { version, err := db.FindCol(0, "SELECT version FROM edgeVersions") if err != nil { @@ -203,3 +206,12 @@ func upgradeV0_0_10(db *dbs.DB) error { return nil } + +// v0.2.5 +func upgradeV0_2_5(db *dbs.DB) error { + _, err := db.Exec("UPDATE edgeUsers SET day=FROM_UNIXTIME(createdAt,'%Y%m%d')") + if err != nil { + return err + } + return nil +}