mirror of
https://gitee.com/dromara/mayfly-go
synced 2025-11-02 15:30:25 +08:00
Compare commits
2 Commits
614a144f60
...
c86f2ad412
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c86f2ad412 | ||
|
|
82fd97e06a |
@@ -10,9 +10,9 @@
|
||||
"lint-fix": "eslint --fix --ext .js --ext .jsx --ext .vue src/"
|
||||
},
|
||||
"dependencies": {
|
||||
"@element-plus/icons-vue": "^2.3.1",
|
||||
"@logicflow/core": "^2.0.16",
|
||||
"@logicflow/extension": "^2.0.21",
|
||||
"@element-plus/icons-vue": "^2.3.2",
|
||||
"@logicflow/core": "^2.1.1",
|
||||
"@logicflow/extension": "^2.1.2",
|
||||
"@vueuse/core": "^13.6.0",
|
||||
"@xterm/addon-fit": "^0.10.0",
|
||||
"@xterm/addon-search": "^0.15.0",
|
||||
@@ -24,7 +24,7 @@
|
||||
"crypto-js": "^4.2.0",
|
||||
"dayjs": "^1.11.13",
|
||||
"echarts": "^6.0.0",
|
||||
"element-plus": "^2.10.5",
|
||||
"element-plus": "^2.10.7",
|
||||
"js-base64": "^3.7.7",
|
||||
"jsencrypt": "^3.3.2",
|
||||
"monaco-editor": "^0.52.2",
|
||||
@@ -59,7 +59,7 @@
|
||||
"eslint-plugin-vue": "^10.4.0",
|
||||
"postcss": "^8.5.6",
|
||||
"prettier": "^3.6.1",
|
||||
"sass": "^1.89.2",
|
||||
"sass": "^1.90.0",
|
||||
"tailwindcss": "^4.1.11",
|
||||
"typescript": "^5.9.2",
|
||||
"vite": "npm:rolldown-vite@latest",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
<template>
|
||||
<el-main class="layout-main !h-full">
|
||||
<el-scrollbar ref="layoutScrollbarRef" view-class="!h-full">
|
||||
<el-main class="layout-main h-full">
|
||||
<el-scrollbar ref="layoutScrollbarRef" view-class="h-full">
|
||||
<LayoutParentView />
|
||||
</el-scrollbar>
|
||||
|
||||
@@ -13,7 +13,7 @@
|
||||
</template>
|
||||
|
||||
<script setup lang="ts" name="layoutMain">
|
||||
import { getCurrentInstance, watch, defineAsyncComponent } from 'vue';
|
||||
import { watch, defineAsyncComponent, useTemplateRef, nextTick, onMounted } from 'vue';
|
||||
import { useRoute } from 'vue-router';
|
||||
import { storeToRefs } from 'pinia';
|
||||
import { useThemeConfig } from '@/store/themeConfig';
|
||||
@@ -21,22 +21,33 @@ import { useThemeConfig } from '@/store/themeConfig';
|
||||
const LayoutParentView = defineAsyncComponent(() => import('@/layout/routerView/parent.vue'));
|
||||
const Footer = defineAsyncComponent(() => import('@/layout/footer/index.vue'));
|
||||
|
||||
const { proxy } = getCurrentInstance() as any;
|
||||
const layoutScrollbarRef = useTemplateRef('layoutScrollbarRef');
|
||||
const { themeConfig } = storeToRefs(useThemeConfig());
|
||||
const route = useRoute();
|
||||
|
||||
// 监听 themeConfig 配置文件的变化,更新菜单 el-scrollbar 的高度
|
||||
watch(themeConfig.value, (val) => {
|
||||
if (val.isFixedHeaderChange !== val.isFixedHeader) {
|
||||
if (!proxy.$refs.layoutScrollbarRef) return false;
|
||||
proxy.$refs.layoutScrollbarRef.update();
|
||||
if (!layoutScrollbarRef.value) {
|
||||
return;
|
||||
}
|
||||
layoutScrollbarRef.value.update();
|
||||
}
|
||||
});
|
||||
|
||||
// 监听路由的变化
|
||||
watch(
|
||||
() => route.path,
|
||||
() => {
|
||||
proxy.$refs.layoutScrollbarRef.wrapRef.scrollTop = 0;
|
||||
nextTick(() => {
|
||||
if (!layoutScrollbarRef.value) {
|
||||
return;
|
||||
}
|
||||
setTimeout(() => {
|
||||
layoutScrollbarRef.value.update();
|
||||
}, 500);
|
||||
layoutScrollbarRef.value.setScrollTop();
|
||||
});
|
||||
}
|
||||
);
|
||||
</script>
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
<template>
|
||||
<el-container class="layout-container flex-center layout-backtop">
|
||||
<el-container class="layout-container layout-backtop !flex-col">
|
||||
<Header />
|
||||
<Main />
|
||||
</el-container>
|
||||
|
||||
@@ -34,13 +34,12 @@
|
||||
</template>
|
||||
|
||||
<script lang="ts" setup name="navMenuHorizontal">
|
||||
import { reactive, computed, onMounted, inject, defineAsyncComponent } from 'vue';
|
||||
import { reactive, computed, onMounted, inject } from 'vue';
|
||||
import { useRoute, onBeforeRouteUpdate } from 'vue-router';
|
||||
import SubItem from '@/layout/navMenu/subItem.vue';
|
||||
import { useRoutesList } from '@/store/routesList';
|
||||
import { useThemeConfig } from '@/store/themeConfig';
|
||||
|
||||
const SubItem = defineAsyncComponent(() => import('@/layout/navMenu/subItem.vue'));
|
||||
|
||||
// 定义父组件传过来的值
|
||||
const props = defineProps({
|
||||
// 菜单列表
|
||||
@@ -117,42 +116,29 @@ onBeforeRouteUpdate((to) => {
|
||||
overflow: hidden;
|
||||
margin-right: 30px;
|
||||
|
||||
.horizontal-menu {
|
||||
border: none !important;
|
||||
::v-deep(.el-scrollbar__bar.is-vertical) {
|
||||
display: none;
|
||||
}
|
||||
|
||||
::v-deep(a) {
|
||||
width: 100%;
|
||||
}
|
||||
|
||||
.el-menu.el-menu--horizontal {
|
||||
display: flex;
|
||||
height: 100%;
|
||||
width: 100%;
|
||||
box-sizing: border-box;
|
||||
|
||||
::v-deep(.el-menu-item) {
|
||||
height: 42px;
|
||||
line-height: 42px;
|
||||
padding: 0 15px !important;
|
||||
margin: 0 5px;
|
||||
border-radius: 6px;
|
||||
display: flex;
|
||||
align-items: center;
|
||||
}
|
||||
|
||||
::v-deep(.el-sub-menu__title) {
|
||||
height: 42px;
|
||||
line-height: 42px;
|
||||
padding: 0 25px 0 15px !important; /* 右边留出更多空间给箭头图标 */
|
||||
margin: 0 5px;
|
||||
border-radius: 6px;
|
||||
display: flex;
|
||||
align-items: center;
|
||||
}
|
||||
|
||||
::v-deep(.el-sub-menu__icon-arrow) {
|
||||
right: 5px !important;
|
||||
margin-top: -5px !important;
|
||||
}
|
||||
|
||||
::v-deep(.el-menu-item.is-active),
|
||||
::v-deep(.el-sub-menu.is-active .el-sub-menu__title) {
|
||||
color: #409eff;
|
||||
background-color: rgba(64, 158, 255, 0.1);
|
||||
}
|
||||
border-bottom: none !important;
|
||||
}
|
||||
}
|
||||
|
||||
// 菜单项基础样式
|
||||
.horizontal-menu :deep(.el-menu-item),
|
||||
.horizontal-menu :deep(.el-sub-menu__title) {
|
||||
margin: 0 5px !important;
|
||||
justify-content: center;
|
||||
max-width: 160px;
|
||||
min-width: 100px;
|
||||
}
|
||||
</style>
|
||||
|
||||
@@ -21,55 +21,46 @@ const routeModules: Record<string, any> = import.meta.glob(['../views/**/route.{
|
||||
|
||||
// 后端控制路由:执行路由数据初始化
|
||||
export async function initBackendRoutes() {
|
||||
let allModuleRoutes = {};
|
||||
for (const path in routeModules) {
|
||||
// 获取默认导出的路由
|
||||
const routes = routeModules[path]?.default;
|
||||
allModuleRoutes = { ...allModuleRoutes, ...routes };
|
||||
}
|
||||
// 合并所有模块路由
|
||||
const allModuleRoutes = Object.values(routeModules).reduce((acc: any, module: any) => {
|
||||
return { ...acc, ...module.default };
|
||||
}, {});
|
||||
|
||||
const token = getToken(); // 获取浏览器缓存 token 值
|
||||
const token = getToken();
|
||||
if (!token) {
|
||||
// 无 token 停止执行下一步
|
||||
return false;
|
||||
}
|
||||
useUserInfo().setUserInfo({});
|
||||
// 获取路由
|
||||
let menuRoute = await getBackEndControlRoutes();
|
||||
|
||||
const cacheList: Array<string> = [];
|
||||
useUserInfo().setUserInfo({});
|
||||
|
||||
try {
|
||||
// 获取路由和权限
|
||||
const menuAndPermission = await openApi.getPermissions();
|
||||
useUserInfo().userInfo.permissions = menuAndPermission.permissions;
|
||||
const menuRoute = menuAndPermission.menus;
|
||||
|
||||
const cacheList: string[] = [];
|
||||
|
||||
// 处理路由(component)
|
||||
const routes = backEndRouterConverter(allModuleRoutes, menuRoute, (router: any) => {
|
||||
// 可能为false时不存在isKeepAlive属性
|
||||
if (!router.meta.isKeepAlive) {
|
||||
router.meta.isKeepAlive = false;
|
||||
}
|
||||
// 确保 isKeepAlive 属性存在
|
||||
router.meta.isKeepAlive = router.meta.isKeepAlive ?? false;
|
||||
if (router.meta.isKeepAlive) {
|
||||
cacheList.push(router.name);
|
||||
cacheList.push(router.name as string);
|
||||
}
|
||||
});
|
||||
|
||||
// 添加路由
|
||||
routes.forEach((item: any) => {
|
||||
if (item.meta.isFull) {
|
||||
// 菜单为全屏展示 (示例:数据大屏页面等)
|
||||
router.addRoute(item as RouteRecordRaw);
|
||||
} else {
|
||||
// 要将嵌套路由添加到现有的路由中,可以将路由的 name 作为第一个参数传递给 router.addRoute(),这将有效地添加路由,就像通过 children 添加的一样
|
||||
router.addRoute(LAYOUT_ROUTE_NAME, item as RouteRecordRaw);
|
||||
}
|
||||
});
|
||||
|
||||
useKeepALiveNames().setCacheKeepAlive(cacheList);
|
||||
useRoutesList().setRoutesList(routes);
|
||||
}
|
||||
|
||||
// 后端控制路由,isRequestRoutes 为 true,则开启后端控制路由
|
||||
export async function getBackEndControlRoutes() {
|
||||
try {
|
||||
const menuAndPermission = await openApi.getPermissions();
|
||||
// 赋值权限码,用于控制按钮等
|
||||
useUserInfo().userInfo.permissions = menuAndPermission.permissions;
|
||||
return menuAndPermission.menus;
|
||||
} catch (e: any) {
|
||||
console.error('获取菜单权限信息失败', e);
|
||||
clearSession();
|
||||
@@ -97,57 +88,52 @@ type RouterConvCallbackFunc = (router: any) => void;
|
||||
* @param meta.linkType ==> 外链类型, 内嵌: 以iframe展示、外链: 新标签打开
|
||||
* @param meta.link ==> 外链地址
|
||||
* */
|
||||
export function backEndRouterConverter(allModuleRoutes: any, routes: any, callbackFunc: RouterConvCallbackFunc = null as any, parentPath: string = '/') {
|
||||
if (!routes) {
|
||||
return [];
|
||||
}
|
||||
export function backEndRouterConverter(allModuleRoutes: any, routes: any, callbackFunc?: RouterConvCallbackFunc, parentPath = '/'): any[] {
|
||||
if (!routes) return [];
|
||||
|
||||
return routes.map((item: any) => {
|
||||
if (!item.meta) return item;
|
||||
|
||||
const routeItems = [];
|
||||
for (let item of routes) {
|
||||
if (!item.meta) {
|
||||
return item;
|
||||
}
|
||||
// 将json字符串的meta转为对象
|
||||
item.meta = JSON.parse(item.meta);
|
||||
const meta = typeof item.meta === 'string' ? JSON.parse(item.meta) : item.meta;
|
||||
|
||||
// 处理路径
|
||||
let path = item.code;
|
||||
// 如果不是以 / 开头,则路径需要拼接父路径
|
||||
if (!path.startsWith('/')) {
|
||||
path = parentPath + '/' + path;
|
||||
path = `${parentPath}/${path}`.replace(/\/+/g, '/');
|
||||
}
|
||||
item.path = path;
|
||||
delete item['code'];
|
||||
|
||||
// route.meta.title == resource.name
|
||||
item.meta.title = item.name;
|
||||
delete item['name'];
|
||||
// 构建路由对象
|
||||
const routeItem: any = {
|
||||
path,
|
||||
name: meta.routeName,
|
||||
meta: {
|
||||
...meta,
|
||||
title: item.name,
|
||||
},
|
||||
};
|
||||
|
||||
// route.name == resource.meta.routeName
|
||||
const routerName = item.meta.routeName;
|
||||
item.name = routerName;
|
||||
// 如果是外链类型,name的路由名都是Link 或者 Iframes会导致路由名重复,无法添加多个外链
|
||||
if (item.meta.link) {
|
||||
if (item.meta.linkType == LinkTypeEnum.Link.value) {
|
||||
item.component = Link;
|
||||
// 处理外链
|
||||
if (meta.link) {
|
||||
routeItem.component = meta.linkType == LinkTypeEnum.Link.value ? Link : Iframe;
|
||||
} else {
|
||||
item.component = Iframe;
|
||||
}
|
||||
} else {
|
||||
// routerName == 模块下route.ts 字段key == 组件名
|
||||
item.component = allModuleRoutes[routerName];
|
||||
}
|
||||
delete item.meta['routeName'];
|
||||
|
||||
// route.redirect == resource.meta.redirect
|
||||
if (item.meta.redirect) {
|
||||
item.redirect = item.meta.redirect;
|
||||
delete item.meta['redirect'];
|
||||
}
|
||||
// 存在回调,则执行回调
|
||||
callbackFunc && callbackFunc(item);
|
||||
item.children && backEndRouterConverter(allModuleRoutes, item.children, callbackFunc, item.path);
|
||||
routeItems.push(item);
|
||||
// 使用模块路由组件
|
||||
routeItem.component = allModuleRoutes[meta.routeName];
|
||||
}
|
||||
|
||||
return routeItems;
|
||||
// 处理重定向
|
||||
if (meta.redirect) {
|
||||
routeItem.redirect = meta.redirect;
|
||||
}
|
||||
|
||||
// 处理子路由
|
||||
if (item.children) {
|
||||
routeItem.children = backEndRouterConverter(allModuleRoutes, item.children, callbackFunc, path);
|
||||
}
|
||||
|
||||
// 执行回调
|
||||
callbackFunc?.(routeItem);
|
||||
|
||||
return routeItem;
|
||||
});
|
||||
}
|
||||
|
||||
@@ -131,35 +131,10 @@ $spacing: 8px;
|
||||
|
||||
// 横向菜单
|
||||
.el-menu--horizontal {
|
||||
background: var(--bg-topBar);
|
||||
|
||||
.el-menu-item,
|
||||
.el-sub-menu {
|
||||
height: $menuHeight;
|
||||
line-height: $menuHeight;
|
||||
color: var(--bg-topBarColor);
|
||||
border-radius: $radius;
|
||||
padding: 0 10px !important; // 减小内边距
|
||||
|
||||
.el-sub-menu__title {
|
||||
height: $menuHeight;
|
||||
line-height: $menuHeight;
|
||||
color: var(--bg-topBarColor);
|
||||
border-radius: $radius;
|
||||
padding: 0 10px !important; // 减小内边距
|
||||
}
|
||||
}
|
||||
|
||||
.el-menu-item.is-active,
|
||||
.el-sub-menu.is-active .el-sub-menu__title {
|
||||
color: #409eff;
|
||||
background-color: rgba(64, 158, 255, 0.1);
|
||||
}
|
||||
|
||||
.el-menu-item:hover,
|
||||
.el-sub-menu:not(.is-active):hover .el-sub-menu__title {
|
||||
background-color: rgba(64, 158, 255, 0.05);
|
||||
transform: translateY(-1px);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -171,33 +146,15 @@ $spacing: 8px;
|
||||
|
||||
.el-menu-item,
|
||||
.el-sub-menu__title {
|
||||
height: $menuHeight;
|
||||
line-height: $menuHeight;
|
||||
color: var(--bg-topBarColor);
|
||||
border-radius: $radius;
|
||||
transition: all 0.2s ease;
|
||||
padding: 0 10px !important; // 减小内边距
|
||||
border-bottom: none !important;
|
||||
}
|
||||
|
||||
.el-menu-item:not(.is-active):hover,
|
||||
.el-sub-menu:not(.is-active):hover .el-sub-menu__title {
|
||||
color: var(--bg-topBarColor);
|
||||
background-color: rgba(0, 0, 0, 0.03);
|
||||
}
|
||||
|
||||
.el-menu-item.is-active,
|
||||
.el-sub-menu.is-active .el-sub-menu__title {
|
||||
background-color: rgba(64, 158, 255, 0.1);
|
||||
color: #409eff;
|
||||
font-weight: 500;
|
||||
border-bottom: none !important;
|
||||
}
|
||||
|
||||
// 为水平菜单的子菜单项正确处理箭头图标位置
|
||||
.el-sub-menu {
|
||||
.el-sub-menu__title {
|
||||
padding-right: 20px !important; // 调整箭头图标空间
|
||||
padding-right: 22px !important; // 调整箭头图标空间
|
||||
border-bottom: none !important;
|
||||
}
|
||||
|
||||
|
||||
@@ -66,7 +66,7 @@ import { useI18nCreateTitle, useI18nDeleteConfirm, useI18nDeleteSuccessMsg, useI
|
||||
import { tmplApi } from '../api';
|
||||
import { TmplStatusEnum, TmplTypeEnum, ChannelTypeEnum } from '../enums';
|
||||
import TmplEdit from './TmplEdit.vue';
|
||||
import EnumValue from '../../../common/Enum';
|
||||
import EnumValue from '@/common/Enum';
|
||||
import AccountSelectFormItem from '@/views/system/account/components/AccountSelectFormItem.vue';
|
||||
|
||||
const perms = {
|
||||
|
||||
@@ -1,6 +1,12 @@
|
||||
<template>
|
||||
<el-card class="h-full flex" body-class="!p-1 flex flex-col w-full">
|
||||
<el-input v-model="filterText" :placeholder="$t('tag.tagFilterPlaceholder')" clearable size="small" class="!mb-1 w-full" />
|
||||
<el-card class="h-full flex tag-tree-card" body-class="!p-0 flex flex-col w-full">
|
||||
<div class="tag-tree-header">
|
||||
<el-input v-model="filterText" :placeholder="$t('tag.tagFilterPlaceholder')" clearable size="small" class="tag-tree-search w-full">
|
||||
<template #prefix>
|
||||
<SvgIcon class="tag-tree-search-icon" name="search" />
|
||||
</template>
|
||||
</el-input>
|
||||
</div>
|
||||
<el-scrollbar>
|
||||
<el-tree
|
||||
class="min-w-full inline-block"
|
||||
@@ -30,7 +36,7 @@
|
||||
|
||||
<slot v-else :node="node" :data="data" name="prefix"></slot>
|
||||
|
||||
<span class="ml-0.5" :title="data.labelRemark">
|
||||
<span class="ml-1" :title="data.labelRemark">
|
||||
<slot name="label" :data="data" v-if="!data.disabled"> {{ $t(data.label) }}</slot>
|
||||
<!-- 禁用状态 -->
|
||||
<slot name="disabledLabel" :data="data" v-else>
|
||||
@@ -40,7 +46,7 @@
|
||||
</slot>
|
||||
</span>
|
||||
|
||||
<span class="absolute right-2.5 mt-0.5 text-[10px] text-gray-400">
|
||||
<span class="ml-auto pr-1.5 text-[10px] text-gray-400">
|
||||
<slot :node="node" :data="data" name="suffix"></slot>
|
||||
</span>
|
||||
</div>
|
||||
@@ -59,6 +65,7 @@ import TagInfo from './TagInfo.vue';
|
||||
import { Contextmenu } from '@/components/contextmenu';
|
||||
import { tagApi } from '../tag/api';
|
||||
import { isPrefixSubsequence } from '@/common/utils/string';
|
||||
import SvgIcon from '@/components/svgIcon/index.vue';
|
||||
|
||||
const props = defineProps({
|
||||
resourceType: {
|
||||
@@ -248,4 +255,22 @@ defineExpose({
|
||||
});
|
||||
</script>
|
||||
|
||||
<style lang="scss" scoped></style>
|
||||
<style lang="scss" scoped>
|
||||
.tag-tree-card {
|
||||
:deep(.el-card__body) {
|
||||
padding: 0;
|
||||
}
|
||||
}
|
||||
|
||||
.tag-tree-header {
|
||||
padding: 4px 6px;
|
||||
border-bottom: 1px solid var(--el-border-color-light);
|
||||
}
|
||||
|
||||
.tag-tree-search {
|
||||
:deep(.el-input__wrapper) {
|
||||
border-radius: 14px;
|
||||
height: 24px;
|
||||
}
|
||||
}
|
||||
</style>
|
||||
|
||||
@@ -33,7 +33,7 @@
|
||||
</div>
|
||||
|
||||
<!-- 字段名列 -->
|
||||
<div v-else @contextmenu="headerContextmenuClick($event, column)" style="position: relative">
|
||||
<div v-else style="position: relative" @mouseenter="showColumnAction(column)" @mouseleave="hideColumnAction">
|
||||
<!-- 字段列的数据类型 -->
|
||||
<div class="column-type">
|
||||
<span v-if="column.dataTypeSubscript === 'icon-clock'">
|
||||
@@ -65,9 +65,56 @@
|
||||
|
||||
<!-- 字段列右部分内容 -->
|
||||
<div class="column-right">
|
||||
<span v-if="column.title == nowSortColumn?.columnName">
|
||||
<SvgIcon color="var(--el-color-primary)" :name="nowSortColumn?.order == 'asc' ? 'top' : 'bottom'"></SvgIcon>
|
||||
<el-dropdown
|
||||
@command="handleColumnCommand(column, $event)"
|
||||
@visibleChange="onColumnActionVisibleChange(column, $event)"
|
||||
trigger="click"
|
||||
v-if="column.key !== rowNoColumn.key"
|
||||
size="small"
|
||||
>
|
||||
<span class="column-actions-trigger">
|
||||
<!-- 排序箭头图标 -->
|
||||
<SvgIcon
|
||||
v-if="
|
||||
column.title == nowSortColumn?.columnName &&
|
||||
!showColumnActions[column.key] &&
|
||||
!columnActionVisible[column.key]
|
||||
"
|
||||
:color="'var(--el-color-primary)'"
|
||||
:name="nowSortColumn?.order == 'asc' ? 'top' : 'bottom'"
|
||||
:size="14"
|
||||
/>
|
||||
<!-- 更多操作图标 -->
|
||||
<SvgIcon
|
||||
v-if="columnActionVisible[column.key] || showColumnActions[column.key]"
|
||||
name="MoreFilled"
|
||||
:size="14"
|
||||
:color="'var(--el-color-primary)'"
|
||||
class="column-more-icon"
|
||||
:class="{ 'column-more-icon-visible': columnActionVisible[column.key] || showColumnActions[column.key] }"
|
||||
/>
|
||||
</span>
|
||||
<template #dropdown>
|
||||
<el-dropdown-menu>
|
||||
<el-dropdown-item command="sort-asc">
|
||||
<SvgIcon name="top" class="mr-1" />
|
||||
{{ $t('db.asc') }}
|
||||
</el-dropdown-item>
|
||||
<el-dropdown-item command="sort-desc">
|
||||
<SvgIcon name="bottom" class="mr-1" />
|
||||
{{ $t('db.desc') }}
|
||||
</el-dropdown-item>
|
||||
<el-dropdown-item v-if="!column.fixed" command="fix">
|
||||
<SvgIcon name="Paperclip" class="mr-1" />
|
||||
{{ $t('db.fixed') }}
|
||||
</el-dropdown-item>
|
||||
<el-dropdown-item v-else command="unfix">
|
||||
<SvgIcon name="Minus" class="mr-1" />
|
||||
{{ $t('db.cancelFiexd') }}
|
||||
</el-dropdown-item>
|
||||
</el-dropdown-menu>
|
||||
</template>
|
||||
</el-dropdown>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
@@ -214,43 +261,9 @@ const props = defineProps({
|
||||
const contextmenuRef = ref();
|
||||
const tableRef = ref();
|
||||
|
||||
/** 表头 menu items **/
|
||||
|
||||
const cmHeaderAsc = new ContextmenuItem('asc', 'db.asc')
|
||||
.withIcon('top')
|
||||
.withOnClick((data: any) => {
|
||||
onTableSortChange({ columnName: data.dataKey, order: 'asc' });
|
||||
})
|
||||
.withHideFunc(() => !props.showColumnTip);
|
||||
|
||||
const cmHeaderDesc = new ContextmenuItem('desc', 'db.desc')
|
||||
.withIcon('bottom')
|
||||
.withOnClick((data: any) => {
|
||||
onTableSortChange({ columnName: data.dataKey, order: 'desc' });
|
||||
})
|
||||
.withHideFunc(() => !props.showColumnTip);
|
||||
|
||||
const cmHeaderFixed = new ContextmenuItem('fixed', 'db.fixed')
|
||||
.withIcon('Paperclip')
|
||||
.withOnClick((data: any) => {
|
||||
state.columns.forEach((column: any) => {
|
||||
if (column.dataKey == data.dataKey) {
|
||||
column.fixed = true;
|
||||
}
|
||||
});
|
||||
})
|
||||
.withHideFunc((data: any) => data.fixed);
|
||||
|
||||
const cmHeaderCancelFixed = new ContextmenuItem('cancelFixed', 'db.cancelFiexd')
|
||||
.withIcon('Minus')
|
||||
.withOnClick((data: any) => {
|
||||
state.columns.forEach((column: any) => {
|
||||
if (column.dataKey == data.dataKey) {
|
||||
column.fixed = false;
|
||||
}
|
||||
});
|
||||
})
|
||||
.withHideFunc((data: any) => !data.fixed);
|
||||
// 用于控制列操作按钮的显示
|
||||
const showColumnActions = ref({} as any);
|
||||
const columnActionVisible = ref({} as any);
|
||||
|
||||
/** 表数据 contextmenu items **/
|
||||
|
||||
@@ -508,6 +521,55 @@ const cancelLoading = async () => {
|
||||
endLoading();
|
||||
};
|
||||
|
||||
/**
|
||||
* 显示列操作按钮
|
||||
*/
|
||||
const showColumnAction = (column: any) => {
|
||||
showColumnActions.value[column.key] = true;
|
||||
};
|
||||
|
||||
/**
|
||||
* 隐藏列操作按钮
|
||||
*/
|
||||
const hideColumnAction = () => {
|
||||
showColumnActions.value = {};
|
||||
};
|
||||
|
||||
/**
|
||||
* 处理列操作命令
|
||||
*/
|
||||
const handleColumnCommand = (column: any, command: string) => {
|
||||
switch (command) {
|
||||
case 'sort-asc':
|
||||
onTableSortChange({ columnName: column.dataKey, order: 'asc' });
|
||||
break;
|
||||
case 'sort-desc':
|
||||
onTableSortChange({ columnName: column.dataKey, order: 'desc' });
|
||||
break;
|
||||
case 'fix':
|
||||
state.columns.forEach((col: any) => {
|
||||
if (col.dataKey == column.dataKey) {
|
||||
col.fixed = true;
|
||||
}
|
||||
});
|
||||
break;
|
||||
case 'unfix':
|
||||
state.columns.forEach((col: any) => {
|
||||
if (col.dataKey == column.dataKey) {
|
||||
col.fixed = false;
|
||||
}
|
||||
});
|
||||
break;
|
||||
}
|
||||
// 点击了取消固定等操作后,可能更多的icon还是显示在列上,所以需要重新置为空对象。暂时不懂是组件bug还是啥
|
||||
columnActionVisible.value = {};
|
||||
};
|
||||
|
||||
const onColumnActionVisibleChange = (column: any, visible: boolean) => {
|
||||
columnActionVisible.value = {}; // 只显示一个列的更多icon
|
||||
columnActionVisible.value[column.key] = visible;
|
||||
};
|
||||
|
||||
/**
|
||||
* 当前单元格是否允许编辑
|
||||
* @param rowIndex ri
|
||||
@@ -570,16 +632,6 @@ const rowEventHandlers = {
|
||||
},
|
||||
};
|
||||
|
||||
const headerContextmenuClick = (event: any, data: any) => {
|
||||
event.preventDefault(); // 阻止默认的右击菜单行为
|
||||
|
||||
const { clientX, clientY } = event;
|
||||
state.contextmenu.dropdown.x = clientX;
|
||||
state.contextmenu.dropdown.y = clientY;
|
||||
state.contextmenu.items = [cmHeaderAsc, cmHeaderDesc, cmHeaderFixed, cmHeaderCancelFixed];
|
||||
contextmenuRef.value.openContextmenu(data);
|
||||
};
|
||||
|
||||
const dataContextmenuClick = (event: any, rowIndex: number, column: any, data: any) => {
|
||||
event.preventDefault(); // 阻止默认的右击菜单行为
|
||||
|
||||
@@ -851,6 +903,31 @@ defineExpose({
|
||||
top: 2px;
|
||||
right: 0;
|
||||
padding: 2px;
|
||||
display: flex;
|
||||
align-items: center;
|
||||
}
|
||||
|
||||
.column-actions-trigger {
|
||||
display: inline-flex;
|
||||
align-items: center;
|
||||
justify-content: center;
|
||||
width: 16px;
|
||||
height: 16px;
|
||||
border-radius: 50%;
|
||||
cursor: pointer;
|
||||
|
||||
&:hover {
|
||||
background-color: var(--el-fill-color-light);
|
||||
}
|
||||
}
|
||||
|
||||
.column-more-icon {
|
||||
opacity: 0;
|
||||
transition: opacity 0.2s;
|
||||
}
|
||||
|
||||
.column-more-icon-visible {
|
||||
opacity: 1 !important;
|
||||
}
|
||||
}
|
||||
</style>
|
||||
|
||||
@@ -497,8 +497,8 @@ export class DbInst {
|
||||
return;
|
||||
}
|
||||
|
||||
// 获取列名称的长度 加上排序图标长度、abc为字段类型简称占位符、排序图标等
|
||||
const columnWidth: number = getTextWidth(prop + 'abc') + 10;
|
||||
// 获取列名称的长度 加上排序图标长度、abc为字段类型简称占位符、更多/排序图标等
|
||||
const columnWidth: number = getTextWidth(prop + 'abc') + 25;
|
||||
// prop为该列的字段名(传字符串);tableData为该表格的数据源(传变量);
|
||||
if (!tableData || !tableData.length || tableData.length === 0 || tableData === undefined) {
|
||||
return columnWidth;
|
||||
|
||||
@@ -43,7 +43,7 @@
|
||||
|
||||
<el-form ref="teamForm" :model="addTeamDialog.form" :rules="teamFormRules" label-width="auto">
|
||||
<el-form-item prop="name" :label="$t('common.name')" required>
|
||||
<el-input :disabled="addTeamDialog.form.id" v-model="addTeamDialog.form.name" auto-complete="off"></el-input>
|
||||
<el-input :disabled="addTeamDialog.form.id > 0" v-model="addTeamDialog.form.name" auto-complete="off"></el-input>
|
||||
</el-form-item>
|
||||
|
||||
<el-form-item prop="validityDate" :label="$t('team.validity')" required>
|
||||
@@ -98,10 +98,8 @@
|
||||
<AccountSelectFormItem v-model="showMemDialog.memForm.accountIds" multiple focus />
|
||||
</el-form>
|
||||
<template #footer>
|
||||
<div class="dialog-footer">
|
||||
<el-button @click="onCancelAddMember()">{{ $t('common.cancel') }}</el-button>
|
||||
<el-button @click="onAddMember" type="primary">{{ $t('common.confirm') }}</el-button>
|
||||
</div>
|
||||
</template>
|
||||
</el-dialog>
|
||||
</el-dialog>
|
||||
@@ -238,8 +236,8 @@ const onSaveTeam = async () => {
|
||||
|
||||
const onCancelSaveTeam = () => {
|
||||
state.addTeamDialog.visible = false;
|
||||
teamForm.value.resetFields();
|
||||
setTimeout(() => {
|
||||
teamForm.value?.resetFields();
|
||||
state.addTeamDialog.form = {} as any;
|
||||
}, 500);
|
||||
};
|
||||
|
||||
1
server/.gitignore
vendored
1
server/.gitignore
vendored
@@ -11,4 +11,3 @@ mayfly_rsa.pub
|
||||
/db/mariadb/
|
||||
|
||||
*.sqlite
|
||||
file
|
||||
@@ -1,12 +1,11 @@
|
||||
module mayfly-go
|
||||
|
||||
go 1.24
|
||||
go 1.25
|
||||
|
||||
require (
|
||||
gitee.com/chunanyong/dm v1.8.20
|
||||
gitee.com/liuzongyang/libpq v1.10.11
|
||||
github.com/antlr4-go/antlr/v4 v4.13.1
|
||||
github.com/emirpasic/gods v1.18.1
|
||||
github.com/gin-gonic/gin v1.10.1
|
||||
github.com/glebarez/sqlite v1.11.0
|
||||
github.com/go-gormigrate/gormigrate/v2 v2.1.4
|
||||
@@ -15,7 +14,7 @@ require (
|
||||
github.com/go-playground/universal-translator v0.18.1
|
||||
github.com/go-playground/validator/v10 v10.27.0
|
||||
github.com/go-sql-driver/mysql v1.9.3
|
||||
github.com/golang-jwt/jwt/v5 v5.2.3
|
||||
github.com/golang-jwt/jwt/v5 v5.3.0
|
||||
github.com/google/uuid v1.6.0
|
||||
github.com/gorilla/websocket v1.5.3
|
||||
github.com/lionsoul2014/ip2region/binding/golang v0.0.0-20250630080345-f9402614f6ba
|
||||
@@ -24,7 +23,7 @@ require (
|
||||
github.com/pkg/errors v0.9.1
|
||||
github.com/pkg/sftp v1.13.9
|
||||
github.com/pquerna/otp v1.5.0
|
||||
github.com/redis/go-redis/v9 v9.11.0
|
||||
github.com/redis/go-redis/v9 v9.12.1
|
||||
github.com/robfig/cron/v3 v3.0.1 // 定时任务
|
||||
github.com/sijms/go-ora/v2 v2.9.0
|
||||
github.com/spf13/cast v1.9.2
|
||||
@@ -32,7 +31,7 @@ require (
|
||||
github.com/tidwall/gjson v1.18.0
|
||||
github.com/veops/go-ansiterm v0.0.5
|
||||
go.mongodb.org/mongo-driver/v2 v2.2.2 // mongo
|
||||
golang.org/x/crypto v0.40.0 // ssh
|
||||
golang.org/x/crypto v0.41.0 // ssh
|
||||
golang.org/x/oauth2 v0.30.0
|
||||
golang.org/x/sync v0.16.0
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.2.1
|
||||
@@ -92,8 +91,8 @@ require (
|
||||
golang.org/x/exp v0.0.0-20250718183923-645b1fa84792 // indirect
|
||||
golang.org/x/image v0.29.0 // indirect
|
||||
golang.org/x/net v0.42.0 // indirect
|
||||
golang.org/x/sys v0.34.0 // indirect
|
||||
golang.org/x/text v0.27.0 // indirect
|
||||
golang.org/x/sys v0.35.0 // indirect
|
||||
golang.org/x/text v0.28.0 // indirect
|
||||
google.golang.org/protobuf v1.36.6 // indirect
|
||||
modernc.org/libc v1.66.4 // indirect
|
||||
modernc.org/mathutil v1.7.1 // indirect
|
||||
|
||||
@@ -347,6 +347,25 @@ func (v *MysqlVisitor) VisitAtomTableItem(ctx *mysqlparser.AtomTableItemContext)
|
||||
return tableSourceItem
|
||||
}
|
||||
|
||||
func (v *MysqlVisitor) VisitSubqueryTableItem(ctx *mysqlparser.SubqueryTableItemContext) interface{} {
|
||||
sti := new(sqlstmt.SubqueryTableItem)
|
||||
sti.Node = sqlstmt.NewNode(ctx.GetParser(), ctx)
|
||||
|
||||
// 解析子查询
|
||||
if ss := ctx.SelectStatement(); ss != nil {
|
||||
sti.SubQuery = ss.Accept(v).(sqlstmt.ISelectStmt)
|
||||
}
|
||||
|
||||
// 获取别名
|
||||
if alias := ctx.GetAlias(); alias != nil {
|
||||
sti.Alias = alias.GetText()
|
||||
} else if uid := ctx.Uid(); uid != nil {
|
||||
sti.Alias = uid.GetText()
|
||||
}
|
||||
|
||||
return sti
|
||||
}
|
||||
|
||||
func (v *MysqlVisitor) VisitInnerJoin(ctx *mysqlparser.InnerJoinContext) interface{} {
|
||||
ij := new(sqlstmt.InnerJoin)
|
||||
ij.Node = sqlstmt.NewNode(ctx.GetParser(), ctx)
|
||||
|
||||
@@ -139,6 +139,14 @@ type (
|
||||
TableName *TableName // 表名
|
||||
Alias string // 别名
|
||||
}
|
||||
|
||||
// SubqueryTableItem 表示子查询表项,如 (SELECT * FROM table1) AS alias
|
||||
SubqueryTableItem struct {
|
||||
TableSourceItem
|
||||
|
||||
SubQuery ISelectStmt
|
||||
Alias string
|
||||
}
|
||||
)
|
||||
|
||||
func (*TableSource) isTableSource() {}
|
||||
|
||||
15
server/internal/file/infra/persistence/file.go
Normal file
15
server/internal/file/infra/persistence/file.go
Normal file
@@ -0,0 +1,15 @@
|
||||
package persistence
|
||||
|
||||
import (
|
||||
"mayfly-go/internal/file/domain/entity"
|
||||
"mayfly-go/internal/file/domain/repository"
|
||||
"mayfly-go/pkg/base"
|
||||
)
|
||||
|
||||
type fileRepoImpl struct {
|
||||
base.RepoImpl[*entity.File]
|
||||
}
|
||||
|
||||
func newFileRepo() repository.File {
|
||||
return &fileRepoImpl{}
|
||||
}
|
||||
9
server/internal/file/infra/persistence/persistence.go
Normal file
9
server/internal/file/infra/persistence/persistence.go
Normal file
@@ -0,0 +1,9 @@
|
||||
package persistence
|
||||
|
||||
import (
|
||||
"mayfly-go/pkg/ioc"
|
||||
)
|
||||
|
||||
func InitIoc() {
|
||||
ioc.Register(newFileRepo(), ioc.WithComponentName("FileRepo"))
|
||||
}
|
||||
@@ -379,7 +379,7 @@ func (m *MachineFile) UploadFolder(rc *req.Ctx) {
|
||||
|
||||
isSuccess := true
|
||||
for _, chunk := range chunks {
|
||||
go func(files []FolderFile, wg *sync.WaitGroup) {
|
||||
wg.Go(func() {
|
||||
defer func() {
|
||||
// 协程执行完成后调用Done方法
|
||||
wg.Done()
|
||||
@@ -397,7 +397,7 @@ func (m *MachineFile) UploadFolder(rc *req.Ctx) {
|
||||
}
|
||||
}()
|
||||
|
||||
for _, file := range files {
|
||||
for _, file := range chunk {
|
||||
fileHeader := file.Fileheader
|
||||
dir := file.Dir
|
||||
file, _ := fileHeader.Open()
|
||||
@@ -410,7 +410,7 @@ func (m *MachineFile) UploadFolder(rc *req.Ctx) {
|
||||
defer createfile.Close()
|
||||
io.Copy(createfile, file)
|
||||
}
|
||||
}(chunk, &wg)
|
||||
})
|
||||
}
|
||||
|
||||
// 等待所有协程执行完成
|
||||
|
||||
@@ -1,262 +0,0 @@
|
||||
package runner
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
const minTimerDelay = time.Millisecond * 1
|
||||
const maxTimerDelay = time.Nanosecond * math.MaxInt64
|
||||
|
||||
type DelayQueue[T Delayable] struct {
|
||||
enqueuedSignal chan struct{}
|
||||
dequeuedSignal chan struct{}
|
||||
transferChan chan T
|
||||
singleDequeue chan struct{}
|
||||
mutex sync.Mutex
|
||||
priorityQueue *PriorityQueue[T]
|
||||
zero T
|
||||
}
|
||||
|
||||
type Delayable interface {
|
||||
GetDeadline() time.Time
|
||||
GetKey() string
|
||||
}
|
||||
|
||||
var _ Delayable = (*wrapper[Job])(nil)
|
||||
|
||||
type wrapper[T Job] struct {
|
||||
key string
|
||||
deadline time.Time
|
||||
removed bool
|
||||
status JobStatus
|
||||
job T
|
||||
}
|
||||
|
||||
func newWrapper[T Job](job T) *wrapper[T] {
|
||||
return &wrapper[T]{
|
||||
key: job.GetKey(),
|
||||
job: job,
|
||||
}
|
||||
}
|
||||
|
||||
func (d *wrapper[T]) GetDeadline() time.Time {
|
||||
return d.deadline
|
||||
}
|
||||
|
||||
func (d *wrapper[T]) GetKey() string {
|
||||
return d.key
|
||||
}
|
||||
|
||||
func NewDelayQueue[T Delayable](cap int) *DelayQueue[T] {
|
||||
singleDequeue := make(chan struct{}, 1)
|
||||
singleDequeue <- struct{}{}
|
||||
return &DelayQueue[T]{
|
||||
enqueuedSignal: make(chan struct{}),
|
||||
dequeuedSignal: make(chan struct{}),
|
||||
transferChan: make(chan T),
|
||||
singleDequeue: singleDequeue,
|
||||
priorityQueue: NewPriorityQueue[T](cap, func(src T, dst T) bool {
|
||||
return src.GetDeadline().Before(dst.GetDeadline())
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *DelayQueue[T]) TryDequeue() (T, bool) {
|
||||
s.mutex.Lock()
|
||||
defer s.mutex.Unlock()
|
||||
|
||||
if elm, ok := s.priorityQueue.Peek(0); ok {
|
||||
delay := elm.GetDeadline().Sub(time.Now())
|
||||
if delay < minTimerDelay {
|
||||
// 无需延迟,头部元素出队后直接返回
|
||||
_, _ = s.dequeue()
|
||||
return elm, true
|
||||
}
|
||||
}
|
||||
return s.zero, false
|
||||
}
|
||||
|
||||
func (s *DelayQueue[T]) Dequeue(ctx context.Context) (T, bool) {
|
||||
// 出队锁:避免因重复获取队列头部同一元素降低性能
|
||||
select {
|
||||
case <-s.singleDequeue:
|
||||
defer func() {
|
||||
s.singleDequeue <- struct{}{}
|
||||
}()
|
||||
case <-ctx.Done():
|
||||
return s.zero, false
|
||||
}
|
||||
|
||||
for {
|
||||
// 全局锁:避免入队和出队信号的重置与激活出现并发问题
|
||||
s.mutex.Lock()
|
||||
if ctx.Err() != nil {
|
||||
s.mutex.Unlock()
|
||||
return s.zero, false
|
||||
}
|
||||
|
||||
// 接收直接转发的不需要延迟的新元素
|
||||
select {
|
||||
case elm := <-s.transferChan:
|
||||
s.mutex.Unlock()
|
||||
return elm, true
|
||||
default:
|
||||
}
|
||||
|
||||
// 延迟时间缺省值为 maxTimerDelay, 表示队列为空
|
||||
delay := maxTimerDelay
|
||||
if elm, ok := s.priorityQueue.Peek(0); ok {
|
||||
now := time.Now()
|
||||
delay = elm.GetDeadline().Sub(now)
|
||||
if delay < minTimerDelay {
|
||||
// 无需延迟,头部元素出队后直接返回
|
||||
_, _ = s.dequeue()
|
||||
s.mutex.Unlock()
|
||||
return elm, ok
|
||||
}
|
||||
}
|
||||
// 重置入队信号,避免历史信号干扰
|
||||
select {
|
||||
case <-s.enqueuedSignal:
|
||||
default:
|
||||
}
|
||||
s.mutex.Unlock()
|
||||
|
||||
if delay == maxTimerDelay {
|
||||
// 队列为空, 等待新元素
|
||||
select {
|
||||
case elm := <-s.transferChan:
|
||||
return elm, true
|
||||
case <-s.enqueuedSignal:
|
||||
continue
|
||||
case <-ctx.Done():
|
||||
return s.zero, false
|
||||
}
|
||||
} else if delay >= minTimerDelay {
|
||||
// 等待时间到期或新元素加入
|
||||
timer := time.NewTimer(delay)
|
||||
select {
|
||||
case <-timer.C:
|
||||
continue
|
||||
case elm := <-s.transferChan:
|
||||
timer.Stop()
|
||||
return elm, true
|
||||
case <-s.enqueuedSignal:
|
||||
timer.Stop()
|
||||
continue
|
||||
case <-ctx.Done():
|
||||
timer.Stop()
|
||||
return s.zero, false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *DelayQueue[T]) dequeue() (T, bool) {
|
||||
elm, ok := s.priorityQueue.Dequeue()
|
||||
if !ok {
|
||||
return s.zero, false
|
||||
}
|
||||
select {
|
||||
case s.dequeuedSignal <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
return elm, true
|
||||
}
|
||||
|
||||
func (s *DelayQueue[T]) enqueue(val T) bool {
|
||||
if ok := s.priorityQueue.Enqueue(val); !ok {
|
||||
return false
|
||||
}
|
||||
select {
|
||||
case s.enqueuedSignal <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (s *DelayQueue[T]) TryEnqueue(val T) bool {
|
||||
s.mutex.Lock()
|
||||
defer s.mutex.Unlock()
|
||||
|
||||
if s.priorityQueue.IsFull() {
|
||||
return false
|
||||
}
|
||||
return s.enqueue(val)
|
||||
}
|
||||
|
||||
func (s *DelayQueue[T]) Enqueue(ctx context.Context, val T) bool {
|
||||
for {
|
||||
// 全局锁:避免入队和出队信号的重置与激活出现并发问题
|
||||
s.mutex.Lock()
|
||||
|
||||
if ctx.Err() != nil {
|
||||
s.mutex.Unlock()
|
||||
return false
|
||||
}
|
||||
|
||||
// 如果队列未满,入队后直接返回
|
||||
if !s.priorityQueue.IsFull() {
|
||||
s.enqueue(val)
|
||||
s.mutex.Unlock()
|
||||
return true
|
||||
}
|
||||
// 队列已满,重置出队信号,避免受到历史信号影响
|
||||
select {
|
||||
case <-s.dequeuedSignal:
|
||||
default:
|
||||
}
|
||||
s.mutex.Unlock()
|
||||
|
||||
if delay := val.GetDeadline().Sub(time.Now()); delay >= minTimerDelay {
|
||||
// 新元素需要延迟,等待退出信号、出队信号和到期信号
|
||||
timer := time.NewTimer(delay)
|
||||
select {
|
||||
case <-timer.C:
|
||||
// 新元素不再需要延迟
|
||||
case <-s.dequeuedSignal:
|
||||
// 收到出队信号,从头开始尝试入队
|
||||
timer.Stop()
|
||||
continue
|
||||
case <-ctx.Done():
|
||||
timer.Stop()
|
||||
return false
|
||||
}
|
||||
} else {
|
||||
// 新元素不需要延迟,等待转发成功信号、出队信号和退出信号
|
||||
select {
|
||||
case s.transferChan <- val:
|
||||
// 新元素转发成功,直接返回(避免队列满且元素未到期导致新元素长时间无法入队)
|
||||
return true
|
||||
case <-s.dequeuedSignal:
|
||||
// 收到出队信号,从头开始尝试入队
|
||||
continue
|
||||
case <-ctx.Done():
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *DelayQueue[T]) Remove(_ context.Context, key string) (T, bool) {
|
||||
s.mutex.Lock()
|
||||
defer s.mutex.Unlock()
|
||||
|
||||
return s.priorityQueue.Remove(s.index(key))
|
||||
}
|
||||
|
||||
func (s *DelayQueue[T]) index(key string) int {
|
||||
for i := 0; i < s.priorityQueue.Len(); i++ {
|
||||
elm, ok := s.priorityQueue.Peek(i)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if key == elm.GetKey() {
|
||||
return i
|
||||
}
|
||||
}
|
||||
return -1
|
||||
}
|
||||
@@ -1,425 +0,0 @@
|
||||
package runner
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"math/rand"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
var _ Delayable = &delayElement{}
|
||||
|
||||
type delayElement struct {
|
||||
id uint64
|
||||
value int
|
||||
deadline time.Time
|
||||
}
|
||||
|
||||
func (elm *delayElement) GetDeadline() time.Time {
|
||||
return elm.deadline
|
||||
}
|
||||
|
||||
func (elm *delayElement) GetId() uint64 {
|
||||
return elm.id
|
||||
}
|
||||
|
||||
func (elm *delayElement) GetKey() string {
|
||||
return strconv.FormatUint(elm.id, 16)
|
||||
}
|
||||
|
||||
type testDelayQueue = DelayQueue[*delayElement]
|
||||
|
||||
func newTestDelayQueue(cap int) *testDelayQueue {
|
||||
return NewDelayQueue[*delayElement](cap)
|
||||
}
|
||||
|
||||
func mustEnqueue(val int, delay int64) func(t *testing.T, queue *testDelayQueue) {
|
||||
return func(t *testing.T, queue *testDelayQueue) {
|
||||
require.True(t, queue.Enqueue(context.Background(),
|
||||
newTestElm(val, delay)))
|
||||
}
|
||||
}
|
||||
|
||||
func newTestElm(value int, delay int64) *delayElement {
|
||||
return &delayElement{
|
||||
id: elmId.Add(1),
|
||||
value: value,
|
||||
deadline: time.Now().Add(time.Millisecond * time.Duration(delay)),
|
||||
}
|
||||
}
|
||||
|
||||
var elmId atomic.Uint64
|
||||
|
||||
func TestDelayQueue_Enqueue(t *testing.T) {
|
||||
type testCase[R int, T Delayable] struct {
|
||||
name string
|
||||
queue *DelayQueue[T]
|
||||
before func(t *testing.T, queue *DelayQueue[T])
|
||||
while func(t *testing.T, queue *DelayQueue[T])
|
||||
after func(t *testing.T, queue *DelayQueue[T])
|
||||
value int
|
||||
delay int64
|
||||
timeout int64
|
||||
wantOk bool
|
||||
}
|
||||
tests := []testCase[int, *delayElement]{
|
||||
{
|
||||
name: "enqueue to empty queue",
|
||||
queue: newTestDelayQueue(1),
|
||||
after: func(t *testing.T, queue *testDelayQueue) {
|
||||
val, ok := queue.priorityQueue.Dequeue()
|
||||
require.True(t, ok)
|
||||
require.Equal(t, 1, val.value)
|
||||
},
|
||||
timeout: 10,
|
||||
value: 1,
|
||||
wantOk: true,
|
||||
},
|
||||
{
|
||||
name: "enqueue active element to full queue",
|
||||
queue: newTestDelayQueue(1),
|
||||
before: func(t *testing.T, queue *testDelayQueue) {
|
||||
mustEnqueue(1, 60)(t, queue)
|
||||
},
|
||||
timeout: 40,
|
||||
delay: 20,
|
||||
wantOk: false,
|
||||
},
|
||||
{
|
||||
name: "enqueue inactive element to full queue",
|
||||
queue: newTestDelayQueue(1),
|
||||
before: mustEnqueue(1, 60),
|
||||
timeout: 20,
|
||||
delay: 40,
|
||||
wantOk: false,
|
||||
},
|
||||
{
|
||||
name: "enqueue to full queue while dequeue valid element",
|
||||
queue: newTestDelayQueue(1),
|
||||
before: mustEnqueue(1, 60),
|
||||
while: func(t *testing.T, queue *testDelayQueue) {
|
||||
_, ok := queue.Dequeue(context.Background())
|
||||
require.True(t, ok)
|
||||
},
|
||||
timeout: 80,
|
||||
wantOk: true,
|
||||
},
|
||||
{
|
||||
name: "enqueue active element to full queue while dequeue invalid element",
|
||||
queue: newTestDelayQueue(1),
|
||||
before: mustEnqueue(1, 60),
|
||||
while: func(t *testing.T, queue *testDelayQueue) {
|
||||
elm, ok := queue.Dequeue(context.Background())
|
||||
require.True(t, ok)
|
||||
require.Equal(t, 2, elm.value)
|
||||
},
|
||||
timeout: 40,
|
||||
value: 2,
|
||||
delay: 20,
|
||||
wantOk: true,
|
||||
},
|
||||
{
|
||||
name: "enqueue inactive element to full queue while dequeue invalid element",
|
||||
queue: newTestDelayQueue(1),
|
||||
before: mustEnqueue(1, 60),
|
||||
while: func(t *testing.T, queue *testDelayQueue) {
|
||||
_, ok := queue.Dequeue(context.Background())
|
||||
require.True(t, ok)
|
||||
},
|
||||
timeout: 20,
|
||||
delay: 40,
|
||||
wantOk: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(),
|
||||
time.Millisecond*time.Duration(tt.timeout))
|
||||
defer cancel()
|
||||
if tt.before != nil {
|
||||
tt.before(t, tt.queue)
|
||||
}
|
||||
if tt.while != nil {
|
||||
go tt.while(t, tt.queue)
|
||||
}
|
||||
ok := tt.queue.Enqueue(ctx, newTestElm(tt.value, tt.delay))
|
||||
require.Equal(t, tt.wantOk, ok)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestDelayQueue_Dequeue(t *testing.T) {
|
||||
type testCase[R int, T Delayable] struct {
|
||||
name string
|
||||
queue *DelayQueue[T]
|
||||
before func(t *testing.T, queue *DelayQueue[T])
|
||||
while func(t *testing.T, queue *DelayQueue[T])
|
||||
timeout int64
|
||||
wantVal int
|
||||
wantOk bool
|
||||
}
|
||||
tests := []testCase[int, *delayElement]{
|
||||
{
|
||||
name: "dequeue from empty queue",
|
||||
queue: newTestDelayQueue(1),
|
||||
timeout: 20,
|
||||
wantOk: false,
|
||||
},
|
||||
{
|
||||
name: "dequeue new active element from empty queue",
|
||||
queue: newTestDelayQueue(1),
|
||||
while: mustEnqueue(1, 20),
|
||||
timeout: 4000,
|
||||
wantVal: 1,
|
||||
wantOk: true,
|
||||
},
|
||||
{
|
||||
name: "dequeue new inactive element from empty queue",
|
||||
queue: newTestDelayQueue(1),
|
||||
while: mustEnqueue(1, 60),
|
||||
timeout: 20,
|
||||
wantOk: false,
|
||||
},
|
||||
{
|
||||
name: "dequeue active element from full queue",
|
||||
queue: newTestDelayQueue(1),
|
||||
before: mustEnqueue(1, 60),
|
||||
timeout: 80,
|
||||
wantVal: 1,
|
||||
wantOk: true,
|
||||
},
|
||||
{
|
||||
name: "dequeue inactive element from full queue",
|
||||
queue: newTestDelayQueue(1),
|
||||
before: mustEnqueue(1, 60),
|
||||
timeout: 20,
|
||||
wantOk: false,
|
||||
},
|
||||
{
|
||||
name: "dequeue new active element from full queue",
|
||||
queue: newTestDelayQueue(1),
|
||||
before: mustEnqueue(1, 60),
|
||||
while: mustEnqueue(2, 40),
|
||||
timeout: 80,
|
||||
wantVal: 2,
|
||||
wantOk: true,
|
||||
},
|
||||
{
|
||||
name: "dequeue new inactive element from full queue",
|
||||
queue: newTestDelayQueue(1),
|
||||
before: mustEnqueue(1, 60),
|
||||
while: mustEnqueue(2, 40),
|
||||
timeout: 20,
|
||||
wantOk: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(),
|
||||
time.Millisecond*time.Duration(tt.timeout))
|
||||
defer cancel()
|
||||
if tt.before != nil {
|
||||
tt.before(t, tt.queue)
|
||||
}
|
||||
if tt.while != nil {
|
||||
go tt.while(t, tt.queue)
|
||||
}
|
||||
got, ok := tt.queue.Dequeue(ctx)
|
||||
require.Equal(t, tt.wantOk, ok)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
require.Equal(t, tt.wantVal, got.value)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestDelayQueue(t *testing.T) {
|
||||
const delay = 1000
|
||||
const timeout = 1000
|
||||
const capacity = 100
|
||||
const count = 100
|
||||
var wg sync.WaitGroup
|
||||
var (
|
||||
enqueueSeq atomic.Int32
|
||||
dequeueSeq atomic.Int32
|
||||
checksum atomic.Int64
|
||||
)
|
||||
queue := newTestDelayQueue(capacity)
|
||||
procs := runtime.GOMAXPROCS(0)
|
||||
wg.Add(procs)
|
||||
for i := 0; i < procs; i++ {
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
for {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*timeout)
|
||||
if i%2 == 0 {
|
||||
if seq := int(enqueueSeq.Add(1)); seq <= count {
|
||||
for ctx.Err() == nil {
|
||||
if ok := queue.Enqueue(ctx, newTestElm(seq, int64(rand.Intn(delay)))); ok {
|
||||
break
|
||||
}
|
||||
}
|
||||
} else {
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
} else {
|
||||
if seq := int(dequeueSeq.Add(1)); seq > count {
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
for ctx.Err() == nil {
|
||||
if elm, ok := queue.Dequeue(ctx); ok {
|
||||
require.Less(t, elm.GetDeadline().Sub(time.Now()), minTimerDelay)
|
||||
checksum.Add(int64(elm.value))
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
cancel()
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
assert.Zero(t, queue.priorityQueue.Len())
|
||||
assert.Equal(t, int64((1+count)*count/2), checksum.Load())
|
||||
}
|
||||
|
||||
func BenchmarkDelayQueueV3(b *testing.B) {
|
||||
const delay = 0
|
||||
const capacity = 100
|
||||
|
||||
b.Run("enqueue", func(b *testing.B) {
|
||||
queue := newTestDelayQueue(b.N)
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_ = queue.Enqueue(context.Background(), newTestElm(1, delay))
|
||||
}
|
||||
})
|
||||
|
||||
b.Run("parallel to enqueue", func(b *testing.B) {
|
||||
queue := newTestDelayQueue(b.N)
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
_ = queue.Enqueue(context.Background(), newTestElm(1, delay))
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
b.Run("dequeue", func(b *testing.B) {
|
||||
queue := newTestDelayQueue(b.N)
|
||||
for i := 0; i < b.N; i++ {
|
||||
require.True(b, queue.Enqueue(context.Background(), newTestElm(1, delay)))
|
||||
}
|
||||
time.Sleep(time.Millisecond * delay)
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, _ = queue.Dequeue(context.Background())
|
||||
}
|
||||
})
|
||||
|
||||
b.Run("parallel to dequeue", func(b *testing.B) {
|
||||
queue := newTestDelayQueue(b.N)
|
||||
for i := 0; i < b.N; i++ {
|
||||
require.True(b, queue.Enqueue(context.Background(), newTestElm(1, delay)))
|
||||
}
|
||||
time.Sleep(time.Millisecond * delay)
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
_, _ = queue.Dequeue(context.Background())
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
b.Run("parallel to dequeue while enqueue", func(b *testing.B) {
|
||||
queue := newTestDelayQueue(capacity)
|
||||
go func() {
|
||||
for i := 0; i < b.N; i++ {
|
||||
_ = queue.Enqueue(context.Background(), newTestElm(i, delay))
|
||||
}
|
||||
}()
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
_, _ = queue.Dequeue(context.Background())
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
b.Run("parallel to enqueue while dequeue", func(b *testing.B) {
|
||||
queue := newTestDelayQueue(capacity)
|
||||
go func() {
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, _ = queue.Dequeue(context.Background())
|
||||
}
|
||||
}()
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
_ = queue.Enqueue(context.Background(), newTestElm(1, delay))
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
b.Run("parallel to enqueue and dequeue", func(b *testing.B) {
|
||||
var wg sync.WaitGroup
|
||||
var (
|
||||
enqueueSeq atomic.Int32
|
||||
dequeueSeq atomic.Int32
|
||||
)
|
||||
queue := newTestDelayQueue(capacity)
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
procs := runtime.GOMAXPROCS(0)
|
||||
wg.Add(procs)
|
||||
for i := 0; i < procs; i++ {
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
for {
|
||||
if i%2 == 0 {
|
||||
if seq := int(enqueueSeq.Add(1)); seq <= b.N {
|
||||
for {
|
||||
if ok := queue.Enqueue(context.Background(), newTestElm(seq, delay)); ok {
|
||||
break
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return
|
||||
}
|
||||
} else {
|
||||
if seq := int(dequeueSeq.Add(1)); seq > b.N {
|
||||
return
|
||||
}
|
||||
for {
|
||||
if _, ok := queue.Dequeue(context.Background()); ok {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
})
|
||||
}
|
||||
@@ -1,136 +0,0 @@
|
||||
package runner
|
||||
|
||||
// PriorityQueue 是一个基于小顶堆的优先队列
|
||||
// 当capacity <= 0时,为无界队列,切片容量会动态扩缩容
|
||||
// 当capacity > 0 时,为有界队列,初始化后就固定容量,不会扩缩容
|
||||
type PriorityQueue[T any] struct {
|
||||
// 用于比较前一个元素是否小于后一个元素
|
||||
less Less[T]
|
||||
// 队列容量
|
||||
capacity int
|
||||
// 队列中的元素,为便于计算父子节点的index,0位置留空,根节点从1开始
|
||||
data []T
|
||||
|
||||
zero T
|
||||
}
|
||||
|
||||
func (p *PriorityQueue[T]) Len() int {
|
||||
return len(p.data) - 1
|
||||
}
|
||||
|
||||
// Cap 无界队列返回0,有界队列返回创建队列时设置的值
|
||||
func (p *PriorityQueue[T]) Cap() int {
|
||||
return p.capacity
|
||||
}
|
||||
|
||||
func (p *PriorityQueue[T]) IsBoundless() bool {
|
||||
return p.capacity <= 0
|
||||
}
|
||||
|
||||
func (p *PriorityQueue[T]) IsFull() bool {
|
||||
return p.capacity > 0 && len(p.data)-1 == p.capacity
|
||||
}
|
||||
|
||||
func (p *PriorityQueue[T]) IsEmpty() bool {
|
||||
return len(p.data) < 2
|
||||
}
|
||||
|
||||
func (p *PriorityQueue[T]) Peek(i int) (T, bool) {
|
||||
if p.IsEmpty() {
|
||||
return p.zero, false
|
||||
}
|
||||
if i >= p.Len() {
|
||||
return p.zero, false
|
||||
}
|
||||
return p.data[i+1], true
|
||||
}
|
||||
|
||||
func (p *PriorityQueue[T]) Enqueue(t T) bool {
|
||||
if p.IsFull() {
|
||||
return false
|
||||
}
|
||||
|
||||
p.data = append(p.data, t)
|
||||
node, parent := len(p.data)-1, (len(p.data)-1)/2
|
||||
for parent > 0 && p.less(p.data[node], p.data[parent]) {
|
||||
p.data[parent], p.data[node] = p.data[node], p.data[parent]
|
||||
node = parent
|
||||
parent = parent / 2
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (p *PriorityQueue[T]) Dequeue() (T, bool) {
|
||||
if p.IsEmpty() {
|
||||
return p.zero, false
|
||||
}
|
||||
|
||||
pop := p.data[1]
|
||||
// 假定说我拿到了堆顶,就是理论上优先级最低的
|
||||
// pop 的优先级
|
||||
p.data[1] = p.data[len(p.data)-1]
|
||||
p.data = p.data[:len(p.data)-1]
|
||||
p.shrinkIfNecessary()
|
||||
p.heapify(p.data, len(p.data)-1, 1)
|
||||
return pop, true
|
||||
}
|
||||
|
||||
func (p *PriorityQueue[T]) shrinkIfNecessary() {
|
||||
if !p.IsBoundless() {
|
||||
return
|
||||
}
|
||||
if cap(p.data) > 1024 && len(p.data)*3 < cap(p.data)*2 {
|
||||
data := make([]T, len(p.data), cap(p.data)*5/6)
|
||||
copy(data, p.data)
|
||||
p.data = data
|
||||
}
|
||||
}
|
||||
|
||||
func (p *PriorityQueue[T]) heapify(data []T, n, i int) {
|
||||
minPos := i
|
||||
for {
|
||||
if left := i * 2; left <= n && p.less(data[left], data[minPos]) {
|
||||
minPos = left
|
||||
}
|
||||
if right := i*2 + 1; right <= n && p.less(data[right], data[minPos]) {
|
||||
minPos = right
|
||||
}
|
||||
if minPos == i {
|
||||
break
|
||||
}
|
||||
data[i], data[minPos] = data[minPos], data[i]
|
||||
i = minPos
|
||||
}
|
||||
}
|
||||
|
||||
func (p *PriorityQueue[T]) Remove(i int) (T, bool) {
|
||||
if p.IsEmpty() || i >= p.Len() || i < 0 {
|
||||
return p.zero, false
|
||||
}
|
||||
|
||||
i += 1
|
||||
result := p.data[i]
|
||||
last := len(p.data) - 1
|
||||
p.data[i] = p.data[last]
|
||||
p.data = p.data[:last]
|
||||
p.shrinkIfNecessary()
|
||||
p.heapify(p.data, len(p.data)-1, i)
|
||||
return result, true
|
||||
}
|
||||
|
||||
// NewPriorityQueue 创建优先队列 capacity <= 0 时,为无界队列,否则有有界队列
|
||||
func NewPriorityQueue[T any](capacity int, less Less[T]) *PriorityQueue[T] {
|
||||
sliceCap := capacity + 1
|
||||
if capacity < 1 {
|
||||
capacity = 0
|
||||
sliceCap = 64
|
||||
}
|
||||
return &PriorityQueue[T]{
|
||||
capacity: capacity,
|
||||
data: make([]T, 1, sliceCap),
|
||||
less: less,
|
||||
}
|
||||
}
|
||||
|
||||
// Less 用于比较两个对象的大小 src < dst, 返回 true,src >= dst, 返回 false
|
||||
type Less[T any] func(src T, dst T) bool
|
||||
@@ -1,67 +0,0 @@
|
||||
package runner
|
||||
|
||||
import (
|
||||
"github.com/stretchr/testify/require"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestChangePriority(t *testing.T) {
|
||||
q := NewPriorityQueue[*priorityElement](100,
|
||||
func(src *priorityElement, dst *priorityElement) bool {
|
||||
return src.Priority < dst.Priority
|
||||
})
|
||||
e1 := &priorityElement{
|
||||
Data: 10,
|
||||
Priority: 200,
|
||||
}
|
||||
_ = q.Enqueue(e1)
|
||||
e2 := &priorityElement{
|
||||
Data: 10,
|
||||
Priority: 100,
|
||||
}
|
||||
_ = q.Enqueue(e2)
|
||||
//e1.Priority = 10
|
||||
val, _ := q.Dequeue()
|
||||
println(val)
|
||||
}
|
||||
|
||||
type priorityElement struct {
|
||||
Data any
|
||||
Priority int
|
||||
}
|
||||
|
||||
func TestPriorityQueue_Remove(t *testing.T) {
|
||||
q := NewPriorityQueue[*priorityElement](100,
|
||||
func(src *priorityElement, dst *priorityElement) bool {
|
||||
return src.Priority < dst.Priority
|
||||
})
|
||||
|
||||
for i := 8; i > 0; i-- {
|
||||
q.Enqueue(&priorityElement{Priority: i})
|
||||
}
|
||||
requirePriorities(t, q)
|
||||
|
||||
q.Remove(8)
|
||||
requirePriorities(t, q)
|
||||
q.Remove(7)
|
||||
requirePriorities(t, q)
|
||||
|
||||
q.Remove(2)
|
||||
requirePriorities(t, q)
|
||||
|
||||
q.Remove(1)
|
||||
requirePriorities(t, q)
|
||||
|
||||
q.Remove(0)
|
||||
requirePriorities(t, q)
|
||||
}
|
||||
|
||||
func requirePriorities(t *testing.T, q *PriorityQueue[*priorityElement]) {
|
||||
ps := make([]int, 0, q.Len())
|
||||
for _, val := range q.data[1:] {
|
||||
ps = append(ps, val.Priority)
|
||||
}
|
||||
for i := q.Len(); i >= 2; i-- {
|
||||
require.False(t, q.less(q.data[i], q.data[i/2]), ps)
|
||||
}
|
||||
}
|
||||
@@ -1,425 +0,0 @@
|
||||
package runner
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/emirpasic/gods/maps/linkedhashmap"
|
||||
"mayfly-go/pkg/logx"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrJobNotFound = errors.New("任务未找到")
|
||||
ErrJobExist = errors.New("任务已存在")
|
||||
ErrJobFinished = errors.New("任务已完成")
|
||||
ErrJobDisabled = errors.New("任务已禁用")
|
||||
ErrJobExpired = errors.New("任务已过期")
|
||||
ErrJobRunning = errors.New("任务执行中")
|
||||
)
|
||||
|
||||
type JobKey = string
|
||||
type RunJobFunc[T Job] func(ctx context.Context, job T) error
|
||||
type NextJobFunc[T Job] func() (T, bool)
|
||||
type RunnableJobFunc[T Job] func(job T, nextRunning NextJobFunc[T]) (bool, error)
|
||||
type ScheduleJobFunc[T Job] func(job T) (deadline time.Time, err error)
|
||||
type UpdateJobFunc[T Job] func(ctx context.Context, job T) error
|
||||
|
||||
type JobStatus int
|
||||
|
||||
const (
|
||||
JobUnknown JobStatus = iota
|
||||
JobDelaying
|
||||
JobWaiting
|
||||
JobRunning
|
||||
JobSuccess
|
||||
JobFailed
|
||||
)
|
||||
|
||||
type Job interface {
|
||||
GetKey() JobKey
|
||||
Update(job Job)
|
||||
SetStatus(status JobStatus, err error)
|
||||
SetEnabled(enabled bool, desc string)
|
||||
}
|
||||
|
||||
type iterator[T Job] struct {
|
||||
index int
|
||||
data []*wrapper[T]
|
||||
zero T
|
||||
}
|
||||
|
||||
func (iter *iterator[T]) Begin() {
|
||||
iter.index = -1
|
||||
}
|
||||
|
||||
func (iter *iterator[T]) Next() (T, bool) {
|
||||
if iter.index >= len(iter.data)-1 {
|
||||
return iter.zero, false
|
||||
}
|
||||
iter.index++
|
||||
return iter.data[iter.index].job, true
|
||||
}
|
||||
|
||||
type array[T Job] struct {
|
||||
size int
|
||||
data []*wrapper[T]
|
||||
}
|
||||
|
||||
func newArray[T Job](size int) *array[T] {
|
||||
return &array[T]{
|
||||
size: size,
|
||||
data: make([]*wrapper[T], 0, size),
|
||||
}
|
||||
}
|
||||
|
||||
func (a *array[T]) Iterator() *iterator[T] {
|
||||
return &iterator[T]{
|
||||
index: -1,
|
||||
data: a.data,
|
||||
}
|
||||
}
|
||||
|
||||
func (a *array[T]) Full() bool {
|
||||
return len(a.data) >= a.size
|
||||
}
|
||||
|
||||
func (a *array[T]) Append(job *wrapper[T]) bool {
|
||||
if len(a.data) >= a.size {
|
||||
return false
|
||||
}
|
||||
a.data = append(a.data, job)
|
||||
return true
|
||||
}
|
||||
|
||||
func (a *array[T]) Get(key JobKey) (*wrapper[T], bool) {
|
||||
for _, job := range a.data {
|
||||
if key == job.GetKey() {
|
||||
return job, true
|
||||
}
|
||||
}
|
||||
return nil, false
|
||||
}
|
||||
|
||||
func (a *array[T]) Remove(key JobKey) {
|
||||
length := len(a.data)
|
||||
for i, elm := range a.data {
|
||||
if key == elm.GetKey() {
|
||||
a.data[i], a.data[length-1] = a.data[length-1], nil
|
||||
a.data = a.data[:length-1]
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type Runner[T Job] struct {
|
||||
maxRunning int
|
||||
waiting *linkedhashmap.Map
|
||||
running *array[T]
|
||||
runJob RunJobFunc[T]
|
||||
runnableJob RunnableJobFunc[T]
|
||||
scheduleJob ScheduleJobFunc[T]
|
||||
updateJob UpdateJobFunc[T]
|
||||
mutex sync.Mutex
|
||||
wg sync.WaitGroup
|
||||
context context.Context
|
||||
cancel context.CancelFunc
|
||||
zero T
|
||||
signal chan struct{}
|
||||
all map[JobKey]*wrapper[T]
|
||||
delayQueue *DelayQueue[*wrapper[T]]
|
||||
}
|
||||
|
||||
type Opt[T Job] func(runner *Runner[T])
|
||||
|
||||
func WithRunnableJob[T Job](runnableJob RunnableJobFunc[T]) Opt[T] {
|
||||
return func(runner *Runner[T]) {
|
||||
runner.runnableJob = runnableJob
|
||||
}
|
||||
}
|
||||
|
||||
func WithScheduleJob[T Job](scheduleJob ScheduleJobFunc[T]) Opt[T] {
|
||||
return func(runner *Runner[T]) {
|
||||
runner.scheduleJob = scheduleJob
|
||||
}
|
||||
}
|
||||
|
||||
func WithUpdateJob[T Job](updateJob UpdateJobFunc[T]) Opt[T] {
|
||||
return func(runner *Runner[T]) {
|
||||
runner.updateJob = updateJob
|
||||
}
|
||||
}
|
||||
|
||||
func NewRunner[T Job](maxRunning int, runJob RunJobFunc[T], opts ...Opt[T]) *Runner[T] {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
runner := &Runner[T]{
|
||||
maxRunning: maxRunning,
|
||||
all: make(map[string]*wrapper[T], maxRunning),
|
||||
waiting: linkedhashmap.New(),
|
||||
running: newArray[T](maxRunning),
|
||||
context: ctx,
|
||||
cancel: cancel,
|
||||
signal: make(chan struct{}, 1),
|
||||
delayQueue: NewDelayQueue[*wrapper[T]](0),
|
||||
}
|
||||
runner.runJob = runJob
|
||||
runner.runnableJob = func(job T, _ NextJobFunc[T]) (bool, error) {
|
||||
return true, nil
|
||||
}
|
||||
runner.updateJob = func(ctx context.Context, job T) error {
|
||||
return nil
|
||||
}
|
||||
for _, opt := range opts {
|
||||
opt(runner)
|
||||
}
|
||||
|
||||
runner.wg.Add(maxRunning + 1)
|
||||
for i := 0; i < maxRunning; i++ {
|
||||
go runner.run()
|
||||
}
|
||||
go func() {
|
||||
defer runner.wg.Done()
|
||||
for runner.context.Err() == nil {
|
||||
wrap, ok := runner.delayQueue.Dequeue(ctx)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
runner.mutex.Lock()
|
||||
if old, ok := runner.all[wrap.key]; !ok || wrap != old {
|
||||
runner.mutex.Unlock()
|
||||
continue
|
||||
}
|
||||
runner.waiting.Put(wrap.key, wrap)
|
||||
wrap.status = JobWaiting
|
||||
runner.trigger()
|
||||
runner.mutex.Unlock()
|
||||
}
|
||||
}()
|
||||
return runner
|
||||
}
|
||||
|
||||
func (r *Runner[T]) Close() {
|
||||
r.cancel()
|
||||
r.wg.Wait()
|
||||
}
|
||||
|
||||
func (r *Runner[T]) run() {
|
||||
defer r.wg.Done()
|
||||
|
||||
for r.context.Err() == nil {
|
||||
select {
|
||||
case <-r.signal:
|
||||
wrap, ok := r.pickRunnableJob()
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
r.doRun(wrap)
|
||||
r.afterRun(wrap)
|
||||
case <-r.context.Done():
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Runner[T]) doRun(wrap *wrapper[T]) {
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
logx.Error(fmt.Sprintf("failed to run job: %v", err))
|
||||
time.Sleep(time.Millisecond * 10)
|
||||
}
|
||||
}()
|
||||
wrap.job.SetStatus(JobRunning, nil)
|
||||
if err := r.updateJob(r.context, wrap.job); err != nil {
|
||||
err = fmt.Errorf("任务状态保存失败: %w", err)
|
||||
wrap.job.SetStatus(JobFailed, err)
|
||||
_ = r.updateJob(r.context, wrap.job)
|
||||
return
|
||||
}
|
||||
runErr := r.runJob(r.context, wrap.job)
|
||||
if runErr != nil {
|
||||
wrap.job.SetStatus(JobFailed, runErr)
|
||||
} else {
|
||||
wrap.job.SetStatus(JobSuccess, nil)
|
||||
}
|
||||
if err := r.updateJob(r.context, wrap.job); err != nil {
|
||||
if runErr != nil {
|
||||
err = fmt.Errorf("任务状态保存失败: %w, %w", err, runErr)
|
||||
} else {
|
||||
err = fmt.Errorf("任务状态保存失败: %w", err)
|
||||
}
|
||||
wrap.job.SetStatus(JobFailed, err)
|
||||
_ = r.updateJob(r.context, wrap.job)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Runner[T]) afterRun(wrap *wrapper[T]) {
|
||||
r.mutex.Lock()
|
||||
defer r.mutex.Unlock()
|
||||
|
||||
r.running.Remove(wrap.key)
|
||||
delete(r.all, wrap.key)
|
||||
wrap.status = JobUnknown
|
||||
r.trigger()
|
||||
if wrap.removed {
|
||||
return
|
||||
}
|
||||
deadline, err := r.doScheduleJob(wrap.job, true)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
_ = r.schedule(r.context, deadline, wrap.job)
|
||||
}
|
||||
|
||||
func (r *Runner[T]) doScheduleJob(job T, finished bool) (time.Time, error) {
|
||||
if r.scheduleJob == nil {
|
||||
if finished {
|
||||
return time.Time{}, ErrJobFinished
|
||||
}
|
||||
return time.Now(), nil
|
||||
}
|
||||
return r.scheduleJob(job)
|
||||
}
|
||||
|
||||
func (r *Runner[T]) pickRunnableJob() (*wrapper[T], bool) {
|
||||
r.mutex.Lock()
|
||||
defer r.mutex.Unlock()
|
||||
|
||||
var disabled []JobKey
|
||||
iter := r.running.Iterator()
|
||||
var runnable *wrapper[T]
|
||||
ok := r.waiting.Any(func(key interface{}, value interface{}) bool {
|
||||
wrap := value.(*wrapper[T])
|
||||
iter.Begin()
|
||||
able, err := r.runnableJob(wrap.job, iter.Next)
|
||||
if err != nil {
|
||||
wrap.job.SetEnabled(false, err.Error())
|
||||
r.updateJob(r.context, wrap.job)
|
||||
disabled = append(disabled, key.(JobKey))
|
||||
}
|
||||
if able {
|
||||
if r.running.Full() {
|
||||
return false
|
||||
}
|
||||
r.waiting.Remove(key)
|
||||
r.running.Append(wrap)
|
||||
wrap.status = JobRunning
|
||||
if !r.running.Full() && !r.waiting.Empty() {
|
||||
r.trigger()
|
||||
}
|
||||
runnable = wrap
|
||||
return true
|
||||
}
|
||||
return false
|
||||
})
|
||||
for _, key := range disabled {
|
||||
r.waiting.Remove(key)
|
||||
delete(r.all, key)
|
||||
}
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
return runnable, true
|
||||
}
|
||||
|
||||
func (r *Runner[T]) schedule(ctx context.Context, deadline time.Time, job T) error {
|
||||
wrap := newWrapper(job)
|
||||
wrap.deadline = deadline
|
||||
if wrap.deadline.After(time.Now()) {
|
||||
r.delayQueue.Enqueue(ctx, wrap)
|
||||
wrap.status = JobDelaying
|
||||
} else {
|
||||
r.waiting.Put(wrap.key, wrap)
|
||||
wrap.status = JobWaiting
|
||||
r.trigger()
|
||||
}
|
||||
r.all[wrap.key] = wrap
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Runner[T]) Add(ctx context.Context, job T) error {
|
||||
r.mutex.Lock()
|
||||
defer r.mutex.Unlock()
|
||||
|
||||
if _, ok := r.all[job.GetKey()]; ok {
|
||||
return ErrJobExist
|
||||
}
|
||||
deadline, err := r.doScheduleJob(job, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return r.schedule(ctx, deadline, job)
|
||||
}
|
||||
|
||||
func (r *Runner[T]) Update(ctx context.Context, job T) error {
|
||||
r.mutex.Lock()
|
||||
defer r.mutex.Unlock()
|
||||
|
||||
wrap, ok := r.all[job.GetKey()]
|
||||
if !ok {
|
||||
return ErrJobNotFound
|
||||
}
|
||||
wrap.job.Update(job)
|
||||
switch wrap.status {
|
||||
case JobDelaying:
|
||||
r.delayQueue.Remove(ctx, wrap.key)
|
||||
case JobWaiting:
|
||||
r.waiting.Remove(wrap.key)
|
||||
case JobRunning:
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
deadline, err := r.doScheduleJob(job, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return r.schedule(ctx, deadline, wrap.job)
|
||||
}
|
||||
|
||||
func (r *Runner[T]) StartNow(ctx context.Context, job T) error {
|
||||
r.mutex.Lock()
|
||||
defer r.mutex.Unlock()
|
||||
|
||||
if wrap, ok := r.all[job.GetKey()]; ok {
|
||||
switch wrap.status {
|
||||
case JobDelaying:
|
||||
r.delayQueue.Remove(ctx, wrap.key)
|
||||
delete(r.all, wrap.key)
|
||||
case JobWaiting, JobRunning:
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
}
|
||||
return r.schedule(ctx, time.Now(), job)
|
||||
}
|
||||
|
||||
func (r *Runner[T]) trigger() {
|
||||
select {
|
||||
case r.signal <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Runner[T]) Remove(ctx context.Context, key JobKey) error {
|
||||
r.mutex.Lock()
|
||||
defer r.mutex.Unlock()
|
||||
|
||||
wrap, ok := r.all[key]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
switch wrap.status {
|
||||
case JobDelaying:
|
||||
r.delayQueue.Remove(ctx, key)
|
||||
delete(r.all, key)
|
||||
case JobWaiting:
|
||||
r.waiting.Remove(key)
|
||||
delete(r.all, key)
|
||||
case JobRunning:
|
||||
// 统一标记为 removed, 待任务执行完成后再删除
|
||||
wrap.removed = true
|
||||
return ErrJobRunning
|
||||
default:
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -1,130 +0,0 @@
|
||||
package runner
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"mayfly-go/pkg/utils/timex"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
var _ Job = &testJob{}
|
||||
|
||||
func newTestJob(key string) *testJob {
|
||||
return &testJob{
|
||||
Key: key,
|
||||
}
|
||||
}
|
||||
|
||||
type testJob struct {
|
||||
Key JobKey
|
||||
status int
|
||||
}
|
||||
|
||||
func (t *testJob) Update(_ Job) {}
|
||||
|
||||
func (t *testJob) GetKey() JobKey {
|
||||
return t.Key
|
||||
}
|
||||
|
||||
func (t *testJob) SetStatus(status JobStatus, err error) {}
|
||||
|
||||
func (t *testJob) SetEnabled(enabled bool, desc string) {}
|
||||
|
||||
func TestRunner_Close(t *testing.T) {
|
||||
signal := make(chan struct{}, 1)
|
||||
waiting := sync.WaitGroup{}
|
||||
waiting.Add(1)
|
||||
runner := NewRunner[*testJob](1, func(ctx context.Context, job *testJob) error {
|
||||
waiting.Done()
|
||||
timex.SleepWithContext(ctx, time.Hour)
|
||||
signal <- struct{}{}
|
||||
return nil
|
||||
})
|
||||
go func() {
|
||||
job := &testJob{
|
||||
Key: "close",
|
||||
}
|
||||
_ = runner.Add(context.Background(), job)
|
||||
}()
|
||||
waiting.Wait()
|
||||
timer := time.NewTimer(time.Microsecond * 10)
|
||||
defer timer.Stop()
|
||||
runner.Close()
|
||||
select {
|
||||
case <-timer.C:
|
||||
require.FailNow(t, "runner 未能及时退出")
|
||||
case <-signal:
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunner_AddJob(t *testing.T) {
|
||||
type testCase struct {
|
||||
name string
|
||||
job *testJob
|
||||
want error
|
||||
}
|
||||
testCases := []testCase{
|
||||
{
|
||||
name: "first job",
|
||||
job: newTestJob("single"),
|
||||
want: nil,
|
||||
},
|
||||
{
|
||||
name: "second job",
|
||||
job: newTestJob("dual"),
|
||||
want: nil,
|
||||
},
|
||||
{
|
||||
name: "repetitive job",
|
||||
job: newTestJob("dual"),
|
||||
want: ErrJobExist,
|
||||
},
|
||||
}
|
||||
runner := NewRunner[*testJob](1, func(ctx context.Context, job *testJob) error {
|
||||
timex.SleepWithContext(ctx, time.Hour)
|
||||
return nil
|
||||
})
|
||||
defer runner.Close()
|
||||
for _, tc := range testCases {
|
||||
err := runner.Add(context.Background(), tc.job)
|
||||
if tc.want != nil {
|
||||
require.ErrorIs(t, err, tc.want)
|
||||
continue
|
||||
}
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestJob_UpdateStatus(t *testing.T) {
|
||||
const d = time.Millisecond * 20
|
||||
const (
|
||||
unknown = iota
|
||||
running
|
||||
finished
|
||||
)
|
||||
runner := NewRunner[*testJob](1, func(ctx context.Context, job *testJob) error {
|
||||
job.status = running
|
||||
timex.SleepWithContext(ctx, d*2)
|
||||
job.status = finished
|
||||
return nil
|
||||
})
|
||||
first := newTestJob("first")
|
||||
second := newTestJob("second")
|
||||
_ = runner.Add(context.Background(), first)
|
||||
_ = runner.Add(context.Background(), second)
|
||||
|
||||
time.Sleep(d)
|
||||
assert.Equal(t, running, first.status)
|
||||
assert.Equal(t, unknown, second.status)
|
||||
|
||||
time.Sleep(d * 2)
|
||||
assert.Equal(t, finished, first.status)
|
||||
assert.Equal(t, running, second.status)
|
||||
|
||||
time.Sleep(d * 2)
|
||||
assert.Equal(t, finished, first.status)
|
||||
assert.Equal(t, finished, second.status)
|
||||
}
|
||||
Reference in New Issue
Block a user