多个API节点时选择一个作为主节点/优化任务相关代码

This commit is contained in:
GoEdgeLab
2022-04-23 12:32:30 +08:00
parent 7a961f5a24
commit 777732d98d
35 changed files with 467 additions and 350 deletions

View File

@@ -7,7 +7,6 @@ import (
"github.com/TeaOSLab/EdgeAPI/internal/dnsclients"
"github.com/TeaOSLab/EdgeAPI/internal/dnsclients/dnstypes"
"github.com/TeaOSLab/EdgeAPI/internal/goman"
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
"github.com/TeaOSLab/EdgeAPI/internal/utils"
"github.com/TeaOSLab/EdgeCommon/pkg/dnsconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
@@ -21,41 +20,42 @@ import (
func init() {
dbs.OnReadyDone(func() {
goman.New(func() {
NewDNSTaskExecutor().Start()
NewDNSTaskExecutor(10 * time.Second).Start()
})
})
}
// DNSTaskExecutor DNS任务执行器
type DNSTaskExecutor struct {
BaseTask
ticker *time.Ticker
}
func NewDNSTaskExecutor() *DNSTaskExecutor {
return &DNSTaskExecutor{}
func NewDNSTaskExecutor(duration time.Duration) *DNSTaskExecutor {
return &DNSTaskExecutor{
ticker: time.NewTicker(duration),
}
}
func (this *DNSTaskExecutor) Start() {
ticker := time.NewTicker(10 * time.Second)
for range ticker.C {
err := this.LoopWithLocker(10)
for range this.ticker.C {
err := this.Loop()
if err != nil {
remotelogs.Error("DNSTaskExecutor", err.Error())
this.logErr("DNSTaskExecutor", err.Error())
}
}
}
func (this *DNSTaskExecutor) LoopWithLocker(seconds int64) error {
ok, err := models.SharedSysLockerDAO.Lock(nil, "dns_task_executor", seconds-1) // 假设执行时间为1秒
if err != nil {
return err
}
if !ok {
func (this *DNSTaskExecutor) Loop() error {
if !models.SharedAPINodeDAO.CheckAPINodeIsPrimaryWithoutErr() {
return nil
}
return this.Loop()
return this.loop()
}
func (this *DNSTaskExecutor) Loop() error {
func (this *DNSTaskExecutor) loop() error {
tasks, err := dnsmodels.SharedDNSTaskDAO.FindAllDoingTasks(nil)
if err != nil {
return err
@@ -118,7 +118,7 @@ func (this *DNSTaskExecutor) doServer(taskId int64, oldClusterId int64, serverId
if isOk {
err := dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskDone(tx, taskId)
if err != nil {
remotelogs.Error("DNSTaskExecutor", err.Error())
this.logErr("DNSTaskExecutor", err.Error())
}
}
}()
@@ -275,7 +275,7 @@ func (this *DNSTaskExecutor) doNode(taskId int64, nodeId int64) error {
if isOk {
err := dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskDone(nil, taskId)
if err != nil {
remotelogs.Error("DNSTaskExecutor", err.Error())
this.logErr("DNSTaskExecutor", err.Error())
}
}
}()
@@ -314,7 +314,7 @@ func (this *DNSTaskExecutor) doCluster(taskId int64, clusterId int64) error {
if isOk {
err := dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskDone(nil, taskId)
if err != nil {
remotelogs.Error("DNSTaskExecutor", err.Error())
this.logErr("DNSTaskExecutor", err.Error())
}
}
}()
@@ -519,7 +519,7 @@ func (this *DNSTaskExecutor) doClusterRemove(taskId int64, clusterId int64, doma
if isOk {
err := dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskDone(nil, taskId)
if err != nil {
remotelogs.Error("DNSTaskExecutor", err.Error())
this.logErr("DNSTaskExecutor", err.Error())
}
}
}()
@@ -603,7 +603,7 @@ func (this *DNSTaskExecutor) doDomainWithTask(taskId int64, domainId int64) erro
if taskId > 0 {
err := dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskDone(tx, taskId)
if err != nil {
remotelogs.Error("DNSTaskExecutor", err.Error())
this.logErr("DNSTaskExecutor", err.Error())
}
}
}
@@ -634,7 +634,7 @@ func (this *DNSTaskExecutor) doDomainWithTask(taskId int64, domainId int64) erro
manager := dnsclients.FindProvider(provider.Type)
if manager == nil {
remotelogs.Error("DNSTaskExecutor", "unsupported dns provider type '"+provider.Type+"'")
this.logErr("DNSTaskExecutor", "unsupported dns provider type '"+provider.Type+"'")
isOk = true
return nil
}
@@ -711,7 +711,7 @@ func (this *DNSTaskExecutor) findDNSManagerWithDomainId(tx *dbs.Tx, domainId int
var manager = dnsclients.FindProvider(provider.Type)
if manager == nil {
remotelogs.Error("DNSTaskExecutor", "unsupported dns provider type '"+provider.Type+"'")
this.logErr("DNSTaskExecutor", "unsupported dns provider type '"+provider.Type+"'")
return nil, nil, nil
}
params, err := provider.DecodeAPIParams()

View File

@@ -1,15 +1,17 @@
package tasks
package tasks_test
import (
"github.com/TeaOSLab/EdgeAPI/internal/tasks"
"github.com/iwind/TeaGo/dbs"
"testing"
"time"
)
func TestDNSTaskExecutor_Loop(t *testing.T) {
dbs.NotifyReady()
executor := NewDNSTaskExecutor()
err := executor.Loop()
var task = tasks.NewDNSTaskExecutor(10 * time.Second)
err := task.Loop()
if err != nil {
t.Fatal(err)
}

View File

@@ -4,50 +4,41 @@ import (
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/goman"
"github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/logs"
"time"
)
func init() {
dbs.OnReadyDone(func() {
looper := NewEventLooper()
goman.New(func() {
looper.Start()
NewEventLooper(2 * time.Second).Start()
})
})
}
// EventLooper 事件相关处理程序
type EventLooper struct {
BaseTask
ticker *time.Ticker
}
func NewEventLooper() *EventLooper {
return &EventLooper{}
func NewEventLooper(duration time.Duration) *EventLooper {
return &EventLooper{
ticker: time.NewTicker(duration),
}
}
func (this *EventLooper) Start() {
ticker := time.NewTicker(2 * time.Second)
for range ticker.C {
err := this.loop()
for range this.ticker.C {
err := this.Loop()
if err != nil {
logs.Println("[EVENT_LOOPER]" + err.Error())
this.logErr("EventLooper", err.Error())
}
}
}
func (this *EventLooper) loop() error {
lockerKey := "eventLooper"
isOk, err := models.SharedSysLockerDAO.Lock(nil, lockerKey, 3600)
if err != nil {
return err
}
defer func() {
err = models.SharedSysLockerDAO.Unlock(nil, lockerKey)
if err != nil {
logs.Println("[EVENT_LOOPER]" + err.Error())
}
}()
if !isOk {
func (this *EventLooper) Loop() error {
if !models.SharedAPINodeDAO.CheckAPINodeIsPrimaryWithoutErr() {
return nil
}
@@ -58,12 +49,12 @@ func (this *EventLooper) loop() error {
for _, eventOne := range events {
event, err := eventOne.DecodeEvent()
if err != nil {
logs.Println("[EVENT_LOOPER]" + err.Error())
this.logErr("EventLooper", err.Error())
continue
}
err = event.Run()
if err != nil {
logs.Println("[EVENT_LOOPER]" + err.Error())
this.logErr("EventLooper", err.Error())
continue
}
err = models.SharedSysEventDAO.DeleteEvent(nil, int64(eventOne.Id))

View File

@@ -9,14 +9,14 @@ import (
"github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/systemconfigs"
"github.com/iwind/TeaGo/logs"
"github.com/iwind/TeaGo/maps"
"time"
)
// HealthCheckClusterTask 单个集群的健康检查任务
type HealthCheckClusterTask struct {
BaseTask
clusterId int64
config *serverconfigs.HealthCheckConfig
ticker *utils.Ticker
@@ -37,12 +37,12 @@ func (this *HealthCheckClusterTask) Reset(config *serverconfigs.HealthCheckConfi
// 检查是否有变化
oldJSON, err := json.Marshal(this.config)
if err != nil {
logs.Println("[TASK][HEALTH_CHECK]" + err.Error())
this.logErr("HealthCheckClusterTask", err.Error())
return
}
newJSON, err := json.Marshal(config)
if err != nil {
logs.Println("[TASK][HEALTH_CHECK]" + err.Error())
this.logErr("HealthCheckClusterTask", err.Error())
return
}
if bytes.Compare(oldJSON, newJSON) != 0 {
@@ -71,9 +71,9 @@ func (this *HealthCheckClusterTask) Run() {
ticker := utils.NewTicker(duration)
goman.New(func() {
for ticker.Wait() {
err := this.loop(int64(duration.Seconds()))
err := this.Loop()
if err != nil {
logs.Println("[TASK][HEALTH_CHECK]" + err.Error())
this.logErr("HealthCheckClusterTask", err.Error())
}
}
})
@@ -89,33 +89,21 @@ func (this *HealthCheckClusterTask) Stop() {
this.ticker = nil
}
// 单个循环任务
func (this *HealthCheckClusterTask) loop(seconds int64) error {
// 检查上次运行时间,防止重复运行
settingKey := systemconfigs.SettingCodeClusterHealthCheck + "Loop" + numberutils.FormatInt64(this.clusterId)
timestamp := time.Now().Unix()
c, err := models.SharedSysSettingDAO.CompareInt64Setting(nil, settingKey, timestamp-seconds)
if err != nil {
return err
}
if c > 0 {
// Loop 单个循环任务
func (this *HealthCheckClusterTask) Loop() error {
// 检查是否为主节点
if !models.SharedAPINodeDAO.CheckAPINodeIsPrimaryWithoutErr() {
return nil
}
// 记录时间
err = models.SharedSysSettingDAO.UpdateSetting(nil, settingKey, []byte(numberutils.FormatInt64(timestamp)))
if err != nil {
return err
}
// 开始运行
executor := NewHealthCheckExecutor(this.clusterId)
var executor = NewHealthCheckExecutor(this.clusterId)
results, err := executor.Run()
if err != nil {
return err
}
failedResults := []maps.Map{}
var failedResults = []maps.Map{}
for _, result := range results {
if !result.IsOk {
failedResults = append(failedResults, maps.Map{
@@ -139,7 +127,7 @@ func (this *HealthCheckClusterTask) loop(seconds int64) error {
if err != nil {
return err
}
message := "有" + numberutils.FormatInt(len(failedResults)) + "个节点在健康检查中出现问题"
var message = "有" + numberutils.FormatInt(len(failedResults)) + "个节点在健康检查中出现问题"
err = models.NewMessageDAO().CreateClusterMessage(nil, nodeconfigs.NodeRoleNode, this.clusterId, models.MessageTypeHealthCheckFailed, models.MessageLevelError, message, message, failedResultsJSON)
if err != nil {
return err

View File

@@ -1,14 +1,15 @@
package tasks
package tasks_test
import (
"github.com/TeaOSLab/EdgeAPI/internal/tasks"
"github.com/iwind/TeaGo/dbs"
"testing"
)
func TestHealthCheckClusterTask_loop(t *testing.T) {
func TestHealthCheckClusterTask_Loop(t *testing.T) {
dbs.NotifyReady()
task := NewHealthCheckClusterTask(10, nil)
err := task.loop(10)
var task = tasks.NewHealthCheckClusterTask(10, nil)
err := task.Loop()
if err != nil {
t.Fatal(err)
}

View File

@@ -7,7 +7,6 @@ import (
teaconst "github.com/TeaOSLab/EdgeAPI/internal/const"
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
"github.com/TeaOSLab/EdgeAPI/internal/utils"
"github.com/TeaOSLab/EdgeCommon/pkg/configutils"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
@@ -25,6 +24,8 @@ import (
)
type HealthCheckExecutor struct {
BaseTask
clusterId int64
}
@@ -154,7 +155,7 @@ func (this *HealthCheckExecutor) runNode(healthCheckConfig *serverconfigs.Health
if teaconst.IsPlus {
isChanged, err := models.SharedNodeIPAddressDAO.UpdateAddressHealthCount(nil, result.NodeAddrId, result.IsOk, healthCheckConfig.CountUp, healthCheckConfig.CountDown, healthCheckConfig.AutoDown)
if err != nil {
remotelogs.Error("HEALTH_CHECK_EXECUTOR", err.Error())
this.logErr("HealthCheckExecutor", err.Error())
return
}
@@ -175,14 +176,14 @@ func (this *HealthCheckExecutor) runNode(healthCheckConfig *serverconfigs.Health
err = models.NewMessageDAO().CreateNodeMessage(nil, nodeconfigs.NodeRoleNode, this.clusterId, int64(result.Node.Id), messageType, messageLevel, message, message, nil, false)
if err != nil {
remotelogs.Error("HEALTH_CHECK_EXECUTOR", err.Error())
this.logErr("HealthCheckExecutor", err.Error())
return
}
// 触发阈值
err = models.SharedNodeIPAddressDAO.FireThresholds(nil, nodeconfigs.NodeRoleNode, int64(result.Node.Id))
if err != nil {
remotelogs.Error("HEALTH_CHECK_EXECUTOR", err.Error())
this.logErr("HealthCheckExecutor", err.Error())
return
}
}
@@ -195,7 +196,7 @@ func (this *HealthCheckExecutor) runNode(healthCheckConfig *serverconfigs.Health
if healthCheckConfig.AutoDown {
isChanged, err := models.SharedNodeDAO.UpdateNodeUpCount(nil, int64(result.Node.Id), result.IsOk, healthCheckConfig.CountUp, healthCheckConfig.CountDown)
if err != nil {
remotelogs.Error("HEALTH_CHECK_EXECUTOR", err.Error())
this.logErr("HealthCheckExecutor", err.Error())
} else if isChanged {
// 通知恢复或下线
if result.IsOk {

View File

@@ -1,10 +1,11 @@
//go:build plus
// +build plus
package tasks
package tasks_test
import (
teaconst "github.com/TeaOSLab/EdgeAPI/internal/const"
"github.com/TeaOSLab/EdgeAPI/internal/tasks"
"github.com/iwind/TeaGo/dbs"
"testing"
)
@@ -13,7 +14,7 @@ func TestHealthCheckExecutor_Run(t *testing.T) {
teaconst.IsPlus = true
dbs.NotifyReady()
executor := NewHealthCheckExecutor(35)
executor := tasks.NewHealthCheckExecutor(35)
results, err := executor.Run()
if err != nil {
t.Fatal(err)

View File

@@ -5,50 +5,52 @@ import (
"encoding/json"
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/goman"
"github.com/TeaOSLab/EdgeAPI/internal/utils"
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
"github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/lists"
"github.com/iwind/TeaGo/logs"
"time"
)
func init() {
dbs.OnReadyDone(func() {
goman.New(func() {
NewHealthCheckTask().Run()
NewHealthCheckTask(1 * time.Minute).Start()
})
})
}
// HealthCheckTask 节点健康检查任务
type HealthCheckTask struct {
BaseTask
ticker *time.Ticker
tasksMap map[int64]*HealthCheckClusterTask // taskId => task
}
func NewHealthCheckTask() *HealthCheckTask {
func NewHealthCheckTask(duration time.Duration) *HealthCheckTask {
return &HealthCheckTask{
ticker: time.NewTicker(duration),
tasksMap: map[int64]*HealthCheckClusterTask{},
}
}
func (this *HealthCheckTask) Run() {
err := this.loop()
func (this *HealthCheckTask) Start() {
err := this.Loop()
if err != nil {
logs.Println("[TASK][HEALTH_CHECK]" + err.Error())
this.logErr("HealthCheckTask", err.Error())
}
ticker := utils.NewTicker(60 * time.Second)
for ticker.Wait() {
err := this.loop()
for range this.ticker.C {
err := this.Loop()
if err != nil {
logs.Println("[TASK][HEALTH_CHECK]" + err.Error())
this.logErr("HealthCheckTask", err.Error())
}
}
}
func (this *HealthCheckTask) loop() error {
func (this *HealthCheckTask) Loop() error {
clusters, err := models.NewNodeClusterDAO().FindAllEnableClusters(nil)
if err != nil {
return err
@@ -74,7 +76,7 @@ func (this *HealthCheckTask) loop() error {
if len(cluster.HealthCheck) > 0 {
err = json.Unmarshal(cluster.HealthCheck, config)
if err != nil {
logs.Println("[TASK][HEALTH_CHECK]" + err.Error())
this.logErr("HealthCheckTask", err.Error())
continue
}
}
@@ -85,7 +87,7 @@ func (this *HealthCheckTask) loop() error {
newJSON, _ := json.Marshal(config)
oldJSON, _ := json.Marshal(task.Config())
if bytes.Compare(oldJSON, newJSON) != 0 {
logs.Println("[TASK][HEALTH_CHECK]update cluster '" + numberutils.FormatInt64(clusterId) + "'")
remotelogs.Println("TASK", "[HealthCheckTask]update cluster '"+numberutils.FormatInt64(clusterId)+"'")
goman.New(func() {
task.Reset(config)
})

View File

@@ -5,49 +5,53 @@ import (
"fmt"
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/goman"
"github.com/TeaOSLab/EdgeAPI/internal/utils"
"github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils"
"github.com/TeaOSLab/EdgeCommon/pkg/systemconfigs"
"github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/logs"
"time"
)
func init() {
dbs.OnReadyDone(func() {
goman.New(func() {
NewLogTask().Run()
NewLogTask(24*time.Hour, 1*time.Minute).Start()
})
})
}
type LogTask struct {
BaseTask
cleanTicker *time.Ticker
monitorTicker *time.Ticker
}
func NewLogTask() *LogTask {
return &LogTask{}
func NewLogTask(cleanDuration time.Duration, monitorDuration time.Duration) *LogTask {
return &LogTask{
cleanTicker: time.NewTicker(cleanDuration),
monitorTicker: time.NewTicker(monitorDuration),
}
}
func (this *LogTask) Run() {
func (this *LogTask) Start() {
goman.New(func() {
this.runClean()
this.RunClean()
})
goman.New(func() {
this.runMonitor()
this.RunMonitor()
})
}
func (this *LogTask) runClean() {
var ticker = utils.NewTicker(24 * time.Hour)
for ticker.Wait() {
err := this.loopClean()
func (this *LogTask) RunClean() {
for range this.cleanTicker.C {
err := this.LoopClean()
if err != nil {
logs.Println("[TASK][LOG]" + err.Error())
this.logErr("LogTask", err.Error())
}
}
}
func (this *LogTask) loopClean() error {
func (this *LogTask) LoopClean() error {
var configKey = "adminLogConfig"
valueJSON, err := models.SharedSysSettingDAO.ReadSetting(nil, configKey)
if err != nil {
@@ -71,34 +75,21 @@ func (this *LogTask) loopClean() error {
return nil
}
func (this *LogTask) runMonitor() {
var ticker = utils.NewTicker(1 * time.Minute)
for ticker.Wait() {
err := this.loopMonitor(60)
func (this *LogTask) RunMonitor() {
for range this.monitorTicker.C {
err := this.LoopMonitor()
if err != nil {
logs.Println("[TASK][LOG]" + err.Error())
this.logErr("LogTask", err.Error())
}
}
}
func (this *LogTask) loopMonitor(seconds int64) error {
// 检查上次运行时间,防止重复运行
var settingKey = "logTaskMonitorLoop"
var timestamp = time.Now().Unix()
c, err := models.SharedSysSettingDAO.CompareInt64Setting(nil, settingKey, timestamp-seconds)
if err != nil {
return err
}
if c > 0 {
func (this *LogTask) LoopMonitor() error {
// 检查是否为主节点
if !models.SharedAPINodeDAO.CheckAPINodeIsPrimaryWithoutErr() {
return nil
}
// 记录时间
err = models.SharedSysSettingDAO.UpdateSetting(nil, settingKey, []byte(numberutils.FormatInt64(timestamp)))
if err != nil {
return err
}
var configKey = "adminLogConfig"
valueJSON, err := models.SharedSysSettingDAO.ReadSetting(nil, configKey)
if err != nil {

View File

@@ -1,26 +1,28 @@
package tasks
package tasks_test
import (
"github.com/TeaOSLab/EdgeAPI/internal/tasks"
"github.com/iwind/TeaGo/dbs"
"testing"
"time"
)
func TestLogTask_loopClean(t *testing.T) {
func TestLogTask_LoopClean(t *testing.T) {
dbs.NotifyReady()
task := NewLogTask()
err := task.loopClean()
var task = tasks.NewLogTask(24*time.Hour, 1*time.Minute)
err := task.LoopClean()
if err != nil {
t.Fatal(err)
}
t.Log("ok")
}
func TestLogTask_loopMonitor(t *testing.T) {
func TestLogTask_LoopMonitor(t *testing.T) {
dbs.NotifyReady()
task := NewLogTask()
err := task.loopMonitor(10)
var task = tasks.NewLogTask(24*time.Hour, 1*time.Minute)
err := task.LoopMonitor()
if err != nil {
t.Fatal(err)
}

View File

@@ -3,42 +3,44 @@ package tasks
import (
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/goman"
"github.com/TeaOSLab/EdgeAPI/internal/utils"
"github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/logs"
"time"
)
func init() {
dbs.OnReadyDone(func() {
goman.New(func() {
NewMessageTask().Run()
NewMessageTask(24 * time.Hour).Start()
})
})
}
// MessageTask 消息相关任务
type MessageTask struct {
BaseTask
ticker *time.Ticker
}
// NewMessageTask 获取新对象
func NewMessageTask() *MessageTask {
return &MessageTask{}
func NewMessageTask(duration time.Duration) *MessageTask {
return &MessageTask{
ticker: time.NewTicker(duration),
}
}
// Run 运行
func (this *MessageTask) Run() {
ticker := utils.NewTicker(24 * time.Hour)
for ticker.Wait() {
err := this.loop()
// Start 开始运行
func (this *MessageTask) Start() {
for range this.ticker.C {
err := this.Loop()
if err != nil {
logs.Println("[TASK][MESSAGE]" + err.Error())
this.logErr("MessageTask", err.Error())
}
}
}
// 单次运行
func (this *MessageTask) loop() error {
// Loop 单次运行
func (this *MessageTask) Loop() error {
dayTime := time.Now().AddDate(0, 0, -30) // TODO 这个30天应该可以在界面上设置
return models.NewMessageDAO().DeleteMessagesBeforeDay(nil, dayTime)
}

View File

@@ -5,7 +5,6 @@ package tasks
import (
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/goman"
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
"time"
@@ -14,29 +13,35 @@ import (
func init() {
dbs.OnReadyDone(func() {
goman.New(func() {
NewMonitorItemValueTask().Start()
NewMonitorItemValueTask(1 * time.Hour).Start()
})
})
}
// MonitorItemValueTask 节点监控数值任务
type MonitorItemValueTask struct {
BaseTask
ticker *time.Ticker
}
// NewMonitorItemValueTask 获取新对象
func NewMonitorItemValueTask() *MonitorItemValueTask {
return &MonitorItemValueTask{}
}
func (this *MonitorItemValueTask) Start() {
ticker := time.NewTicker(1 * time.Hour)
func NewMonitorItemValueTask(duration time.Duration) *MonitorItemValueTask {
var ticker = time.NewTicker(duration)
if Tea.IsTesting() {
ticker = time.NewTicker(1 * time.Minute)
}
for range ticker.C {
return &MonitorItemValueTask{
ticker: ticker,
}
}
func (this *MonitorItemValueTask) Start() {
for range this.ticker.C {
err := this.Loop()
if err != nil {
remotelogs.Error("MonitorItemValueTask", err.Error())
this.logErr("MonitorItemValueTask", err.Error())
}
}
}

View File

@@ -1,16 +1,18 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package tasks
package tasks_test
import (
"github.com/TeaOSLab/EdgeAPI/internal/tasks"
"github.com/iwind/TeaGo/dbs"
"testing"
"time"
)
func TestMonitorItemValueTask_Loop(t *testing.T) {
dbs.NotifyReady()
task := NewMonitorItemValueTask()
var task = tasks.NewMonitorItemValueTask(1 * time.Minute)
err := task.Loop()
if err != nil {
t.Fatal(err)

View File

@@ -4,40 +4,40 @@ import (
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/goman"
"github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/logs"
"time"
)
func init() {
dbs.OnReadyDone(func() {
goman.New(func() {
NewNodeLogCleanerTask().Start()
NewNodeLogCleanerTask(24 * time.Hour).Start()
})
})
}
// NodeLogCleanerTask 清理节点日志的任务
type NodeLogCleanerTask struct {
duration time.Duration
BaseTask
ticker *time.Ticker
}
func NewNodeLogCleanerTask() *NodeLogCleanerTask {
func NewNodeLogCleanerTask(duration time.Duration) *NodeLogCleanerTask {
return &NodeLogCleanerTask{
duration: 24 * time.Hour,
ticker: time.NewTicker(duration),
}
}
func (this *NodeLogCleanerTask) Start() {
ticker := time.NewTicker(this.duration)
for range ticker.C {
err := this.loop()
for range this.ticker.C {
err := this.Loop()
if err != nil {
logs.Println("[TASK]" + err.Error())
this.logErr("NodeLogCleanerTask", err.Error())
}
}
}
func (this *NodeLogCleanerTask) loop() error {
func (this *NodeLogCleanerTask) Loop() error {
// 删除 N天 以前的info日志
err := models.SharedNodeLogDAO.DeleteExpiredLogsWithLevel(nil, "info", 3)
if err != nil {

View File

@@ -1,15 +1,17 @@
package tasks
package tasks_test
import (
"github.com/TeaOSLab/EdgeAPI/internal/tasks"
"github.com/iwind/TeaGo/dbs"
"testing"
"time"
)
func TestNodeLogCleaner_loop(t *testing.T) {
dbs.NotifyReady()
cleaner := &NodeLogCleanerTask{}
err := cleaner.loop()
var cleaner = tasks.NewNodeLogCleanerTask(24 * time.Hour)
err := cleaner.Loop()
if err != nil {
t.Fatal(err)
}

View File

@@ -3,11 +3,8 @@ package tasks
import (
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/goman"
"github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/systemconfigs"
"github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/logs"
"github.com/iwind/TeaGo/types"
"strings"
"time"
@@ -15,63 +12,51 @@ import (
func init() {
dbs.OnReadyDone(func() {
var task = NewNodeMonitorTask(60)
var ticker = time.NewTicker(60 * time.Second)
goman.New(func() {
for range ticker.C {
err := task.loop()
if err != nil {
logs.Println("[TASK][NODE_MONITOR]" + err.Error())
}
}
NewNodeMonitorTask(1 * time.Minute).Start()
})
})
}
// NodeMonitorTask 边缘节点监控任务
type NodeMonitorTask struct {
intervalSeconds int
BaseTask
ticker *time.Ticker
inactiveMap map[string]int // cluster@nodeId => count
notifiedMap map[int64]int64 // nodeId => timestamp
}
func NewNodeMonitorTask(intervalSeconds int) *NodeMonitorTask {
func NewNodeMonitorTask(duration time.Duration) *NodeMonitorTask {
return &NodeMonitorTask{
intervalSeconds: intervalSeconds,
inactiveMap: map[string]int{},
notifiedMap: map[int64]int64{},
ticker: time.NewTicker(duration),
inactiveMap: map[string]int{},
notifiedMap: map[int64]int64{},
}
}
func (this *NodeMonitorTask) Run() {
func (this *NodeMonitorTask) Start() {
for range this.ticker.C {
err := this.Loop()
if err != nil {
this.logErr("NodeMonitorTask", err.Error())
}
}
}
func (this *NodeMonitorTask) loop() error {
// 检查上次运行时间,防止重复运行
settingKey := systemconfigs.SettingCodeNodeMonitor + "Loop"
timestamp := time.Now().Unix()
c, err := models.SharedSysSettingDAO.CompareInt64Setting(nil, settingKey, timestamp-int64(this.intervalSeconds))
if err != nil {
return err
}
if c > 0 {
func (this *NodeMonitorTask) Loop() error {
// 检查是否为主节点
if !models.SharedAPINodeDAO.CheckAPINodeIsPrimaryWithoutErr() {
return nil
}
// 记录时间
err = models.SharedSysSettingDAO.UpdateSetting(nil, settingKey, []byte(numberutils.FormatInt64(timestamp)))
if err != nil {
return err
}
clusters, err := models.SharedNodeClusterDAO.FindAllEnableClusters(nil)
if err != nil {
return err
}
for _, cluster := range clusters {
err := this.monitorCluster(cluster)
err := this.MonitorCluster(cluster)
if err != nil {
return err
}
@@ -80,8 +65,8 @@ func (this *NodeMonitorTask) loop() error {
return nil
}
func (this *NodeMonitorTask) monitorCluster(cluster *models.NodeCluster) error {
clusterId := int64(cluster.Id)
func (this *NodeMonitorTask) MonitorCluster(cluster *models.NodeCluster) error {
var clusterId = int64(cluster.Id)
// 检查离线节点
inactiveNodes, err := models.SharedNodeDAO.FindAllInactiveNodesWithClusterId(nil, clusterId)

View File

@@ -1,16 +1,18 @@
package tasks
package tasks_test
import (
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/tasks"
"github.com/iwind/TeaGo/dbs"
"testing"
"time"
)
func TestNodeMonitorTask_loop(t *testing.T) {
dbs.NotifyReady()
var task = NewNodeMonitorTask(60)
err := task.loop()
var task = tasks.NewNodeMonitorTask(60 * time.Second)
err := task.Loop()
if err != nil {
t.Fatal(err)
}
@@ -19,9 +21,9 @@ func TestNodeMonitorTask_loop(t *testing.T) {
func TestNodeMonitorTask_Monitor(t *testing.T) {
dbs.NotifyReady()
var task = NewNodeMonitorTask(60)
var task = tasks.NewNodeMonitorTask(60 * time.Second)
for i := 0; i < 5; i++ {
err := task.monitorCluster(&models.NodeCluster{
err := task.MonitorCluster(&models.NodeCluster{
Id: 42,
})
if err != nil {

View File

@@ -5,49 +5,49 @@ import (
"github.com/TeaOSLab/EdgeAPI/internal/goman"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/logs"
"time"
)
func init() {
dbs.OnReadyDone(func() {
goman.New(func() {
NewNodeTaskExtractor().Start()
NewNodeTaskExtractor(10 * time.Second).Start()
})
})
}
// NodeTaskExtractor 节点任务
type NodeTaskExtractor struct {
BaseTask
ticker *time.Ticker
}
func NewNodeTaskExtractor() *NodeTaskExtractor {
return &NodeTaskExtractor{}
func NewNodeTaskExtractor(duration time.Duration) *NodeTaskExtractor {
return &NodeTaskExtractor{
ticker: time.NewTicker(duration),
}
}
func (this *NodeTaskExtractor) Start() {
ticker := time.NewTicker(10 * time.Second)
for range ticker.C {
for range this.ticker.C {
err := this.Loop()
if err != nil {
logs.Println("[TASK][NODE_TASK_EXTRACTOR]" + err.Error())
this.logErr("NodeTaskExtractor", err.Error())
}
}
}
func (this *NodeTaskExtractor) Loop() error {
ok, err := models.SharedSysLockerDAO.Lock(nil, "node_task_extractor", 10-1) // 假设执行时间为1秒
if err != nil {
return err
}
if !ok {
// 检查是否为主节点
if !models.SharedAPINodeDAO.CheckAPINodeIsPrimaryWithoutErr() {
return nil
}
// 这里不解锁是为了让任务N秒钟之内只运行一次
for _, role := range []string{nodeconfigs.NodeRoleNode, nodeconfigs.NodeRoleDNS} {
err = models.SharedNodeTaskDAO.ExtractAllClusterTasks(nil, role)
err := models.SharedNodeTaskDAO.ExtractAllClusterTasks(nil, role)
if err != nil {
return err
}

View File

@@ -3,62 +3,47 @@ package tasks
import (
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/goman"
"github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/systemconfigs"
"github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/logs"
"time"
)
func init() {
dbs.OnReadyDone(func() {
task := NewNSNodeMonitorTask(60)
ticker := time.NewTicker(60 * time.Second)
goman.New(func() {
for range ticker.C {
err := task.loop()
if err != nil {
logs.Println("[TASK][NS_NODE_MONITOR]" + err.Error())
}
}
NewNSNodeMonitorTask(1 * time.Minute).Start()
})
})
}
// NSNodeMonitorTask 边缘节点监控任务
type NSNodeMonitorTask struct {
intervalSeconds int
BaseTask
ticker *time.Ticker
}
func NewNSNodeMonitorTask(intervalSeconds int) *NSNodeMonitorTask {
func NewNSNodeMonitorTask(duration time.Duration) *NSNodeMonitorTask {
return &NSNodeMonitorTask{
intervalSeconds: intervalSeconds,
ticker: time.NewTicker(duration),
}
}
func (this *NSNodeMonitorTask) Run() {
func (this *NSNodeMonitorTask) Start() {
for range this.ticker.C {
err := this.Loop()
if err != nil {
this.logErr("NS_NODE_MONITOR", err.Error())
}
}
}
func (this *NSNodeMonitorTask) loop() error {
// 检查上次运行时间,防止重复运行
settingKey := systemconfigs.SettingCodeNSNodeMonitor + "Loop"
timestamp := time.Now().Unix()
c, err := models.SharedSysSettingDAO.CompareInt64Setting(nil, settingKey, timestamp-int64(this.intervalSeconds))
if err != nil {
return err
}
if c > 0 {
func (this *NSNodeMonitorTask) Loop() error {
// 检查是否为主节点
if !models.SharedAPINodeDAO.CheckAPINodeIsPrimaryWithoutErr() {
return nil
}
// 记录时间
err = models.SharedSysSettingDAO.UpdateSetting(nil, settingKey, []byte(numberutils.FormatInt64(timestamp)))
if err != nil {
return err
}
clusters, err := models.SharedNSClusterDAO.FindAllEnabledClusters(nil)
if err != nil {
return err

View File

@@ -7,7 +7,6 @@ import (
"github.com/TeaOSLab/EdgeAPI/internal/goman"
"github.com/TeaOSLab/EdgeCommon/pkg/systemconfigs"
"github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/logs"
timeutil "github.com/iwind/TeaGo/utils/time"
"regexp"
"strings"
@@ -16,27 +15,30 @@ import (
func init() {
dbs.OnReadyDone(func() {
task := NewServerAccessLogCleaner()
goman.New(func() {
task.Start()
NewServerAccessLogCleaner(12 * time.Hour).Start()
})
})
}
// ServerAccessLogCleaner 服务访问日志自动清理
type ServerAccessLogCleaner struct {
BaseTask
ticker *time.Ticker
}
func NewServerAccessLogCleaner() *ServerAccessLogCleaner {
return &ServerAccessLogCleaner{}
func NewServerAccessLogCleaner(duration time.Duration) *ServerAccessLogCleaner {
return &ServerAccessLogCleaner{
ticker: time.NewTicker(duration),
}
}
func (this *ServerAccessLogCleaner) Start() {
ticker := time.NewTicker(12 * time.Hour)
for range ticker.C {
for range this.ticker.C {
err := this.Loop()
if err != nil {
logs.Println("[TASK][ServerAccessLogCleaner]Error: " + err.Error())
this.logErr("[TASK][ServerAccessLogCleaner]", err.Error())
}
}
}

View File

@@ -4,12 +4,13 @@ import (
"github.com/TeaOSLab/EdgeAPI/internal/tasks"
"github.com/iwind/TeaGo/dbs"
"testing"
"time"
)
func TestServerAccessLogCleaner_Loop(t *testing.T) {
dbs.NotifyReady()
task := tasks.NewServerAccessLogCleaner()
var task = tasks.NewServerAccessLogCleaner(24 * time.Hour)
err := task.Loop()
if err != nil {
t.Fatal(err)

View File

@@ -4,9 +4,7 @@ import (
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/db/models/acme"
"github.com/TeaOSLab/EdgeAPI/internal/goman"
"github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils"
"github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/logs"
"github.com/iwind/TeaGo/maps"
timeutil "github.com/iwind/TeaGo/utils/time"
"strconv"
@@ -16,50 +14,41 @@ import (
func init() {
dbs.OnReadyDone(func() {
goman.New(func() {
NewSSLCertExpireCheckExecutor().Start()
NewSSLCertExpireCheckExecutor(1 * time.Hour).Start()
})
})
}
// SSLCertExpireCheckExecutor 证书检查任务
type SSLCertExpireCheckExecutor struct {
BaseTask
ticker *time.Ticker
}
func NewSSLCertExpireCheckExecutor() *SSLCertExpireCheckExecutor {
return &SSLCertExpireCheckExecutor{}
func NewSSLCertExpireCheckExecutor(duration time.Duration) *SSLCertExpireCheckExecutor {
return &SSLCertExpireCheckExecutor{
ticker: time.NewTicker(duration),
}
}
// Start 启动任务
func (this *SSLCertExpireCheckExecutor) Start() {
seconds := int64(3600)
ticker := time.NewTicker(time.Duration(seconds) * time.Second)
for range ticker.C {
err := this.loop(seconds)
for range this.ticker.C {
err := this.Loop()
if err != nil {
logs.Println("[ERROR][SSLCertExpireCheckExecutor]" + err.Error())
this.logErr("SSLCertExpireCheckExecutor", err.Error())
}
}
}
// 单次执行
func (this *SSLCertExpireCheckExecutor) loop(seconds int64) error {
// 检查上次运行时间,防止重复运行
settingKey := "sslCertExpiringCheckLoop"
timestamp := time.Now().Unix()
c, err := models.SharedSysSettingDAO.CompareInt64Setting(nil, settingKey, timestamp-seconds)
if err != nil {
return err
}
if c > 0 {
// Loop 单次执行
func (this *SSLCertExpireCheckExecutor) Loop() error {
// 检查是否为主节点
if !models.SharedAPINodeDAO.CheckAPINodeIsPrimaryWithoutErr() {
return nil
}
// 记录时间
err = models.SharedSysSettingDAO.UpdateSetting(nil, settingKey, []byte(numberutils.FormatInt64(timestamp)))
if err != nil {
return err
}
// 查找需要自动更新的证书
// 30, 14 ... 是到期的天数
for _, days := range []int{30, 14, 7} {

View File

@@ -1,6 +1,7 @@
package tasks
package tasks_test
import (
"github.com/TeaOSLab/EdgeAPI/internal/tasks"
"github.com/iwind/TeaGo/dbs"
timeutil "github.com/iwind/TeaGo/utils/time"
"testing"
@@ -16,8 +17,8 @@ func TestSSLCertExpireCheckExecutor_loop(t *testing.T) {
t.Log("3 days later: ", timeutil.FormatTime("Y-m-d", time.Now().Unix()+3*86400), time.Now().Unix()+3*86400)
t.Log("today: ", timeutil.FormatTime("Y-m-d", time.Now().Unix()), time.Now().Unix())
executor := NewSSLCertExpireCheckExecutor()
err := executor.loop(0)
var task = tasks.NewSSLCertExpireCheckExecutor(1 * time.Hour)
err := task.Loop()
if err != nil {
t.Fatal(err)
}

View File

@@ -23,41 +23,38 @@ import (
func init() {
dbs.OnReadyDone(func() {
goman.New(func() {
NewSSLCertUpdateOCSPTask().Start()
NewSSLCertUpdateOCSPTask(1 * time.Minute).Start()
})
})
}
type SSLCertUpdateOCSPTask struct {
BaseTask
ticker *time.Ticker
httpClient *http.Client
}
func NewSSLCertUpdateOCSPTask() *SSLCertUpdateOCSPTask {
func NewSSLCertUpdateOCSPTask(duration time.Duration) *SSLCertUpdateOCSPTask {
return &SSLCertUpdateOCSPTask{
ticker: time.NewTicker(1 * time.Minute),
ticker: time.NewTicker(duration),
httpClient: utils.SharedHttpClient(5 * time.Second),
}
}
func (this *SSLCertUpdateOCSPTask) Start() {
for range this.ticker.C {
err := this.Loop(true)
err := this.Loop()
if err != nil {
remotelogs.Error("SSLCertUpdateOCSPTask", err.Error())
this.logErr("SSLCertUpdateOCSPTask", err.Error())
}
}
}
func (this *SSLCertUpdateOCSPTask) Loop(checkLock bool) error {
if checkLock {
ok, err := models.SharedSysLockerDAO.Lock(nil, "ssl_cert_update_ocsp_task", 60-1) // 假设执行时间为1秒
if err != nil {
return err
}
if !ok {
return nil
}
func (this *SSLCertUpdateOCSPTask) Loop() error {
// 检查是否为主节点
if !models.SharedAPINodeDAO.CheckAPINodeIsPrimaryWithoutErr() {
return nil
}
var tx *dbs.Tx

View File

@@ -6,13 +6,14 @@ import (
"github.com/TeaOSLab/EdgeAPI/internal/tasks"
"github.com/iwind/TeaGo/dbs"
"testing"
"time"
)
func TestSSLCertUpdateOCSPTask_Loop(t *testing.T) {
dbs.NotifyReady()
var task = tasks.NewSSLCertUpdateOCSPTask()
err := task.Loop(false)
var task = tasks.NewSSLCertUpdateOCSPTask(1 * time.Minute)
err := task.Loop()
if err != nil {
t.Fatal(err)
}

View File

@@ -0,0 +1,12 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn .
package tasks
import "github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
type BaseTask struct {
}
func (this *BaseTask) logErr(taskType string, errString string) {
remotelogs.Error("TASK", "run '"+taskType+"' failed: "+errString)
}

View File

@@ -0,0 +1,9 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn .
package tasks
type TaskInterface interface {
Start() error
Loop() error
Stop() error
}