mirror of
https://gitee.com/dromara/mayfly-go
synced 2025-11-03 16:00:25 +08:00
fix: file文件缺失
This commit is contained in:
@@ -1,262 +0,0 @@
|
||||
package runner
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
const minTimerDelay = time.Millisecond * 1
|
||||
const maxTimerDelay = time.Nanosecond * math.MaxInt64
|
||||
|
||||
type DelayQueue[T Delayable] struct {
|
||||
enqueuedSignal chan struct{}
|
||||
dequeuedSignal chan struct{}
|
||||
transferChan chan T
|
||||
singleDequeue chan struct{}
|
||||
mutex sync.Mutex
|
||||
priorityQueue *PriorityQueue[T]
|
||||
zero T
|
||||
}
|
||||
|
||||
type Delayable interface {
|
||||
GetDeadline() time.Time
|
||||
GetKey() string
|
||||
}
|
||||
|
||||
var _ Delayable = (*wrapper[Job])(nil)
|
||||
|
||||
type wrapper[T Job] struct {
|
||||
key string
|
||||
deadline time.Time
|
||||
removed bool
|
||||
status JobStatus
|
||||
job T
|
||||
}
|
||||
|
||||
func newWrapper[T Job](job T) *wrapper[T] {
|
||||
return &wrapper[T]{
|
||||
key: job.GetKey(),
|
||||
job: job,
|
||||
}
|
||||
}
|
||||
|
||||
func (d *wrapper[T]) GetDeadline() time.Time {
|
||||
return d.deadline
|
||||
}
|
||||
|
||||
func (d *wrapper[T]) GetKey() string {
|
||||
return d.key
|
||||
}
|
||||
|
||||
func NewDelayQueue[T Delayable](cap int) *DelayQueue[T] {
|
||||
singleDequeue := make(chan struct{}, 1)
|
||||
singleDequeue <- struct{}{}
|
||||
return &DelayQueue[T]{
|
||||
enqueuedSignal: make(chan struct{}),
|
||||
dequeuedSignal: make(chan struct{}),
|
||||
transferChan: make(chan T),
|
||||
singleDequeue: singleDequeue,
|
||||
priorityQueue: NewPriorityQueue[T](cap, func(src T, dst T) bool {
|
||||
return src.GetDeadline().Before(dst.GetDeadline())
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *DelayQueue[T]) TryDequeue() (T, bool) {
|
||||
s.mutex.Lock()
|
||||
defer s.mutex.Unlock()
|
||||
|
||||
if elm, ok := s.priorityQueue.Peek(0); ok {
|
||||
delay := elm.GetDeadline().Sub(time.Now())
|
||||
if delay < minTimerDelay {
|
||||
// 无需延迟,头部元素出队后直接返回
|
||||
_, _ = s.dequeue()
|
||||
return elm, true
|
||||
}
|
||||
}
|
||||
return s.zero, false
|
||||
}
|
||||
|
||||
func (s *DelayQueue[T]) Dequeue(ctx context.Context) (T, bool) {
|
||||
// 出队锁:避免因重复获取队列头部同一元素降低性能
|
||||
select {
|
||||
case <-s.singleDequeue:
|
||||
defer func() {
|
||||
s.singleDequeue <- struct{}{}
|
||||
}()
|
||||
case <-ctx.Done():
|
||||
return s.zero, false
|
||||
}
|
||||
|
||||
for {
|
||||
// 全局锁:避免入队和出队信号的重置与激活出现并发问题
|
||||
s.mutex.Lock()
|
||||
if ctx.Err() != nil {
|
||||
s.mutex.Unlock()
|
||||
return s.zero, false
|
||||
}
|
||||
|
||||
// 接收直接转发的不需要延迟的新元素
|
||||
select {
|
||||
case elm := <-s.transferChan:
|
||||
s.mutex.Unlock()
|
||||
return elm, true
|
||||
default:
|
||||
}
|
||||
|
||||
// 延迟时间缺省值为 maxTimerDelay, 表示队列为空
|
||||
delay := maxTimerDelay
|
||||
if elm, ok := s.priorityQueue.Peek(0); ok {
|
||||
now := time.Now()
|
||||
delay = elm.GetDeadline().Sub(now)
|
||||
if delay < minTimerDelay {
|
||||
// 无需延迟,头部元素出队后直接返回
|
||||
_, _ = s.dequeue()
|
||||
s.mutex.Unlock()
|
||||
return elm, ok
|
||||
}
|
||||
}
|
||||
// 重置入队信号,避免历史信号干扰
|
||||
select {
|
||||
case <-s.enqueuedSignal:
|
||||
default:
|
||||
}
|
||||
s.mutex.Unlock()
|
||||
|
||||
if delay == maxTimerDelay {
|
||||
// 队列为空, 等待新元素
|
||||
select {
|
||||
case elm := <-s.transferChan:
|
||||
return elm, true
|
||||
case <-s.enqueuedSignal:
|
||||
continue
|
||||
case <-ctx.Done():
|
||||
return s.zero, false
|
||||
}
|
||||
} else if delay >= minTimerDelay {
|
||||
// 等待时间到期或新元素加入
|
||||
timer := time.NewTimer(delay)
|
||||
select {
|
||||
case <-timer.C:
|
||||
continue
|
||||
case elm := <-s.transferChan:
|
||||
timer.Stop()
|
||||
return elm, true
|
||||
case <-s.enqueuedSignal:
|
||||
timer.Stop()
|
||||
continue
|
||||
case <-ctx.Done():
|
||||
timer.Stop()
|
||||
return s.zero, false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *DelayQueue[T]) dequeue() (T, bool) {
|
||||
elm, ok := s.priorityQueue.Dequeue()
|
||||
if !ok {
|
||||
return s.zero, false
|
||||
}
|
||||
select {
|
||||
case s.dequeuedSignal <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
return elm, true
|
||||
}
|
||||
|
||||
func (s *DelayQueue[T]) enqueue(val T) bool {
|
||||
if ok := s.priorityQueue.Enqueue(val); !ok {
|
||||
return false
|
||||
}
|
||||
select {
|
||||
case s.enqueuedSignal <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (s *DelayQueue[T]) TryEnqueue(val T) bool {
|
||||
s.mutex.Lock()
|
||||
defer s.mutex.Unlock()
|
||||
|
||||
if s.priorityQueue.IsFull() {
|
||||
return false
|
||||
}
|
||||
return s.enqueue(val)
|
||||
}
|
||||
|
||||
func (s *DelayQueue[T]) Enqueue(ctx context.Context, val T) bool {
|
||||
for {
|
||||
// 全局锁:避免入队和出队信号的重置与激活出现并发问题
|
||||
s.mutex.Lock()
|
||||
|
||||
if ctx.Err() != nil {
|
||||
s.mutex.Unlock()
|
||||
return false
|
||||
}
|
||||
|
||||
// 如果队列未满,入队后直接返回
|
||||
if !s.priorityQueue.IsFull() {
|
||||
s.enqueue(val)
|
||||
s.mutex.Unlock()
|
||||
return true
|
||||
}
|
||||
// 队列已满,重置出队信号,避免受到历史信号影响
|
||||
select {
|
||||
case <-s.dequeuedSignal:
|
||||
default:
|
||||
}
|
||||
s.mutex.Unlock()
|
||||
|
||||
if delay := val.GetDeadline().Sub(time.Now()); delay >= minTimerDelay {
|
||||
// 新元素需要延迟,等待退出信号、出队信号和到期信号
|
||||
timer := time.NewTimer(delay)
|
||||
select {
|
||||
case <-timer.C:
|
||||
// 新元素不再需要延迟
|
||||
case <-s.dequeuedSignal:
|
||||
// 收到出队信号,从头开始尝试入队
|
||||
timer.Stop()
|
||||
continue
|
||||
case <-ctx.Done():
|
||||
timer.Stop()
|
||||
return false
|
||||
}
|
||||
} else {
|
||||
// 新元素不需要延迟,等待转发成功信号、出队信号和退出信号
|
||||
select {
|
||||
case s.transferChan <- val:
|
||||
// 新元素转发成功,直接返回(避免队列满且元素未到期导致新元素长时间无法入队)
|
||||
return true
|
||||
case <-s.dequeuedSignal:
|
||||
// 收到出队信号,从头开始尝试入队
|
||||
continue
|
||||
case <-ctx.Done():
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *DelayQueue[T]) Remove(_ context.Context, key string) (T, bool) {
|
||||
s.mutex.Lock()
|
||||
defer s.mutex.Unlock()
|
||||
|
||||
return s.priorityQueue.Remove(s.index(key))
|
||||
}
|
||||
|
||||
func (s *DelayQueue[T]) index(key string) int {
|
||||
for i := 0; i < s.priorityQueue.Len(); i++ {
|
||||
elm, ok := s.priorityQueue.Peek(i)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if key == elm.GetKey() {
|
||||
return i
|
||||
}
|
||||
}
|
||||
return -1
|
||||
}
|
||||
@@ -1,425 +0,0 @@
|
||||
package runner
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"math/rand"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
var _ Delayable = &delayElement{}
|
||||
|
||||
type delayElement struct {
|
||||
id uint64
|
||||
value int
|
||||
deadline time.Time
|
||||
}
|
||||
|
||||
func (elm *delayElement) GetDeadline() time.Time {
|
||||
return elm.deadline
|
||||
}
|
||||
|
||||
func (elm *delayElement) GetId() uint64 {
|
||||
return elm.id
|
||||
}
|
||||
|
||||
func (elm *delayElement) GetKey() string {
|
||||
return strconv.FormatUint(elm.id, 16)
|
||||
}
|
||||
|
||||
type testDelayQueue = DelayQueue[*delayElement]
|
||||
|
||||
func newTestDelayQueue(cap int) *testDelayQueue {
|
||||
return NewDelayQueue[*delayElement](cap)
|
||||
}
|
||||
|
||||
func mustEnqueue(val int, delay int64) func(t *testing.T, queue *testDelayQueue) {
|
||||
return func(t *testing.T, queue *testDelayQueue) {
|
||||
require.True(t, queue.Enqueue(context.Background(),
|
||||
newTestElm(val, delay)))
|
||||
}
|
||||
}
|
||||
|
||||
func newTestElm(value int, delay int64) *delayElement {
|
||||
return &delayElement{
|
||||
id: elmId.Add(1),
|
||||
value: value,
|
||||
deadline: time.Now().Add(time.Millisecond * time.Duration(delay)),
|
||||
}
|
||||
}
|
||||
|
||||
var elmId atomic.Uint64
|
||||
|
||||
func TestDelayQueue_Enqueue(t *testing.T) {
|
||||
type testCase[R int, T Delayable] struct {
|
||||
name string
|
||||
queue *DelayQueue[T]
|
||||
before func(t *testing.T, queue *DelayQueue[T])
|
||||
while func(t *testing.T, queue *DelayQueue[T])
|
||||
after func(t *testing.T, queue *DelayQueue[T])
|
||||
value int
|
||||
delay int64
|
||||
timeout int64
|
||||
wantOk bool
|
||||
}
|
||||
tests := []testCase[int, *delayElement]{
|
||||
{
|
||||
name: "enqueue to empty queue",
|
||||
queue: newTestDelayQueue(1),
|
||||
after: func(t *testing.T, queue *testDelayQueue) {
|
||||
val, ok := queue.priorityQueue.Dequeue()
|
||||
require.True(t, ok)
|
||||
require.Equal(t, 1, val.value)
|
||||
},
|
||||
timeout: 10,
|
||||
value: 1,
|
||||
wantOk: true,
|
||||
},
|
||||
{
|
||||
name: "enqueue active element to full queue",
|
||||
queue: newTestDelayQueue(1),
|
||||
before: func(t *testing.T, queue *testDelayQueue) {
|
||||
mustEnqueue(1, 60)(t, queue)
|
||||
},
|
||||
timeout: 40,
|
||||
delay: 20,
|
||||
wantOk: false,
|
||||
},
|
||||
{
|
||||
name: "enqueue inactive element to full queue",
|
||||
queue: newTestDelayQueue(1),
|
||||
before: mustEnqueue(1, 60),
|
||||
timeout: 20,
|
||||
delay: 40,
|
||||
wantOk: false,
|
||||
},
|
||||
{
|
||||
name: "enqueue to full queue while dequeue valid element",
|
||||
queue: newTestDelayQueue(1),
|
||||
before: mustEnqueue(1, 60),
|
||||
while: func(t *testing.T, queue *testDelayQueue) {
|
||||
_, ok := queue.Dequeue(context.Background())
|
||||
require.True(t, ok)
|
||||
},
|
||||
timeout: 80,
|
||||
wantOk: true,
|
||||
},
|
||||
{
|
||||
name: "enqueue active element to full queue while dequeue invalid element",
|
||||
queue: newTestDelayQueue(1),
|
||||
before: mustEnqueue(1, 60),
|
||||
while: func(t *testing.T, queue *testDelayQueue) {
|
||||
elm, ok := queue.Dequeue(context.Background())
|
||||
require.True(t, ok)
|
||||
require.Equal(t, 2, elm.value)
|
||||
},
|
||||
timeout: 40,
|
||||
value: 2,
|
||||
delay: 20,
|
||||
wantOk: true,
|
||||
},
|
||||
{
|
||||
name: "enqueue inactive element to full queue while dequeue invalid element",
|
||||
queue: newTestDelayQueue(1),
|
||||
before: mustEnqueue(1, 60),
|
||||
while: func(t *testing.T, queue *testDelayQueue) {
|
||||
_, ok := queue.Dequeue(context.Background())
|
||||
require.True(t, ok)
|
||||
},
|
||||
timeout: 20,
|
||||
delay: 40,
|
||||
wantOk: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(),
|
||||
time.Millisecond*time.Duration(tt.timeout))
|
||||
defer cancel()
|
||||
if tt.before != nil {
|
||||
tt.before(t, tt.queue)
|
||||
}
|
||||
if tt.while != nil {
|
||||
go tt.while(t, tt.queue)
|
||||
}
|
||||
ok := tt.queue.Enqueue(ctx, newTestElm(tt.value, tt.delay))
|
||||
require.Equal(t, tt.wantOk, ok)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestDelayQueue_Dequeue(t *testing.T) {
|
||||
type testCase[R int, T Delayable] struct {
|
||||
name string
|
||||
queue *DelayQueue[T]
|
||||
before func(t *testing.T, queue *DelayQueue[T])
|
||||
while func(t *testing.T, queue *DelayQueue[T])
|
||||
timeout int64
|
||||
wantVal int
|
||||
wantOk bool
|
||||
}
|
||||
tests := []testCase[int, *delayElement]{
|
||||
{
|
||||
name: "dequeue from empty queue",
|
||||
queue: newTestDelayQueue(1),
|
||||
timeout: 20,
|
||||
wantOk: false,
|
||||
},
|
||||
{
|
||||
name: "dequeue new active element from empty queue",
|
||||
queue: newTestDelayQueue(1),
|
||||
while: mustEnqueue(1, 20),
|
||||
timeout: 4000,
|
||||
wantVal: 1,
|
||||
wantOk: true,
|
||||
},
|
||||
{
|
||||
name: "dequeue new inactive element from empty queue",
|
||||
queue: newTestDelayQueue(1),
|
||||
while: mustEnqueue(1, 60),
|
||||
timeout: 20,
|
||||
wantOk: false,
|
||||
},
|
||||
{
|
||||
name: "dequeue active element from full queue",
|
||||
queue: newTestDelayQueue(1),
|
||||
before: mustEnqueue(1, 60),
|
||||
timeout: 80,
|
||||
wantVal: 1,
|
||||
wantOk: true,
|
||||
},
|
||||
{
|
||||
name: "dequeue inactive element from full queue",
|
||||
queue: newTestDelayQueue(1),
|
||||
before: mustEnqueue(1, 60),
|
||||
timeout: 20,
|
||||
wantOk: false,
|
||||
},
|
||||
{
|
||||
name: "dequeue new active element from full queue",
|
||||
queue: newTestDelayQueue(1),
|
||||
before: mustEnqueue(1, 60),
|
||||
while: mustEnqueue(2, 40),
|
||||
timeout: 80,
|
||||
wantVal: 2,
|
||||
wantOk: true,
|
||||
},
|
||||
{
|
||||
name: "dequeue new inactive element from full queue",
|
||||
queue: newTestDelayQueue(1),
|
||||
before: mustEnqueue(1, 60),
|
||||
while: mustEnqueue(2, 40),
|
||||
timeout: 20,
|
||||
wantOk: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(),
|
||||
time.Millisecond*time.Duration(tt.timeout))
|
||||
defer cancel()
|
||||
if tt.before != nil {
|
||||
tt.before(t, tt.queue)
|
||||
}
|
||||
if tt.while != nil {
|
||||
go tt.while(t, tt.queue)
|
||||
}
|
||||
got, ok := tt.queue.Dequeue(ctx)
|
||||
require.Equal(t, tt.wantOk, ok)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
require.Equal(t, tt.wantVal, got.value)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestDelayQueue(t *testing.T) {
|
||||
const delay = 1000
|
||||
const timeout = 1000
|
||||
const capacity = 100
|
||||
const count = 100
|
||||
var wg sync.WaitGroup
|
||||
var (
|
||||
enqueueSeq atomic.Int32
|
||||
dequeueSeq atomic.Int32
|
||||
checksum atomic.Int64
|
||||
)
|
||||
queue := newTestDelayQueue(capacity)
|
||||
procs := runtime.GOMAXPROCS(0)
|
||||
wg.Add(procs)
|
||||
for i := 0; i < procs; i++ {
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
for {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*timeout)
|
||||
if i%2 == 0 {
|
||||
if seq := int(enqueueSeq.Add(1)); seq <= count {
|
||||
for ctx.Err() == nil {
|
||||
if ok := queue.Enqueue(ctx, newTestElm(seq, int64(rand.Intn(delay)))); ok {
|
||||
break
|
||||
}
|
||||
}
|
||||
} else {
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
} else {
|
||||
if seq := int(dequeueSeq.Add(1)); seq > count {
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
for ctx.Err() == nil {
|
||||
if elm, ok := queue.Dequeue(ctx); ok {
|
||||
require.Less(t, elm.GetDeadline().Sub(time.Now()), minTimerDelay)
|
||||
checksum.Add(int64(elm.value))
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
cancel()
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
assert.Zero(t, queue.priorityQueue.Len())
|
||||
assert.Equal(t, int64((1+count)*count/2), checksum.Load())
|
||||
}
|
||||
|
||||
func BenchmarkDelayQueueV3(b *testing.B) {
|
||||
const delay = 0
|
||||
const capacity = 100
|
||||
|
||||
b.Run("enqueue", func(b *testing.B) {
|
||||
queue := newTestDelayQueue(b.N)
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_ = queue.Enqueue(context.Background(), newTestElm(1, delay))
|
||||
}
|
||||
})
|
||||
|
||||
b.Run("parallel to enqueue", func(b *testing.B) {
|
||||
queue := newTestDelayQueue(b.N)
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
_ = queue.Enqueue(context.Background(), newTestElm(1, delay))
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
b.Run("dequeue", func(b *testing.B) {
|
||||
queue := newTestDelayQueue(b.N)
|
||||
for i := 0; i < b.N; i++ {
|
||||
require.True(b, queue.Enqueue(context.Background(), newTestElm(1, delay)))
|
||||
}
|
||||
time.Sleep(time.Millisecond * delay)
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, _ = queue.Dequeue(context.Background())
|
||||
}
|
||||
})
|
||||
|
||||
b.Run("parallel to dequeue", func(b *testing.B) {
|
||||
queue := newTestDelayQueue(b.N)
|
||||
for i := 0; i < b.N; i++ {
|
||||
require.True(b, queue.Enqueue(context.Background(), newTestElm(1, delay)))
|
||||
}
|
||||
time.Sleep(time.Millisecond * delay)
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
_, _ = queue.Dequeue(context.Background())
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
b.Run("parallel to dequeue while enqueue", func(b *testing.B) {
|
||||
queue := newTestDelayQueue(capacity)
|
||||
go func() {
|
||||
for i := 0; i < b.N; i++ {
|
||||
_ = queue.Enqueue(context.Background(), newTestElm(i, delay))
|
||||
}
|
||||
}()
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
_, _ = queue.Dequeue(context.Background())
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
b.Run("parallel to enqueue while dequeue", func(b *testing.B) {
|
||||
queue := newTestDelayQueue(capacity)
|
||||
go func() {
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, _ = queue.Dequeue(context.Background())
|
||||
}
|
||||
}()
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
_ = queue.Enqueue(context.Background(), newTestElm(1, delay))
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
b.Run("parallel to enqueue and dequeue", func(b *testing.B) {
|
||||
var wg sync.WaitGroup
|
||||
var (
|
||||
enqueueSeq atomic.Int32
|
||||
dequeueSeq atomic.Int32
|
||||
)
|
||||
queue := newTestDelayQueue(capacity)
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
procs := runtime.GOMAXPROCS(0)
|
||||
wg.Add(procs)
|
||||
for i := 0; i < procs; i++ {
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
for {
|
||||
if i%2 == 0 {
|
||||
if seq := int(enqueueSeq.Add(1)); seq <= b.N {
|
||||
for {
|
||||
if ok := queue.Enqueue(context.Background(), newTestElm(seq, delay)); ok {
|
||||
break
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return
|
||||
}
|
||||
} else {
|
||||
if seq := int(dequeueSeq.Add(1)); seq > b.N {
|
||||
return
|
||||
}
|
||||
for {
|
||||
if _, ok := queue.Dequeue(context.Background()); ok {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
})
|
||||
}
|
||||
@@ -1,136 +0,0 @@
|
||||
package runner
|
||||
|
||||
// PriorityQueue 是一个基于小顶堆的优先队列
|
||||
// 当capacity <= 0时,为无界队列,切片容量会动态扩缩容
|
||||
// 当capacity > 0 时,为有界队列,初始化后就固定容量,不会扩缩容
|
||||
type PriorityQueue[T any] struct {
|
||||
// 用于比较前一个元素是否小于后一个元素
|
||||
less Less[T]
|
||||
// 队列容量
|
||||
capacity int
|
||||
// 队列中的元素,为便于计算父子节点的index,0位置留空,根节点从1开始
|
||||
data []T
|
||||
|
||||
zero T
|
||||
}
|
||||
|
||||
func (p *PriorityQueue[T]) Len() int {
|
||||
return len(p.data) - 1
|
||||
}
|
||||
|
||||
// Cap 无界队列返回0,有界队列返回创建队列时设置的值
|
||||
func (p *PriorityQueue[T]) Cap() int {
|
||||
return p.capacity
|
||||
}
|
||||
|
||||
func (p *PriorityQueue[T]) IsBoundless() bool {
|
||||
return p.capacity <= 0
|
||||
}
|
||||
|
||||
func (p *PriorityQueue[T]) IsFull() bool {
|
||||
return p.capacity > 0 && len(p.data)-1 == p.capacity
|
||||
}
|
||||
|
||||
func (p *PriorityQueue[T]) IsEmpty() bool {
|
||||
return len(p.data) < 2
|
||||
}
|
||||
|
||||
func (p *PriorityQueue[T]) Peek(i int) (T, bool) {
|
||||
if p.IsEmpty() {
|
||||
return p.zero, false
|
||||
}
|
||||
if i >= p.Len() {
|
||||
return p.zero, false
|
||||
}
|
||||
return p.data[i+1], true
|
||||
}
|
||||
|
||||
func (p *PriorityQueue[T]) Enqueue(t T) bool {
|
||||
if p.IsFull() {
|
||||
return false
|
||||
}
|
||||
|
||||
p.data = append(p.data, t)
|
||||
node, parent := len(p.data)-1, (len(p.data)-1)/2
|
||||
for parent > 0 && p.less(p.data[node], p.data[parent]) {
|
||||
p.data[parent], p.data[node] = p.data[node], p.data[parent]
|
||||
node = parent
|
||||
parent = parent / 2
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (p *PriorityQueue[T]) Dequeue() (T, bool) {
|
||||
if p.IsEmpty() {
|
||||
return p.zero, false
|
||||
}
|
||||
|
||||
pop := p.data[1]
|
||||
// 假定说我拿到了堆顶,就是理论上优先级最低的
|
||||
// pop 的优先级
|
||||
p.data[1] = p.data[len(p.data)-1]
|
||||
p.data = p.data[:len(p.data)-1]
|
||||
p.shrinkIfNecessary()
|
||||
p.heapify(p.data, len(p.data)-1, 1)
|
||||
return pop, true
|
||||
}
|
||||
|
||||
func (p *PriorityQueue[T]) shrinkIfNecessary() {
|
||||
if !p.IsBoundless() {
|
||||
return
|
||||
}
|
||||
if cap(p.data) > 1024 && len(p.data)*3 < cap(p.data)*2 {
|
||||
data := make([]T, len(p.data), cap(p.data)*5/6)
|
||||
copy(data, p.data)
|
||||
p.data = data
|
||||
}
|
||||
}
|
||||
|
||||
func (p *PriorityQueue[T]) heapify(data []T, n, i int) {
|
||||
minPos := i
|
||||
for {
|
||||
if left := i * 2; left <= n && p.less(data[left], data[minPos]) {
|
||||
minPos = left
|
||||
}
|
||||
if right := i*2 + 1; right <= n && p.less(data[right], data[minPos]) {
|
||||
minPos = right
|
||||
}
|
||||
if minPos == i {
|
||||
break
|
||||
}
|
||||
data[i], data[minPos] = data[minPos], data[i]
|
||||
i = minPos
|
||||
}
|
||||
}
|
||||
|
||||
func (p *PriorityQueue[T]) Remove(i int) (T, bool) {
|
||||
if p.IsEmpty() || i >= p.Len() || i < 0 {
|
||||
return p.zero, false
|
||||
}
|
||||
|
||||
i += 1
|
||||
result := p.data[i]
|
||||
last := len(p.data) - 1
|
||||
p.data[i] = p.data[last]
|
||||
p.data = p.data[:last]
|
||||
p.shrinkIfNecessary()
|
||||
p.heapify(p.data, len(p.data)-1, i)
|
||||
return result, true
|
||||
}
|
||||
|
||||
// NewPriorityQueue 创建优先队列 capacity <= 0 时,为无界队列,否则有有界队列
|
||||
func NewPriorityQueue[T any](capacity int, less Less[T]) *PriorityQueue[T] {
|
||||
sliceCap := capacity + 1
|
||||
if capacity < 1 {
|
||||
capacity = 0
|
||||
sliceCap = 64
|
||||
}
|
||||
return &PriorityQueue[T]{
|
||||
capacity: capacity,
|
||||
data: make([]T, 1, sliceCap),
|
||||
less: less,
|
||||
}
|
||||
}
|
||||
|
||||
// Less 用于比较两个对象的大小 src < dst, 返回 true,src >= dst, 返回 false
|
||||
type Less[T any] func(src T, dst T) bool
|
||||
@@ -1,67 +0,0 @@
|
||||
package runner
|
||||
|
||||
import (
|
||||
"github.com/stretchr/testify/require"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestChangePriority(t *testing.T) {
|
||||
q := NewPriorityQueue[*priorityElement](100,
|
||||
func(src *priorityElement, dst *priorityElement) bool {
|
||||
return src.Priority < dst.Priority
|
||||
})
|
||||
e1 := &priorityElement{
|
||||
Data: 10,
|
||||
Priority: 200,
|
||||
}
|
||||
_ = q.Enqueue(e1)
|
||||
e2 := &priorityElement{
|
||||
Data: 10,
|
||||
Priority: 100,
|
||||
}
|
||||
_ = q.Enqueue(e2)
|
||||
//e1.Priority = 10
|
||||
val, _ := q.Dequeue()
|
||||
println(val)
|
||||
}
|
||||
|
||||
type priorityElement struct {
|
||||
Data any
|
||||
Priority int
|
||||
}
|
||||
|
||||
func TestPriorityQueue_Remove(t *testing.T) {
|
||||
q := NewPriorityQueue[*priorityElement](100,
|
||||
func(src *priorityElement, dst *priorityElement) bool {
|
||||
return src.Priority < dst.Priority
|
||||
})
|
||||
|
||||
for i := 8; i > 0; i-- {
|
||||
q.Enqueue(&priorityElement{Priority: i})
|
||||
}
|
||||
requirePriorities(t, q)
|
||||
|
||||
q.Remove(8)
|
||||
requirePriorities(t, q)
|
||||
q.Remove(7)
|
||||
requirePriorities(t, q)
|
||||
|
||||
q.Remove(2)
|
||||
requirePriorities(t, q)
|
||||
|
||||
q.Remove(1)
|
||||
requirePriorities(t, q)
|
||||
|
||||
q.Remove(0)
|
||||
requirePriorities(t, q)
|
||||
}
|
||||
|
||||
func requirePriorities(t *testing.T, q *PriorityQueue[*priorityElement]) {
|
||||
ps := make([]int, 0, q.Len())
|
||||
for _, val := range q.data[1:] {
|
||||
ps = append(ps, val.Priority)
|
||||
}
|
||||
for i := q.Len(); i >= 2; i-- {
|
||||
require.False(t, q.less(q.data[i], q.data[i/2]), ps)
|
||||
}
|
||||
}
|
||||
@@ -1,425 +0,0 @@
|
||||
package runner
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/emirpasic/gods/maps/linkedhashmap"
|
||||
"mayfly-go/pkg/logx"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrJobNotFound = errors.New("任务未找到")
|
||||
ErrJobExist = errors.New("任务已存在")
|
||||
ErrJobFinished = errors.New("任务已完成")
|
||||
ErrJobDisabled = errors.New("任务已禁用")
|
||||
ErrJobExpired = errors.New("任务已过期")
|
||||
ErrJobRunning = errors.New("任务执行中")
|
||||
)
|
||||
|
||||
type JobKey = string
|
||||
type RunJobFunc[T Job] func(ctx context.Context, job T) error
|
||||
type NextJobFunc[T Job] func() (T, bool)
|
||||
type RunnableJobFunc[T Job] func(job T, nextRunning NextJobFunc[T]) (bool, error)
|
||||
type ScheduleJobFunc[T Job] func(job T) (deadline time.Time, err error)
|
||||
type UpdateJobFunc[T Job] func(ctx context.Context, job T) error
|
||||
|
||||
type JobStatus int
|
||||
|
||||
const (
|
||||
JobUnknown JobStatus = iota
|
||||
JobDelaying
|
||||
JobWaiting
|
||||
JobRunning
|
||||
JobSuccess
|
||||
JobFailed
|
||||
)
|
||||
|
||||
type Job interface {
|
||||
GetKey() JobKey
|
||||
Update(job Job)
|
||||
SetStatus(status JobStatus, err error)
|
||||
SetEnabled(enabled bool, desc string)
|
||||
}
|
||||
|
||||
type iterator[T Job] struct {
|
||||
index int
|
||||
data []*wrapper[T]
|
||||
zero T
|
||||
}
|
||||
|
||||
func (iter *iterator[T]) Begin() {
|
||||
iter.index = -1
|
||||
}
|
||||
|
||||
func (iter *iterator[T]) Next() (T, bool) {
|
||||
if iter.index >= len(iter.data)-1 {
|
||||
return iter.zero, false
|
||||
}
|
||||
iter.index++
|
||||
return iter.data[iter.index].job, true
|
||||
}
|
||||
|
||||
type array[T Job] struct {
|
||||
size int
|
||||
data []*wrapper[T]
|
||||
}
|
||||
|
||||
func newArray[T Job](size int) *array[T] {
|
||||
return &array[T]{
|
||||
size: size,
|
||||
data: make([]*wrapper[T], 0, size),
|
||||
}
|
||||
}
|
||||
|
||||
func (a *array[T]) Iterator() *iterator[T] {
|
||||
return &iterator[T]{
|
||||
index: -1,
|
||||
data: a.data,
|
||||
}
|
||||
}
|
||||
|
||||
func (a *array[T]) Full() bool {
|
||||
return len(a.data) >= a.size
|
||||
}
|
||||
|
||||
func (a *array[T]) Append(job *wrapper[T]) bool {
|
||||
if len(a.data) >= a.size {
|
||||
return false
|
||||
}
|
||||
a.data = append(a.data, job)
|
||||
return true
|
||||
}
|
||||
|
||||
func (a *array[T]) Get(key JobKey) (*wrapper[T], bool) {
|
||||
for _, job := range a.data {
|
||||
if key == job.GetKey() {
|
||||
return job, true
|
||||
}
|
||||
}
|
||||
return nil, false
|
||||
}
|
||||
|
||||
func (a *array[T]) Remove(key JobKey) {
|
||||
length := len(a.data)
|
||||
for i, elm := range a.data {
|
||||
if key == elm.GetKey() {
|
||||
a.data[i], a.data[length-1] = a.data[length-1], nil
|
||||
a.data = a.data[:length-1]
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type Runner[T Job] struct {
|
||||
maxRunning int
|
||||
waiting *linkedhashmap.Map
|
||||
running *array[T]
|
||||
runJob RunJobFunc[T]
|
||||
runnableJob RunnableJobFunc[T]
|
||||
scheduleJob ScheduleJobFunc[T]
|
||||
updateJob UpdateJobFunc[T]
|
||||
mutex sync.Mutex
|
||||
wg sync.WaitGroup
|
||||
context context.Context
|
||||
cancel context.CancelFunc
|
||||
zero T
|
||||
signal chan struct{}
|
||||
all map[JobKey]*wrapper[T]
|
||||
delayQueue *DelayQueue[*wrapper[T]]
|
||||
}
|
||||
|
||||
type Opt[T Job] func(runner *Runner[T])
|
||||
|
||||
func WithRunnableJob[T Job](runnableJob RunnableJobFunc[T]) Opt[T] {
|
||||
return func(runner *Runner[T]) {
|
||||
runner.runnableJob = runnableJob
|
||||
}
|
||||
}
|
||||
|
||||
func WithScheduleJob[T Job](scheduleJob ScheduleJobFunc[T]) Opt[T] {
|
||||
return func(runner *Runner[T]) {
|
||||
runner.scheduleJob = scheduleJob
|
||||
}
|
||||
}
|
||||
|
||||
func WithUpdateJob[T Job](updateJob UpdateJobFunc[T]) Opt[T] {
|
||||
return func(runner *Runner[T]) {
|
||||
runner.updateJob = updateJob
|
||||
}
|
||||
}
|
||||
|
||||
func NewRunner[T Job](maxRunning int, runJob RunJobFunc[T], opts ...Opt[T]) *Runner[T] {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
runner := &Runner[T]{
|
||||
maxRunning: maxRunning,
|
||||
all: make(map[string]*wrapper[T], maxRunning),
|
||||
waiting: linkedhashmap.New(),
|
||||
running: newArray[T](maxRunning),
|
||||
context: ctx,
|
||||
cancel: cancel,
|
||||
signal: make(chan struct{}, 1),
|
||||
delayQueue: NewDelayQueue[*wrapper[T]](0),
|
||||
}
|
||||
runner.runJob = runJob
|
||||
runner.runnableJob = func(job T, _ NextJobFunc[T]) (bool, error) {
|
||||
return true, nil
|
||||
}
|
||||
runner.updateJob = func(ctx context.Context, job T) error {
|
||||
return nil
|
||||
}
|
||||
for _, opt := range opts {
|
||||
opt(runner)
|
||||
}
|
||||
|
||||
runner.wg.Add(maxRunning + 1)
|
||||
for i := 0; i < maxRunning; i++ {
|
||||
go runner.run()
|
||||
}
|
||||
go func() {
|
||||
defer runner.wg.Done()
|
||||
for runner.context.Err() == nil {
|
||||
wrap, ok := runner.delayQueue.Dequeue(ctx)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
runner.mutex.Lock()
|
||||
if old, ok := runner.all[wrap.key]; !ok || wrap != old {
|
||||
runner.mutex.Unlock()
|
||||
continue
|
||||
}
|
||||
runner.waiting.Put(wrap.key, wrap)
|
||||
wrap.status = JobWaiting
|
||||
runner.trigger()
|
||||
runner.mutex.Unlock()
|
||||
}
|
||||
}()
|
||||
return runner
|
||||
}
|
||||
|
||||
func (r *Runner[T]) Close() {
|
||||
r.cancel()
|
||||
r.wg.Wait()
|
||||
}
|
||||
|
||||
func (r *Runner[T]) run() {
|
||||
defer r.wg.Done()
|
||||
|
||||
for r.context.Err() == nil {
|
||||
select {
|
||||
case <-r.signal:
|
||||
wrap, ok := r.pickRunnableJob()
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
r.doRun(wrap)
|
||||
r.afterRun(wrap)
|
||||
case <-r.context.Done():
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Runner[T]) doRun(wrap *wrapper[T]) {
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
logx.Error(fmt.Sprintf("failed to run job: %v", err))
|
||||
time.Sleep(time.Millisecond * 10)
|
||||
}
|
||||
}()
|
||||
wrap.job.SetStatus(JobRunning, nil)
|
||||
if err := r.updateJob(r.context, wrap.job); err != nil {
|
||||
err = fmt.Errorf("任务状态保存失败: %w", err)
|
||||
wrap.job.SetStatus(JobFailed, err)
|
||||
_ = r.updateJob(r.context, wrap.job)
|
||||
return
|
||||
}
|
||||
runErr := r.runJob(r.context, wrap.job)
|
||||
if runErr != nil {
|
||||
wrap.job.SetStatus(JobFailed, runErr)
|
||||
} else {
|
||||
wrap.job.SetStatus(JobSuccess, nil)
|
||||
}
|
||||
if err := r.updateJob(r.context, wrap.job); err != nil {
|
||||
if runErr != nil {
|
||||
err = fmt.Errorf("任务状态保存失败: %w, %w", err, runErr)
|
||||
} else {
|
||||
err = fmt.Errorf("任务状态保存失败: %w", err)
|
||||
}
|
||||
wrap.job.SetStatus(JobFailed, err)
|
||||
_ = r.updateJob(r.context, wrap.job)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Runner[T]) afterRun(wrap *wrapper[T]) {
|
||||
r.mutex.Lock()
|
||||
defer r.mutex.Unlock()
|
||||
|
||||
r.running.Remove(wrap.key)
|
||||
delete(r.all, wrap.key)
|
||||
wrap.status = JobUnknown
|
||||
r.trigger()
|
||||
if wrap.removed {
|
||||
return
|
||||
}
|
||||
deadline, err := r.doScheduleJob(wrap.job, true)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
_ = r.schedule(r.context, deadline, wrap.job)
|
||||
}
|
||||
|
||||
func (r *Runner[T]) doScheduleJob(job T, finished bool) (time.Time, error) {
|
||||
if r.scheduleJob == nil {
|
||||
if finished {
|
||||
return time.Time{}, ErrJobFinished
|
||||
}
|
||||
return time.Now(), nil
|
||||
}
|
||||
return r.scheduleJob(job)
|
||||
}
|
||||
|
||||
func (r *Runner[T]) pickRunnableJob() (*wrapper[T], bool) {
|
||||
r.mutex.Lock()
|
||||
defer r.mutex.Unlock()
|
||||
|
||||
var disabled []JobKey
|
||||
iter := r.running.Iterator()
|
||||
var runnable *wrapper[T]
|
||||
ok := r.waiting.Any(func(key interface{}, value interface{}) bool {
|
||||
wrap := value.(*wrapper[T])
|
||||
iter.Begin()
|
||||
able, err := r.runnableJob(wrap.job, iter.Next)
|
||||
if err != nil {
|
||||
wrap.job.SetEnabled(false, err.Error())
|
||||
r.updateJob(r.context, wrap.job)
|
||||
disabled = append(disabled, key.(JobKey))
|
||||
}
|
||||
if able {
|
||||
if r.running.Full() {
|
||||
return false
|
||||
}
|
||||
r.waiting.Remove(key)
|
||||
r.running.Append(wrap)
|
||||
wrap.status = JobRunning
|
||||
if !r.running.Full() && !r.waiting.Empty() {
|
||||
r.trigger()
|
||||
}
|
||||
runnable = wrap
|
||||
return true
|
||||
}
|
||||
return false
|
||||
})
|
||||
for _, key := range disabled {
|
||||
r.waiting.Remove(key)
|
||||
delete(r.all, key)
|
||||
}
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
return runnable, true
|
||||
}
|
||||
|
||||
func (r *Runner[T]) schedule(ctx context.Context, deadline time.Time, job T) error {
|
||||
wrap := newWrapper(job)
|
||||
wrap.deadline = deadline
|
||||
if wrap.deadline.After(time.Now()) {
|
||||
r.delayQueue.Enqueue(ctx, wrap)
|
||||
wrap.status = JobDelaying
|
||||
} else {
|
||||
r.waiting.Put(wrap.key, wrap)
|
||||
wrap.status = JobWaiting
|
||||
r.trigger()
|
||||
}
|
||||
r.all[wrap.key] = wrap
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Runner[T]) Add(ctx context.Context, job T) error {
|
||||
r.mutex.Lock()
|
||||
defer r.mutex.Unlock()
|
||||
|
||||
if _, ok := r.all[job.GetKey()]; ok {
|
||||
return ErrJobExist
|
||||
}
|
||||
deadline, err := r.doScheduleJob(job, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return r.schedule(ctx, deadline, job)
|
||||
}
|
||||
|
||||
func (r *Runner[T]) Update(ctx context.Context, job T) error {
|
||||
r.mutex.Lock()
|
||||
defer r.mutex.Unlock()
|
||||
|
||||
wrap, ok := r.all[job.GetKey()]
|
||||
if !ok {
|
||||
return ErrJobNotFound
|
||||
}
|
||||
wrap.job.Update(job)
|
||||
switch wrap.status {
|
||||
case JobDelaying:
|
||||
r.delayQueue.Remove(ctx, wrap.key)
|
||||
case JobWaiting:
|
||||
r.waiting.Remove(wrap.key)
|
||||
case JobRunning:
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
deadline, err := r.doScheduleJob(job, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return r.schedule(ctx, deadline, wrap.job)
|
||||
}
|
||||
|
||||
func (r *Runner[T]) StartNow(ctx context.Context, job T) error {
|
||||
r.mutex.Lock()
|
||||
defer r.mutex.Unlock()
|
||||
|
||||
if wrap, ok := r.all[job.GetKey()]; ok {
|
||||
switch wrap.status {
|
||||
case JobDelaying:
|
||||
r.delayQueue.Remove(ctx, wrap.key)
|
||||
delete(r.all, wrap.key)
|
||||
case JobWaiting, JobRunning:
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
}
|
||||
return r.schedule(ctx, time.Now(), job)
|
||||
}
|
||||
|
||||
func (r *Runner[T]) trigger() {
|
||||
select {
|
||||
case r.signal <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Runner[T]) Remove(ctx context.Context, key JobKey) error {
|
||||
r.mutex.Lock()
|
||||
defer r.mutex.Unlock()
|
||||
|
||||
wrap, ok := r.all[key]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
switch wrap.status {
|
||||
case JobDelaying:
|
||||
r.delayQueue.Remove(ctx, key)
|
||||
delete(r.all, key)
|
||||
case JobWaiting:
|
||||
r.waiting.Remove(key)
|
||||
delete(r.all, key)
|
||||
case JobRunning:
|
||||
// 统一标记为 removed, 待任务执行完成后再删除
|
||||
wrap.removed = true
|
||||
return ErrJobRunning
|
||||
default:
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -1,130 +0,0 @@
|
||||
package runner
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"mayfly-go/pkg/utils/timex"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
var _ Job = &testJob{}
|
||||
|
||||
func newTestJob(key string) *testJob {
|
||||
return &testJob{
|
||||
Key: key,
|
||||
}
|
||||
}
|
||||
|
||||
type testJob struct {
|
||||
Key JobKey
|
||||
status int
|
||||
}
|
||||
|
||||
func (t *testJob) Update(_ Job) {}
|
||||
|
||||
func (t *testJob) GetKey() JobKey {
|
||||
return t.Key
|
||||
}
|
||||
|
||||
func (t *testJob) SetStatus(status JobStatus, err error) {}
|
||||
|
||||
func (t *testJob) SetEnabled(enabled bool, desc string) {}
|
||||
|
||||
func TestRunner_Close(t *testing.T) {
|
||||
signal := make(chan struct{}, 1)
|
||||
waiting := sync.WaitGroup{}
|
||||
waiting.Add(1)
|
||||
runner := NewRunner[*testJob](1, func(ctx context.Context, job *testJob) error {
|
||||
waiting.Done()
|
||||
timex.SleepWithContext(ctx, time.Hour)
|
||||
signal <- struct{}{}
|
||||
return nil
|
||||
})
|
||||
go func() {
|
||||
job := &testJob{
|
||||
Key: "close",
|
||||
}
|
||||
_ = runner.Add(context.Background(), job)
|
||||
}()
|
||||
waiting.Wait()
|
||||
timer := time.NewTimer(time.Microsecond * 10)
|
||||
defer timer.Stop()
|
||||
runner.Close()
|
||||
select {
|
||||
case <-timer.C:
|
||||
require.FailNow(t, "runner 未能及时退出")
|
||||
case <-signal:
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunner_AddJob(t *testing.T) {
|
||||
type testCase struct {
|
||||
name string
|
||||
job *testJob
|
||||
want error
|
||||
}
|
||||
testCases := []testCase{
|
||||
{
|
||||
name: "first job",
|
||||
job: newTestJob("single"),
|
||||
want: nil,
|
||||
},
|
||||
{
|
||||
name: "second job",
|
||||
job: newTestJob("dual"),
|
||||
want: nil,
|
||||
},
|
||||
{
|
||||
name: "repetitive job",
|
||||
job: newTestJob("dual"),
|
||||
want: ErrJobExist,
|
||||
},
|
||||
}
|
||||
runner := NewRunner[*testJob](1, func(ctx context.Context, job *testJob) error {
|
||||
timex.SleepWithContext(ctx, time.Hour)
|
||||
return nil
|
||||
})
|
||||
defer runner.Close()
|
||||
for _, tc := range testCases {
|
||||
err := runner.Add(context.Background(), tc.job)
|
||||
if tc.want != nil {
|
||||
require.ErrorIs(t, err, tc.want)
|
||||
continue
|
||||
}
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestJob_UpdateStatus(t *testing.T) {
|
||||
const d = time.Millisecond * 20
|
||||
const (
|
||||
unknown = iota
|
||||
running
|
||||
finished
|
||||
)
|
||||
runner := NewRunner[*testJob](1, func(ctx context.Context, job *testJob) error {
|
||||
job.status = running
|
||||
timex.SleepWithContext(ctx, d*2)
|
||||
job.status = finished
|
||||
return nil
|
||||
})
|
||||
first := newTestJob("first")
|
||||
second := newTestJob("second")
|
||||
_ = runner.Add(context.Background(), first)
|
||||
_ = runner.Add(context.Background(), second)
|
||||
|
||||
time.Sleep(d)
|
||||
assert.Equal(t, running, first.status)
|
||||
assert.Equal(t, unknown, second.status)
|
||||
|
||||
time.Sleep(d * 2)
|
||||
assert.Equal(t, finished, first.status)
|
||||
assert.Equal(t, running, second.status)
|
||||
|
||||
time.Sleep(d * 2)
|
||||
assert.Equal(t, finished, first.status)
|
||||
assert.Equal(t, finished, second.status)
|
||||
}
|
||||
Reference in New Issue
Block a user