diff --git a/internal/db/models/log_dao.go b/internal/db/models/log_dao.go index 23e4c113..5aa155f6 100644 --- a/internal/db/models/log_dao.go +++ b/internal/db/models/log_dao.go @@ -1,12 +1,15 @@ package models import ( + "github.com/TeaOSLab/EdgeAPI/internal/errors" _ "github.com/go-sql-driver/mysql" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/dbs" + "github.com/iwind/TeaGo/types" timeutil "github.com/iwind/TeaGo/utils/time" "regexp" "strings" + "time" ) type LogDAO dbs.DAO @@ -88,6 +91,43 @@ func (this *LogDAO) ListLogs(offset int64, size int64, dayFrom string, dayTo str return } +// 物理删除日志 +func (this *LogDAO) DeleteLogPermanently(logId int64) error { + if logId <= 0 { + return errors.New("invalid logId") + } + _, err := this.Delete(logId) + return err +} + +// 物理删除所有日志 +func (this *LogDAO) DeleteAllLogsPermanently() error { + _, err := this.Query(). + Delete() + return err +} + +// 物理删除某些天之前的日志 +func (this *LogDAO) DeleteLogsPermanentlyBeforeDays(days int) error { + if days <= 0 { + days = 0 + } + untilDay := timeutil.Format("Ymd", time.Now().AddDate(0, 0, -days)) + _, err := this.Query(). + Lte("day", untilDay). + Delete() + return err +} + +// 计算当前日志容量大小 +func (this *LogDAO) SumLogsSize() (int64, error) { + col, err := this.Instance.FindCol(0, "SELECT DATA_LENGTH FROM information_schema.TABLES WHERE TABLE_SCHEMA=? AND TABLE_NAME=? LIMIT 1", this.Instance.Name(), this.Table) + if err != nil { + return 0, err + } + return types.Int64(col), nil +} + // 格式化日期 func (this *LogDAO) formatDay(day string) string { if !regexp.MustCompile(`^\d{4}-\d{2}-\d{2}$`).MatchString(day) { diff --git a/internal/db/models/log_dao_test.go b/internal/db/models/log_dao_test.go index 97c24b56..7ed61979 100644 --- a/internal/db/models/log_dao_test.go +++ b/internal/db/models/log_dao_test.go @@ -2,4 +2,16 @@ package models import ( _ "github.com/go-sql-driver/mysql" + "github.com/iwind/TeaGo/dbs" + "testing" ) + +func TestLogDAO_SumLogsSize(t *testing.T) { + dbs.NotifyReady() + + size, err := SharedLogDAO.SumLogsSize() + if err != nil { + t.Fatal(err) + } + t.Log("size:", size) +} diff --git a/internal/db/models/message_dao.go b/internal/db/models/message_dao.go index b9950fdf..b403d880 100644 --- a/internal/db/models/message_dao.go +++ b/internal/db/models/message_dao.go @@ -34,6 +34,7 @@ const ( MessageTypeSSLCertExpiring MessageType = "SSLCertExpiring" // SSL证书即将过期 MessageTypeSSLCertACMETaskFailed MessageType = "SSLCertACMETaskFailed" // SSL证书任务执行失败 MessageTypeSSLCertACMETaskSuccess MessageType = "SSLCertACMETaskSuccess" // SSL证书任务执行成功 + MessageTypeLogCapacityOverflow MessageType = "LogCapacityOverflow" // 日志超出最大限制 ) type MessageDAO dbs.DAO diff --git a/internal/rpc/services/service_log.go b/internal/rpc/services/service_log.go index c3fab2e9..a99939cb 100644 --- a/internal/rpc/services/service_log.go +++ b/internal/rpc/services/service_log.go @@ -87,3 +87,81 @@ func (this *LogService) ListLogs(ctx context.Context, req *pb.ListLogsRequest) ( return &pb.ListLogsResponse{Logs: result}, nil } + +// 删除单条 +func (this *LogService) DeleteLogPermanently(ctx context.Context, req *pb.DeleteLogPermanentlyRequest) (*pb.RPCSuccess, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + // TODO 校验权限 + + // 执行物理删除 + err = models.SharedLogDAO.DeleteLogPermanently(req.LogId) + if err != nil { + return nil, err + } + + return this.Success() +} + +// 批量删除 +func (this *LogService) DeleteLogsPermanently(ctx context.Context, req *pb.DeleteLogsPermanentlyRequest) (*pb.RPCSuccess, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + // TODO 校验权限 + + // 执行物理删除 + for _, logId := range req.LogIds { + err = models.SharedLogDAO.DeleteLogPermanently(logId) + if err != nil { + return nil, err + } + } + + return this.Success() +} + +// 清理日志 +func (this *LogService) CleanLogsPermanently(ctx context.Context, req *pb.CleanLogsPermanentlyRequest) (*pb.RPCSuccess, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + // TODO 校验权限 + + if req.ClearAll { + err = models.SharedLogDAO.DeleteAllLogsPermanently() + if err != nil { + return nil, err + } + } else if req.Days > 0 { + err = models.SharedLogDAO.DeleteLogsPermanentlyBeforeDays(int(req.Days)) + if err != nil { + return nil, err + } + } + + return this.Success() +} + +// 计算日志容量大小 +func (this *LogService) SumLogsSize(ctx context.Context, req *pb.SumLogsSizeRequest) (*pb.SumLogsResponse, error) { + _, err := this.ValidateAdmin(ctx, 0) + if err != nil { + return nil, err + } + + // TODO 校验权限 + + size, err := models.SharedLogDAO.SumLogsSize() + if err != nil { + return nil, err + } + return &pb.SumLogsResponse{SizeBytes: size}, nil +} diff --git a/internal/tasks/health_check_cluster_task.go b/internal/tasks/health_check_cluster_task.go index c68bd589..8bb8dc78 100644 --- a/internal/tasks/health_check_cluster_task.go +++ b/internal/tasks/health_check_cluster_task.go @@ -87,7 +87,7 @@ func (this *HealthCheckClusterTask) Stop() { // 单个循环任务 func (this *HealthCheckClusterTask) loop(seconds int64) error { // 检查上次运行时间,防止重复运行 - settingKey := models.SettingCodeClusterHealthCheck + numberutils.FormatInt64(this.clusterId) + settingKey := models.SettingCodeClusterHealthCheck + "Loop" + numberutils.FormatInt64(this.clusterId) timestamp := time.Now().Unix() c, err := models.SharedSysSettingDAO.CompareInt64Setting(settingKey, timestamp-seconds) if err != nil { diff --git a/internal/tasks/log_task.go b/internal/tasks/log_task.go new file mode 100644 index 00000000..5d9e84e0 --- /dev/null +++ b/internal/tasks/log_task.go @@ -0,0 +1,158 @@ +package tasks + +import ( + "encoding/json" + "fmt" + "github.com/TeaOSLab/EdgeAPI/internal/db/models" + "github.com/TeaOSLab/EdgeAPI/internal/utils" + "github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils" + "github.com/TeaOSLab/EdgeCommon/pkg/systemconfigs" + "github.com/iwind/TeaGo/dbs" + "github.com/iwind/TeaGo/logs" + "time" +) + +func init() { + dbs.OnReady(func() { + go NewLogTask().Run() + }) +} + +type LogTask struct { +} + +func NewLogTask() *LogTask { + return &LogTask{} +} + +func (this *LogTask) Run() { + go this.runClean() + go this.runMonitor() +} + +func (this *LogTask) runClean() { + ticker := utils.NewTicker(24 * time.Hour) + for ticker.Wait() { + err := this.loopClean(86400) + if err != nil { + logs.Println("[TASK][LOG]" + err.Error()) + } + } +} + +func (this *LogTask) loopClean(seconds int64) error { + // 检查上次运行时间,防止重复运行 + settingKey := "logTaskCleanLoop" + timestamp := time.Now().Unix() + c, err := models.SharedSysSettingDAO.CompareInt64Setting(settingKey, timestamp-seconds) + if err != nil { + return err + } + if c > 0 { + return nil + } + + // 记录时间 + err = models.SharedSysSettingDAO.UpdateSetting(settingKey, []byte(numberutils.FormatInt64(timestamp))) + if err != nil { + return err + } + + configKey := "adminLogConfig" + valueJSON, err := models.SharedSysSettingDAO.ReadSetting(configKey) + if err != nil { + return err + } + if len(valueJSON) == 0 { + return nil + } + + config := &systemconfigs.LogConfig{} + err = json.Unmarshal(valueJSON, config) + if err != nil { + return err + } + if config.Days > 0 { + err = models.SharedLogDAO.DeleteLogsPermanentlyBeforeDays(config.Days) + if err != nil { + return err + } + } + return nil +} + +func (this *LogTask) runMonitor() { + ticker := utils.NewTicker(1 * time.Minute) + for ticker.Wait() { + err := this.loopClean(60) + if err != nil { + logs.Println("[TASK][LOG]" + err.Error()) + } + } +} + +func (this *LogTask) loopMonitor(seconds int64) error { + // 检查上次运行时间,防止重复运行 + settingKey := "logTaskMonitorLoop" + timestamp := time.Now().Unix() + c, err := models.SharedSysSettingDAO.CompareInt64Setting(settingKey, timestamp-seconds) + if err != nil { + return err + } + if c > 0 { + return nil + } + + // 记录时间 + err = models.SharedSysSettingDAO.UpdateSetting(settingKey, []byte(numberutils.FormatInt64(timestamp))) + if err != nil { + return err + } + + configKey := "adminLogConfig" + valueJSON, err := models.SharedSysSettingDAO.ReadSetting(configKey) + if err != nil { + return err + } + if len(valueJSON) == 0 { + return nil + } + + config := &systemconfigs.LogConfig{} + err = json.Unmarshal(valueJSON, config) + if err != nil { + return err + } + + if config.Capacity != nil { + capacityBytes := config.Capacity.Bytes() + if capacityBytes > 0 { + sumBytes, err := models.SharedLogDAO.SumLogsSize() + if err != nil { + return err + } + if sumBytes > capacityBytes { + err := models.SharedMessageDAO.CreateMessage(0, 0, models.MessageTypeLogCapacityOverflow, models.MessageLevelError, "日志用量已经超出最大限制,当前的用量为"+this.formatBytes(sumBytes)+",而设置的最大容量为"+this.formatBytes(capacityBytes)+"。", nil) + if err != nil { + return err + } + } + } + } + + return nil +} + +func (this *LogTask) formatBytes(bytes int64) string { + sizeHuman := "" + if bytes < 1024 { + sizeHuman = numberutils.FormatInt64(bytes) + "字节" + } else if bytes < 1024*1024 { + sizeHuman = fmt.Sprintf("%.2fK", float64(bytes)/1024) + } else if bytes < 1024*1024*1024 { + sizeHuman = fmt.Sprintf("%.2fM", float64(bytes)/1024/1024) + } else { + sizeHuman = fmt.Sprintf("%.2fG", float64(bytes)/1024/1024/1024) + } + return sizeHuman +} diff --git a/internal/tasks/log_task_test.go b/internal/tasks/log_task_test.go new file mode 100644 index 00000000..ba03f433 --- /dev/null +++ b/internal/tasks/log_task_test.go @@ -0,0 +1,28 @@ +package tasks + +import ( + "github.com/iwind/TeaGo/dbs" + "testing" +) + +func TestLogTask_loopClean(t *testing.T) { + dbs.NotifyReady() + + task := NewLogTask() + err := task.loopClean(5) + if err != nil { + t.Fatal(err) + } + t.Log("ok") +} + +func TestLogTask_loopMonitor(t *testing.T) { + dbs.NotifyReady() + + task := NewLogTask() + err := task.loopMonitor(10) + if err != nil { + t.Fatal(err) + } + t.Log("ok") +} diff --git a/internal/tasks/node_monitor_task.go b/internal/tasks/node_monitor_task.go index e012f95f..c32a9540 100644 --- a/internal/tasks/node_monitor_task.go +++ b/internal/tasks/node_monitor_task.go @@ -40,7 +40,7 @@ func (this *NodeMonitorTask) Run() { func (this *NodeMonitorTask) loop() error { // 检查上次运行时间,防止重复运行 - settingKey := models.SettingCodeNodeMonitor + settingKey := models.SettingCodeNodeMonitor + "Loop" timestamp := time.Now().Unix() c, err := models.SharedSysSettingDAO.CompareInt64Setting(settingKey, timestamp-int64(this.intervalSeconds)) if err != nil { diff --git a/internal/tasks/ssl_cert_expire_check_executor.go b/internal/tasks/ssl_cert_expire_check_executor.go index 23559993..cfbc5c2e 100644 --- a/internal/tasks/ssl_cert_expire_check_executor.go +++ b/internal/tasks/ssl_cert_expire_check_executor.go @@ -40,7 +40,7 @@ func (this *SSLCertExpireCheckExecutor) Start() { // 单次执行 func (this *SSLCertExpireCheckExecutor) loop(seconds int64) error { // 检查上次运行时间,防止重复运行 - settingKey := "sslCertExpiringCheck" + settingKey := "sslCertExpiringCheckLoop" timestamp := time.Now().Unix() c, err := models.SharedSysSettingDAO.CompareInt64Setting(settingKey, timestamp-seconds) if err != nil {