Files
EdgeAPI/internal/rpc/services/service_http_cache_task.go
GoEdgeLab 5a17ae9d79 v1.4.1
2024-07-27 14:15:25 +08:00

445 lines
12 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Copyright 2022 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cloud .
package services
import (
"context"
"time"
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeAPI/internal/utils"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeCommon/pkg/userconfigs"
"github.com/iwind/TeaGo/types"
timeutil "github.com/iwind/TeaGo/utils/time"
)
// HTTPCacheTaskService 缓存任务管理
type HTTPCacheTaskService struct {
BaseService
}
// CreateHTTPCacheTask 创建任务
func (this *HTTPCacheTaskService) CreateHTTPCacheTask(ctx context.Context, req *pb.CreateHTTPCacheTaskRequest) (*pb.CreateHTTPCacheTaskResponse, error) {
_, userId, err := this.ValidateAdminAndUser(ctx, true)
if err != nil {
return nil, err
}
var tx = this.NullTx()
// 检查操作类型
if len(req.Type) == 0 {
return nil, errors.New("require 'type' parameter")
}
if req.Type != models.HTTPCacheTaskTypePurge && req.Type != models.HTTPCacheTaskTypeFetch {
return nil, errors.New("'type' must be 'purge' or 'fetch'")
}
// 检查Key类型
if len(req.KeyType) == 0 {
return nil, errors.New("require 'keyType' parameter")
}
if req.KeyType != "key" && req.KeyType != "prefix" {
return nil, errors.New("'keyType' must be 'key' or 'prefix'")
}
// 预热只能是Key
if req.Type == models.HTTPCacheTaskTypeFetch && req.KeyType != "key" {
return nil, errors.New("'keyType' should be 'key' when fetching cache")
}
// 检查key是否为空
if len(req.Keys) == 0 {
return nil, errors.New("'keys' should not be empty")
}
// 检查Key数量
var clusterId int64
if userId > 0 {
// 限制单次
var maxKeysPerTask = userconfigs.MaxCacheKeysPerTask
var maxKeysPerDay = userconfigs.MaxCacheKeysPerDay
serverConfig, err := models.SharedSysSettingDAO.ReadUserServerConfig(tx)
if err != nil {
return nil, err
}
if serverConfig != nil {
switch req.Type {
case models.HTTPCacheTaskTypePurge:
if serverConfig.HTTPCacheTaskPurgeConfig != nil {
if serverConfig.HTTPCacheTaskPurgeConfig.MaxKeysPerTask > 0 {
maxKeysPerTask = serverConfig.HTTPCacheTaskPurgeConfig.MaxKeysPerTask
}
if serverConfig.HTTPCacheTaskPurgeConfig.MaxKeysPerDay > 0 {
maxKeysPerDay = serverConfig.HTTPCacheTaskPurgeConfig.MaxKeysPerDay
}
}
case models.HTTPCacheTaskTypeFetch:
if serverConfig.HTTPCacheTaskFetchConfig != nil {
if serverConfig.HTTPCacheTaskFetchConfig.MaxKeysPerTask > 0 {
maxKeysPerTask = serverConfig.HTTPCacheTaskFetchConfig.MaxKeysPerTask
}
if serverConfig.HTTPCacheTaskFetchConfig.MaxKeysPerDay > 0 {
maxKeysPerDay = serverConfig.HTTPCacheTaskFetchConfig.MaxKeysPerDay
}
}
}
}
if maxKeysPerTask > 0 && len(req.Keys) > types.Int(maxKeysPerTask) {
return nil, errors.New("too many keys in task (current:" + types.String(len(req.Keys)) + ", max:" + types.String(maxKeysPerTask) + ")")
}
if maxKeysPerDay > 0 {
countInDay, err := models.SharedHTTPCacheTaskKeyDAO.CountUserTasksInDay(tx, userId, timeutil.Format("Ymd"), req.Type)
if err != nil {
return nil, err
}
if types.Int(countInDay)+len(req.Keys) > types.Int(maxKeysPerDay) {
return nil, errors.New("too many keys in today (current:" + types.String(types.Int(countInDay)+len(req.Keys)) + ", max:" + types.String(maxKeysPerDay) + ")")
}
}
clusterId, err = models.SharedUserDAO.FindUserClusterId(tx, userId)
if err != nil {
return nil, err
}
}
// 创建任务
taskId, err := models.SharedHTTPCacheTaskDAO.CreateTask(tx, userId, req.Type, req.KeyType, "")
if err != nil {
return nil, err
}
var countKeys = 0
var domainMap = map[string]*models.Server{} // domain name => *Server
for _, key := range req.Keys {
if len(key) == 0 {
continue
}
// 获取域名
var domain = utils.ParseDomainFromKey(key)
if len(domain) == 0 {
continue
}
// 查询所在集群
server, ok := domainMap[domain]
if !ok {
server, err = models.SharedServerDAO.FindEnabledServerWithDomain(tx, userId, domain)
if err != nil {
return nil, err
}
if server == nil {
continue
}
domainMap[domain] = server
}
// 检查用户
if userId > 0 {
if int64(server.UserId) != userId {
continue
}
}
var serverClusterId = int64(server.ClusterId)
if serverClusterId == 0 {
if clusterId > 0 {
serverClusterId = clusterId
} else {
continue
}
}
_, err = models.SharedHTTPCacheTaskKeyDAO.CreateKey(tx, taskId, key, req.Type, req.KeyType, serverClusterId)
if err != nil {
return nil, err
}
countKeys++
}
if countKeys == 0 {
// 如果没有有效的Key则直接完成
err = models.SharedHTTPCacheTaskDAO.UpdateTaskStatus(tx, taskId, true, true)
} else {
err = models.SharedHTTPCacheTaskDAO.UpdateTaskReady(tx, taskId)
}
if err != nil {
return nil, err
}
return &pb.CreateHTTPCacheTaskResponse{
HttpCacheTaskId: taskId,
CountKeys: int64(countKeys),
}, nil
}
// CountHTTPCacheTasks 计算任务数量
func (this *HTTPCacheTaskService) CountHTTPCacheTasks(ctx context.Context, req *pb.CountHTTPCacheTasksRequest) (*pb.RPCCountResponse, error) {
_, userId, err := this.ValidateAdminAndUser(ctx, true)
if err != nil {
return nil, err
}
var tx = this.NullTx()
count, err := models.SharedHTTPCacheTaskDAO.CountTasks(tx, userId)
if err != nil {
return nil, err
}
return this.SuccessCount(count)
}
// CountDoingHTTPCacheTasks 计算正在执行的任务数量
func (this *HTTPCacheTaskService) CountDoingHTTPCacheTasks(ctx context.Context, req *pb.CountDoingHTTPCacheTasksRequest) (*pb.RPCCountResponse, error) {
_, userId, err := this.ValidateAdminAndUser(ctx, true)
if err != nil {
return nil, err
}
var tx = this.NullTx()
count, err := models.SharedHTTPCacheTaskDAO.CountDoingTasks(tx, userId)
if err != nil {
return nil, err
}
return this.SuccessCount(count)
}
// ListHTTPCacheTasks 列出单页任务
func (this *HTTPCacheTaskService) ListHTTPCacheTasks(ctx context.Context, req *pb.ListHTTPCacheTasksRequest) (*pb.ListHTTPCacheTasksResponse, error) {
_, userId, err := this.ValidateAdminAndUser(ctx, true)
if err != nil {
return nil, err
}
var isFromUser = userId > 0
var tx = this.NullTx()
tasks, err := models.SharedHTTPCacheTaskDAO.ListTasks(tx, userId, req.Offset, req.Size)
if err != nil {
return nil, err
}
var pbTasks = []*pb.HTTPCacheTask{}
var cacheMap = utils.NewCacheMap()
for _, task := range tasks {
var taskId = int64(task.Id)
// 查询所属用户
var pbUser = &pb.User{}
if task.UserId > 0 {
var taskUserId = int64(task.UserId)
if taskUserId > 0 {
taskUser, err := models.SharedUserDAO.FindEnabledUser(tx, taskUserId, cacheMap)
if err != nil {
return nil, err
}
if taskUser == nil {
// 找不到用户就删除
err = models.SharedHTTPCacheTaskDAO.DisableHTTPCacheTask(tx, taskUserId)
if err != nil {
return nil, err
}
} else {
pbUser = &pb.User{
Id: int64(taskUser.Id),
Username: taskUser.Username,
Fullname: taskUser.Fullname,
}
}
}
}
// 对用户而言超过Ns自动认为已完成
const timeoutSeconds = 300
if isFromUser && !task.IsDone && time.Now().Unix()-int64(task.CreatedAt) > timeoutSeconds {
task.IsOk = true
task.IsDone = true
task.DoneAt = task.CreatedAt + timeoutSeconds
}
pbTasks = append(pbTasks, &pb.HTTPCacheTask{
Id: taskId,
UserId: int64(task.UserId),
Type: task.Type,
KeyType: task.KeyType,
CreatedAt: int64(task.CreatedAt),
DoneAt: int64(task.DoneAt),
IsDone: task.IsDone,
IsOk: task.IsOk,
Description: task.Description,
User: pbUser,
HttpCacheTaskKeys: nil,
})
}
return &pb.ListHTTPCacheTasksResponse{
HttpCacheTasks: pbTasks,
}, nil
}
// FindEnabledHTTPCacheTask 查找单个任务
func (this *HTTPCacheTaskService) FindEnabledHTTPCacheTask(ctx context.Context, req *pb.FindEnabledHTTPCacheTaskRequest) (*pb.FindEnabledHTTPCacheTaskResponse, error) {
_, userId, err := this.ValidateAdminAndUser(ctx, true)
if err != nil {
return nil, err
}
var isFromUser = userId > 0
var tx = this.NullTx()
if userId > 0 {
err = models.SharedHTTPCacheTaskDAO.CheckUserTask(tx, userId, req.HttpCacheTaskId)
if err != nil {
return nil, err
}
}
task, err := models.SharedHTTPCacheTaskDAO.FindEnabledHTTPCacheTask(tx, req.HttpCacheTaskId)
if err != nil {
return nil, err
}
if task == nil {
return &pb.FindEnabledHTTPCacheTaskResponse{HttpCacheTask: nil}, nil
}
// 对用户而言超过Ns自动认为已完成
const timeoutSeconds = 300
if isFromUser && !task.IsDone && time.Now().Unix()-int64(task.CreatedAt) > timeoutSeconds {
task.IsOk = true
task.IsDone = true
task.DoneAt = task.CreatedAt + timeoutSeconds
}
// 查询所属用户
var pbUser = &pb.User{}
if task.UserId > 0 {
var taskUserId = int64(task.UserId)
if taskUserId > 0 {
taskUser, err := models.SharedUserDAO.FindEnabledUser(tx, taskUserId, nil)
if err != nil {
return nil, err
}
if taskUser == nil {
// 找不到用户就删除
err = models.SharedHTTPCacheTaskDAO.DisableHTTPCacheTask(tx, taskUserId)
if err != nil {
return nil, err
}
} else {
pbUser = &pb.User{
Id: int64(taskUser.Id),
Username: taskUser.Username,
Fullname: taskUser.Fullname,
}
}
}
}
// Keys
keys, err := models.SharedHTTPCacheTaskKeyDAO.FindAllTaskKeys(tx, req.HttpCacheTaskId)
if err != nil {
return nil, err
}
var pbKeys = []*pb.HTTPCacheTaskKey{}
for _, key := range keys {
// 对用户而言超过Ns自动认为已完成
if isFromUser && task.IsDone {
key.IsDone = true
key.Errors = nil
}
// 集群信息
var pbNodeCluster *pb.NodeCluster
if !isFromUser && key.ClusterId > 0 {
clusterName, findClusterErr := models.SharedNodeClusterDAO.FindNodeClusterName(tx, int64(key.ClusterId))
if findClusterErr != nil {
return nil, findClusterErr
}
pbNodeCluster = &pb.NodeCluster{
Id: int64(key.ClusterId),
Name: clusterName,
}
}
pbKeys = append(pbKeys, &pb.HTTPCacheTaskKey{
Id: int64(key.Id),
TaskId: int64(key.TaskId),
Key: key.Key,
KeyType: key.KeyType,
IsDone: key.IsDone,
IsDoing: !key.IsDone && len(key.DecodeNodes()) > 0,
ErrorsJSON: key.Errors,
NodeCluster: pbNodeCluster,
})
}
return &pb.FindEnabledHTTPCacheTaskResponse{
HttpCacheTask: &pb.HTTPCacheTask{
Id: int64(task.Id),
UserId: int64(task.UserId),
Type: task.Type,
KeyType: task.KeyType,
CreatedAt: int64(task.CreatedAt),
DoneAt: int64(task.DoneAt),
IsDone: task.IsDone,
IsOk: task.IsOk,
User: pbUser,
HttpCacheTaskKeys: pbKeys,
},
}, nil
}
// DeleteHTTPCacheTask 删除任务
func (this *HTTPCacheTaskService) DeleteHTTPCacheTask(ctx context.Context, req *pb.DeleteHTTPCacheTaskRequest) (*pb.RPCSuccess, error) {
_, userId, err := this.ValidateAdminAndUser(ctx, true)
if err != nil {
return nil, err
}
var tx = this.NullTx()
if userId > 0 {
err = models.SharedHTTPCacheTaskDAO.CheckUserTask(tx, userId, req.HttpCacheTaskId)
if err != nil {
return nil, err
}
}
err = models.SharedHTTPCacheTaskDAO.DisableHTTPCacheTask(tx, req.HttpCacheTaskId)
if err != nil {
return nil, err
}
return this.Success()
}
// ResetHTTPCacheTask 重置任务状态
// 只允许管理员重置,用于调试
func (this *HTTPCacheTaskService) ResetHTTPCacheTask(ctx context.Context, req *pb.ResetHTTPCacheTaskRequest) (*pb.RPCSuccess, error) {
_, err := this.ValidateAdmin(ctx)
if err != nil {
return nil, err
}
var tx = this.NullTx()
// 重置任务
err = models.SharedHTTPCacheTaskDAO.ResetTask(tx, req.HttpCacheTaskId)
if err != nil {
return nil, err
}
// 重置任务下的Key
err = models.SharedHTTPCacheTaskKeyDAO.ResetCacheKeysWithTaskId(tx, req.HttpCacheTaskId)
if err != nil {
return nil, err
}
return this.Success()
}