多个API节点时选择一个作为主节点/优化任务相关代码

This commit is contained in:
刘祥超
2022-04-23 12:32:30 +08:00
parent 773f3e1a7e
commit 89c1edc9ee
35 changed files with 467 additions and 350 deletions

View File

@@ -3,6 +3,8 @@ package models
import ( import (
"encoding/json" "encoding/json"
"errors" "errors"
"github.com/TeaOSLab/EdgeAPI/internal/configs"
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
"github.com/TeaOSLab/EdgeAPI/internal/utils" "github.com/TeaOSLab/EdgeAPI/internal/utils"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
_ "github.com/go-sql-driver/mysql" _ "github.com/go-sql-driver/mysql"
@@ -49,7 +51,10 @@ func (this *APINodeDAO) EnableAPINode(tx *dbs.Tx, id int64) error {
Pk(id). Pk(id).
Set("state", APINodeStateEnabled). Set("state", APINodeStateEnabled).
Update() Update()
return err if err != nil {
return err
}
return this.NotifyUpdate(tx, id)
} }
// DisableAPINode 禁用条目 // DisableAPINode 禁用条目
@@ -58,7 +63,10 @@ func (this *APINodeDAO) DisableAPINode(tx *dbs.Tx, id int64) error {
Pk(id). Pk(id).
Set("state", APINodeStateDisabled). Set("state", APINodeStateDisabled).
Update() Update()
return err if err != nil {
return err
}
return this.NotifyUpdate(tx, id)
} }
// FindEnabledAPINode 查找启用中的条目 // FindEnabledAPINode 查找启用中的条目
@@ -149,16 +157,33 @@ func (this *APINodeDAO) CreateAPINode(tx *dbs.Tx, name string, description strin
return return
} }
err = this.NotifyUpdate(tx, types.Int64(op.Id))
if err != nil {
remotelogs.Error("API_NODE_DAO", err.Error())
}
return types.Int64(op.Id), nil return types.Int64(op.Id), nil
} }
// UpdateAPINode 修改API节点 // UpdateAPINode 修改API节点
func (this *APINodeDAO) UpdateAPINode(tx *dbs.Tx, nodeId int64, name string, description string, httpJSON []byte, httpsJSON []byte, restIsOn bool, restHTTPJSON []byte, restHTTPSJSON []byte, accessAddrsJSON []byte, isOn bool) error { func (this *APINodeDAO) UpdateAPINode(tx *dbs.Tx, nodeId int64, name string, description string, httpJSON []byte, httpsJSON []byte, restIsOn bool, restHTTPJSON []byte, restHTTPSJSON []byte, accessAddrsJSON []byte, isOn bool, isPrimary bool) error {
if nodeId <= 0 { if nodeId <= 0 {
return errors.New("invalid nodeId") return errors.New("invalid nodeId")
} }
op := NewAPINodeOperator() // 取消别的Primary
if isPrimary {
err := this.Query(tx).
Neq("id", nodeId).
Attr("isPrimary", true).
Set("isPrimary", false).
UpdateQuickly()
if err != nil {
return err
}
}
var op = NewAPINodeOperator()
op.Id = nodeId op.Id = nodeId
op.Name = name op.Name = name
op.Description = description op.Description = description
@@ -191,8 +216,13 @@ func (this *APINodeDAO) UpdateAPINode(tx *dbs.Tx, nodeId int64, name string, des
op.AccessAddrs = "[]" op.AccessAddrs = "[]"
} }
op.IsPrimary = isPrimary
err := this.Save(tx, op) err := this.Save(tx, op)
return err if err != nil {
return err
}
return this.NotifyUpdate(tx, nodeId)
} }
// FindAllEnabledAPINodes 列出所有可用API节点 // FindAllEnabledAPINodes 列出所有可用API节点
@@ -294,23 +324,6 @@ func (this *APINodeDAO) UpdateAPINodeStatus(tx *dbs.Tx, apiNodeId int64, statusJ
return err return err
} }
// 生成唯一ID
func (this *APINodeDAO) genUniqueId(tx *dbs.Tx) (string, error) {
for {
uniqueId := rands.HexString(32)
ok, err := this.Query(tx).
Attr("uniqueId", uniqueId).
Exist()
if err != nil {
return "", err
}
if ok {
continue
}
return uniqueId, nil
}
}
// CountAllLowerVersionNodes 计算所有节点中低于某个版本的节点数量 // CountAllLowerVersionNodes 计算所有节点中低于某个版本的节点数量
func (this *APINodeDAO) CountAllLowerVersionNodes(tx *dbs.Tx, version string) (int64, error) { func (this *APINodeDAO) CountAllLowerVersionNodes(tx *dbs.Tx, version string) (int64, error) {
return this.Query(tx). return this.Query(tx).
@@ -384,3 +397,114 @@ func (this *APINodeDAO) FindAllEnabledAPIAccessIPs(tx *dbs.Tx, cacheMap *utils.C
return result, nil return result, nil
} }
// CheckAPINodeIsPrimary 检查当前节点是否为Primary节点
func (this *APINodeDAO) CheckAPINodeIsPrimary(tx *dbs.Tx) (bool, error) {
config, err := configs.SharedAPIConfig()
if err != nil {
return false, err
}
isPrimary, err := this.Query(tx).
State(APINodeStateEnabled).
Attr("uniqueId", config.NodeId).
Attr("isPrimary", true).
Exist()
if err != nil {
return false, err
}
if isPrimary {
return true, nil
}
// 检查是否有别的Primary节点
count, err := this.Query(tx).
State(APINodeStateEnabled).
Attr("isOn", true).
Attr("isPrimary", true).
Count()
if err != nil {
return false, err
}
if count == 0 {
err = this.ResetPrimaryAPINode(tx)
if err != nil {
return false, err
}
return true, nil
}
return false, nil
}
// CheckAPINodeIsPrimaryWithoutErr 检查当前节点是否为Primary节点并忽略错误
func (this *APINodeDAO) CheckAPINodeIsPrimaryWithoutErr() bool {
b, err := this.CheckAPINodeIsPrimary(nil)
return b && err == nil
}
// ResetPrimaryAPINode 重置Primary节点
func (this *APINodeDAO) ResetPrimaryAPINode(tx *dbs.Tx) error {
// 当前是否有Primary节点
apiNode, err := this.Query(tx).
State(APINodeStateEnabled).
Attr("isOn", true).
Attr("isPrimary", true).
Find()
if err != nil {
return err
}
if apiNode == nil {
// 选择一个作为Primary
// TODO 将来需要考虑API节点离线的情况
apiNodeId, err := this.Query(tx).
State(APINodeStateEnabled).
Attr("isOn", true).
ResultPk().
FindInt64Col(0)
if err != nil {
return err
}
if apiNodeId > 0 {
err = this.Query(tx).
Pk(apiNodeId).
Set("isPrimary", true).
UpdateQuickly()
if err != nil {
return err
}
}
}
return nil
}
// NotifyUpdate 通知变更
func (this *APINodeDAO) NotifyUpdate(tx *dbs.Tx, apiNodeId int64) error {
// suppress IDE warning
_ = apiNodeId
err := this.ResetPrimaryAPINode(tx)
if err != nil {
return err
}
return nil
}
// 生成唯一ID
func (this *APINodeDAO) genUniqueId(tx *dbs.Tx) (string, error) {
for {
uniqueId := rands.HexString(32)
ok, err := this.Query(tx).
Attr("uniqueId", uniqueId).
Exist()
if err != nil {
return "", err
}
if ok {
continue
}
return uniqueId, nil
}
}

View File

@@ -34,6 +34,16 @@ func TestAPINodeDAO_FindAllEnabledAPIAccessIPs(t *testing.T) {
t.Log(NewAPINodeDAO().FindAllEnabledAPIAccessIPs(nil, cacheMap)) t.Log(NewAPINodeDAO().FindAllEnabledAPIAccessIPs(nil, cacheMap))
} }
func TestAPINodeDAO_CheckAPINodeIsPrimary(t *testing.T) {
var dao = NewAPINodeDAO()
t.Log(dao.CheckAPINodeIsPrimary(nil))
}
func TestAPINodeDAO_ResetPrimaryAPINode(t *testing.T) {
var dao = NewAPINodeDAO()
t.Log(dao.ResetPrimaryAPINode(nil))
}
func BenchmarkAPINodeDAO_New(b *testing.B) { func BenchmarkAPINodeDAO_New(b *testing.B) {
runtime.GOMAXPROCS(1) runtime.GOMAXPROCS(1)
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {

View File

@@ -23,6 +23,7 @@ type APINode struct {
AdminId uint32 `field:"adminId"` // 管理员ID AdminId uint32 `field:"adminId"` // 管理员ID
Weight uint32 `field:"weight"` // 权重 Weight uint32 `field:"weight"` // 权重
Status dbs.JSON `field:"status"` // 运行状态 Status dbs.JSON `field:"status"` // 运行状态
IsPrimary bool `field:"isPrimary"` // 是否为主API节点
} }
type APINodeOperator struct { type APINodeOperator struct {
@@ -45,6 +46,7 @@ type APINodeOperator struct {
AdminId interface{} // 管理员ID AdminId interface{} // 管理员ID
Weight interface{} // 权重 Weight interface{} // 权重
Status interface{} // 运行状态 Status interface{} // 运行状态
IsPrimary interface{} // 是否为主API节点
} }
func NewAPINodeOperator() *APINodeOperator { func NewAPINodeOperator() *APINodeOperator {

View File

@@ -264,17 +264,20 @@ func (this *NodeIPAddressDAO) FindFirstNodeAccessIPAddress(tx *dbs.Tx, nodeId in
} }
// FindFirstNodeAccessIPAddressId 查找节点的第一个可访问的IP地址ID // FindFirstNodeAccessIPAddressId 查找节点的第一个可访问的IP地址ID
func (this *NodeIPAddressDAO) FindFirstNodeAccessIPAddressId(tx *dbs.Tx, nodeId int64, role nodeconfigs.NodeRole) (int64, error) { func (this *NodeIPAddressDAO) FindFirstNodeAccessIPAddressId(tx *dbs.Tx, nodeId int64, mustUp bool, role nodeconfigs.NodeRole) (int64, error) {
if len(role) == 0 { if len(role) == 0 {
role = nodeconfigs.NodeRoleNode role = nodeconfigs.NodeRoleNode
} }
return this.Query(tx). var query = this.Query(tx).
Attr("nodeId", nodeId). Attr("nodeId", nodeId).
Attr("role", role). Attr("role", role).
State(NodeIPAddressStateEnabled). State(NodeIPAddressStateEnabled).
Attr("canAccess", true). Attr("canAccess", true).
Attr("isOn", true). Attr("isOn", true)
Attr("isUp", true). if mustUp {
query.Attr("isUp", true)
}
return query.
Desc("order"). Desc("order").
AscPk(). AscPk().
Result("id"). Result("id").

View File

@@ -9,6 +9,6 @@ import (
func TestNodeIPAddressDAO_FindFirstNodeAccessIPAddress(t *testing.T) { func TestNodeIPAddressDAO_FindFirstNodeAccessIPAddress(t *testing.T) {
var dao = NewNodeIPAddressDAO() var dao = NewNodeIPAddressDAO()
t.Log(dao.FindFirstNodeAccessIPAddress(nil, 48, nodeconfigs.NodeRoleNode)) t.Log(dao.FindFirstNodeAccessIPAddress(nil, 48, true, nodeconfigs.NodeRoleNode))
t.Log(dao.FindFirstNodeAccessIPAddressId(nil, 48, nodeconfigs.NodeRoleNode)) t.Log(dao.FindFirstNodeAccessIPAddressId(nil, 48, true, nodeconfigs.NodeRoleNode))
} }

View File

@@ -39,7 +39,7 @@ func (this *APINodeService) UpdateAPINode(ctx context.Context, req *pb.UpdateAPI
tx := this.NullTx() tx := this.NullTx()
err = models.SharedAPINodeDAO.UpdateAPINode(tx, req.ApiNodeId, req.Name, req.Description, req.HttpJSON, req.HttpsJSON, req.RestIsOn, req.RestHTTPJSON, req.RestHTTPSJSON, req.AccessAddrsJSON, req.IsOn) err = models.SharedAPINodeDAO.UpdateAPINode(tx, req.ApiNodeId, req.Name, req.Description, req.HttpJSON, req.HttpsJSON, req.RestIsOn, req.RestHTTPJSON, req.RestHTTPSJSON, req.AccessAddrsJSON, req.IsOn, req.IsPrimary)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -97,6 +97,7 @@ func (this *APINodeService) FindAllEnabledAPINodes(ctx context.Context, req *pb.
HttpsJSON: node.Https, HttpsJSON: node.Https,
AccessAddrsJSON: node.AccessAddrs, AccessAddrsJSON: node.AccessAddrs,
AccessAddrs: accessAddrs, AccessAddrs: accessAddrs,
IsPrimary: node.IsPrimary,
}) })
} }
@@ -174,6 +175,7 @@ func (this *APINodeService) ListEnabledAPINodes(ctx context.Context, req *pb.Lis
AccessAddrsJSON: node.AccessAddrs, AccessAddrsJSON: node.AccessAddrs,
AccessAddrs: accessAddrs, AccessAddrs: accessAddrs,
StatusJSON: node.Status, StatusJSON: node.Status,
IsPrimary: node.IsPrimary,
}) })
} }
@@ -218,6 +220,7 @@ func (this *APINodeService) FindEnabledAPINode(ctx context.Context, req *pb.Find
RestHTTPSJSON: node.RestHTTPS, RestHTTPSJSON: node.RestHTTPS,
AccessAddrsJSON: node.AccessAddrs, AccessAddrsJSON: node.AccessAddrs,
AccessAddrs: accessAddrs, AccessAddrs: accessAddrs,
IsPrimary: node.IsPrimary,
} }
return &pb.FindEnabledAPINodeResponse{ApiNode: result}, nil return &pb.FindEnabledAPINodeResponse{ApiNode: result}, nil
} }
@@ -270,6 +273,7 @@ func (this *APINodeService) FindCurrentAPINode(ctx context.Context, req *pb.Find
AccessAddrsJSON: node.AccessAddrs, AccessAddrsJSON: node.AccessAddrs,
AccessAddrs: accessAddrs, AccessAddrs: accessAddrs,
StatusJSON: nil, StatusJSON: nil,
IsPrimary: node.IsPrimary,
}}, nil }}, nil
} }

