diff --git a/frontend/src/router/dynamicRouter.ts b/frontend/src/router/dynamicRouter.ts index 55b8070e..c009479f 100644 --- a/frontend/src/router/dynamicRouter.ts +++ b/frontend/src/router/dynamicRouter.ts @@ -21,55 +21,46 @@ const routeModules: Record = import.meta.glob(['../views/**/route.{ // 后端控制路由:执行路由数据初始化 export async function initBackendRoutes() { - let allModuleRoutes = {}; - for (const path in routeModules) { - // 获取默认导出的路由 - const routes = routeModules[path]?.default; - allModuleRoutes = { ...allModuleRoutes, ...routes }; - } + // 合并所有模块路由 + const allModuleRoutes = Object.values(routeModules).reduce((acc: any, module: any) => { + return { ...acc, ...module.default }; + }, {}); - const token = getToken(); // 获取浏览器缓存 token 值 + const token = getToken(); if (!token) { - // 无 token 停止执行下一步 return false; } + useUserInfo().setUserInfo({}); - // 获取路由 - let menuRoute = await getBackEndControlRoutes(); - const cacheList: Array = []; - // 处理路由(component) - const routes = backEndRouterConverter(allModuleRoutes, menuRoute, (router: any) => { - // 可能为false时不存在isKeepAlive属性 - if (!router.meta.isKeepAlive) { - router.meta.isKeepAlive = false; - } - if (router.meta.isKeepAlive) { - cacheList.push(router.name); - } - }); - - routes.forEach((item: any) => { - if (item.meta.isFull) { - // 菜单为全屏展示 (示例:数据大屏页面等) - router.addRoute(item as RouteRecordRaw); - } else { - // 要将嵌套路由添加到现有的路由中,可以将路由的 name 作为第一个参数传递给 router.addRoute(),这将有效地添加路由,就像通过 children 添加的一样 - router.addRoute(LAYOUT_ROUTE_NAME, item as RouteRecordRaw); - } - }); - - useKeepALiveNames().setCacheKeepAlive(cacheList); - useRoutesList().setRoutesList(routes); -} - -// 后端控制路由,isRequestRoutes 为 true,则开启后端控制路由 -export async function getBackEndControlRoutes() { try { + // 获取路由和权限 const menuAndPermission = await openApi.getPermissions(); - // 赋值权限码,用于控制按钮等 useUserInfo().userInfo.permissions = menuAndPermission.permissions; - return menuAndPermission.menus; + const menuRoute = menuAndPermission.menus; + + const cacheList: string[] = []; + + // 处理路由(component) + const routes = backEndRouterConverter(allModuleRoutes, menuRoute, (router: any) => { + // 确保 isKeepAlive 属性存在 + router.meta.isKeepAlive = router.meta.isKeepAlive ?? false; + if (router.meta.isKeepAlive) { + cacheList.push(router.name as string); + } + }); + + // 添加路由 + routes.forEach((item: any) => { + if (item.meta.isFull) { + router.addRoute(item as RouteRecordRaw); + } else { + router.addRoute(LAYOUT_ROUTE_NAME, item as RouteRecordRaw); + } + }); + + useKeepALiveNames().setCacheKeepAlive(cacheList); + useRoutesList().setRoutesList(routes); } catch (e: any) { console.error('获取菜单权限信息失败', e); clearSession(); @@ -97,57 +88,52 @@ type RouterConvCallbackFunc = (router: any) => void; * @param meta.linkType ==> 外链类型, 内嵌: 以iframe展示、外链: 新标签打开 * @param meta.link ==> 外链地址 * */ -export function backEndRouterConverter(allModuleRoutes: any, routes: any, callbackFunc: RouterConvCallbackFunc = null as any, parentPath: string = '/') { - if (!routes) { - return []; - } +export function backEndRouterConverter(allModuleRoutes: any, routes: any, callbackFunc?: RouterConvCallbackFunc, parentPath = '/'): any[] { + if (!routes) return []; + + return routes.map((item: any) => { + if (!item.meta) return item; - const routeItems = []; - for (let item of routes) { - if (!item.meta) { - return item; - } // 将json字符串的meta转为对象 - item.meta = JSON.parse(item.meta); + const meta = typeof item.meta === 'string' ? JSON.parse(item.meta) : item.meta; + // 处理路径 let path = item.code; - // 如果不是以 / 开头,则路径需要拼接父路径 if (!path.startsWith('/')) { - path = parentPath + '/' + path; + path = `${parentPath}/${path}`.replace(/\/+/g, '/'); } - item.path = path; - delete item['code']; - // route.meta.title == resource.name - item.meta.title = item.name; - delete item['name']; + // 构建路由对象 + const routeItem: any = { + path, + name: meta.routeName, + meta: { + ...meta, + title: item.name, + }, + }; - // route.name == resource.meta.routeName - const routerName = item.meta.routeName; - item.name = routerName; - // 如果是外链类型,name的路由名都是Link 或者 Iframes会导致路由名重复,无法添加多个外链 - if (item.meta.link) { - if (item.meta.linkType == LinkTypeEnum.Link.value) { - item.component = Link; - } else { - item.component = Iframe; - } + // 处理外链 + if (meta.link) { + routeItem.component = meta.linkType == LinkTypeEnum.Link.value ? Link : Iframe; } else { - // routerName == 模块下route.ts 字段key == 组件名 - item.component = allModuleRoutes[routerName]; + // 使用模块路由组件 + routeItem.component = allModuleRoutes[meta.routeName]; } - delete item.meta['routeName']; - // route.redirect == resource.meta.redirect - if (item.meta.redirect) { - item.redirect = item.meta.redirect; - delete item.meta['redirect']; + // 处理重定向 + if (meta.redirect) { + routeItem.redirect = meta.redirect; } - // 存在回调,则执行回调 - callbackFunc && callbackFunc(item); - item.children && backEndRouterConverter(allModuleRoutes, item.children, callbackFunc, item.path); - routeItems.push(item); - } - return routeItems; + // 处理子路由 + if (item.children) { + routeItem.children = backEndRouterConverter(allModuleRoutes, item.children, callbackFunc, path); + } + + // 执行回调 + callbackFunc?.(routeItem); + + return routeItem; + }); } diff --git a/frontend/src/views/ops/tag/TeamList.vue b/frontend/src/views/ops/tag/TeamList.vue index b3807625..ff61745d 100755 --- a/frontend/src/views/ops/tag/TeamList.vue +++ b/frontend/src/views/ops/tag/TeamList.vue @@ -43,7 +43,7 @@ - + @@ -98,10 +98,8 @@ @@ -238,8 +236,8 @@ const onSaveTeam = async () => { const onCancelSaveTeam = () => { state.addTeamDialog.visible = false; - teamForm.value.resetFields(); setTimeout(() => { + teamForm.value?.resetFields(); state.addTeamDialog.form = {} as any; }, 500); }; diff --git a/server/.gitignore b/server/.gitignore index b63be3c4..f50b0177 100644 --- a/server/.gitignore +++ b/server/.gitignore @@ -11,4 +11,3 @@ mayfly_rsa.pub /db/mariadb/ *.sqlite -file \ No newline at end of file diff --git a/server/go.mod b/server/go.mod index fc0aa66c..87b70c6b 100644 --- a/server/go.mod +++ b/server/go.mod @@ -6,7 +6,6 @@ require ( gitee.com/chunanyong/dm v1.8.20 gitee.com/liuzongyang/libpq v1.10.11 github.com/antlr4-go/antlr/v4 v4.13.1 - github.com/emirpasic/gods v1.18.1 github.com/gin-gonic/gin v1.10.1 github.com/glebarez/sqlite v1.11.0 github.com/go-gormigrate/gormigrate/v2 v2.1.4 @@ -15,7 +14,7 @@ require ( github.com/go-playground/universal-translator v0.18.1 github.com/go-playground/validator/v10 v10.27.0 github.com/go-sql-driver/mysql v1.9.3 - github.com/golang-jwt/jwt/v5 v5.2.3 + github.com/golang-jwt/jwt/v5 v5.3.0 github.com/google/uuid v1.6.0 github.com/gorilla/websocket v1.5.3 github.com/lionsoul2014/ip2region/binding/golang v0.0.0-20250630080345-f9402614f6ba diff --git a/server/internal/db/dbm/sqlparser/mysql/visitor.go b/server/internal/db/dbm/sqlparser/mysql/visitor.go index 3e26c19b..8b423b5a 100644 --- a/server/internal/db/dbm/sqlparser/mysql/visitor.go +++ b/server/internal/db/dbm/sqlparser/mysql/visitor.go @@ -347,6 +347,25 @@ func (v *MysqlVisitor) VisitAtomTableItem(ctx *mysqlparser.AtomTableItemContext) return tableSourceItem } +func (v *MysqlVisitor) VisitSubqueryTableItem(ctx *mysqlparser.SubqueryTableItemContext) interface{} { + sti := new(sqlstmt.SubqueryTableItem) + sti.Node = sqlstmt.NewNode(ctx.GetParser(), ctx) + + // 解析子查询 + if ss := ctx.SelectStatement(); ss != nil { + sti.SubQuery = ss.Accept(v).(sqlstmt.ISelectStmt) + } + + // 获取别名 + if alias := ctx.GetAlias(); alias != nil { + sti.Alias = alias.GetText() + } else if uid := ctx.Uid(); uid != nil { + sti.Alias = uid.GetText() + } + + return sti +} + func (v *MysqlVisitor) VisitInnerJoin(ctx *mysqlparser.InnerJoinContext) interface{} { ij := new(sqlstmt.InnerJoin) ij.Node = sqlstmt.NewNode(ctx.GetParser(), ctx) diff --git a/server/internal/db/dbm/sqlparser/sqlstmt/common.go b/server/internal/db/dbm/sqlparser/sqlstmt/common.go index 223bd3c1..8907b0f1 100644 --- a/server/internal/db/dbm/sqlparser/sqlstmt/common.go +++ b/server/internal/db/dbm/sqlparser/sqlstmt/common.go @@ -139,6 +139,14 @@ type ( TableName *TableName // 表名 Alias string // 别名 } + + // SubqueryTableItem 表示子查询表项,如 (SELECT * FROM table1) AS alias + SubqueryTableItem struct { + TableSourceItem + + SubQuery ISelectStmt + Alias string + } ) func (*TableSource) isTableSource() {} diff --git a/server/internal/file/infra/persistence/file.go b/server/internal/file/infra/persistence/file.go new file mode 100644 index 00000000..a43d7e83 --- /dev/null +++ b/server/internal/file/infra/persistence/file.go @@ -0,0 +1,15 @@ +package persistence + +import ( + "mayfly-go/internal/file/domain/entity" + "mayfly-go/internal/file/domain/repository" + "mayfly-go/pkg/base" +) + +type fileRepoImpl struct { + base.RepoImpl[*entity.File] +} + +func newFileRepo() repository.File { + return &fileRepoImpl{} +} diff --git a/server/internal/file/infra/persistence/persistence.go b/server/internal/file/infra/persistence/persistence.go new file mode 100644 index 00000000..db0a9d59 --- /dev/null +++ b/server/internal/file/infra/persistence/persistence.go @@ -0,0 +1,9 @@ +package persistence + +import ( + "mayfly-go/pkg/ioc" +) + +func InitIoc() { + ioc.Register(newFileRepo(), ioc.WithComponentName("FileRepo")) +} diff --git a/server/pkg/runner/delay_queue.go b/server/pkg/runner/delay_queue.go deleted file mode 100644 index fc7e008a..00000000 --- a/server/pkg/runner/delay_queue.go +++ /dev/null @@ -1,262 +0,0 @@ -package runner - -import ( - "context" - "math" - "sync" - "time" -) - -const minTimerDelay = time.Millisecond * 1 -const maxTimerDelay = time.Nanosecond * math.MaxInt64 - -type DelayQueue[T Delayable] struct { - enqueuedSignal chan struct{} - dequeuedSignal chan struct{} - transferChan chan T - singleDequeue chan struct{} - mutex sync.Mutex - priorityQueue *PriorityQueue[T] - zero T -} - -type Delayable interface { - GetDeadline() time.Time - GetKey() string -} - -var _ Delayable = (*wrapper[Job])(nil) - -type wrapper[T Job] struct { - key string - deadline time.Time - removed bool - status JobStatus - job T -} - -func newWrapper[T Job](job T) *wrapper[T] { - return &wrapper[T]{ - key: job.GetKey(), - job: job, - } -} - -func (d *wrapper[T]) GetDeadline() time.Time { - return d.deadline -} - -func (d *wrapper[T]) GetKey() string { - return d.key -} - -func NewDelayQueue[T Delayable](cap int) *DelayQueue[T] { - singleDequeue := make(chan struct{}, 1) - singleDequeue <- struct{}{} - return &DelayQueue[T]{ - enqueuedSignal: make(chan struct{}), - dequeuedSignal: make(chan struct{}), - transferChan: make(chan T), - singleDequeue: singleDequeue, - priorityQueue: NewPriorityQueue[T](cap, func(src T, dst T) bool { - return src.GetDeadline().Before(dst.GetDeadline()) - }), - } -} - -func (s *DelayQueue[T]) TryDequeue() (T, bool) { - s.mutex.Lock() - defer s.mutex.Unlock() - - if elm, ok := s.priorityQueue.Peek(0); ok { - delay := elm.GetDeadline().Sub(time.Now()) - if delay < minTimerDelay { - // 无需延迟,头部元素出队后直接返回 - _, _ = s.dequeue() - return elm, true - } - } - return s.zero, false -} - -func (s *DelayQueue[T]) Dequeue(ctx context.Context) (T, bool) { - // 出队锁:避免因重复获取队列头部同一元素降低性能 - select { - case <-s.singleDequeue: - defer func() { - s.singleDequeue <- struct{}{} - }() - case <-ctx.Done(): - return s.zero, false - } - - for { - // 全局锁:避免入队和出队信号的重置与激活出现并发问题 - s.mutex.Lock() - if ctx.Err() != nil { - s.mutex.Unlock() - return s.zero, false - } - - // 接收直接转发的不需要延迟的新元素 - select { - case elm := <-s.transferChan: - s.mutex.Unlock() - return elm, true - default: - } - - // 延迟时间缺省值为 maxTimerDelay, 表示队列为空 - delay := maxTimerDelay - if elm, ok := s.priorityQueue.Peek(0); ok { - now := time.Now() - delay = elm.GetDeadline().Sub(now) - if delay < minTimerDelay { - // 无需延迟,头部元素出队后直接返回 - _, _ = s.dequeue() - s.mutex.Unlock() - return elm, ok - } - } - // 重置入队信号,避免历史信号干扰 - select { - case <-s.enqueuedSignal: - default: - } - s.mutex.Unlock() - - if delay == maxTimerDelay { - // 队列为空, 等待新元素 - select { - case elm := <-s.transferChan: - return elm, true - case <-s.enqueuedSignal: - continue - case <-ctx.Done(): - return s.zero, false - } - } else if delay >= minTimerDelay { - // 等待时间到期或新元素加入 - timer := time.NewTimer(delay) - select { - case <-timer.C: - continue - case elm := <-s.transferChan: - timer.Stop() - return elm, true - case <-s.enqueuedSignal: - timer.Stop() - continue - case <-ctx.Done(): - timer.Stop() - return s.zero, false - } - } - } -} - -func (s *DelayQueue[T]) dequeue() (T, bool) { - elm, ok := s.priorityQueue.Dequeue() - if !ok { - return s.zero, false - } - select { - case s.dequeuedSignal <- struct{}{}: - default: - } - return elm, true -} - -func (s *DelayQueue[T]) enqueue(val T) bool { - if ok := s.priorityQueue.Enqueue(val); !ok { - return false - } - select { - case s.enqueuedSignal <- struct{}{}: - default: - } - return true -} - -func (s *DelayQueue[T]) TryEnqueue(val T) bool { - s.mutex.Lock() - defer s.mutex.Unlock() - - if s.priorityQueue.IsFull() { - return false - } - return s.enqueue(val) -} - -func (s *DelayQueue[T]) Enqueue(ctx context.Context, val T) bool { - for { - // 全局锁:避免入队和出队信号的重置与激活出现并发问题 - s.mutex.Lock() - - if ctx.Err() != nil { - s.mutex.Unlock() - return false - } - - // 如果队列未满,入队后直接返回 - if !s.priorityQueue.IsFull() { - s.enqueue(val) - s.mutex.Unlock() - return true - } - // 队列已满,重置出队信号,避免受到历史信号影响 - select { - case <-s.dequeuedSignal: - default: - } - s.mutex.Unlock() - - if delay := val.GetDeadline().Sub(time.Now()); delay >= minTimerDelay { - // 新元素需要延迟,等待退出信号、出队信号和到期信号 - timer := time.NewTimer(delay) - select { - case <-timer.C: - // 新元素不再需要延迟 - case <-s.dequeuedSignal: - // 收到出队信号,从头开始尝试入队 - timer.Stop() - continue - case <-ctx.Done(): - timer.Stop() - return false - } - } else { - // 新元素不需要延迟,等待转发成功信号、出队信号和退出信号 - select { - case s.transferChan <- val: - // 新元素转发成功,直接返回(避免队列满且元素未到期导致新元素长时间无法入队) - return true - case <-s.dequeuedSignal: - // 收到出队信号,从头开始尝试入队 - continue - case <-ctx.Done(): - return false - } - } - } -} - -func (s *DelayQueue[T]) Remove(_ context.Context, key string) (T, bool) { - s.mutex.Lock() - defer s.mutex.Unlock() - - return s.priorityQueue.Remove(s.index(key)) -} - -func (s *DelayQueue[T]) index(key string) int { - for i := 0; i < s.priorityQueue.Len(); i++ { - elm, ok := s.priorityQueue.Peek(i) - if !ok { - continue - } - if key == elm.GetKey() { - return i - } - } - return -1 -} diff --git a/server/pkg/runner/delay_queue_test.go b/server/pkg/runner/delay_queue_test.go deleted file mode 100644 index bceffdee..00000000 --- a/server/pkg/runner/delay_queue_test.go +++ /dev/null @@ -1,425 +0,0 @@ -package runner - -import ( - "context" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "math/rand" - "runtime" - "strconv" - "sync" - "sync/atomic" - "testing" - "time" -) - -var _ Delayable = &delayElement{} - -type delayElement struct { - id uint64 - value int - deadline time.Time -} - -func (elm *delayElement) GetDeadline() time.Time { - return elm.deadline -} - -func (elm *delayElement) GetId() uint64 { - return elm.id -} - -func (elm *delayElement) GetKey() string { - return strconv.FormatUint(elm.id, 16) -} - -type testDelayQueue = DelayQueue[*delayElement] - -func newTestDelayQueue(cap int) *testDelayQueue { - return NewDelayQueue[*delayElement](cap) -} - -func mustEnqueue(val int, delay int64) func(t *testing.T, queue *testDelayQueue) { - return func(t *testing.T, queue *testDelayQueue) { - require.True(t, queue.Enqueue(context.Background(), - newTestElm(val, delay))) - } -} - -func newTestElm(value int, delay int64) *delayElement { - return &delayElement{ - id: elmId.Add(1), - value: value, - deadline: time.Now().Add(time.Millisecond * time.Duration(delay)), - } -} - -var elmId atomic.Uint64 - -func TestDelayQueue_Enqueue(t *testing.T) { - type testCase[R int, T Delayable] struct { - name string - queue *DelayQueue[T] - before func(t *testing.T, queue *DelayQueue[T]) - while func(t *testing.T, queue *DelayQueue[T]) - after func(t *testing.T, queue *DelayQueue[T]) - value int - delay int64 - timeout int64 - wantOk bool - } - tests := []testCase[int, *delayElement]{ - { - name: "enqueue to empty queue", - queue: newTestDelayQueue(1), - after: func(t *testing.T, queue *testDelayQueue) { - val, ok := queue.priorityQueue.Dequeue() - require.True(t, ok) - require.Equal(t, 1, val.value) - }, - timeout: 10, - value: 1, - wantOk: true, - }, - { - name: "enqueue active element to full queue", - queue: newTestDelayQueue(1), - before: func(t *testing.T, queue *testDelayQueue) { - mustEnqueue(1, 60)(t, queue) - }, - timeout: 40, - delay: 20, - wantOk: false, - }, - { - name: "enqueue inactive element to full queue", - queue: newTestDelayQueue(1), - before: mustEnqueue(1, 60), - timeout: 20, - delay: 40, - wantOk: false, - }, - { - name: "enqueue to full queue while dequeue valid element", - queue: newTestDelayQueue(1), - before: mustEnqueue(1, 60), - while: func(t *testing.T, queue *testDelayQueue) { - _, ok := queue.Dequeue(context.Background()) - require.True(t, ok) - }, - timeout: 80, - wantOk: true, - }, - { - name: "enqueue active element to full queue while dequeue invalid element", - queue: newTestDelayQueue(1), - before: mustEnqueue(1, 60), - while: func(t *testing.T, queue *testDelayQueue) { - elm, ok := queue.Dequeue(context.Background()) - require.True(t, ok) - require.Equal(t, 2, elm.value) - }, - timeout: 40, - value: 2, - delay: 20, - wantOk: true, - }, - { - name: "enqueue inactive element to full queue while dequeue invalid element", - queue: newTestDelayQueue(1), - before: mustEnqueue(1, 60), - while: func(t *testing.T, queue *testDelayQueue) { - _, ok := queue.Dequeue(context.Background()) - require.True(t, ok) - }, - timeout: 20, - delay: 40, - wantOk: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), - time.Millisecond*time.Duration(tt.timeout)) - defer cancel() - if tt.before != nil { - tt.before(t, tt.queue) - } - if tt.while != nil { - go tt.while(t, tt.queue) - } - ok := tt.queue.Enqueue(ctx, newTestElm(tt.value, tt.delay)) - require.Equal(t, tt.wantOk, ok) - }) - } -} - -func TestDelayQueue_Dequeue(t *testing.T) { - type testCase[R int, T Delayable] struct { - name string - queue *DelayQueue[T] - before func(t *testing.T, queue *DelayQueue[T]) - while func(t *testing.T, queue *DelayQueue[T]) - timeout int64 - wantVal int - wantOk bool - } - tests := []testCase[int, *delayElement]{ - { - name: "dequeue from empty queue", - queue: newTestDelayQueue(1), - timeout: 20, - wantOk: false, - }, - { - name: "dequeue new active element from empty queue", - queue: newTestDelayQueue(1), - while: mustEnqueue(1, 20), - timeout: 4000, - wantVal: 1, - wantOk: true, - }, - { - name: "dequeue new inactive element from empty queue", - queue: newTestDelayQueue(1), - while: mustEnqueue(1, 60), - timeout: 20, - wantOk: false, - }, - { - name: "dequeue active element from full queue", - queue: newTestDelayQueue(1), - before: mustEnqueue(1, 60), - timeout: 80, - wantVal: 1, - wantOk: true, - }, - { - name: "dequeue inactive element from full queue", - queue: newTestDelayQueue(1), - before: mustEnqueue(1, 60), - timeout: 20, - wantOk: false, - }, - { - name: "dequeue new active element from full queue", - queue: newTestDelayQueue(1), - before: mustEnqueue(1, 60), - while: mustEnqueue(2, 40), - timeout: 80, - wantVal: 2, - wantOk: true, - }, - { - name: "dequeue new inactive element from full queue", - queue: newTestDelayQueue(1), - before: mustEnqueue(1, 60), - while: mustEnqueue(2, 40), - timeout: 20, - wantOk: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), - time.Millisecond*time.Duration(tt.timeout)) - defer cancel() - if tt.before != nil { - tt.before(t, tt.queue) - } - if tt.while != nil { - go tt.while(t, tt.queue) - } - got, ok := tt.queue.Dequeue(ctx) - require.Equal(t, tt.wantOk, ok) - if !ok { - return - } - require.Equal(t, tt.wantVal, got.value) - }) - } -} - -func TestDelayQueue(t *testing.T) { - const delay = 1000 - const timeout = 1000 - const capacity = 100 - const count = 100 - var wg sync.WaitGroup - var ( - enqueueSeq atomic.Int32 - dequeueSeq atomic.Int32 - checksum atomic.Int64 - ) - queue := newTestDelayQueue(capacity) - procs := runtime.GOMAXPROCS(0) - wg.Add(procs) - for i := 0; i < procs; i++ { - go func(i int) { - defer wg.Done() - for { - ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*timeout) - if i%2 == 0 { - if seq := int(enqueueSeq.Add(1)); seq <= count { - for ctx.Err() == nil { - if ok := queue.Enqueue(ctx, newTestElm(seq, int64(rand.Intn(delay)))); ok { - break - } - } - } else { - cancel() - return - } - } else { - if seq := int(dequeueSeq.Add(1)); seq > count { - cancel() - return - } - for ctx.Err() == nil { - if elm, ok := queue.Dequeue(ctx); ok { - require.Less(t, elm.GetDeadline().Sub(time.Now()), minTimerDelay) - checksum.Add(int64(elm.value)) - break - } - } - } - cancel() - } - }(i) - } - wg.Wait() - assert.Zero(t, queue.priorityQueue.Len()) - assert.Equal(t, int64((1+count)*count/2), checksum.Load()) -} - -func BenchmarkDelayQueueV3(b *testing.B) { - const delay = 0 - const capacity = 100 - - b.Run("enqueue", func(b *testing.B) { - queue := newTestDelayQueue(b.N) - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - _ = queue.Enqueue(context.Background(), newTestElm(1, delay)) - } - }) - - b.Run("parallel to enqueue", func(b *testing.B) { - queue := newTestDelayQueue(b.N) - b.ReportAllocs() - b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - _ = queue.Enqueue(context.Background(), newTestElm(1, delay)) - } - }) - }) - - b.Run("dequeue", func(b *testing.B) { - queue := newTestDelayQueue(b.N) - for i := 0; i < b.N; i++ { - require.True(b, queue.Enqueue(context.Background(), newTestElm(1, delay))) - } - time.Sleep(time.Millisecond * delay) - b.ReportAllocs() - b.ResetTimer() - - for i := 0; i < b.N; i++ { - _, _ = queue.Dequeue(context.Background()) - } - }) - - b.Run("parallel to dequeue", func(b *testing.B) { - queue := newTestDelayQueue(b.N) - for i := 0; i < b.N; i++ { - require.True(b, queue.Enqueue(context.Background(), newTestElm(1, delay))) - } - time.Sleep(time.Millisecond * delay) - b.ReportAllocs() - b.ResetTimer() - - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - _, _ = queue.Dequeue(context.Background()) - } - }) - }) - - b.Run("parallel to dequeue while enqueue", func(b *testing.B) { - queue := newTestDelayQueue(capacity) - go func() { - for i := 0; i < b.N; i++ { - _ = queue.Enqueue(context.Background(), newTestElm(i, delay)) - } - }() - b.ReportAllocs() - b.ResetTimer() - - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - _, _ = queue.Dequeue(context.Background()) - } - }) - }) - - b.Run("parallel to enqueue while dequeue", func(b *testing.B) { - queue := newTestDelayQueue(capacity) - go func() { - for i := 0; i < b.N; i++ { - _, _ = queue.Dequeue(context.Background()) - } - }() - b.ReportAllocs() - b.ResetTimer() - - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - _ = queue.Enqueue(context.Background(), newTestElm(1, delay)) - } - }) - }) - - b.Run("parallel to enqueue and dequeue", func(b *testing.B) { - var wg sync.WaitGroup - var ( - enqueueSeq atomic.Int32 - dequeueSeq atomic.Int32 - ) - queue := newTestDelayQueue(capacity) - b.ReportAllocs() - b.ResetTimer() - procs := runtime.GOMAXPROCS(0) - wg.Add(procs) - for i := 0; i < procs; i++ { - go func(i int) { - defer wg.Done() - for { - if i%2 == 0 { - if seq := int(enqueueSeq.Add(1)); seq <= b.N { - for { - if ok := queue.Enqueue(context.Background(), newTestElm(seq, delay)); ok { - break - } - } - } else { - return - } - } else { - if seq := int(dequeueSeq.Add(1)); seq > b.N { - return - } - for { - if _, ok := queue.Dequeue(context.Background()); ok { - break - } - } - } - } - }(i) - } - wg.Wait() - }) -} diff --git a/server/pkg/runner/priority_queue.go b/server/pkg/runner/priority_queue.go deleted file mode 100644 index 68a3a1e7..00000000 --- a/server/pkg/runner/priority_queue.go +++ /dev/null @@ -1,136 +0,0 @@ -package runner - -// PriorityQueue 是一个基于小顶堆的优先队列 -// 当capacity <= 0时,为无界队列,切片容量会动态扩缩容 -// 当capacity > 0 时,为有界队列,初始化后就固定容量,不会扩缩容 -type PriorityQueue[T any] struct { - // 用于比较前一个元素是否小于后一个元素 - less Less[T] - // 队列容量 - capacity int - // 队列中的元素,为便于计算父子节点的index,0位置留空,根节点从1开始 - data []T - - zero T -} - -func (p *PriorityQueue[T]) Len() int { - return len(p.data) - 1 -} - -// Cap 无界队列返回0,有界队列返回创建队列时设置的值 -func (p *PriorityQueue[T]) Cap() int { - return p.capacity -} - -func (p *PriorityQueue[T]) IsBoundless() bool { - return p.capacity <= 0 -} - -func (p *PriorityQueue[T]) IsFull() bool { - return p.capacity > 0 && len(p.data)-1 == p.capacity -} - -func (p *PriorityQueue[T]) IsEmpty() bool { - return len(p.data) < 2 -} - -func (p *PriorityQueue[T]) Peek(i int) (T, bool) { - if p.IsEmpty() { - return p.zero, false - } - if i >= p.Len() { - return p.zero, false - } - return p.data[i+1], true -} - -func (p *PriorityQueue[T]) Enqueue(t T) bool { - if p.IsFull() { - return false - } - - p.data = append(p.data, t) - node, parent := len(p.data)-1, (len(p.data)-1)/2 - for parent > 0 && p.less(p.data[node], p.data[parent]) { - p.data[parent], p.data[node] = p.data[node], p.data[parent] - node = parent - parent = parent / 2 - } - return true -} - -func (p *PriorityQueue[T]) Dequeue() (T, bool) { - if p.IsEmpty() { - return p.zero, false - } - - pop := p.data[1] - // 假定说我拿到了堆顶,就是理论上优先级最低的 - // pop 的优先级 - p.data[1] = p.data[len(p.data)-1] - p.data = p.data[:len(p.data)-1] - p.shrinkIfNecessary() - p.heapify(p.data, len(p.data)-1, 1) - return pop, true -} - -func (p *PriorityQueue[T]) shrinkIfNecessary() { - if !p.IsBoundless() { - return - } - if cap(p.data) > 1024 && len(p.data)*3 < cap(p.data)*2 { - data := make([]T, len(p.data), cap(p.data)*5/6) - copy(data, p.data) - p.data = data - } -} - -func (p *PriorityQueue[T]) heapify(data []T, n, i int) { - minPos := i - for { - if left := i * 2; left <= n && p.less(data[left], data[minPos]) { - minPos = left - } - if right := i*2 + 1; right <= n && p.less(data[right], data[minPos]) { - minPos = right - } - if minPos == i { - break - } - data[i], data[minPos] = data[minPos], data[i] - i = minPos - } -} - -func (p *PriorityQueue[T]) Remove(i int) (T, bool) { - if p.IsEmpty() || i >= p.Len() || i < 0 { - return p.zero, false - } - - i += 1 - result := p.data[i] - last := len(p.data) - 1 - p.data[i] = p.data[last] - p.data = p.data[:last] - p.shrinkIfNecessary() - p.heapify(p.data, len(p.data)-1, i) - return result, true -} - -// NewPriorityQueue 创建优先队列 capacity <= 0 时,为无界队列,否则有有界队列 -func NewPriorityQueue[T any](capacity int, less Less[T]) *PriorityQueue[T] { - sliceCap := capacity + 1 - if capacity < 1 { - capacity = 0 - sliceCap = 64 - } - return &PriorityQueue[T]{ - capacity: capacity, - data: make([]T, 1, sliceCap), - less: less, - } -} - -// Less 用于比较两个对象的大小 src < dst, 返回 true,src >= dst, 返回 false -type Less[T any] func(src T, dst T) bool diff --git a/server/pkg/runner/priority_queue_test.go b/server/pkg/runner/priority_queue_test.go deleted file mode 100644 index 3e4c23c4..00000000 --- a/server/pkg/runner/priority_queue_test.go +++ /dev/null @@ -1,67 +0,0 @@ -package runner - -import ( - "github.com/stretchr/testify/require" - "testing" -) - -func TestChangePriority(t *testing.T) { - q := NewPriorityQueue[*priorityElement](100, - func(src *priorityElement, dst *priorityElement) bool { - return src.Priority < dst.Priority - }) - e1 := &priorityElement{ - Data: 10, - Priority: 200, - } - _ = q.Enqueue(e1) - e2 := &priorityElement{ - Data: 10, - Priority: 100, - } - _ = q.Enqueue(e2) - //e1.Priority = 10 - val, _ := q.Dequeue() - println(val) -} - -type priorityElement struct { - Data any - Priority int -} - -func TestPriorityQueue_Remove(t *testing.T) { - q := NewPriorityQueue[*priorityElement](100, - func(src *priorityElement, dst *priorityElement) bool { - return src.Priority < dst.Priority - }) - - for i := 8; i > 0; i-- { - q.Enqueue(&priorityElement{Priority: i}) - } - requirePriorities(t, q) - - q.Remove(8) - requirePriorities(t, q) - q.Remove(7) - requirePriorities(t, q) - - q.Remove(2) - requirePriorities(t, q) - - q.Remove(1) - requirePriorities(t, q) - - q.Remove(0) - requirePriorities(t, q) -} - -func requirePriorities(t *testing.T, q *PriorityQueue[*priorityElement]) { - ps := make([]int, 0, q.Len()) - for _, val := range q.data[1:] { - ps = append(ps, val.Priority) - } - for i := q.Len(); i >= 2; i-- { - require.False(t, q.less(q.data[i], q.data[i/2]), ps) - } -} diff --git a/server/pkg/runner/runner.go b/server/pkg/runner/runner.go deleted file mode 100644 index d7eb7e48..00000000 --- a/server/pkg/runner/runner.go +++ /dev/null @@ -1,425 +0,0 @@ -package runner - -import ( - "context" - "errors" - "fmt" - "github.com/emirpasic/gods/maps/linkedhashmap" - "mayfly-go/pkg/logx" - "sync" - "time" -) - -var ( - ErrJobNotFound = errors.New("任务未找到") - ErrJobExist = errors.New("任务已存在") - ErrJobFinished = errors.New("任务已完成") - ErrJobDisabled = errors.New("任务已禁用") - ErrJobExpired = errors.New("任务已过期") - ErrJobRunning = errors.New("任务执行中") -) - -type JobKey = string -type RunJobFunc[T Job] func(ctx context.Context, job T) error -type NextJobFunc[T Job] func() (T, bool) -type RunnableJobFunc[T Job] func(job T, nextRunning NextJobFunc[T]) (bool, error) -type ScheduleJobFunc[T Job] func(job T) (deadline time.Time, err error) -type UpdateJobFunc[T Job] func(ctx context.Context, job T) error - -type JobStatus int - -const ( - JobUnknown JobStatus = iota - JobDelaying - JobWaiting - JobRunning - JobSuccess - JobFailed -) - -type Job interface { - GetKey() JobKey - Update(job Job) - SetStatus(status JobStatus, err error) - SetEnabled(enabled bool, desc string) -} - -type iterator[T Job] struct { - index int - data []*wrapper[T] - zero T -} - -func (iter *iterator[T]) Begin() { - iter.index = -1 -} - -func (iter *iterator[T]) Next() (T, bool) { - if iter.index >= len(iter.data)-1 { - return iter.zero, false - } - iter.index++ - return iter.data[iter.index].job, true -} - -type array[T Job] struct { - size int - data []*wrapper[T] -} - -func newArray[T Job](size int) *array[T] { - return &array[T]{ - size: size, - data: make([]*wrapper[T], 0, size), - } -} - -func (a *array[T]) Iterator() *iterator[T] { - return &iterator[T]{ - index: -1, - data: a.data, - } -} - -func (a *array[T]) Full() bool { - return len(a.data) >= a.size -} - -func (a *array[T]) Append(job *wrapper[T]) bool { - if len(a.data) >= a.size { - return false - } - a.data = append(a.data, job) - return true -} - -func (a *array[T]) Get(key JobKey) (*wrapper[T], bool) { - for _, job := range a.data { - if key == job.GetKey() { - return job, true - } - } - return nil, false -} - -func (a *array[T]) Remove(key JobKey) { - length := len(a.data) - for i, elm := range a.data { - if key == elm.GetKey() { - a.data[i], a.data[length-1] = a.data[length-1], nil - a.data = a.data[:length-1] - return - } - } -} - -type Runner[T Job] struct { - maxRunning int - waiting *linkedhashmap.Map - running *array[T] - runJob RunJobFunc[T] - runnableJob RunnableJobFunc[T] - scheduleJob ScheduleJobFunc[T] - updateJob UpdateJobFunc[T] - mutex sync.Mutex - wg sync.WaitGroup - context context.Context - cancel context.CancelFunc - zero T - signal chan struct{} - all map[JobKey]*wrapper[T] - delayQueue *DelayQueue[*wrapper[T]] -} - -type Opt[T Job] func(runner *Runner[T]) - -func WithRunnableJob[T Job](runnableJob RunnableJobFunc[T]) Opt[T] { - return func(runner *Runner[T]) { - runner.runnableJob = runnableJob - } -} - -func WithScheduleJob[T Job](scheduleJob ScheduleJobFunc[T]) Opt[T] { - return func(runner *Runner[T]) { - runner.scheduleJob = scheduleJob - } -} - -func WithUpdateJob[T Job](updateJob UpdateJobFunc[T]) Opt[T] { - return func(runner *Runner[T]) { - runner.updateJob = updateJob - } -} - -func NewRunner[T Job](maxRunning int, runJob RunJobFunc[T], opts ...Opt[T]) *Runner[T] { - ctx, cancel := context.WithCancel(context.Background()) - runner := &Runner[T]{ - maxRunning: maxRunning, - all: make(map[string]*wrapper[T], maxRunning), - waiting: linkedhashmap.New(), - running: newArray[T](maxRunning), - context: ctx, - cancel: cancel, - signal: make(chan struct{}, 1), - delayQueue: NewDelayQueue[*wrapper[T]](0), - } - runner.runJob = runJob - runner.runnableJob = func(job T, _ NextJobFunc[T]) (bool, error) { - return true, nil - } - runner.updateJob = func(ctx context.Context, job T) error { - return nil - } - for _, opt := range opts { - opt(runner) - } - - runner.wg.Add(maxRunning + 1) - for i := 0; i < maxRunning; i++ { - go runner.run() - } - go func() { - defer runner.wg.Done() - for runner.context.Err() == nil { - wrap, ok := runner.delayQueue.Dequeue(ctx) - if !ok { - continue - } - runner.mutex.Lock() - if old, ok := runner.all[wrap.key]; !ok || wrap != old { - runner.mutex.Unlock() - continue - } - runner.waiting.Put(wrap.key, wrap) - wrap.status = JobWaiting - runner.trigger() - runner.mutex.Unlock() - } - }() - return runner -} - -func (r *Runner[T]) Close() { - r.cancel() - r.wg.Wait() -} - -func (r *Runner[T]) run() { - defer r.wg.Done() - - for r.context.Err() == nil { - select { - case <-r.signal: - wrap, ok := r.pickRunnableJob() - if !ok { - continue - } - r.doRun(wrap) - r.afterRun(wrap) - case <-r.context.Done(): - } - } -} - -func (r *Runner[T]) doRun(wrap *wrapper[T]) { - defer func() { - if err := recover(); err != nil { - logx.Error(fmt.Sprintf("failed to run job: %v", err)) - time.Sleep(time.Millisecond * 10) - } - }() - wrap.job.SetStatus(JobRunning, nil) - if err := r.updateJob(r.context, wrap.job); err != nil { - err = fmt.Errorf("任务状态保存失败: %w", err) - wrap.job.SetStatus(JobFailed, err) - _ = r.updateJob(r.context, wrap.job) - return - } - runErr := r.runJob(r.context, wrap.job) - if runErr != nil { - wrap.job.SetStatus(JobFailed, runErr) - } else { - wrap.job.SetStatus(JobSuccess, nil) - } - if err := r.updateJob(r.context, wrap.job); err != nil { - if runErr != nil { - err = fmt.Errorf("任务状态保存失败: %w, %w", err, runErr) - } else { - err = fmt.Errorf("任务状态保存失败: %w", err) - } - wrap.job.SetStatus(JobFailed, err) - _ = r.updateJob(r.context, wrap.job) - return - } -} - -func (r *Runner[T]) afterRun(wrap *wrapper[T]) { - r.mutex.Lock() - defer r.mutex.Unlock() - - r.running.Remove(wrap.key) - delete(r.all, wrap.key) - wrap.status = JobUnknown - r.trigger() - if wrap.removed { - return - } - deadline, err := r.doScheduleJob(wrap.job, true) - if err != nil { - return - } - _ = r.schedule(r.context, deadline, wrap.job) -} - -func (r *Runner[T]) doScheduleJob(job T, finished bool) (time.Time, error) { - if r.scheduleJob == nil { - if finished { - return time.Time{}, ErrJobFinished - } - return time.Now(), nil - } - return r.scheduleJob(job) -} - -func (r *Runner[T]) pickRunnableJob() (*wrapper[T], bool) { - r.mutex.Lock() - defer r.mutex.Unlock() - - var disabled []JobKey - iter := r.running.Iterator() - var runnable *wrapper[T] - ok := r.waiting.Any(func(key interface{}, value interface{}) bool { - wrap := value.(*wrapper[T]) - iter.Begin() - able, err := r.runnableJob(wrap.job, iter.Next) - if err != nil { - wrap.job.SetEnabled(false, err.Error()) - r.updateJob(r.context, wrap.job) - disabled = append(disabled, key.(JobKey)) - } - if able { - if r.running.Full() { - return false - } - r.waiting.Remove(key) - r.running.Append(wrap) - wrap.status = JobRunning - if !r.running.Full() && !r.waiting.Empty() { - r.trigger() - } - runnable = wrap - return true - } - return false - }) - for _, key := range disabled { - r.waiting.Remove(key) - delete(r.all, key) - } - if !ok { - return nil, false - } - return runnable, true -} - -func (r *Runner[T]) schedule(ctx context.Context, deadline time.Time, job T) error { - wrap := newWrapper(job) - wrap.deadline = deadline - if wrap.deadline.After(time.Now()) { - r.delayQueue.Enqueue(ctx, wrap) - wrap.status = JobDelaying - } else { - r.waiting.Put(wrap.key, wrap) - wrap.status = JobWaiting - r.trigger() - } - r.all[wrap.key] = wrap - return nil -} - -func (r *Runner[T]) Add(ctx context.Context, job T) error { - r.mutex.Lock() - defer r.mutex.Unlock() - - if _, ok := r.all[job.GetKey()]; ok { - return ErrJobExist - } - deadline, err := r.doScheduleJob(job, false) - if err != nil { - return err - } - return r.schedule(ctx, deadline, job) -} - -func (r *Runner[T]) Update(ctx context.Context, job T) error { - r.mutex.Lock() - defer r.mutex.Unlock() - - wrap, ok := r.all[job.GetKey()] - if !ok { - return ErrJobNotFound - } - wrap.job.Update(job) - switch wrap.status { - case JobDelaying: - r.delayQueue.Remove(ctx, wrap.key) - case JobWaiting: - r.waiting.Remove(wrap.key) - case JobRunning: - return nil - default: - } - deadline, err := r.doScheduleJob(job, false) - if err != nil { - return err - } - return r.schedule(ctx, deadline, wrap.job) -} - -func (r *Runner[T]) StartNow(ctx context.Context, job T) error { - r.mutex.Lock() - defer r.mutex.Unlock() - - if wrap, ok := r.all[job.GetKey()]; ok { - switch wrap.status { - case JobDelaying: - r.delayQueue.Remove(ctx, wrap.key) - delete(r.all, wrap.key) - case JobWaiting, JobRunning: - return nil - default: - } - } - return r.schedule(ctx, time.Now(), job) -} - -func (r *Runner[T]) trigger() { - select { - case r.signal <- struct{}{}: - default: - } -} - -func (r *Runner[T]) Remove(ctx context.Context, key JobKey) error { - r.mutex.Lock() - defer r.mutex.Unlock() - - wrap, ok := r.all[key] - if !ok { - return nil - } - switch wrap.status { - case JobDelaying: - r.delayQueue.Remove(ctx, key) - delete(r.all, key) - case JobWaiting: - r.waiting.Remove(key) - delete(r.all, key) - case JobRunning: - // 统一标记为 removed, 待任务执行完成后再删除 - wrap.removed = true - return ErrJobRunning - default: - } - return nil -} diff --git a/server/pkg/runner/runner_test.go b/server/pkg/runner/runner_test.go deleted file mode 100644 index 367dfd13..00000000 --- a/server/pkg/runner/runner_test.go +++ /dev/null @@ -1,130 +0,0 @@ -package runner - -import ( - "context" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "mayfly-go/pkg/utils/timex" - "sync" - "testing" - "time" -) - -var _ Job = &testJob{} - -func newTestJob(key string) *testJob { - return &testJob{ - Key: key, - } -} - -type testJob struct { - Key JobKey - status int -} - -func (t *testJob) Update(_ Job) {} - -func (t *testJob) GetKey() JobKey { - return t.Key -} - -func (t *testJob) SetStatus(status JobStatus, err error) {} - -func (t *testJob) SetEnabled(enabled bool, desc string) {} - -func TestRunner_Close(t *testing.T) { - signal := make(chan struct{}, 1) - waiting := sync.WaitGroup{} - waiting.Add(1) - runner := NewRunner[*testJob](1, func(ctx context.Context, job *testJob) error { - waiting.Done() - timex.SleepWithContext(ctx, time.Hour) - signal <- struct{}{} - return nil - }) - go func() { - job := &testJob{ - Key: "close", - } - _ = runner.Add(context.Background(), job) - }() - waiting.Wait() - timer := time.NewTimer(time.Microsecond * 10) - defer timer.Stop() - runner.Close() - select { - case <-timer.C: - require.FailNow(t, "runner 未能及时退出") - case <-signal: - } -} - -func TestRunner_AddJob(t *testing.T) { - type testCase struct { - name string - job *testJob - want error - } - testCases := []testCase{ - { - name: "first job", - job: newTestJob("single"), - want: nil, - }, - { - name: "second job", - job: newTestJob("dual"), - want: nil, - }, - { - name: "repetitive job", - job: newTestJob("dual"), - want: ErrJobExist, - }, - } - runner := NewRunner[*testJob](1, func(ctx context.Context, job *testJob) error { - timex.SleepWithContext(ctx, time.Hour) - return nil - }) - defer runner.Close() - for _, tc := range testCases { - err := runner.Add(context.Background(), tc.job) - if tc.want != nil { - require.ErrorIs(t, err, tc.want) - continue - } - require.NoError(t, err) - } -} - -func TestJob_UpdateStatus(t *testing.T) { - const d = time.Millisecond * 20 - const ( - unknown = iota - running - finished - ) - runner := NewRunner[*testJob](1, func(ctx context.Context, job *testJob) error { - job.status = running - timex.SleepWithContext(ctx, d*2) - job.status = finished - return nil - }) - first := newTestJob("first") - second := newTestJob("second") - _ = runner.Add(context.Background(), first) - _ = runner.Add(context.Background(), second) - - time.Sleep(d) - assert.Equal(t, running, first.status) - assert.Equal(t, unknown, second.status) - - time.Sleep(d * 2) - assert.Equal(t, finished, first.status) - assert.Equal(t, running, second.status) - - time.Sleep(d * 2) - assert.Equal(t, finished, first.status) - assert.Equal(t, finished, second.status) -}