From 0992fcbee5ff88392b159c64e76f2936e6aff32b Mon Sep 17 00:00:00 2001 From: GoEdgeLab Date: Tue, 20 Oct 2020 16:45:03 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=81=A5=E5=BA=B7=E6=A3=80?= =?UTF-8?q?=E6=9F=A5=E5=AE=9A=E6=97=B6=E4=BB=BB=E5=8A=A1/=E5=81=A5?= =?UTF-8?q?=E5=BA=B7=E6=A3=80=E6=9F=A5=E5=8F=AF=E4=BB=A5=E5=8F=91=E9=80=81?= =?UTF-8?q?=E6=B6=88=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 2 +- go.sum | 2 + internal/db/models/message_dao.go | 128 ++++++++++++++++ internal/db/models/message_dao_test.go | 25 +++ internal/db/models/message_model.go | 40 +++++ internal/db/models/message_model_ext.go | 1 + internal/db/models/sys_setting_dao.go | 23 ++- internal/db/models/sys_setting_dao_test.go | 16 +- internal/tasks/event_looper.go | 7 +- internal/tasks/health_check_cluster_task.go | 145 ++++++++++++++++++ .../tasks/health_check_cluster_task_test.go | 16 ++ internal/tasks/health_check_executor.go | 6 +- internal/tasks/health_check_task.go | 95 ++++++++++++ internal/tasks/message_task.go | 41 +++++ ...og_cleaner.go => node_log_cleaner_task.go} | 15 +- ..._test.go => node_log_cleaner_task_test.go} | 2 +- internal/utils/numberutils/utils.go | 11 ++ 17 files changed, 555 insertions(+), 20 deletions(-) create mode 100644 internal/db/models/message_dao.go create mode 100644 internal/db/models/message_dao_test.go create mode 100644 internal/db/models/message_model.go create mode 100644 internal/db/models/message_model_ext.go create mode 100644 internal/tasks/health_check_cluster_task.go create mode 100644 internal/tasks/health_check_cluster_task_test.go create mode 100644 internal/tasks/health_check_task.go create mode 100644 internal/tasks/message_task.go rename internal/tasks/{node_log_cleaner.go => node_log_cleaner_task.go} (61%) rename internal/tasks/{node_log_cleaner_test.go => node_log_cleaner_task_test.go} (81%) create mode 100644 internal/utils/numberutils/utils.go diff --git a/go.mod b/go.mod index a0d97a5c..0a21513a 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/go-sql-driver/mysql v1.5.0 github.com/go-yaml/yaml v2.1.0+incompatible github.com/golang/protobuf v1.4.2 - github.com/iwind/TeaGo v0.0.0-20201013075636-119886e49c04 + github.com/iwind/TeaGo v0.0.0-20201020081413-7cf62d6f420f github.com/pkg/sftp v1.12.0 golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a google.golang.org/grpc v1.32.0 diff --git a/go.sum b/go.sum index 6faca87d..1dff287d 100644 --- a/go.sum +++ b/go.sum @@ -66,6 +66,8 @@ github.com/iwind/TeaGo v0.0.0-20201010005321-430e836dee8a h1:sO6uDbQOEe6/tIB3o31 github.com/iwind/TeaGo v0.0.0-20201010005321-430e836dee8a/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc= github.com/iwind/TeaGo v0.0.0-20201013075636-119886e49c04 h1:QXRSB5x9pJFZLC7dCxDx9CRXAx9en1Uk3QdfzC8wMC8= github.com/iwind/TeaGo v0.0.0-20201013075636-119886e49c04/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc= +github.com/iwind/TeaGo v0.0.0-20201020081413-7cf62d6f420f h1:6Ws2H+eorfVUoMO2jta6A9nIdh8oi5/5LXo/LkAxR+E= +github.com/iwind/TeaGo v0.0.0-20201020081413-7cf62d6f420f/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc= github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8= diff --git a/internal/db/models/message_dao.go b/internal/db/models/message_dao.go new file mode 100644 index 00000000..f5c05c65 --- /dev/null +++ b/internal/db/models/message_dao.go @@ -0,0 +1,128 @@ +package models + +import ( + "crypto/md5" + "fmt" + _ "github.com/go-sql-driver/mysql" + "github.com/iwind/TeaGo/Tea" + "github.com/iwind/TeaGo/dbs" + "github.com/iwind/TeaGo/types" + timeutil "github.com/iwind/TeaGo/utils/time" + "time" +) + +const ( + MessageStateEnabled = 1 // 已启用 + MessageStateDisabled = 0 // 已禁用 + + MessageLevelInfo = "info" + MessageLevelWarning = "warning" + MessageLevelError = "error" +) + +type MessageType = string + +const ( + MessageTypeHealthCheckFail MessageType = "HealthCheckFail" +) + +type MessageDAO dbs.DAO + +func NewMessageDAO() *MessageDAO { + return dbs.NewDAO(&MessageDAO{ + DAOObject: dbs.DAOObject{ + DB: Tea.Env, + Table: "edgeMessages", + Model: new(Message), + PkName: "id", + }, + }).(*MessageDAO) +} + +var SharedMessageDAO *MessageDAO + +func init() { + dbs.OnReady(func() { + SharedMessageDAO = NewMessageDAO() + }) +} + +// 启用条目 +func (this *MessageDAO) EnableMessage(id int64) error { + _, err := this.Query(). + Pk(id). + Set("state", MessageStateEnabled). + Update() + return err +} + +// 禁用条目 +func (this *MessageDAO) DisableMessage(id int64) error { + _, err := this.Query(). + Pk(id). + Set("state", MessageStateDisabled). + Update() + return err +} + +// 查找启用中的条目 +func (this *MessageDAO) FindEnabledMessage(id int64) (*Message, error) { + result, err := this.Query(). + Pk(id). + Attr("state", MessageStateEnabled). + Find() + if result == nil { + return nil, err + } + return result.(*Message), err +} + +// 创建集群消息 +func (this *MessageDAO) CreateClusterMessage(clusterId int64, messageType MessageType, level string, body string, paramsJSON []byte) error { + _, err := this.createMessage(clusterId, 0, messageType, level, body, paramsJSON) + return err +} + +// 删除某天之前的消息 +func (this *MessageDAO) DeleteMessagesBeforeDay(dayTime time.Time) error { + day := timeutil.Format("Ymd", dayTime) + _, err := this.Query(). + Where("day<:day"). + Param("day", day). + Delete() + return err +} + +// 创建消息 +func (this *MessageDAO) createMessage(clusterId int64, nodeId int64, messageType MessageType, level string, body string, paramsJSON []byte) (int64, error) { + h := md5.New() + h.Write([]byte(body)) + h.Write(paramsJSON) + hash := fmt.Sprintf("%x", h.Sum(nil)) + + // TODO 检查同样的消息最近是否发送过 + + // 创建新消息 + op := NewMessageOperator() + op.AdminId = 0 // TODO + op.UserId = 0 // TODO + op.ClusterId = clusterId + op.NodeId = nodeId + op.Type = messageType + op.Level = level + op.Body = body + if len(paramsJSON) > 0 { + op.Params = paramsJSON + } + op.IsRead = false + op.State = MessageStateEnabled + op.CreatedAt = time.Now().Unix() + op.Day = timeutil.Format("Ymd") + op.Hash = hash + + _, err := this.Save(op) + if err != nil { + return 0, err + } + return types.Int64(op.Id), nil +} diff --git a/internal/db/models/message_dao_test.go b/internal/db/models/message_dao_test.go new file mode 100644 index 00000000..27539fa5 --- /dev/null +++ b/internal/db/models/message_dao_test.go @@ -0,0 +1,25 @@ +package models + +import ( + _ "github.com/go-sql-driver/mysql" + "testing" + "time" +) + +func TestMessageDAO_CreateClusterMessage(t *testing.T) { + dao := NewMessageDAO() + err := dao.CreateClusterMessage(1, "test", "error", "123", []byte("456")) + if err != nil { + t.Fatal(err) + } + t.Log("ok") +} + +func TestMessageDAO_DeleteMessagesBeforeDay(t *testing.T) { + dao := NewMessageDAO() + err := dao.DeleteMessagesBeforeDay(time.Now()) + if err != nil { + t.Fatal(err) + } + t.Log("ok") +} diff --git a/internal/db/models/message_model.go b/internal/db/models/message_model.go new file mode 100644 index 00000000..c79305cc --- /dev/null +++ b/internal/db/models/message_model.go @@ -0,0 +1,40 @@ +package models + +// 消息通知 +type Message struct { + Id uint64 `field:"id"` // ID + AdminId uint32 `field:"adminId"` // 管理员ID + UserId uint32 `field:"userId"` // 用户ID + ClusterId uint32 `field:"clusterId"` // 集群ID + NodeId uint32 `field:"nodeId"` // 节点ID + Level string `field:"level"` // 级别 + Body string `field:"body"` // 内容 + Type string `field:"type"` // 消息类型 + Params string `field:"params"` // 额外的参数 + IsRead uint8 `field:"isRead"` // 是否已读 + State uint8 `field:"state"` // 状态 + CreatedAt uint64 `field:"createdAt"` // 创建时间 + Day string `field:"day"` // 日期YYYYMMDD + Hash string `field:"hash"` // 消息内容的Hash +} + +type MessageOperator struct { + Id interface{} // ID + AdminId interface{} // 管理员ID + UserId interface{} // 用户ID + ClusterId interface{} // 集群ID + NodeId interface{} // 节点ID + Level interface{} // 级别 + Body interface{} // 内容 + Type interface{} // 消息类型 + Params interface{} // 额外的参数 + IsRead interface{} // 是否已读 + State interface{} // 状态 + CreatedAt interface{} // 创建时间 + Day interface{} // 日期YYYYMMDD + Hash interface{} // 消息内容的Hash +} + +func NewMessageOperator() *MessageOperator { + return &MessageOperator{} +} diff --git a/internal/db/models/message_model_ext.go b/internal/db/models/message_model_ext.go new file mode 100644 index 00000000..2640e7f9 --- /dev/null +++ b/internal/db/models/message_model_ext.go @@ -0,0 +1 @@ +package models diff --git a/internal/db/models/sys_setting_dao.go b/internal/db/models/sys_setting_dao.go index 8ef37a37..4b636e95 100644 --- a/internal/db/models/sys_setting_dao.go +++ b/internal/db/models/sys_setting_dao.go @@ -5,6 +5,7 @@ import ( _ "github.com/go-sql-driver/mysql" "github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/dbs" + "github.com/iwind/TeaGo/types" ) type SysSettingDAO dbs.DAO @@ -80,9 +81,9 @@ func (this *SysSettingDAO) UpdateSetting(codeFormat string, valueJSON []byte, co } // 读取配置 -func (this *SysSettingDAO) ReadSetting(code string, args ...interface{}) (valueJSON []byte, err error) { - if len(args) > 0 { - code = fmt.Sprintf(code, args...) +func (this *SysSettingDAO) ReadSetting(code string, codeFormatArgs ...interface{}) (valueJSON []byte, err error) { + if len(codeFormatArgs) > 0 { + code = fmt.Sprintf(code, codeFormatArgs...) } col, err := this.Query(). Attr("code", code). @@ -90,3 +91,19 @@ func (this *SysSettingDAO) ReadSetting(code string, args ...interface{}) (valueJ FindStringCol("") return []byte(col), err } + +// 对比配置中的数字大小 +func (this *SysSettingDAO) CompareInt64Setting(code string, anotherValue int64) (int8, error) { + valueJSON, err := this.ReadSetting(code) + if err != nil { + return 0, err + } + value := types.Int64(string(valueJSON)) + if value > anotherValue { + return 1, nil + } + if value < anotherValue { + return -1, nil + } + return 0, nil +} diff --git a/internal/db/models/sys_setting_dao_test.go b/internal/db/models/sys_setting_dao_test.go index 571233b8..b4f18eb0 100644 --- a/internal/db/models/sys_setting_dao_test.go +++ b/internal/db/models/sys_setting_dao_test.go @@ -6,12 +6,12 @@ import ( ) func TestSysSettingDAO_UpdateSetting(t *testing.T) { - err := SharedSysSettingDAO.UpdateSetting("hello", []byte(`"world"`)) + err := NewSysSettingDAO().UpdateSetting("hello", []byte(`"world"`)) if err != nil { t.Fatal(err) } - value, err := SharedSysSettingDAO.ReadSetting("hello") + value, err := NewSysSettingDAO().ReadSetting("hello") if err != nil { t.Fatal(err) } @@ -19,14 +19,22 @@ func TestSysSettingDAO_UpdateSetting(t *testing.T) { } func TestSysSettingDAO_UpdateSetting_Args(t *testing.T) { - err := SharedSysSettingDAO.UpdateSetting("hello %d", []byte(`"world 123"`), 123) + err := NewSysSettingDAO().UpdateSetting("hello %d", []byte(`"world 123"`), 123) if err != nil { t.Fatal(err) } - value, err := SharedSysSettingDAO.ReadSetting("hello %d", 123) + value, err := NewSysSettingDAO().ReadSetting("hello %d", 123) if err != nil { t.Fatal(err) } t.Log("value:", string(value)) } + +func TestSysSettingDAO_CompareInt64Setting(t *testing.T) { + i, err := NewSysSettingDAO().CompareInt64Setting("int64", 1024) + if err != nil { + t.Fatal(err) + } + t.Log("result:", i) +} diff --git a/internal/tasks/event_looper.go b/internal/tasks/event_looper.go index 0e6c3583..90707d77 100644 --- a/internal/tasks/event_looper.go +++ b/internal/tasks/event_looper.go @@ -2,13 +2,16 @@ package tasks import ( "github.com/TeaOSLab/EdgeAPI/internal/db/models" + "github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/logs" "time" ) func init() { - looper := NewEventLooper() - go looper.Start() + dbs.OnReady(func() { + looper := NewEventLooper() + go looper.Start() + }) } type EventLooper struct { diff --git a/internal/tasks/health_check_cluster_task.go b/internal/tasks/health_check_cluster_task.go new file mode 100644 index 00000000..d49098b4 --- /dev/null +++ b/internal/tasks/health_check_cluster_task.go @@ -0,0 +1,145 @@ +package tasks + +import ( + "bytes" + "encoding/json" + "github.com/TeaOSLab/EdgeAPI/internal/db/models" + "github.com/TeaOSLab/EdgeAPI/internal/utils" + "github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils" + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "github.com/iwind/TeaGo/logs" + "github.com/iwind/TeaGo/maps" + "time" +) + +// 单个集群的健康检查任务 +type HealthCheckClusterTask struct { + clusterId int64 + config *serverconfigs.HealthCheckConfig + ticker *utils.Ticker +} + +// 创建新任务 +func NewHealthCheckClusterTask(clusterId int64, config *serverconfigs.HealthCheckConfig) *HealthCheckClusterTask { + return &HealthCheckClusterTask{ + clusterId: clusterId, + config: config, + } +} + +// 重置配置 +func (this *HealthCheckClusterTask) Reset(config *serverconfigs.HealthCheckConfig) { + // 检查是否有变化 + oldJSON, err := json.Marshal(this.config) + if err != nil { + logs.Println("[TASK][HEALTH_CHECK]" + err.Error()) + return + } + newJSON, err := json.Marshal(config) + if err != nil { + logs.Println("[TASK][HEALTH_CHECK]" + err.Error()) + return + } + if bytes.Compare(oldJSON, newJSON) != 0 { + this.config = config + this.Run() + } +} + +// 执行 +func (this *HealthCheckClusterTask) Run() { + this.Stop() + + if this.config == nil { + return + } + if !this.config.IsOn { + return + } + if this.config.Interval == nil { + return + } + duration := this.config.Interval.Duration() + if duration <= 0 { + return + } + ticker := utils.NewTicker(duration) + go func() { + for ticker.Wait() { + err := this.loop(int64(duration.Seconds())) + if err != nil { + logs.Println("[TASK][HEALTH_CHECK]" + err.Error()) + } + } + }() + this.ticker = ticker +} + +// 停止 +func (this *HealthCheckClusterTask) Stop() { + if this.ticker == nil { + return + } + this.ticker.Stop() + this.ticker = nil +} + +// 单个循环任务 +func (this *HealthCheckClusterTask) loop(seconds int64) error { + // 检查上次运行时间,防止重复运行 + settingKey := "cluster_health_check_%d" + numberutils.FormatInt64(this.clusterId) + timestamp := time.Now().Unix() + c, err := models.SharedSysSettingDAO.CompareInt64Setting(settingKey, timestamp-seconds) + if err != nil { + return err + } + if c > 0 { + return nil + } + + // 记录时间 + err = models.SharedSysSettingDAO.UpdateSetting(settingKey, []byte(numberutils.FormatInt64(timestamp))) + if err != nil { + return err + } + + // 开始运行 + executor := NewHealthCheckExecutor(this.clusterId) + results, err := executor.Run() + if err != nil { + return err + } + + failedResults := []maps.Map{} + for _, result := range results { + if !result.IsOk { + failedResults = append(failedResults, maps.Map{ + "node": maps.Map{ + "id": result.Node.Id, + "name": result.Node.Name, + }, + "isOk": false, + "error": result.Error, + "nodeAddr": result.NodeAddr, + }) + } + } + + if len(failedResults) > 0 { + failedResultsJSON, err := json.Marshal(failedResults) + if err != nil { + return err + } + err = models.NewMessageDAO().CreateClusterMessage(this.clusterId, models.MessageTypeHealthCheckFail, models.MessageLevelError, "有"+numberutils.FormatInt(len(failedResults))+"个节点在健康检查中出现问题", failedResultsJSON) + if err != nil { + return err + } + } + + return nil +} + +// 获取当前配置 +func (this *HealthCheckClusterTask) Config() *serverconfigs.HealthCheckConfig { + return this.config +} diff --git a/internal/tasks/health_check_cluster_task_test.go b/internal/tasks/health_check_cluster_task_test.go new file mode 100644 index 00000000..d07d4244 --- /dev/null +++ b/internal/tasks/health_check_cluster_task_test.go @@ -0,0 +1,16 @@ +package tasks + +import ( + "github.com/iwind/TeaGo/dbs" + "testing" +) + +func TestHealthCheckClusterTask_loop(t *testing.T) { + dbs.NotifyReady() + task := NewHealthCheckClusterTask(10, nil) + err := task.loop(10) + if err != nil { + t.Fatal(err) + } + t.Log("ok") +} diff --git a/internal/tasks/health_check_executor.go b/internal/tasks/health_check_executor.go index 25a11891..51da7d41 100644 --- a/internal/tasks/health_check_executor.go +++ b/internal/tasks/health_check_executor.go @@ -24,7 +24,7 @@ func NewHealthCheckExecutor(clusterId int64) *HealthCheckExecutor { } func (this *HealthCheckExecutor) Run() ([]*HealthCheckResult, error) { - cluster, err := models.SharedNodeClusterDAO.FindEnabledNodeCluster(this.clusterId) + cluster, err := models.NewNodeClusterDAO().FindEnabledNodeCluster(this.clusterId) if err != nil { return nil, err } @@ -42,7 +42,7 @@ func (this *HealthCheckExecutor) Run() ([]*HealthCheckResult, error) { } results := []*HealthCheckResult{} - nodes, err := models.SharedNodeDAO.FindAllEnabledNodesWithClusterId(this.clusterId) + nodes, err := models.NewNodeDAO().FindAllEnabledNodesWithClusterId(this.clusterId) if err != nil { return nil, err } @@ -54,7 +54,7 @@ func (this *HealthCheckExecutor) Run() ([]*HealthCheckResult, error) { Node: node, } - addresses, err := models.SharedNodeIPAddressDAO.FindAllEnabledAddressesWithNode(int64(node.Id)) + addresses, err := models.NewNodeIPAddressDAO().FindAllEnabledAddressesWithNode(int64(node.Id)) if err != nil { return nil, err } diff --git a/internal/tasks/health_check_task.go b/internal/tasks/health_check_task.go new file mode 100644 index 00000000..96dae9d2 --- /dev/null +++ b/internal/tasks/health_check_task.go @@ -0,0 +1,95 @@ +package tasks + +import ( + "bytes" + "encoding/json" + "github.com/TeaOSLab/EdgeAPI/internal/db/models" + "github.com/TeaOSLab/EdgeAPI/internal/utils" + "github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils" + "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" + "github.com/iwind/TeaGo/dbs" + "github.com/iwind/TeaGo/lists" + "github.com/iwind/TeaGo/logs" + "time" +) + +func init() { + dbs.OnReady(func() { + go NewHealthCheckTask().Run() + }) +} + +type HealthCheckTask struct { + tasksMap map[int64]*HealthCheckClusterTask // taskId => task +} + +func NewHealthCheckTask() *HealthCheckTask { + return &HealthCheckTask{ + tasksMap: map[int64]*HealthCheckClusterTask{}, + } +} + +func (this *HealthCheckTask) Run() { + err := this.loop() + if err != nil { + logs.Println("[TASK][HEALTH_CHECK]" + err.Error()) + } + + ticker := utils.NewTicker(60 * time.Second) + for ticker.Wait() { + err := this.loop() + if err != nil { + logs.Println("[TASK][HEALTH_CHECK]" + err.Error()) + } + } +} + +func (this *HealthCheckTask) loop() error { + clusters, err := models.NewNodeClusterDAO().FindAllEnableClusters() + if err != nil { + return err + } + clusterIds := []int64{} + for _, cluster := range clusters { + clusterIds = append(clusterIds, int64(cluster.Id)) + } + + // 停掉删除的 + for clusterId, task := range this.tasksMap { + if !lists.ContainsInt64(clusterIds, clusterId) { + task.Stop() + delete(this.tasksMap, clusterId) + } + } + + // 启动新的或更新老的 + for _, cluster := range clusters { + clusterId := int64(cluster.Id) + + config := &serverconfigs.HealthCheckConfig{} + if len(cluster.HealthCheck) > 0 && cluster.HealthCheck != "null" { + err = json.Unmarshal([]byte(cluster.HealthCheck), config) + if err != nil { + logs.Println("[TASK][HEALTH_CHECK]" + err.Error()) + continue + } + } + + task, ok := this.tasksMap[clusterId] + if ok { + // 检查是否有变化 + newJSON, _ := json.Marshal(config) + oldJSON, _ := json.Marshal(task.Config()) + if bytes.Compare(oldJSON, newJSON) != 0 { + logs.Println("[TASK][HEALTH_CHECK]update cluster '" + numberutils.FormatInt64(clusterId) + "'") + go task.Reset(config) + } + } else { + task := NewHealthCheckClusterTask(clusterId, config) + this.tasksMap[clusterId] = task + go task.Run() + } + } + + return nil +} diff --git a/internal/tasks/message_task.go b/internal/tasks/message_task.go new file mode 100644 index 00000000..e7aab11f --- /dev/null +++ b/internal/tasks/message_task.go @@ -0,0 +1,41 @@ +package tasks + +import ( + "github.com/TeaOSLab/EdgeAPI/internal/db/models" + "github.com/TeaOSLab/EdgeAPI/internal/utils" + "github.com/iwind/TeaGo/dbs" + "github.com/iwind/TeaGo/logs" + "time" +) + +func init() { + dbs.OnReady(func() { + go NewMessageTask().Run() + }) +} + +// 消息相关任务 +type MessageTask struct { +} + +// 获取新对象 +func NewMessageTask() *MessageTask { + return &MessageTask{} +} + +// 运行 +func (this *MessageTask) Run() { + ticker := utils.NewTicker(24 * time.Hour) + for ticker.Wait() { + err := this.loop() + if err != nil { + logs.Println("[TASK][MESSAGE]" + err.Error()) + } + } +} + +// 单次运行 +func (this *MessageTask) loop() error { + dayTime := time.Now().AddDate(0, 0, -30) // TODO 这个30天应该可以在界面上设置 + return models.NewMessageDAO().DeleteMessagesBeforeDay(dayTime) +} diff --git a/internal/tasks/node_log_cleaner.go b/internal/tasks/node_log_cleaner_task.go similarity index 61% rename from internal/tasks/node_log_cleaner.go rename to internal/tasks/node_log_cleaner_task.go index 42f7746d..1e0d952d 100644 --- a/internal/tasks/node_log_cleaner.go +++ b/internal/tasks/node_log_cleaner_task.go @@ -2,26 +2,29 @@ package tasks import ( "github.com/TeaOSLab/EdgeAPI/internal/db/models" + "github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/logs" "time" ) func init() { - go NewNodeLogCleaner().Start() + dbs.OnReady(func() { + go NewNodeLogCleanerTask().Start() + }) } // 清理节点日志的工具 -type NodeLogCleaner struct { +type NodeLogCleanerTask struct { duration time.Duration } -func NewNodeLogCleaner() *NodeLogCleaner { - return &NodeLogCleaner{ +func NewNodeLogCleanerTask() *NodeLogCleanerTask { + return &NodeLogCleanerTask{ duration: 24 * time.Hour, } } -func (this *NodeLogCleaner) Start() { +func (this *NodeLogCleanerTask) Start() { ticker := time.NewTicker(this.duration) for range ticker.C { err := this.loop() @@ -31,7 +34,7 @@ func (this *NodeLogCleaner) Start() { } } -func (this *NodeLogCleaner) loop() error { +func (this *NodeLogCleanerTask) loop() error { // TODO 30天这个数值改成可以设置 return models.SharedNodeLogDAO.DeleteExpiredLogs(30) } diff --git a/internal/tasks/node_log_cleaner_test.go b/internal/tasks/node_log_cleaner_task_test.go similarity index 81% rename from internal/tasks/node_log_cleaner_test.go rename to internal/tasks/node_log_cleaner_task_test.go index 98fba778..0d81449a 100644 --- a/internal/tasks/node_log_cleaner_test.go +++ b/internal/tasks/node_log_cleaner_task_test.go @@ -3,7 +3,7 @@ package tasks import "testing" func TestNodeLogCleaner_loop(t *testing.T) { - cleaner := &NodeLogCleaner{} + cleaner := &NodeLogCleanerTask{} err := cleaner.loop() if err != nil { t.Fatal(err) diff --git a/internal/utils/numberutils/utils.go b/internal/utils/numberutils/utils.go new file mode 100644 index 00000000..b7cd4b35 --- /dev/null +++ b/internal/utils/numberutils/utils.go @@ -0,0 +1,11 @@ +package numberutils + +import "strconv" + +func FormatInt64(value int64) string { + return strconv.FormatInt(value, 10) +} + +func FormatInt(value int) string { + return strconv.Itoa(value) +}