View File

@@ -1489,7 +1489,7 @@ func (this *NodeService) UpdateNodeDNS(ctx context.Context, req *pb.UpdateNodeDN
// 修改IP // 修改IP
if len(req.IpAddr) > 0 { if len(req.IpAddr) > 0 {
ipAddrId, err := models.SharedNodeIPAddressDAO.FindFirstNodeAccessIPAddressId(tx, req.NodeId, nodeconfigs.NodeRoleNode) ipAddrId, err := models.SharedNodeIPAddressDAO.FindFirstNodeAccessIPAddressId(tx, req.NodeId, true, nodeconfigs.NodeRoleNode)
if err != nil { if err != nil {
return nil, err return nil, err
} }

File diff suppressed because one or more lines are too long

View File

@@ -7,7 +7,6 @@ import (
"github.com/TeaOSLab/EdgeAPI/internal/dnsclients" "github.com/TeaOSLab/EdgeAPI/internal/dnsclients"
"github.com/TeaOSLab/EdgeAPI/internal/dnsclients/dnstypes" "github.com/TeaOSLab/EdgeAPI/internal/dnsclients/dnstypes"
"github.com/TeaOSLab/EdgeAPI/internal/goman" "github.com/TeaOSLab/EdgeAPI/internal/goman"
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
"github.com/TeaOSLab/EdgeAPI/internal/utils" "github.com/TeaOSLab/EdgeAPI/internal/utils"
"github.com/TeaOSLab/EdgeCommon/pkg/dnsconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/dnsconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
@@ -21,41 +20,42 @@ import (
func init() { func init() {
dbs.OnReadyDone(func() { dbs.OnReadyDone(func() {
goman.New(func() { goman.New(func() {
NewDNSTaskExecutor().Start() NewDNSTaskExecutor(10 * time.Second).Start()
}) })
}) })
} }
// DNSTaskExecutor DNS任务执行器 // DNSTaskExecutor DNS任务执行器
type DNSTaskExecutor struct { type DNSTaskExecutor struct {
BaseTask
ticker *time.Ticker
} }
func NewDNSTaskExecutor() *DNSTaskExecutor { func NewDNSTaskExecutor(duration time.Duration) *DNSTaskExecutor {
return &DNSTaskExecutor{} return &DNSTaskExecutor{
ticker: time.NewTicker(duration),
}
} }
func (this *DNSTaskExecutor) Start() { func (this *DNSTaskExecutor) Start() {
ticker := time.NewTicker(10 * time.Second) for range this.ticker.C {
for range ticker.C { err := this.Loop()
err := this.LoopWithLocker(10)
if err != nil { if err != nil {
remotelogs.Error("DNSTaskExecutor", err.Error()) this.logErr("DNSTaskExecutor", err.Error())
} }
} }
} }
func (this *DNSTaskExecutor) LoopWithLocker(seconds int64) error { func (this *DNSTaskExecutor) Loop() error {
ok, err := models.SharedSysLockerDAO.Lock(nil, "dns_task_executor", seconds-1) // 假设执行时间为1秒 if !models.SharedAPINodeDAO.CheckAPINodeIsPrimaryWithoutErr() {
if err != nil {
return err
}
if !ok {
return nil return nil
} }
return this.Loop()
return this.loop()
} }
func (this *DNSTaskExecutor) Loop() error { func (this *DNSTaskExecutor) loop() error {
tasks, err := dnsmodels.SharedDNSTaskDAO.FindAllDoingTasks(nil) tasks, err := dnsmodels.SharedDNSTaskDAO.FindAllDoingTasks(nil)
if err != nil { if err != nil {
return err return err
@@ -118,7 +118,7 @@ func (this *DNSTaskExecutor) doServer(taskId int64, oldClusterId int64, serverId
if isOk { if isOk {
err := dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskDone(tx, taskId) err := dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskDone(tx, taskId)
if err != nil { if err != nil {
remotelogs.Error("DNSTaskExecutor", err.Error()) this.logErr("DNSTaskExecutor", err.Error())
} }
} }
}() }()
@@ -275,7 +275,7 @@ func (this *DNSTaskExecutor) doNode(taskId int64, nodeId int64) error {
if isOk { if isOk {
err := dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskDone(nil, taskId) err := dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskDone(nil, taskId)
if err != nil { if err != nil {
remotelogs.Error("DNSTaskExecutor", err.Error()) this.logErr("DNSTaskExecutor", err.Error())
} }
} }
}() }()
@@ -314,7 +314,7 @@ func (this *DNSTaskExecutor) doCluster(taskId int64, clusterId int64) error {
if isOk { if isOk {
err := dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskDone(nil, taskId) err := dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskDone(nil, taskId)
if err != nil { if err != nil {
remotelogs.Error("DNSTaskExecutor", err.Error()) this.logErr("DNSTaskExecutor", err.Error())
} }
} }
}() }()
@@ -519,7 +519,7 @@ func (this *DNSTaskExecutor) doClusterRemove(taskId int64, clusterId int64, doma
if isOk { if isOk {
err := dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskDone(nil, taskId) err := dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskDone(nil, taskId)
if err != nil { if err != nil {
remotelogs.Error("DNSTaskExecutor", err.Error()) this.logErr("DNSTaskExecutor", err.Error())
} }
} }
}() }()
@@ -603,7 +603,7 @@ func (this *DNSTaskExecutor) doDomainWithTask(taskId int64, domainId int64) erro
if taskId > 0 { if taskId > 0 {
err := dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskDone(tx, taskId) err := dnsmodels.SharedDNSTaskDAO.UpdateDNSTaskDone(tx, taskId)
if err != nil { if err != nil {
remotelogs.Error("DNSTaskExecutor", err.Error()) this.logErr("DNSTaskExecutor", err.Error())
} }
} }
} }
@@ -634,7 +634,7 @@ func (this *DNSTaskExecutor) doDomainWithTask(taskId int64, domainId int64) erro
manager := dnsclients.FindProvider(provider.Type) manager := dnsclients.FindProvider(provider.Type)
if manager == nil { if manager == nil {
remotelogs.Error("DNSTaskExecutor", "unsupported dns provider type '"+provider.Type+"'") this.logErr("DNSTaskExecutor", "unsupported dns provider type '"+provider.Type+"'")
isOk = true isOk = true
return nil return nil
} }
@@ -711,7 +711,7 @@ func (this *DNSTaskExecutor) findDNSManagerWithDomainId(tx *dbs.Tx, domainId int
var manager = dnsclients.FindProvider(provider.Type) var manager = dnsclients.FindProvider(provider.Type)
if manager == nil { if manager == nil {
remotelogs.Error("DNSTaskExecutor", "unsupported dns provider type '"+provider.Type+"'") this.logErr("DNSTaskExecutor", "unsupported dns provider type '"+provider.Type+"'")
return nil, nil, nil return nil, nil, nil
} }
params, err := provider.DecodeAPIParams() params, err := provider.DecodeAPIParams()

View File

@@ -1,15 +1,17 @@
package tasks package tasks_test
import ( import (
"github.com/TeaOSLab/EdgeAPI/internal/tasks"
"github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/dbs"
"testing" "testing"
"time"
) )
func TestDNSTaskExecutor_Loop(t *testing.T) { func TestDNSTaskExecutor_Loop(t *testing.T) {
dbs.NotifyReady() dbs.NotifyReady()
executor := NewDNSTaskExecutor() var task = tasks.NewDNSTaskExecutor(10 * time.Second)
err := executor.Loop() err := task.Loop()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@@ -4,50 +4,41 @@ import (
"github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/goman" "github.com/TeaOSLab/EdgeAPI/internal/goman"
"github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/logs"
"time" "time"
) )
func init() { func init() {
dbs.OnReadyDone(func() { dbs.OnReadyDone(func() {
looper := NewEventLooper()
goman.New(func() { goman.New(func() {
looper.Start() NewEventLooper(2 * time.Second).Start()
}) })
}) })
} }
// EventLooper 事件相关处理程序 // EventLooper 事件相关处理程序
type EventLooper struct { type EventLooper struct {
BaseTask
ticker *time.Ticker
} }
func NewEventLooper() *EventLooper { func NewEventLooper(duration time.Duration) *EventLooper {
return &EventLooper{} return &EventLooper{
ticker: time.NewTicker(duration),
}
} }
func (this *EventLooper) Start() { func (this *EventLooper) Start() {
ticker := time.NewTicker(2 * time.Second) for range this.ticker.C {
for range ticker.C { err := this.Loop()
err := this.loop()
if err != nil { if err != nil {
logs.Println("[EVENT_LOOPER]" + err.Error()) this.logErr("EventLooper", err.Error())
} }
} }
} }
func (this *EventLooper) loop() error { func (this *EventLooper) Loop() error {
lockerKey := "eventLooper" if !models.SharedAPINodeDAO.CheckAPINodeIsPrimaryWithoutErr() {
isOk, err := models.SharedSysLockerDAO.Lock(nil, lockerKey, 3600)
if err != nil {
return err
}
defer func() {
err = models.SharedSysLockerDAO.Unlock(nil, lockerKey)
if err != nil {
logs.Println("[EVENT_LOOPER]" + err.Error())
}
}()
if !isOk {
return nil return nil
} }
@@ -58,12 +49,12 @@ func (this *EventLooper) loop() error {
for _, eventOne := range events { for _, eventOne := range events {
event, err := eventOne.DecodeEvent() event, err := eventOne.DecodeEvent()
if err != nil { if err != nil {
logs.Println("[EVENT_LOOPER]" + err.Error()) this.logErr("EventLooper", err.Error())
continue continue
} }
err = event.Run() err = event.Run()
if err != nil { if err != nil {
logs.Println("[EVENT_LOOPER]" + err.Error()) this.logErr("EventLooper", err.Error())
continue continue
} }
err = models.SharedSysEventDAO.DeleteEvent(nil, int64(eventOne.Id)) err = models.SharedSysEventDAO.DeleteEvent(nil, int64(eventOne.Id))

View File

@@ -9,14 +9,14 @@ import (
"github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils" "github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/systemconfigs"
"github.com/iwind/TeaGo/logs"
"github.com/iwind/TeaGo/maps" "github.com/iwind/TeaGo/maps"
"time" "time"
) )
// HealthCheckClusterTask 单个集群的健康检查任务 // HealthCheckClusterTask 单个集群的健康检查任务
type HealthCheckClusterTask struct { type HealthCheckClusterTask struct {
BaseTask
clusterId int64 clusterId int64
config *serverconfigs.HealthCheckConfig config *serverconfigs.HealthCheckConfig
ticker *utils.Ticker ticker *utils.Ticker
@@ -37,12 +37,12 @@ func (this *HealthCheckClusterTask) Reset(config *serverconfigs.HealthCheckConfi
// 检查是否有变化 // 检查是否有变化
oldJSON, err := json.Marshal(this.config) oldJSON, err := json.Marshal(this.config)
if err != nil { if err != nil {
logs.Println("[TASK][HEALTH_CHECK]" + err.Error()) this.logErr("HealthCheckClusterTask", err.Error())
return return
} }
newJSON, err := json.Marshal(config) newJSON, err := json.Marshal(config)
if err != nil { if err != nil {
logs.Println("[TASK][HEALTH_CHECK]" + err.Error()) this.logErr("HealthCheckClusterTask", err.Error())
return return
} }
if bytes.Compare(oldJSON, newJSON) != 0 { if bytes.Compare(oldJSON, newJSON) != 0 {
@@ -71,9 +71,9 @@ func (this *HealthCheckClusterTask) Run() {
ticker := utils.NewTicker(duration) ticker := utils.NewTicker(duration)
goman.New(func() { goman.New(func() {
for ticker.Wait() { for ticker.Wait() {
err := this.loop(int64(duration.Seconds())) err := this.Loop()
if err != nil { if err != nil {
logs.Println("[TASK][HEALTH_CHECK]" + err.Error()) this.logErr("HealthCheckClusterTask", err.Error())
} }
} }
}) })
@@ -89,33 +89,21 @@ func (this *HealthCheckClusterTask) Stop() {
this.ticker = nil this.ticker = nil
} }
// 单个循环任务 // Loop 单个循环任务
func (this *HealthCheckClusterTask) loop(seconds int64) error { func (this *HealthCheckClusterTask) Loop() error {
// 检查上次运行时间,防止重复运行 // 检查是否为主节点
settingKey := systemconfigs.SettingCodeClusterHealthCheck + "Loop" + numberutils.FormatInt64(this.clusterId) if !models.SharedAPINodeDAO.CheckAPINodeIsPrimaryWithoutErr() {
timestamp := time.Now().Unix()
c, err := models.SharedSysSettingDAO.CompareInt64Setting(nil, settingKey, timestamp-seconds)
if err != nil {
return err
}
if c > 0 {
return nil return nil
} }
// 记录时间
err = models.SharedSysSettingDAO.UpdateSetting(nil, settingKey, []byte(numberutils.FormatInt64(timestamp)))
if err != nil {
return err
}
// 开始运行 // 开始运行
executor := NewHealthCheckExecutor(this.clusterId) var executor = NewHealthCheckExecutor(this.clusterId)
results, err := executor.Run() results, err := executor.Run()
if err != nil { if err != nil {
return err return err
} }
failedResults := []maps.Map{} var failedResults = []maps.Map{}
for _, result := range results { for _, result := range results {
if !result.IsOk { if !result.IsOk {
failedResults = append(failedResults, maps.Map{ failedResults = append(failedResults, maps.Map{
@@ -139,7 +127,7 @@ func (this *HealthCheckClusterTask) loop(seconds int64) error {
if err != nil { if err != nil {
return err return err
} }
message := "有" + numberutils.FormatInt(len(failedResults)) + "个节点在健康检查中出现问题" var message = "有" + numberutils.FormatInt(len(failedResults)) + "个节点在健康检查中出现问题"
err = models.NewMessageDAO().CreateClusterMessage(nil, nodeconfigs.NodeRoleNode, this.clusterId, models.MessageTypeHealthCheckFailed, models.MessageLevelError, message, message, failedResultsJSON) err = models.NewMessageDAO().CreateClusterMessage(nil, nodeconfigs.NodeRoleNode, this.clusterId, models.MessageTypeHealthCheckFailed, models.MessageLevelError, message, message, failedResultsJSON)
if err != nil { if err != nil {
return err return err

View File

@@ -1,14 +1,15 @@
package tasks package tasks_test
import ( import (
"github.com/TeaOSLab/EdgeAPI/internal/tasks"
"github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/dbs"
"testing" "testing"
) )
func TestHealthCheckClusterTask_loop(t *testing.T) { func TestHealthCheckClusterTask_Loop(t *testing.T) {
dbs.NotifyReady() dbs.NotifyReady()
task := NewHealthCheckClusterTask(10, nil) var task = tasks.NewHealthCheckClusterTask(10, nil)
err := task.loop(10) err := task.Loop()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@@ -7,7 +7,6 @@ import (
teaconst "github.com/TeaOSLab/EdgeAPI/internal/const" teaconst "github.com/TeaOSLab/EdgeAPI/internal/const"
"github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/errors" "github.com/TeaOSLab/EdgeAPI/internal/errors"
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
"github.com/TeaOSLab/EdgeAPI/internal/utils" "github.com/TeaOSLab/EdgeAPI/internal/utils"
"github.com/TeaOSLab/EdgeCommon/pkg/configutils" "github.com/TeaOSLab/EdgeCommon/pkg/configutils"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
@@ -25,6 +24,8 @@ import (
) )
type HealthCheckExecutor struct { type HealthCheckExecutor struct {
BaseTask
clusterId int64 clusterId int64
} }
@@ -154,7 +155,7 @@ func (this *HealthCheckExecutor) runNode(healthCheckConfig *serverconfigs.Health
if teaconst.IsPlus { if teaconst.IsPlus {
isChanged, err := models.SharedNodeIPAddressDAO.UpdateAddressHealthCount(nil, result.NodeAddrId, result.IsOk, healthCheckConfig.CountUp, healthCheckConfig.CountDown, healthCheckConfig.AutoDown) isChanged, err := models.SharedNodeIPAddressDAO.UpdateAddressHealthCount(nil, result.NodeAddrId, result.IsOk, healthCheckConfig.CountUp, healthCheckConfig.CountDown, healthCheckConfig.AutoDown)
if err != nil { if err != nil {
remotelogs.Error("HEALTH_CHECK_EXECUTOR", err.Error()) this.logErr("HealthCheckExecutor", err.Error())
return return
} }
@@ -175,14 +176,14 @@ func (this *HealthCheckExecutor) runNode(healthCheckConfig *serverconfigs.Health
err = models.NewMessageDAO().CreateNodeMessage(nil, nodeconfigs.NodeRoleNode, this.clusterId, int64(result.Node.Id), messageType, messageLevel, message, message, nil, false) err = models.NewMessageDAO().CreateNodeMessage(nil, nodeconfigs.NodeRoleNode, this.clusterId, int64(result.Node.Id), messageType, messageLevel, message, message, nil, false)
if err != nil { if err != nil {
remotelogs.Error("HEALTH_CHECK_EXECUTOR", err.Error()) this.logErr("HealthCheckExecutor", err.Error())
return return
} }
// 触发阈值 // 触发阈值
err = models.SharedNodeIPAddressDAO.FireThresholds(nil, nodeconfigs.NodeRoleNode, int64(result.Node.Id)) err = models.SharedNodeIPAddressDAO.FireThresholds(nil, nodeconfigs.NodeRoleNode, int64(result.Node.Id))
if err != nil { if err != nil {
remotelogs.Error("HEALTH_CHECK_EXECUTOR", err.Error()) this.logErr("HealthCheckExecutor", err.Error())
return return
} }
} }
@@ -195,7 +196,7 @@ func (this *HealthCheckExecutor) runNode(healthCheckConfig *serverconfigs.Health
if healthCheckConfig.AutoDown { if healthCheckConfig.AutoDown {
isChanged, err := models.SharedNodeDAO.UpdateNodeUpCount(nil, int64(result.Node.Id), result.IsOk, healthCheckConfig.CountUp, healthCheckConfig.CountDown) isChanged, err := models.SharedNodeDAO.UpdateNodeUpCount(nil, int64(result.Node.Id), result.IsOk, healthCheckConfig.CountUp, healthCheckConfig.CountDown)
if err != nil { if err != nil {
remotelogs.Error("HEALTH_CHECK_EXECUTOR", err.Error()) this.logErr("HealthCheckExecutor", err.Error())
} else if isChanged { } else if isChanged {
// 通知恢复或下线 // 通知恢复或下线
if result.IsOk { if result.IsOk {

View File

@@ -1,10 +1,11 @@
//go:build plus //go:build plus
// +build plus // +build plus
package tasks package tasks_test
import ( import (
teaconst "github.com/TeaOSLab/EdgeAPI/internal/const" teaconst "github.com/TeaOSLab/EdgeAPI/internal/const"
"github.com/TeaOSLab/EdgeAPI/internal/tasks"
"github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/dbs"
"testing" "testing"
) )
@@ -13,7 +14,7 @@ func TestHealthCheckExecutor_Run(t *testing.T) {
teaconst.IsPlus = true teaconst.IsPlus = true
dbs.NotifyReady() dbs.NotifyReady()
executor := NewHealthCheckExecutor(35) executor := tasks.NewHealthCheckExecutor(35)
results, err := executor.Run() results, err := executor.Run()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)

View File

@@ -5,50 +5,52 @@ import (
"encoding/json" "encoding/json"
"github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/goman" "github.com/TeaOSLab/EdgeAPI/internal/goman"
"github.com/TeaOSLab/EdgeAPI/internal/utils" "github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
"github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils" "github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/lists" "github.com/iwind/TeaGo/lists"
"github.com/iwind/TeaGo/logs"
"time" "time"
) )
func init() { func init() {
dbs.OnReadyDone(func() { dbs.OnReadyDone(func() {
goman.New(func() { goman.New(func() {
NewHealthCheckTask().Run() NewHealthCheckTask(1 * time.Minute).Start()
}) })
}) })
} }
// HealthCheckTask 节点健康检查任务 // HealthCheckTask 节点健康检查任务
type HealthCheckTask struct { type HealthCheckTask struct {
BaseTask
ticker *time.Ticker
tasksMap map[int64]*HealthCheckClusterTask // taskId => task tasksMap map[int64]*HealthCheckClusterTask // taskId => task
} }
func NewHealthCheckTask() *HealthCheckTask { func NewHealthCheckTask(duration time.Duration) *HealthCheckTask {
return &HealthCheckTask{ return &HealthCheckTask{
ticker: time.NewTicker(duration),
tasksMap: map[int64]*HealthCheckClusterTask{}, tasksMap: map[int64]*HealthCheckClusterTask{},
} }
} }
func (this *HealthCheckTask) Run() { func (this *HealthCheckTask) Start() {
err := this.loop() err := this.Loop()
if err != nil { if err != nil {
logs.Println("[TASK][HEALTH_CHECK]" + err.Error()) this.logErr("HealthCheckTask", err.Error())
} }
ticker := utils.NewTicker(60 * time.Second) for range this.ticker.C {
for ticker.Wait() { err := this.Loop()
err := this.loop()
if err != nil { if err != nil {
logs.Println("[TASK][HEALTH_CHECK]" + err.Error()) this.logErr("HealthCheckTask", err.Error())
} }
} }
} }
func (this *HealthCheckTask) loop() error { func (this *HealthCheckTask) Loop() error {
clusters, err := models.NewNodeClusterDAO().FindAllEnableClusters(nil) clusters, err := models.NewNodeClusterDAO().FindAllEnableClusters(nil)
if err != nil { if err != nil {
return err return err
@@ -74,7 +76,7 @@ func (this *HealthCheckTask) loop() error {
if len(cluster.HealthCheck) > 0 { if len(cluster.HealthCheck) > 0 {
err = json.Unmarshal(cluster.HealthCheck, config) err = json.Unmarshal(cluster.HealthCheck, config)
if err != nil { if err != nil {
logs.Println("[TASK][HEALTH_CHECK]" + err.Error()) this.logErr("HealthCheckTask", err.Error())
continue continue
} }
} }
@@ -85,7 +87,7 @@ func (this *HealthCheckTask) loop() error {
newJSON, _ := json.Marshal(config) newJSON, _ := json.Marshal(config)
oldJSON, _ := json.Marshal(task.Config()) oldJSON, _ := json.Marshal(task.Config())
if bytes.Compare(oldJSON, newJSON) != 0 { if bytes.Compare(oldJSON, newJSON) != 0 {
logs.Println("[TASK][HEALTH_CHECK]update cluster '" + numberutils.FormatInt64(clusterId) + "'") remotelogs.Println("TASK", "[HealthCheckTask]update cluster '"+numberutils.FormatInt64(clusterId)+"'")
goman.New(func() { goman.New(func() {
task.Reset(config) task.Reset(config)
}) })

View File

@@ -5,49 +5,53 @@ import (
"fmt" "fmt"
"github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/goman" "github.com/TeaOSLab/EdgeAPI/internal/goman"
"github.com/TeaOSLab/EdgeAPI/internal/utils"
"github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils" "github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils"
"github.com/TeaOSLab/EdgeCommon/pkg/systemconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/systemconfigs"
"github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/logs"
"time" "time"
) )
func init() { func init() {
dbs.OnReadyDone(func() { dbs.OnReadyDone(func() {
goman.New(func() { goman.New(func() {
NewLogTask().Run() NewLogTask(24*time.Hour, 1*time.Minute).Start()
}) })
}) })
} }
type LogTask struct { type LogTask struct {
BaseTask
cleanTicker *time.Ticker
monitorTicker *time.Ticker
} }
func NewLogTask() *LogTask { func NewLogTask(cleanDuration time.Duration, monitorDuration time.Duration) *LogTask {
return &LogTask{} return &LogTask{
cleanTicker: time.NewTicker(cleanDuration),
monitorTicker: time.NewTicker(monitorDuration),
}
} }
func (this *LogTask) Run() { func (this *LogTask) Start() {
goman.New(func() { goman.New(func() {
this.runClean() this.RunClean()
}) })
goman.New(func() { goman.New(func() {
this.runMonitor() this.RunMonitor()
}) })
} }
func (this *LogTask) runClean() { func (this *LogTask) RunClean() {
var ticker = utils.NewTicker(24 * time.Hour) for range this.cleanTicker.C {
for ticker.Wait() { err := this.LoopClean()
err := this.loopClean()
if err != nil { if err != nil {
logs.Println("[TASK][LOG]" + err.Error()) this.logErr("LogTask", err.Error())
} }
} }
} }
func (this *LogTask) loopClean() error { func (this *LogTask) LoopClean() error {
var configKey = "adminLogConfig" var configKey = "adminLogConfig"
valueJSON, err := models.SharedSysSettingDAO.ReadSetting(nil, configKey) valueJSON, err := models.SharedSysSettingDAO.ReadSetting(nil, configKey)
if err != nil { if err != nil {
@@ -71,34 +75,21 @@ func (this *LogTask) loopClean() error {
return nil return nil
} }
func (this *LogTask) runMonitor() { func (this *LogTask) RunMonitor() {
var ticker = utils.NewTicker(1 * time.Minute) for range this.monitorTicker.C {
for ticker.Wait() { err := this.LoopMonitor()
err := this.loopMonitor(60)
if err != nil { if err != nil {
logs.Println("[TASK][LOG]" + err.Error()) this.logErr("LogTask", err.Error())
} }
} }
} }
func (this *LogTask) loopMonitor(seconds int64) error { func (this *LogTask) LoopMonitor() error {
// 检查上次运行时间,防止重复运行 // 检查是否为主节点
var settingKey = "logTaskMonitorLoop" if !models.SharedAPINodeDAO.CheckAPINodeIsPrimaryWithoutErr() {
var timestamp = time.Now().Unix()
c, err := models.SharedSysSettingDAO.CompareInt64Setting(nil, settingKey, timestamp-seconds)
if err != nil {
return err
}
if c > 0 {
return nil return nil
} }
// 记录时间
err = models.SharedSysSettingDAO.UpdateSetting(nil, settingKey, []byte(numberutils.FormatInt64(timestamp)))
if err != nil {
return err
}
var configKey = "adminLogConfig" var configKey = "adminLogConfig"
valueJSON, err := models.SharedSysSettingDAO.ReadSetting(nil, configKey) valueJSON, err := models.SharedSysSettingDAO.ReadSetting(nil, configKey)
if err != nil { if err != nil {

View File

@@ -1,26 +1,28 @@
package tasks package tasks_test
import ( import (
"github.com/TeaOSLab/EdgeAPI/internal/tasks"
"github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/dbs"
"testing" "testing"
"time"
) )
func TestLogTask_loopClean(t *testing.T) { func TestLogTask_LoopClean(t *testing.T) {
dbs.NotifyReady() dbs.NotifyReady()
task := NewLogTask() var task = tasks.NewLogTask(24*time.Hour, 1*time.Minute)
err := task.loopClean() err := task.LoopClean()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
t.Log("ok") t.Log("ok")
} }
func TestLogTask_loopMonitor(t *testing.T) { func TestLogTask_LoopMonitor(t *testing.T) {
dbs.NotifyReady() dbs.NotifyReady()
task := NewLogTask() var task = tasks.NewLogTask(24*time.Hour, 1*time.Minute)
err := task.loopMonitor(10) err := task.LoopMonitor()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@@ -3,42 +3,44 @@ package tasks
import ( import (
"github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/goman" "github.com/TeaOSLab/EdgeAPI/internal/goman"
"github.com/TeaOSLab/EdgeAPI/internal/utils"
"github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/logs"
"time" "time"
) )
func init() { func init() {
dbs.OnReadyDone(func() { dbs.OnReadyDone(func() {
goman.New(func() { goman.New(func() {
NewMessageTask().Run() NewMessageTask(24 * time.Hour).Start()
}) })
}) })
} }
// MessageTask 消息相关任务 // MessageTask 消息相关任务
type MessageTask struct { type MessageTask struct {
BaseTask
ticker *time.Ticker
} }
// NewMessageTask 获取新对象 // NewMessageTask 获取新对象
func NewMessageTask() *MessageTask { func NewMessageTask(duration time.Duration) *MessageTask {
return &MessageTask{} return &MessageTask{
ticker: time.NewTicker(duration),
}
} }
// Run 运行 // Start 开始运行
func (this *MessageTask) Run() { func (this *MessageTask) Start() {
ticker := utils.NewTicker(24 * time.Hour) for range this.ticker.C {
for ticker.Wait() { err := this.Loop()
err := this.loop()
if err != nil { if err != nil {
logs.Println("[TASK][MESSAGE]" + err.Error()) this.logErr("MessageTask", err.Error())
} }
} }
} }
// 单次运行 // Loop 单次运行
func (this *MessageTask) loop() error { func (this *MessageTask) Loop() error {
dayTime := time.Now().AddDate(0, 0, -30) // TODO 这个30天应该可以在界面上设置 dayTime := time.Now().AddDate(0, 0, -30) // TODO 这个30天应该可以在界面上设置
return models.NewMessageDAO().DeleteMessagesBeforeDay(nil, dayTime) return models.NewMessageDAO().DeleteMessagesBeforeDay(nil, dayTime)
} }

View File

@@ -5,7 +5,6 @@ package tasks
import ( import (
"github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/goman" "github.com/TeaOSLab/EdgeAPI/internal/goman"
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
"github.com/iwind/TeaGo/Tea" "github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/dbs"
"time" "time"
@@ -14,29 +13,35 @@ import (
func init() { func init() {
dbs.OnReadyDone(func() { dbs.OnReadyDone(func() {
goman.New(func() { goman.New(func() {
NewMonitorItemValueTask().Start() NewMonitorItemValueTask(1 * time.Hour).Start()
}) })
}) })
} }
// MonitorItemValueTask 节点监控数值任务 // MonitorItemValueTask 节点监控数值任务
type MonitorItemValueTask struct { type MonitorItemValueTask struct {
BaseTask
ticker *time.Ticker
} }
// NewMonitorItemValueTask 获取新对象 // NewMonitorItemValueTask 获取新对象
func NewMonitorItemValueTask() *MonitorItemValueTask { func NewMonitorItemValueTask(duration time.Duration) *MonitorItemValueTask {
return &MonitorItemValueTask{} var ticker = time.NewTicker(duration)
}
func (this *MonitorItemValueTask) Start() {
ticker := time.NewTicker(1 * time.Hour)
if Tea.IsTesting() { if Tea.IsTesting() {
ticker = time.NewTicker(1 * time.Minute) ticker = time.NewTicker(1 * time.Minute)
} }
for range ticker.C {
return &MonitorItemValueTask{
ticker: ticker,
}
}
func (this *MonitorItemValueTask) Start() {
for range this.ticker.C {
err := this.Loop() err := this.Loop()
if err != nil { if err != nil {
remotelogs.Error("MonitorItemValueTask", err.Error()) this.logErr("MonitorItemValueTask", err.Error())
} }
} }
} }

View File

@@ -1,16 +1,18 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved. // Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package tasks package tasks_test
import ( import (
"github.com/TeaOSLab/EdgeAPI/internal/tasks"
"github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/dbs"
"testing" "testing"
"time"
) )
func TestMonitorItemValueTask_Loop(t *testing.T) { func TestMonitorItemValueTask_Loop(t *testing.T) {
dbs.NotifyReady() dbs.NotifyReady()
task := NewMonitorItemValueTask() var task = tasks.NewMonitorItemValueTask(1 * time.Minute)
err := task.Loop() err := task.Loop()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)

View File

@@ -4,40 +4,40 @@ import (
"github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/goman" "github.com/TeaOSLab/EdgeAPI/internal/goman"
"github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/logs"
"time" "time"
) )
func init() { func init() {
dbs.OnReadyDone(func() { dbs.OnReadyDone(func() {
goman.New(func() { goman.New(func() {
NewNodeLogCleanerTask().Start() NewNodeLogCleanerTask(24 * time.Hour).Start()
}) })
}) })
} }
// NodeLogCleanerTask 清理节点日志的任务 // NodeLogCleanerTask 清理节点日志的任务
type NodeLogCleanerTask struct { type NodeLogCleanerTask struct {
duration time.Duration BaseTask
ticker *time.Ticker
} }
func NewNodeLogCleanerTask() *NodeLogCleanerTask { func NewNodeLogCleanerTask(duration time.Duration) *NodeLogCleanerTask {
return &NodeLogCleanerTask{ return &NodeLogCleanerTask{
duration: 24 * time.Hour, ticker: time.NewTicker(duration),
} }
} }
func (this *NodeLogCleanerTask) Start() { func (this *NodeLogCleanerTask) Start() {
ticker := time.NewTicker(this.duration) for range this.ticker.C {
for range ticker.C { err := this.Loop()
err := this.loop()
if err != nil { if err != nil {
logs.Println("[TASK]" + err.Error()) this.logErr("NodeLogCleanerTask", err.Error())
} }
} }
} }
func (this *NodeLogCleanerTask) loop() error { func (this *NodeLogCleanerTask) Loop() error {
// 删除 N天 以前的info日志 // 删除 N天 以前的info日志
err := models.SharedNodeLogDAO.DeleteExpiredLogsWithLevel(nil, "info", 3) err := models.SharedNodeLogDAO.DeleteExpiredLogsWithLevel(nil, "info", 3)
if err != nil { if err != nil {

View File

@@ -1,15 +1,17 @@
package tasks package tasks_test
import ( import (
"github.com/TeaOSLab/EdgeAPI/internal/tasks"
"github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/dbs"
"testing" "testing"
"time"
) )
func TestNodeLogCleaner_loop(t *testing.T) { func TestNodeLogCleaner_loop(t *testing.T) {
dbs.NotifyReady() dbs.NotifyReady()
cleaner := &NodeLogCleanerTask{} var cleaner = tasks.NewNodeLogCleanerTask(24 * time.Hour)
err := cleaner.loop() err := cleaner.Loop()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@@ -3,11 +3,8 @@ package tasks
import ( import (
"github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/goman" "github.com/TeaOSLab/EdgeAPI/internal/goman"
"github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/systemconfigs"
"github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/logs"
"github.com/iwind/TeaGo/types" "github.com/iwind/TeaGo/types"
"strings" "strings"
"time" "time"
@@ -15,63 +12,51 @@ import (
func init() { func init() {
dbs.OnReadyDone(func() { dbs.OnReadyDone(func() {
var task = NewNodeMonitorTask(60)
var ticker = time.NewTicker(60 * time.Second)
goman.New(func() { goman.New(func() {
for range ticker.C { NewNodeMonitorTask(1 * time.Minute).Start()
err := task.loop()
if err != nil {
logs.Println("[TASK][NODE_MONITOR]" + err.Error())
}
}
}) })
}) })
} }
// NodeMonitorTask 边缘节点监控任务 // NodeMonitorTask 边缘节点监控任务
type NodeMonitorTask struct { type NodeMonitorTask struct {
intervalSeconds int BaseTask
ticker *time.Ticker
inactiveMap map[string]int // cluster@nodeId => count inactiveMap map[string]int // cluster@nodeId => count
notifiedMap map[int64]int64 // nodeId => timestamp notifiedMap map[int64]int64 // nodeId => timestamp
} }
func NewNodeMonitorTask(intervalSeconds int) *NodeMonitorTask { func NewNodeMonitorTask(duration time.Duration) *NodeMonitorTask {
return &NodeMonitorTask{ return &NodeMonitorTask{
intervalSeconds: intervalSeconds, ticker: time.NewTicker(duration),
inactiveMap: map[string]int{}, inactiveMap: map[string]int{},
notifiedMap: map[int64]int64{}, notifiedMap: map[int64]int64{},
} }
} }
func (this *NodeMonitorTask) Run() { func (this *NodeMonitorTask) Start() {
for range this.ticker.C {
err := this.Loop()
if err != nil {
this.logErr("NodeMonitorTask", err.Error())
}
}
} }
func (this *NodeMonitorTask) loop() error { func (this *NodeMonitorTask) Loop() error {
// 检查上次运行时间,防止重复运行 // 检查是否为主节点
settingKey := systemconfigs.SettingCodeNodeMonitor + "Loop" if !models.SharedAPINodeDAO.CheckAPINodeIsPrimaryWithoutErr() {
timestamp := time.Now().Unix()
c, err := models.SharedSysSettingDAO.CompareInt64Setting(nil, settingKey, timestamp-int64(this.intervalSeconds))
if err != nil {
return err
}
if c > 0 {
return nil return nil
} }
// 记录时间
err = models.SharedSysSettingDAO.UpdateSetting(nil, settingKey, []byte(numberutils.FormatInt64(timestamp)))
if err != nil {
return err
}
clusters, err := models.SharedNodeClusterDAO.FindAllEnableClusters(nil) clusters, err := models.SharedNodeClusterDAO.FindAllEnableClusters(nil)
if err != nil { if err != nil {
return err return err
} }
for _, cluster := range clusters { for _, cluster := range clusters {
err := this.monitorCluster(cluster) err := this.MonitorCluster(cluster)
if err != nil { if err != nil {
return err return err
} }
@@ -80,8 +65,8 @@ func (this *NodeMonitorTask) loop() error {
return nil return nil
} }
func (this *NodeMonitorTask) monitorCluster(cluster *models.NodeCluster) error { func (this *NodeMonitorTask) MonitorCluster(cluster *models.NodeCluster) error {
clusterId := int64(cluster.Id) var clusterId = int64(cluster.Id)
// 检查离线节点 // 检查离线节点
inactiveNodes, err := models.SharedNodeDAO.FindAllInactiveNodesWithClusterId(nil, clusterId) inactiveNodes, err := models.SharedNodeDAO.FindAllInactiveNodesWithClusterId(nil, clusterId)

View File

@@ -1,16 +1,18 @@
package tasks package tasks_test
import ( import (
"github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/tasks"
"github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/dbs"
"testing" "testing"
"time"
) )
func TestNodeMonitorTask_loop(t *testing.T) { func TestNodeMonitorTask_loop(t *testing.T) {
dbs.NotifyReady() dbs.NotifyReady()
var task = NewNodeMonitorTask(60) var task = tasks.NewNodeMonitorTask(60 * time.Second)
err := task.loop() err := task.Loop()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -19,9 +21,9 @@ func TestNodeMonitorTask_loop(t *testing.T) {
func TestNodeMonitorTask_Monitor(t *testing.T) { func TestNodeMonitorTask_Monitor(t *testing.T) {
dbs.NotifyReady() dbs.NotifyReady()
var task = NewNodeMonitorTask(60) var task = tasks.NewNodeMonitorTask(60 * time.Second)
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
err := task.monitorCluster(&models.NodeCluster{ err := task.MonitorCluster(&models.NodeCluster{
Id: 42, Id: 42,
}) })
if err != nil { if err != nil {

View File

@@ -5,49 +5,49 @@ import (
"github.com/TeaOSLab/EdgeAPI/internal/goman" "github.com/TeaOSLab/EdgeAPI/internal/goman"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/logs"
"time" "time"
) )
func init() { func init() {
dbs.OnReadyDone(func() { dbs.OnReadyDone(func() {
goman.New(func() { goman.New(func() {
NewNodeTaskExtractor().Start() NewNodeTaskExtractor(10 * time.Second).Start()
}) })
}) })
} }
// NodeTaskExtractor 节点任务 // NodeTaskExtractor 节点任务
type NodeTaskExtractor struct { type NodeTaskExtractor struct {
BaseTask
ticker *time.Ticker
} }
func NewNodeTaskExtractor() *NodeTaskExtractor { func NewNodeTaskExtractor(duration time.Duration) *NodeTaskExtractor {
return &NodeTaskExtractor{} return &NodeTaskExtractor{
ticker: time.NewTicker(duration),
}
} }
func (this *NodeTaskExtractor) Start() { func (this *NodeTaskExtractor) Start() {
ticker := time.NewTicker(10 * time.Second) for range this.ticker.C {
for range ticker.C {
err := this.Loop() err := this.Loop()
if err != nil { if err != nil {
logs.Println("[TASK][NODE_TASK_EXTRACTOR]" + err.Error()) this.logErr("NodeTaskExtractor", err.Error())
} }
} }
} }
func (this *NodeTaskExtractor) Loop() error { func (this *NodeTaskExtractor) Loop() error {
ok, err := models.SharedSysLockerDAO.Lock(nil, "node_task_extractor", 10-1) // 假设执行时间为1秒 // 检查是否为主节点
if err != nil { if !models.SharedAPINodeDAO.CheckAPINodeIsPrimaryWithoutErr() {
return err
}
if !ok {
return nil return nil
} }
// 这里不解锁是为了让任务N秒钟之内只运行一次 // 这里不解锁是为了让任务N秒钟之内只运行一次
for _, role := range []string{nodeconfigs.NodeRoleNode, nodeconfigs.NodeRoleDNS} { for _, role := range []string{nodeconfigs.NodeRoleNode, nodeconfigs.NodeRoleDNS} {
err = models.SharedNodeTaskDAO.ExtractAllClusterTasks(nil, role) err := models.SharedNodeTaskDAO.ExtractAllClusterTasks(nil, role)
if err != nil { if err != nil {
return err return err
} }

View File

@@ -3,62 +3,47 @@ package tasks
import ( import (
"github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/goman" "github.com/TeaOSLab/EdgeAPI/internal/goman"
"github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/systemconfigs"
"github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/logs"
"time" "time"
) )
func init() { func init() {
dbs.OnReadyDone(func() { dbs.OnReadyDone(func() {
task := NewNSNodeMonitorTask(60)
ticker := time.NewTicker(60 * time.Second)
goman.New(func() { goman.New(func() {
for range ticker.C { NewNSNodeMonitorTask(1 * time.Minute).Start()
err := task.loop()
if err != nil {
logs.Println("[TASK][NS_NODE_MONITOR]" + err.Error())
}
}
}) })
}) })
} }
// NSNodeMonitorTask 边缘节点监控任务 // NSNodeMonitorTask 边缘节点监控任务
type NSNodeMonitorTask struct { type NSNodeMonitorTask struct {
intervalSeconds int BaseTask
ticker *time.Ticker
} }
func NewNSNodeMonitorTask(intervalSeconds int) *NSNodeMonitorTask { func NewNSNodeMonitorTask(duration time.Duration) *NSNodeMonitorTask {
return &NSNodeMonitorTask{ return &NSNodeMonitorTask{
intervalSeconds: intervalSeconds, ticker: time.NewTicker(duration),
} }
} }
func (this *NSNodeMonitorTask) Run() { func (this *NSNodeMonitorTask) Start() {
for range this.ticker.C {
err := this.Loop()
if err != nil {
this.logErr("NS_NODE_MONITOR", err.Error())
}
}
} }
func (this *NSNodeMonitorTask) loop() error { func (this *NSNodeMonitorTask) Loop() error {
// 检查上次运行时间,防止重复运行 // 检查是否为主节点
settingKey := systemconfigs.SettingCodeNSNodeMonitor + "Loop" if !models.SharedAPINodeDAO.CheckAPINodeIsPrimaryWithoutErr() {
timestamp := time.Now().Unix()
c, err := models.SharedSysSettingDAO.CompareInt64Setting(nil, settingKey, timestamp-int64(this.intervalSeconds))
if err != nil {
return err
}
if c > 0 {
return nil return nil
} }
// 记录时间
err = models.SharedSysSettingDAO.UpdateSetting(nil, settingKey, []byte(numberutils.FormatInt64(timestamp)))
if err != nil {
return err
}
clusters, err := models.SharedNSClusterDAO.FindAllEnabledClusters(nil) clusters, err := models.SharedNSClusterDAO.FindAllEnabledClusters(nil)
if err != nil { if err != nil {
return err return err

View File

@@ -7,7 +7,6 @@ import (
"github.com/TeaOSLab/EdgeAPI/internal/goman" "github.com/TeaOSLab/EdgeAPI/internal/goman"
"github.com/TeaOSLab/EdgeCommon/pkg/systemconfigs" "github.com/TeaOSLab/EdgeCommon/pkg/systemconfigs"
"github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/logs"
timeutil "github.com/iwind/TeaGo/utils/time" timeutil "github.com/iwind/TeaGo/utils/time"
"regexp" "regexp"
"strings" "strings"
@@ -16,27 +15,30 @@ import (
func init() { func init() {
dbs.OnReadyDone(func() { dbs.OnReadyDone(func() {
task := NewServerAccessLogCleaner()
goman.New(func() { goman.New(func() {
task.Start() NewServerAccessLogCleaner(12 * time.Hour).Start()
}) })
}) })
} }
// ServerAccessLogCleaner 服务访问日志自动清理 // ServerAccessLogCleaner 服务访问日志自动清理
type ServerAccessLogCleaner struct { type ServerAccessLogCleaner struct {
BaseTask
ticker *time.Ticker
} }
func NewServerAccessLogCleaner() *ServerAccessLogCleaner { func NewServerAccessLogCleaner(duration time.Duration) *ServerAccessLogCleaner {
return &ServerAccessLogCleaner{} return &ServerAccessLogCleaner{
ticker: time.NewTicker(duration),
}
} }
func (this *ServerAccessLogCleaner) Start() { func (this *ServerAccessLogCleaner) Start() {
ticker := time.NewTicker(12 * time.Hour) for range this.ticker.C {
for range ticker.C {
err := this.Loop() err := this.Loop()
if err != nil { if err != nil {
logs.Println("[TASK][ServerAccessLogCleaner]Error: " + err.Error()) this.logErr("[TASK][ServerAccessLogCleaner]", err.Error())
} }
} }
} }

View File

@@ -4,12 +4,13 @@ import (
"github.com/TeaOSLab/EdgeAPI/internal/tasks" "github.com/TeaOSLab/EdgeAPI/internal/tasks"
"github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/dbs"
"testing" "testing"
"time"
) )
func TestServerAccessLogCleaner_Loop(t *testing.T) { func TestServerAccessLogCleaner_Loop(t *testing.T) {
dbs.NotifyReady() dbs.NotifyReady()
task := tasks.NewServerAccessLogCleaner() var task = tasks.NewServerAccessLogCleaner(24 * time.Hour)
err := task.Loop() err := task.Loop()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)

View File

@@ -4,9 +4,7 @@ import (
"github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/db/models/acme" "github.com/TeaOSLab/EdgeAPI/internal/db/models/acme"
"github.com/TeaOSLab/EdgeAPI/internal/goman" "github.com/TeaOSLab/EdgeAPI/internal/goman"
"github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils"
"github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/logs"
"github.com/iwind/TeaGo/maps" "github.com/iwind/TeaGo/maps"
timeutil "github.com/iwind/TeaGo/utils/time" timeutil "github.com/iwind/TeaGo/utils/time"
"strconv" "strconv"
@@ -16,50 +14,41 @@ import (
func init() { func init() {
dbs.OnReadyDone(func() { dbs.OnReadyDone(func() {
goman.New(func() { goman.New(func() {
NewSSLCertExpireCheckExecutor().Start() NewSSLCertExpireCheckExecutor(1 * time.Hour).Start()
}) })
}) })
} }
// SSLCertExpireCheckExecutor 证书检查任务 // SSLCertExpireCheckExecutor 证书检查任务
type SSLCertExpireCheckExecutor struct { type SSLCertExpireCheckExecutor struct {
BaseTask
ticker *time.Ticker
} }
func NewSSLCertExpireCheckExecutor() *SSLCertExpireCheckExecutor { func NewSSLCertExpireCheckExecutor(duration time.Duration) *SSLCertExpireCheckExecutor {
return &SSLCertExpireCheckExecutor{} return &SSLCertExpireCheckExecutor{
ticker: time.NewTicker(duration),
}
} }
// Start 启动任务 // Start 启动任务
func (this *SSLCertExpireCheckExecutor) Start() { func (this *SSLCertExpireCheckExecutor) Start() {
seconds := int64(3600) for range this.ticker.C {
ticker := time.NewTicker(time.Duration(seconds) * time.Second) err := this.Loop()
for range ticker.C {
err := this.loop(seconds)
if err != nil { if err != nil {
logs.Println("[ERROR][SSLCertExpireCheckExecutor]" + err.Error()) this.logErr("SSLCertExpireCheckExecutor", err.Error())
} }
} }
} }
// 单次执行 // Loop 单次执行
func (this *SSLCertExpireCheckExecutor) loop(seconds int64) error { func (this *SSLCertExpireCheckExecutor) Loop() error {
// 检查上次运行时间,防止重复运行 // 检查是否为主节点
settingKey := "sslCertExpiringCheckLoop" if !models.SharedAPINodeDAO.CheckAPINodeIsPrimaryWithoutErr() {
timestamp := time.Now().Unix()
c, err := models.SharedSysSettingDAO.CompareInt64Setting(nil, settingKey, timestamp-seconds)
if err != nil {
return err
}
if c > 0 {
return nil return nil
} }
// 记录时间
err = models.SharedSysSettingDAO.UpdateSetting(nil, settingKey, []byte(numberutils.FormatInt64(timestamp)))
if err != nil {
return err
}
// 查找需要自动更新的证书 // 查找需要自动更新的证书
// 30, 14 ... 是到期的天数 // 30, 14 ... 是到期的天数
for _, days := range []int{30, 14, 7} { for _, days := range []int{30, 14, 7} {

View File

@@ -1,6 +1,7 @@
package tasks package tasks_test
import ( import (
"github.com/TeaOSLab/EdgeAPI/internal/tasks"
"github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/dbs"
timeutil "github.com/iwind/TeaGo/utils/time" timeutil "github.com/iwind/TeaGo/utils/time"
"testing" "testing"
@@ -16,8 +17,8 @@ func TestSSLCertExpireCheckExecutor_loop(t *testing.T) {
t.Log("3 days later: ", timeutil.FormatTime("Y-m-d", time.Now().Unix()+3*86400), time.Now().Unix()+3*86400) t.Log("3 days later: ", timeutil.FormatTime("Y-m-d", time.Now().Unix()+3*86400), time.Now().Unix()+3*86400)
t.Log("today: ", timeutil.FormatTime("Y-m-d", time.Now().Unix()), time.Now().Unix()) t.Log("today: ", timeutil.FormatTime("Y-m-d", time.Now().Unix()), time.Now().Unix())
executor := NewSSLCertExpireCheckExecutor() var task = tasks.NewSSLCertExpireCheckExecutor(1 * time.Hour)
err := executor.loop(0) err := task.Loop()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@@ -23,41 +23,38 @@ import (
func init() { func init() {
dbs.OnReadyDone(func() { dbs.OnReadyDone(func() {
goman.New(func() { goman.New(func() {
NewSSLCertUpdateOCSPTask().Start() NewSSLCertUpdateOCSPTask(1 * time.Minute).Start()
}) })
}) })
} }
type SSLCertUpdateOCSPTask struct { type SSLCertUpdateOCSPTask struct {
BaseTask
ticker *time.Ticker ticker *time.Ticker
httpClient *http.Client httpClient *http.Client
} }
func NewSSLCertUpdateOCSPTask() *SSLCertUpdateOCSPTask { func NewSSLCertUpdateOCSPTask(duration time.Duration) *SSLCertUpdateOCSPTask {
return &SSLCertUpdateOCSPTask{ return &SSLCertUpdateOCSPTask{
ticker: time.NewTicker(1 * time.Minute), ticker: time.NewTicker(duration),
httpClient: utils.SharedHttpClient(5 * time.Second), httpClient: utils.SharedHttpClient(5 * time.Second),
} }
} }
func (this *SSLCertUpdateOCSPTask) Start() { func (this *SSLCertUpdateOCSPTask) Start() {
for range this.ticker.C { for range this.ticker.C {
err := this.Loop(true) err := this.Loop()
if err != nil { if err != nil {
remotelogs.Error("SSLCertUpdateOCSPTask", err.Error()) this.logErr("SSLCertUpdateOCSPTask", err.Error())
} }
} }
} }
func (this *SSLCertUpdateOCSPTask) Loop(checkLock bool) error { func (this *SSLCertUpdateOCSPTask) Loop() error {
if checkLock { // 检查是否为主节点
ok, err := models.SharedSysLockerDAO.Lock(nil, "ssl_cert_update_ocsp_task", 60-1) // 假设执行时间为1秒 if !models.SharedAPINodeDAO.CheckAPINodeIsPrimaryWithoutErr() {
if err != nil { return nil
return err
}
if !ok {
return nil
}
} }
var tx *dbs.Tx var tx *dbs.Tx

View File

@@ -6,13 +6,14 @@ import (
"github.com/TeaOSLab/EdgeAPI/internal/tasks" "github.com/TeaOSLab/EdgeAPI/internal/tasks"
"github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/dbs"
"testing" "testing"
"time"
) )
func TestSSLCertUpdateOCSPTask_Loop(t *testing.T) { func TestSSLCertUpdateOCSPTask_Loop(t *testing.T) {
dbs.NotifyReady() dbs.NotifyReady()
var task = tasks.NewSSLCertUpdateOCSPTask() var task = tasks.NewSSLCertUpdateOCSPTask(1 * time.Minute)
err := task.Loop(false) err := task.Loop()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@@ -0,0 +1,12 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn .
package tasks
import "github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
type BaseTask struct {
}
func (this *BaseTask) logErr(taskType string, errString string) {
remotelogs.Error("TASK", "run '"+taskType+"' failed: "+errString)
}

View File

@@ -0,0 +1,9 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn .
package tasks
type TaskInterface interface {
Start() error
Loop() error
Stop() error
}