Compare commits

...

7 Commits

Author SHA1 Message Date
meilin.huang
403d1c45e5 refactor: sshtunnel等 2026-02-01 13:35:23 +08:00
meilin.huang
400db0402a refactor: 包优化&其他问题修复 2026-01-25 14:16:16 +08:00
fanzhouqi
f0ae178183 !143 fix:mysql查询时,如果出现列名一样,会覆盖数据,github issues #124
* fix:mysql查询时,如果出现列名一样,会覆盖数据,github issues #124
2026-01-23 09:13:14 +00:00
meilin.huang
4641e448d2 fix: 数据库迁移、同步保存时定时任务未清除问题 2026-01-21 12:22:54 +08:00
meilin.huang
f0de65b7ce refactor: 协程启动优化、tagviews调整 2026-01-20 19:45:46 +08:00
meilin.huang
0472c5101f feat: 数据库迁移至文件支持zip格式、go启动协程时panic优化、菜单资源优化 2026-01-18 20:40:11 +08:00
meilin.huang
185cd6f82b fix: postgres导出调整等 2026-01-14 20:38:06 +08:00
166 changed files with 2240 additions and 1573 deletions

View File

@@ -57,7 +57,7 @@ function build() {
execFileName="${execFileName}.exe"
fi
go mod tidy
CGO_ENABLE=0 GOOS=${os} GOARCH=${arch} go build -ldflags=-w -o ${execFileName} main.go
CGO_ENABLE=0 GOOS=${os} GOARCH=${arch} go build -trimpath -ldflags=-w -o ${execFileName} main.go
if [ -d ${toFolder} ] ; then
echo_green "The desired folder already exists. Clear the folder"

View File

@@ -24,7 +24,7 @@
"crypto-js": "^4.2.0",
"dayjs": "^1.11.19",
"echarts": "^6.0.0",
"element-plus": "^2.13.0",
"element-plus": "^2.13.2",
"js-base64": "^3.7.8",
"jsencrypt": "^3.5.4",
"monaco-editor": "^0.55.1",

View File

@@ -13,9 +13,6 @@ export function getBaseApiUrl() {
const config = {
baseApiUrl: `${(window as any).globalConfig.BaseApiUrl || location.protocol + '//' + getBaseApiUrl()}/api`,
baseWsUrl: `${(window as any).globalConfig.BaseWsUrl || `${location.protocol == 'https:' ? 'wss:' : 'ws:'}//${getBaseApiUrl()}`}/api`,
// 系统版本
version: 'v1.10.5',
};
export default config;

View File

@@ -225,10 +225,12 @@ const initMonacoEditorIns = () => {
let options = Object.assign(defaultOptions, props.options as any);
monacoEditorIns = monaco.editor.create(monacoTextareaRef.value, options);
// 监听内容改变,双向绑定
monacoEditorIns.onDidChangeModelContent(() => {
modelValue.value = monacoEditorIns.getModel()?.getValue();
});
if (!options.readOnly) {
// 监听内容改变,双向绑定
monacoEditorIns.onDidChangeModelContent(() => {
modelValue.value = monacoEditorIns.getModel()?.getValue();
});
}
};
const changeLanguage = (value: any) => {

View File

@@ -34,7 +34,7 @@ const websocketUrl = ref(props.wsUrl);
const { data } = useWebSocket(websocketUrl);
const editorRef: any = useTemplateRef('editorRef');
const editorRef = useTemplateRef<InstanceType<typeof MonacoEditor>>('editorRef');
const modelValue = defineModel<string>('modelValue', {
type: String,
@@ -56,9 +56,9 @@ const reload = (wsUrl: string) => {
};
const revealLastLine = () => {
const editor = editorRef.value.getEditor();
const lineCount = editor?.getModel().getLineCount();
editor.revealLine(lineCount);
const editor = editorRef.value?.getEditor();
const lineCount = editor?.getModel()?.getLineCount();
editor?.revealLine(lineCount || 0);
};
defineExpose({

View File

@@ -169,6 +169,7 @@ export default {
transfer2Db: 'Transfer to DB',
transfer2File: 'Transfer to File',
fileSaveDays: 'File retention days',
fileType: 'File Type',
transferStrategy: 'Transfer Strategy',
day: 'Day',
transferFull: 'Full',

View File

@@ -47,6 +47,7 @@ export default {
machineSecurityCmdSvae: 'Cmd Config-Save',
machineSecurityCmdDelete: 'Cmd Config-Delete',
db: 'Database',
dbms: 'DBMS',
dbDataOp: 'Data Operation',
dbDataOpBase: 'DB-Base Permission',
@@ -78,6 +79,8 @@ export default {
dbTransferFileRun: 'Transfer File-Run',
redis: 'Redis',
redisSave: 'Save Redis',
redisDel: 'Delete Redis',
redisDataOp: 'Redis - Data Operation',
redisDataOpBase: 'Redis - Base Permission',
redisDataOpSave: 'Redis - Save Data',
@@ -86,13 +89,18 @@ export default {
redisManageBase: 'Redis - Base Permission',
mongo: 'Mongo',
mongoSave: 'Save Mongo',
mongoDel: 'Delete Mongo',
mongoDataOp: 'Mongo - Data Operation',
mongoDataOpBase: 'Mongo - Base Permission',
mongoDataOpSave: 'Mongo - Save Data',
mongoDataOpDelete: 'Mongo - Delete Data',
mongoManage: 'Mongo Manage',
mongoManageBase: 'Mongo - Base Permission',
containerManageBase: 'Container Manage - Base Permission',
container: 'Container',
containerSave: 'Save Container',
containerDel: 'Delete Container',
flow: 'Flow',
myTask: 'My Task',

View File

@@ -165,6 +165,7 @@ export default {
transfer2Db: '迁移到数据库',
transfer2File: '迁移到文件',
fileSaveDays: '文件保留天数',
fileType: '文件类型',
transferStrategy: '迁移策略',
day: '天',
transferFull: '全量',

View File

@@ -47,6 +47,7 @@ export default {
machineSecurityCmdSvae: '机器-命令配置-保存',
machineSecurityCmdDelete: '机器-命令配置-删除',
db: '数据库',
dbms: 'DBMS',
dbDataOp: 'DB-数据操作',
dbDataOpBase: 'DB-数据操作-基本权限',
@@ -78,6 +79,8 @@ export default {
dbTransferFileRun: '迁移文件-执行',
redis: 'Redis',
redisSave: 'Redis-保存',
redisDel: 'Redis-删除',
redisDataOp: 'Redis-数据操作',
redisDataOpBase: 'Redis-数据操作-基本权限',
redisDataOpSave: 'Redis-数据操作-数据保存',
@@ -86,6 +89,8 @@ export default {
redisManageBase: 'Redis-管理-基本权限',
mongo: 'Mongo',
mongoSave: 'Mongo-保存',
mongoDel: 'Mongo-删除',
mongoDataOp: '数据操作',
mongoDataOpBase: 'Mongo-数据操作-基本权限',
mongoDataOpSave: 'Mongo-数据操作-数据保存',
@@ -93,7 +98,9 @@ export default {
mongoManage: 'Mongo管理',
mongoManageBase: 'Mongo-管理-基本权限',
containerManageBase: '容器-管理-基本权限',
container: '容器',
containerSave: '容器-保存',
containerDel: '容器-删除',
flow: '工单流程',
myTask: '我的任务',

View File

@@ -4,7 +4,7 @@
<span class="logo-title">
{{ `${themeConfig.globalTitle}` }}
<sub
><span style="font-size: 10px; color: goldenrod">{{ ` ${config.version}` }}</span></sub
><span style="font-size: 10px; color: goldenrod">{{ ` ${themeConfig.version}` }}</span></sub
>
</span>
</div>
@@ -17,7 +17,6 @@
import { computed } from 'vue';
import { storeToRefs } from 'pinia';
import { useThemeConfig } from '@/store/themeConfig';
import config from '@/common/config';
const { themeConfig } = storeToRefs(useThemeConfig());

View File

@@ -308,17 +308,7 @@
<!-- 其它设置 -->
<el-divider content-position="left">{{ $t('layout.config.otherSetting') }}</el-divider>
<div class="layout-breadcrumb-seting-bar-flex !mt-3.5">
<div class="layout-breadcrumb-seting-bar-flex-label">{{ $t('layout.config.tagsStyle') }}</div>
<div class="layout-breadcrumb-seting-bar-flex-value">
<el-select v-model="themeConfig.tagsStyle" placeholder="请选择" size="small" style="width: 90px">
<el-option label="风格1" value="tags-style-one"></el-option>
<el-option label="风格2" value="tags-style-two"></el-option>
<el-option label="风格3" value="tags-style-three"></el-option>
</el-select>
</div>
</div>
<div class="layout-breadcrumb-seting-bar-flex !mt-3.5">
<div class="layout-breadcrumb-seting-bar-flex mt-3.5!">
<div class="layout-breadcrumb-seting-bar-flex-label">{{ $t('layout.config.animation') }}</div>
<div class="layout-breadcrumb-seting-bar-flex-value">
<el-select v-model="themeConfig.animation" size="small" style="width: 90px">
@@ -328,7 +318,7 @@
</el-select>
</div>
</div>
<div class="layout-breadcrumb-seting-bar-flex !mt-3.5 !mb-5.5">
<div class="layout-breadcrumb-seting-bar-flex mt-3.5! mb-5.5!">
<div class="layout-breadcrumb-seting-bar-flex-label">
{{ $t('layout.config.columnsAsideStyle') }}
</div>

View File

@@ -1,7 +1,7 @@
<template>
<div class="layout-navbars-tagsview" :class="{ 'layout-navbars-tagsview-shadow': themeConfig.layout === 'classic' }">
<el-scrollbar ref="scrollbarRef" @wheel.prevent="onHandleScroll">
<ul class="layout-navbars-tagsview-ul" :class="setTagsStyle" ref="tagsUlRef">
<ul class="layout-navbars-tagsview-ul" ref="tagsUlRef">
<li
v-for="(v, k) in tagsViews"
:key="k"
@@ -18,26 +18,20 @@
>
<SvgIcon :name="v.icon" class="layout-navbars-tagsview-ul-li-iconfont" v-if="themeConfig.isTagsviewIcon" />
<span>{{ $t(v.title) }}</span>
<template v-if="isActive(v)">
<SvgIcon
name="RefreshRight"
class="!text-[14px] ml-1 layout-navbars-tagsview-ul-li-refresh"
class="text-[14px]! ml-1 layout-navbars-tagsview-ul-li-icon layout-navbars-tagsview-ul-li-refresh"
@click.stop="refreshCurrentTagsView($route.fullPath)"
/>
<SvgIcon
name="Close"
class="!text-[14px] layout-navbars-tagsview-ul-li-icon layout-icon-active"
class="text-[14px]! layout-navbars-tagsview-ul-li-icon layout-navbars-tagsview-ul-li-close layout-icon-active"
v-if="!v.isAffix"
@click.stop="closeCurrentTagsView(themeConfig.isShareTagsView ? v.path : v.path)"
/>
</template>
<SvgIcon
name="Close"
class="!text-[14px] layout-navbars-tagsview-ul-li-icon layout-icon-three"
v-if="!v.isAffix"
@click.stop="closeCurrentTagsView(themeConfig.isShareTagsView ? v.path : v.path)"
/>
</li>
</ul>
</el-scrollbar>
@@ -46,7 +40,7 @@
</template>
<script lang="ts" setup name="layoutTagsView">
import { reactive, onMounted, computed, ref, nextTick, onBeforeUpdate, getCurrentInstance, watch } from 'vue';
import { reactive, onMounted, ref, nextTick, onBeforeUpdate, getCurrentInstance, watch } from 'vue';
import { useRoute, useRouter, onBeforeRouteUpdate } from 'vue-router';
import screenfull from 'screenfull';
import { storeToRefs } from 'pinia';
@@ -105,11 +99,6 @@ const state = reactive({
},
});
// 动态设置 tagsView 风格样式
const setTagsStyle = computed(() => {
return themeConfig.value.tagsStyle;
});
// 存储 tagsViewList 到浏览器临时缓存中,页面刷新时,保留记录
const addBrowserSetSession = (tagsViewList: Array<object>) => {
setTagViews(tagsViewList);
@@ -403,163 +392,120 @@ onBeforeRouteUpdate((to) => {
});
</script>
<style scoped lang="scss">
<style scoped lang="css">
.layout-navbars-tagsview {
background-color: var(--bg-main-color);
border-bottom: 1px solid var(--el-border-color-light, #ebeef5);
position: relative;
z-index: 4;
box-shadow: 0 1px 2px rgba(0, 0, 0, 0.05);
}
:deep(.el-scrollbar__wrap) {
overflow-x: auto !important;
}
.layout-navbars-tagsview :deep(.el-scrollbar__wrap) {
overflow-x: auto !important;
}
&-ul {
list-style: none;
margin: 0;
padding: 0;
height: 34px;
display: flex;
align-items: center;
color: var(--el-text-color-regular);
font-size: 12px;
white-space: nowrap;
padding: 0 15px;
.layout-navbars-tagsview-ul {
list-style: none;
margin: 0;
padding: 0;
height: 38px;
display: flex;
align-items: center;
color: var(--el-text-color-regular);
font-size: 13px;
white-space: nowrap;
padding: 0 15px;
}
&-li {
height: 26px;
line-height: 26px;
display: flex;
align-items: center;
border: 1px solid var(--el-border-color-lighter);
padding: 0 15px;
margin-right: 5px;
border-radius: 2px;
position: relative;
z-index: 0;
cursor: pointer;
justify-content: space-between;
.layout-navbars-tagsview-ul-li {
height: 30px;
line-height: 30px;
display: flex;
align-items: center;
border-radius: 6px;
padding: 0 12px;
margin-right: 6px;
position: relative;
z-index: 0;
cursor: pointer;
justify-content: space-between;
transition: all 0.3s ease;
border: 1px solid var(--el-border-color, #dcdfe6);
box-sizing: border-box;
background-color: var(--el-bg-color, #fafafa);
color: var(--el-text-color-regular, #606266);
box-shadow: 0 1px 2px rgba(0, 0, 0, 0.05);
}
&:hover {
background-color: var(--el-color-primary-light-9);
color: var(--el-color-primary);
border-color: var(--el-color-primary-light-5);
}
.layout-navbars-tagsview-ul-li:not(.is-active):hover {
background-color: var(--el-fill-color-blank, #f5f7fa);
color: var(--el-text-color-primary, #303133);
border-color: var(--el-color-primary-light-7, #c6e2ff);
transform: translateY(-1px);
}
&-iconfont {
position: relative;
left: -5px;
font-size: 12px;
}
.layout-navbars-tagsview-ul-li-iconfont {
position: relative;
left: -3px;
font-size: 12px;
margin-right: 4px;
}
&-icon {
border-radius: 100%;
position: relative;
height: 14px;
width: 14px;
text-align: center;
line-height: 14px;
right: -5px;
.layout-navbars-tagsview-ul-li-icon {
border-radius: 4px;
position: relative;
height: 18px;
width: 18px;
text-align: center;
line-height: 18px;
right: -3px;
margin-left: 4px;
transition: all 0.25s ease;
color: var(--el-text-color-secondary, #909399);
display: flex;
align-items: center;
justify-content: center;
}
&:hover {
color: var(--el-color-white);
background-color: var(--el-color-primary-light-3);
}
}
.layout-navbars-tagsview-ul-li-icon:hover {
background-color: var(--el-color-info-light-7);
border-radius: 4px;
}
.layout-icon-active {
display: block;
}
.layout-icon-active {
display: flex;
align-items: center;
justify-content: center;
}
.layout-icon-three {
display: none;
}
}
.layout-navbars-tagsview-ul .is-active {
color: var(--el-color-primary, #409eff);
background: var(--el-color-primary-light-9, #ecf5ff);
border-color: var(--el-color-primary-light-5, #409eff);
box-shadow: 0 2px 4px rgba(64, 158, 255, 0.2);
}
.is-active {
color: var(--el-color-white);
background: var(--el-color-primary);
border-color: var(--el-color-primary);
transition: border-color 3s ease;
}
}
.layout-navbars-tagsview-ul .is-active .layout-navbars-tagsview-ul-li-icon {
color: var(--el-color-primary, #409eff);
}
// 风格2
.tags-style-two {
.layout-navbars-tagsview-ul-li {
margin-right: 0 !important;
border: none !important;
position: relative;
border-radius: 3px !important;
.layout-navbars-tagsview-ul .is-active .layout-navbars-tagsview-ul-li-icon:hover {
background-color: var(--el-color-primary);
color: var(--el-color-white);
transform: scale(1.1);
}
.layout-icon-active {
display: none;
}
.layout-navbars-tagsview-ul .is-active .layout-navbars-tagsview-ul-li-close:hover {
background-color: var(--el-color-danger);
color: var(--el-color-white);
border-radius: 4px;
}
.layout-icon-three {
display: block;
}
&:hover {
background: none !important;
}
}
.is-active {
background: none !important;
color: var(--el-color-primary) !important;
}
}
// 风格3
.tags-style-three {
align-items: flex-end;
.tgs-style-three-svg {
-webkit-mask-image:
url(''),
url(''),
url("data:image/svg+xml,<svg xmlns='http://www.w3.org/2000/svg'><rect rx='8' width='100%' height='100%' fill='%23F8EAE7'/></svg>");
-webkit-mask-size:
18px 30px,
20px 30px,
calc(100% - 30px) calc(100% + 17px);
-webkit-mask-position:
right bottom,
left bottom,
center top;
-webkit-mask-repeat: no-repeat;
}
.layout-navbars-tagsview-ul-li {
padding: 0 5px;
border-width: 15px 27px 15px;
border-style: solid;
border-color: transparent;
margin: 0 -15px;
.layout-icon-active {
display: none;
}
.layout-icon-three {
display: block;
}
&:hover {
@extend .tgs-style-three-svg;
background: var(--tagsview3-active-background-color);
color: unset;
}
}
.is-active {
@extend .tgs-style-three-svg;
background: var(--tagsview3-active-background-color) !important;
color: var(--el-color-primary) !important;
z-index: 1;
}
}
.layout-navbars-tagsview-ul .is-active .layout-navbars-tagsview-ul-li-refresh:hover {
background-color: var(--el-color-primary);
color: var(--el-color-white);
border-radius: 4px;
}
.layout-navbars-tagsview-shadow {

View File

@@ -98,8 +98,6 @@ export const useThemeConfig = defineStore('themeConfig', {
/* 其它设置
------------------------------- */
// 默认 Tagsview 风格,可选 1、 tags-style-one 2、 tags-style-two 3、 tags-style-three
tagsStyle: 'tags-style-three',
// 默认主页面切换动画,可选 1、 slide-right 2、 slide-left 3、 opacitys
animation: 'slide-right',
// 默认分栏高亮风格,可选 1、 圆角 columns-round 2、 卡片 columns-card
@@ -137,6 +135,7 @@ export const useThemeConfig = defineStore('themeConfig', {
appSlogan: 'common.appSlogan',
// 网站logo icon, base64编码内容
logoIcon: logoIcon,
version: 'latest',
// 默认初始语言,可选值"<zh-cn|en|zh-tw>",默认 zh-cn
globalI18n: 'zh-cn',
// 默认全局组件大小,可选值"<|large|default|small>",默认 ''
@@ -155,12 +154,15 @@ export const useThemeConfig = defineStore('themeConfig', {
if (tc) {
this.themeConfig = tc;
document.documentElement.style.cssText = getLocal('themeConfigStyle');
} else {
getServerConf().then((res) => {
this.themeConfig.globalI18n = res.i18n;
});
}
getServerConf().then((res) => {
this.themeConfig.globalI18n = res.i18n;
this.themeConfig.version = res.version;
});
this.themeConfig.defaultListPageSize = calculatePageSizeByScreenHeight();
// 根据后台系统配置初始化
getSysStyleConfig().then((res) => {
if (res?.title) {
@@ -215,3 +217,34 @@ export const useThemeConfig = defineStore('themeConfig', {
},
},
});
// 计算每页显示数量的方法
const calculatePageSizeByScreenHeight = (): number => {
const windowHeight = window.innerHeight || document.documentElement.clientHeight;
// 计算页面其他部分的高度(这是一个大概的估算)
// 包括顶部导航、面包屑、搜索区域、分页控件等
const headerHeight = 60; // 页面顶部导航高度
const subHeaderHeight = 50; // 子页面头部或其他内容高度
const searchFormHeight = 100; // 搜索表单高度,如果显示的话
const tableHeaderHeight = 44; // 表格头部高度
const paginationHeight = 40; // 分页控件高度
const paddingMarginHeight = 30; // 额外的内外边距
// 计算可用于表格内容的高度
const availableContentHeight =
windowHeight - headerHeight - subHeaderHeight - searchFormHeight - tableHeaderHeight - paginationHeight - paddingMarginHeight;
// 根据表格尺寸确定行高
const rowHeight = 40;
// 计算理论上的行数
const calculatedRows = Math.floor(availableContentHeight / rowHeight);
// 设置限制范围
const minPageSize = 10;
const maxPageSize = 30;
// 确保返回值在合理范围内,且至少有基本的行数
return Math.max(minPageSize, Math.min(maxPageSize, calculatedRows));
};

View File

@@ -40,7 +40,6 @@ declare interface ThemeConfigState {
isInvert: boolean;
isWatermark: boolean;
watermarkText: Array<string>;
tagsStyle: string;
animation: string;
columnsAsideStyle: string;
layout: string;
@@ -49,6 +48,7 @@ declare interface ThemeConfigState {
globalViceTitle: string;
appSlogan: string;
logoIcon: string;
version: string;
globalI18n: string;
globalComponentSize: string;
terminalTheme: string;

View File

@@ -39,7 +39,7 @@
</div>
</div>
<el-splitter style="height: calc(100vh - 215px)" layout="vertical" @resize-end="onResizeTableHeight">
<el-splitter style="height: calc(100vh - 220px)" layout="vertical" @resize-end="onResizeTableHeight">
<el-splitter-panel :size="state.editorSize" max="80%">
<MonacoEditor ref="monacoEditorRef" class="mt-1" v-model="state.sql" language="sql" height="100%" :id="'MonacoTextarea-' + getKey()" />
</el-splitter-panel>
@@ -289,7 +289,7 @@ const onResizeTableHeight = (index: number, sizes: number[]) => {
editorHeight = plitpaneHeight / 2;
}
let tableDataHeight = plitpaneHeight - editorHeight - 43;
let tableDataHeight = plitpaneHeight - editorHeight - 47;
state.editorSize = editorHeight;
state.tableDataHeight = tableDataHeight + 'px';
@@ -332,6 +332,7 @@ const onRunSql = async (newTab = false) => {
* 执行多条SQL并合并结果
*/
const runMultipleSqls = async (sqls: string[], newTab: boolean) => {
state.execResTabs = [];
// 分类SQL语句
const nonQuerySqls: string[] = []; // 影响行数类SQL (UPDATE, INSERT, DELETE等)
const querySqls: string[] = []; // 查询类SQL (SELECT等)
@@ -486,6 +487,7 @@ const runSql = async (sql: string, remark = '', newTab = false) => {
state.execResTabs[i].tableColumn = colAndData.columns.map((x: any) => {
return {
columnName: x.name,
key: x.key,
columnType: x.type,
show: true,
};

View File

@@ -77,9 +77,7 @@
<!-- 排序箭头图标 -->
<SvgIcon
v-if="
column.title == nowSortColumn?.columnName &&
!showColumnActions[column.key] &&
!columnActionVisible[column.key]
column.key == nowSortColumn?.key && !showColumnActions[column.key] && !columnActionVisible[column.key]
"
:color="'var(--el-color-primary)'"
:name="nowSortColumn?.order == 'asc' ? 'top' : 'bottom'"
@@ -135,7 +133,7 @@
<div v-else @dblclick="onEnterEditMode(rowData, column, rowIndex, columnIndex)">
<div v-if="canEdit(rowIndex, columnIndex)">
<ColumnFormItem
v-model="rowData[column.dataKey!]"
v-model="rowData[column.key!]"
:data-type="column.dataType"
@blur="onExitEditMode(rowData, column, rowIndex)"
:column-name="column.columnName"
@@ -143,11 +141,11 @@
/>
</div>
<div v-else :class="isUpdated(rowIndex, column.dataKey) ? 'update_field_active ml-0.5 mr-0.5' : 'ml-0.5 mr-0.5'">
<span v-if="rowData[column.dataKey!] === null" style="color: var(--el-color-info-light-5)"> NULL </span>
<div v-else :class="isUpdated(rowIndex, column.key) ? 'update_field_active ml-0.5 mr-0.5' : 'ml-0.5 mr-0.5'">
<span v-if="rowData[column.key!] === null" style="color: var(--el-color-info-light-5)"> NULL </span>
<span v-else :title="rowData[column.dataKey!]" class="el-text el-text--small is-truncated">
{{ rowData[column.dataKey!] }}
<span v-else :title="rowData[column.key!]" class="el-text el-text--small is-truncated">
{{ rowData[column.key!] }}
</span>
</div>
</div>
@@ -275,7 +273,7 @@ const columnActionVisible = ref({} as any);
const cmDataCopyCell = new ContextmenuItem('copyValue', 'common.copy')
.withIcon('CopyDocument')
.withOnClick(async (data: any) => {
await copyToClipboard(data.rowData[data.column.dataKey]);
await copyToClipboard(data.rowData[data.column.key]);
})
.withHideFunc(() => {
// 选中多条则隐藏该复制按钮
@@ -409,7 +407,6 @@ const dbConfig = useStorage('dbConfig', DbThemeConfig);
const rowNoColumn = {
title: 'No.',
key: 'tableDataRowNo',
dataKey: 'tableDataRowNo',
width: 45,
fixed: true,
align: 'center',
@@ -515,8 +512,6 @@ const setTableColumns = (columns: any) => {
x.remark = `${x.columnType} ${x.columnComment ? ' | ' + x.columnComment : ''}`;
return {
...x,
key: columnName,
dataKey: columnName,
width: DbInst.flexColumnWidth(columnName, state.datas),
title: columnName,
align: x.dataType == DataType.Number ? 'right' : 'left',
@@ -565,21 +560,21 @@ const hideColumnAction = () => {
const handleColumnCommand = (column: any, command: string) => {
switch (command) {
case 'sort-asc':
onTableSortChange({ columnName: column.dataKey, order: 'asc' });
onTableSortChange({ key: column.key, order: 'asc' });
break;
case 'sort-desc':
onTableSortChange({ columnName: column.dataKey, order: 'desc' });
onTableSortChange({ key: column.key, order: 'desc' });
break;
case 'fix':
state.columns.forEach((col: any) => {
if (col.dataKey == column.dataKey) {
if (col.key == column.key) {
col.fixed = true;
}
});
break;
case 'unfix':
state.columns.forEach((col: any) => {
if (col.dataKey == column.dataKey) {
if (col.key == column.key) {
col.fixed = false;
}
});
@@ -718,7 +713,7 @@ const onGenerateJson = async () => {
let obj: any = {};
for (let column of state.columns) {
if (column.show) {
obj[column.title] = selectionData[column.dataKey];
obj[column.title] = selectionData[column.key];
}
}
jsonObj.push(obj);
@@ -775,7 +770,7 @@ const onEnterEditMode = (rowData: any, column: any, rowIndex = 0, columnIndex =
nowUpdateCell.value = {
rowIndex: rowIndex,
colIndex: columnIndex,
oldValue: rowData[column.dataKey],
oldValue: rowData[column.key],
dataType: column.dataType,
};
};
@@ -785,7 +780,7 @@ const onExitEditMode = (rowData: any, column: any, rowIndex = 0) => {
return;
}
const oldValue = nowUpdateCell.value.oldValue;
const newValue = rowData[column.dataKey];
const newValue = rowData[column.key];
// 未改变单元格值
if (oldValue == newValue) {
@@ -800,7 +795,7 @@ const onExitEditMode = (rowData: any, column: any, rowIndex = 0) => {
cellUpdateMap.value.set(rowIndex, updatedRow);
}
const columnName = column.dataKey;
const columnName = column.key;
let cellData = updatedRow.columnsMap.get(columnName);
if (cellData) {
// 多次修改情况,可能又修改回原值,则移除该修改单元格

View File

@@ -152,7 +152,7 @@
<el-text
id="copyValue"
style="color: var(--el-color-info-light-3)"
class="is-truncated !text-[12px] mt-1"
class="is-truncated text-[12px]! mt-1"
@click="copyToClipboard(sql)"
:title="sql"
>{{ sql }}</el-text
@@ -392,6 +392,7 @@ const selectData = async () => {
const columns = await getNowDbInst().loadColumns(props.dbName, props.tableName);
columns.forEach((x: any) => {
x.show = true;
x.key = x.columnName;
});
state.columns = columns;
}
@@ -592,7 +593,7 @@ const onSelectByCondition = async () => {
*/
const onTableSortChange = async (sort: any) => {
const sortType = sort.order == 'desc' ? 'DESC' : 'ASC';
state.orderBy = `ORDER BY ${state.dbDialect.quoteIdentifier(sort.columnName)} ${sortType}`;
state.orderBy = `ORDER BY ${state.dbDialect.quoteIdentifier(sort.key)} ${sortType}`;
await onRefresh();
};

View File

@@ -59,8 +59,8 @@
</el-form-item>
<el-form-item v-if="form.mode === 2">
<el-row class="w-full!">
<el-col :span="12">
<el-row :gutter="10">
<el-col :span="10">
<el-form-item prop="targetFileDbType" :label="$t('db.dbFileType')" :required="form.mode === 2">
<el-select v-model="form.targetFileDbType" clearable filterable>
<el-option
@@ -79,7 +79,13 @@
</el-form-item>
</el-col>
<el-col :span="12">
<el-col :span="6">
<el-form-item :label="$t('db.fileType')">
<el-select v-model="form.extra.fileType" :options="fileTypeOptions"> </el-select>
</el-form-item>
</el-col>
<el-col :span="8">
<el-form-item :label="$t('db.fileSaveDays')">
<el-input-number v-model="form.fileSaveDays" :min="-1" :max="1000">
<template #suffix>
@@ -138,10 +144,8 @@
</el-form>
<template #footer>
<div>
<el-button @click="cancel()">{{ $t('common.cancel') }}</el-button>
<el-button type="primary" :loading="saveBtnLoading" @click="btnOk">{{ $t('common.confirm') }}</el-button>
</div>
<el-button @click="cancel()">{{ $t('common.cancel') }}</el-button>
<el-button type="primary" :loading="saveBtnLoading" @click="btnOk">{{ $t('common.confirm') }}</el-button>
</template>
</el-drawer>
</div>
@@ -187,6 +191,11 @@ const rules = {
cron: [Rules.requiredSelect('cron')],
};
const fileTypeOptions = [
{ label: '.sql', value: 'sql' },
{ label: '.zip', value: 'zip' },
];
const dbForm: any = ref(null);
type FormData = {
@@ -215,6 +224,7 @@ type FormData = {
deleteTable?: 1 | 2;
checkedKeys: string;
runningState: 1 | 2;
extra: { fileType: string };
};
const basicFormData = {
@@ -226,6 +236,7 @@ const basicFormData = {
deleteTable: 1,
checkedKeys: '',
runningState: 1,
extra: { fileType: fileTypeOptions[0].value },
} as FormData;
const srcTableList = ref<{ tableName: string; tableComment: string }[]>([]);
@@ -264,6 +275,7 @@ watch(dialogVisible, async (newValue: boolean) => {
if (!newValue) {
return;
}
const propsData = props.data as any;
if (!propsData?.id) {
let d = {} as FormData;
@@ -275,8 +287,8 @@ watch(dialogVisible, async (newValue: boolean) => {
return;
}
state.form = deepClone(props.data) as FormData;
let { srcDbId, targetDbId } = state.form;
const form = deepClone(props.data) as FormData;
let { srcDbId, targetDbId } = form;
// 初始化src数据源
if (srcDbId) {
@@ -301,11 +313,14 @@ watch(dialogVisible, async (newValue: boolean) => {
}
// 初始化勾选迁移表
srcTreeRef.value.setCheckedKeys(state.form.checkedKeys.split(','));
srcTreeRef.value.setCheckedKeys(form.checkedKeys.split(','));
// 初始化默认值
state.form.cronAble = state.form.cronAble || 0;
state.form.mode = state.form.mode || 1;
form.cronAble = form.cronAble || -1;
form.mode = form.mode || 1;
form.extra = form.extra || { fileType: fileTypeOptions[0].value };
state.form = form;
});
watch(

View File

@@ -12,8 +12,10 @@
lazy
>
<template #tableHeader>
<el-button type="primary" icon="plus" @click="editContainerConf(false)" plain>{{ $t('common.create') }}</el-button>
<el-button type="danger" icon="delete" :disabled="selectionData.length < 1" @click="deleteConf" plain>{{ $t('common.delete') }}</el-button>
<el-button v-auth="'container:save'" type="primary" icon="plus" @click="editContainerConf(false)" plain>{{ $t('common.create') }}</el-button>
<el-button v-auth="'container:del'" type="danger" icon="delete" :disabled="selectionData.length < 1" @click="deleteConf" plain>
{{ $t('common.delete') }}
</el-button>
</template>
<template #tagPath="{ data }">
@@ -22,7 +24,7 @@
<template #action="{ data }">
<el-button @click="showDetail(data)" link>{{ $t('common.detail') }}</el-button>
<el-button type="primary" link @click="editContainerConf(data)">{{ $t('common.edit') }}</el-button>
<el-button v-auth="'container:save'" type="primary" link @click="editContainerConf(data)">{{ $t('common.edit') }}</el-button>
</template>
</page-table>

View File

@@ -70,5 +70,5 @@ export function getMachineTerminalSocketUrl(authCertName: any) {
}
export function getMachineRdpSocketUrl(authCertName: any) {
return `/machines/rdp/${authCertName}`;
return `/api/machines/rdp/${authCertName}`;
}

View File

@@ -85,12 +85,18 @@ const NodeTypeAuthCert = new NodeType(12)
(await node.ctx?.addResourceComponent(MachineOpComp)).openTerminal(node.params);
})
.withContextMenuItems([
new ContextmenuItem('term', 'machine.openTerminal').withIcon('Monitor').withOnClick(async (node: TagTreeNode) => {
(await node.ctx?.addResourceComponent(MachineOpComp))?.openTerminal(node.params);
}),
new ContextmenuItem('term-ex', 'machine.newTabOpenTerminal').withIcon('Monitor').withOnClick(async (node: TagTreeNode) => {
(await node.ctx?.addResourceComponent(MachineOpComp))?.openTerminal(node.params, true);
}),
new ContextmenuItem('term', 'machine.openTerminal')
.withIcon('Monitor')
.withPermission('machine:terminal')
.withOnClick(async (node: TagTreeNode) => {
(await node.ctx?.addResourceComponent(MachineOpComp))?.openTerminal(node.params);
}),
new ContextmenuItem('term-ex', 'machine.newTabOpenTerminal')
.withIcon('Monitor')
.withPermission('machine:terminal')
.withOnClick(async (node: TagTreeNode) => {
(await node.ctx?.addResourceComponent(MachineOpComp))?.openTerminal(node.params, true);
}),
new ContextmenuItem('files', 'machine.fileManage').withIcon('FolderOpened').withOnClick(async (node: any) => {
(await node.ctx?.addResourceComponent(MachineOpComp)).showFileManage(node.params);
}),

View File

@@ -12,8 +12,10 @@
lazy
>
<template #tableHeader>
<el-button type="primary" icon="plus" @click="editMongo(false)" plain>{{ $t('common.create') }}</el-button>
<el-button type="danger" icon="delete" :disabled="selectionData.length < 1" @click="deleteMongo" plain>{{ $t('common.delete') }}</el-button>
<el-button v-auth="'mongo:save'" type="primary" icon="plus" @click="editMongo(false)" plain>{{ $t('common.create') }}</el-button>
<el-button v-auth="'mongo:del'" type="danger" icon="delete" :disabled="selectionData.length < 1" @click="deleteMongo" plain>
{{ $t('common.delete') }}
</el-button>
</template>
<template #tagPath="{ data }">
@@ -25,7 +27,7 @@
<el-button @click="showUsers(data.id)" link type="success">cmd</el-button>
<el-button @click="editMongo(data)" link type="primary">{{ $t('common.edit') }}</el-button>
<el-button v-auth="'mongo:save'" @click="editMongo(data)" link type="primary">{{ $t('common.edit') }}</el-button>
</template>
</page-table>

View File

@@ -12,8 +12,10 @@
lazy
>
<template #tableHeader>
<el-button type="primary" icon="plus" @click="editRedis(false)" plain>{{ $t('common.create') }}</el-button>
<el-button type="danger" icon="delete" :disabled="selectionData.length < 1" @click="deleteRedis" plain>{{ $t('common.delete') }}</el-button>
<el-button v-auth="'redis:save'" type="primary" icon="plus" @click="editRedis(false)" plain>{{ $t('common.create') }}</el-button>
<el-button v-auth="'redis:del'" type="danger" icon="delete" :disabled="selectionData.length < 1" @click="deleteRedis" plain>
{{ $t('common.delete') }}
</el-button>
</template>
<template #tagPath="{ data }">
@@ -27,7 +29,7 @@
<el-button @click="onShowClusterInfo(data)" v-if="data.mode === 'cluster'" type="primary" link>{{ $t('redis.clusterInfo') }}</el-button>
<el-button @click="showDetail(data)" link>{{ $t('common.detail') }}</el-button>
<el-button type="primary" link @click="editRedis(data)">{{ $t('common.edit') }}</el-button>
<el-button v-auth="'redis:save'" type="primary" link @click="editRedis(data)">{{ $t('common.edit') }}</el-button>
</template>
</page-table>

View File

@@ -32,7 +32,7 @@
<template #dropdown>
<el-dropdown-menu>
<template v-for="item in contextMenuItems" :key="item.clickId">
<el-dropdown-item v-if="!item.isHide(props.data)" :command="item">
<el-dropdown-item v-if="!item.isHide(props.data) && hasPerm(item.permission)" :command="item">
<SvgIcon v-if="item.icon" :name="item.icon" class="mr-1" />{{ $t(item.txt) }}
</el-dropdown-item>
</template>
@@ -54,6 +54,7 @@ import SvgIcon from '@/components/svgIcon/index.vue';
import { ContextmenuItem } from '@/components/contextmenu';
import { ResourceOpCtx, TagTreeNode } from '@/views/ops/component/tag';
import { ResourceOpCtxKey } from '@/views/ops/resource/resource';
import { hasPerm } from '@/components/auth/auth';
const resourceOpCtx: ResourceOpCtx | undefined = inject(ResourceOpCtxKey, undefined);

View File

@@ -2,7 +2,7 @@
<div class="h-full">
<el-splitter @resize="onResizeOpPanel">
<el-splitter-panel size="24%" max="40%">
<el-card class="h-full flex" body-class="bg-(--el-bg-color) !p-0 flex flex-col w-full">
<el-card class="h-full flex" body-class="!p-0 flex flex-col w-full">
<div class="tag-tree-header flex justify-between items-center">
<el-input v-model="filterText" :placeholder="$t('tag.tagFilterPlaceholder')" clearable size="small" class="tag-tree-search w-full">
<template #prefix>

View File

@@ -375,6 +375,7 @@ const allowDrop = (draggingNode: any, dropNode: any, type: any) => {
// 只有权限节点可移动至菜单节点下 或者移动菜单
return (
(draggingNode.data.type == permissionTypeValue && dropNode.data.type == menuTypeValue) ||
(draggingNode.data.type == permissionTypeValue && dropNode.data.type == permissionTypeValue) ||
(draggingNode.data.type == menuTypeValue && dropNode.data.type == menuTypeValue)
);
}

View File

@@ -18,20 +18,19 @@ jwt:
expire-time: 720
# refreshToken过期时间单位分钟
refresh-token-expire-time: 4320
# 资源密码aes加密key
aes:
key: 1111111111111111
# 若存在mysql配置优先使用mysql
mysql:
host: mysql:3306
# 数据库配置dialect支持mysql、sqlite
db:
dialect: mysql
address: mysql:3306
name: mayfly-go
username: root
password: 111049
db-name: mayfly-go
config: charset=utf8&loc=Local&parseTime=true
max-idle-conns: 5
sqlite:
path: ./mayfly-go.sqlite
max-idle-conns: 5
# db:
# dialect: sqlite
# address: ./mayfly-go.db
# max-idle-conns: 5
# 若同时部署多台机器则需要配置redis信息用于缓存权限码、验证码、公私钥等
# redis:
# host: localhost
@@ -55,3 +54,6 @@ log:
# max-age: 60
# # 是否使用 gzip 压缩方式压缩轮转后的日志文件
# compress: true
# 资源密码aes加密key
aes:
key: 1111111111111111

View File

@@ -18,17 +18,17 @@ require (
github.com/go-playground/universal-translator v0.18.1
github.com/go-playground/validator/v10 v10.30.1
github.com/go-sql-driver/mysql v1.9.3
github.com/golang-jwt/jwt/v5 v5.3.0
github.com/golang-jwt/jwt/v5 v5.3.1
github.com/google/uuid v1.6.0
github.com/gorilla/websocket v1.5.3
github.com/lionsoul2014/ip2region/binding/golang v0.0.0-20250930013652-2d71241a3bb9
github.com/microsoft/go-mssqldb v1.9.3
github.com/microsoft/go-mssqldb v1.9.6
github.com/mojocn/base64Captcha v1.3.8 //
github.com/opencontainers/image-spec v1.1.1
github.com/pkg/errors v0.9.1
github.com/pkg/sftp v1.13.10
github.com/pquerna/otp v1.5.0
github.com/redis/go-redis/v9 v9.17.2
github.com/redis/go-redis/v9 v9.17.3
github.com/robfig/cron/v3 v3.0.1 //
github.com/sijms/go-ora/v2 v2.9.0
github.com/spf13/cast v1.10.0
@@ -36,7 +36,7 @@ require (
github.com/tidwall/gjson v1.18.0
github.com/veops/go-ansiterm v0.0.5
go.mongodb.org/mongo-driver/v2 v2.3.0 // mongo
golang.org/x/crypto v0.46.0 // ssh
golang.org/x/crypto v0.47.0 // ssh
golang.org/x/oauth2 v0.34.0
golang.org/x/sync v0.19.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
@@ -109,6 +109,7 @@ require (
github.com/quic-go/qpack v0.5.1 // indirect
github.com/quic-go/quic-go v0.55.0 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
github.com/shopspring/decimal v1.4.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/slongfield/pyfmt v0.0.0-20220222012616-ea85ff4c361f // indirect
github.com/tidwall/match v1.2.0 // indirect
@@ -132,11 +133,11 @@ require (
golang.org/x/arch v0.21.0 // indirect
golang.org/x/exp v0.0.0-20251002181428-27f1f14c8bb9 // indirect
golang.org/x/image v0.31.0 // indirect
golang.org/x/mod v0.30.0 // indirect
golang.org/x/net v0.47.0 // indirect
golang.org/x/sys v0.39.0 // indirect
golang.org/x/text v0.32.0 // indirect
golang.org/x/tools v0.39.0 // indirect
golang.org/x/mod v0.31.0 // indirect
golang.org/x/net v0.48.0 // indirect
golang.org/x/sys v0.40.0 // indirect
golang.org/x/text v0.33.0 // indirect
golang.org/x/tools v0.40.0 // indirect
google.golang.org/protobuf v1.36.10 // indirect
modernc.org/libc v1.66.10 // indirect
modernc.org/mathutil v1.7.1 // indirect

View File

@@ -6,6 +6,7 @@ import (
"io"
"mayfly-go/internal/ai/config"
aimodel "mayfly-go/internal/ai/model"
"mayfly-go/pkg/gox"
"mayfly-go/pkg/logx"
"github.com/cloudwego/eino/components/tool"
@@ -17,11 +18,7 @@ import (
// GetAiAgent 获取AI Agent
func GetAiAgent(ctx context.Context, aiConfig *config.AIModelConfig, tools ...tool.BaseTool) (*react.Agent, error) {
aiModel := aimodel.GetAIModelByConfig(aiConfig)
if aiModel == nil {
return nil, errors.New("no supported AI model found")
}
toolableChatModel, err := aiModel.GetChatModel(ctx, aiConfig)
toolableChatModel, err := aimodel.GetChatModel(ctx, aiConfig)
if err != nil {
return nil, err
}
@@ -64,7 +61,7 @@ func NewAiAgent(ctx context.Context, toolTypes ...ToolType) (*AiAgent, error) {
}
// Chat 聊天,返回消息流通道
func (aiAgent *AiAgent) Chat(ctx context.Context, sysPrompt string, question string) (chan *schema.Message, chan error) {
func (aiAgent *AiAgent) Chat(ctx context.Context, sysPrompt string, question string) (<-chan *schema.Message, <-chan error) {
ch := make(chan *schema.Message, 512)
errCh := make(chan error, 1)
@@ -77,6 +74,10 @@ func (aiAgent *AiAgent) Chat(ctx context.Context, sysPrompt string, question str
go func() {
defer close(ch)
defer close(errCh)
defer gox.Recover(func(err error) {
errCh <- err
})
sr, err := aiAgent.Stream(ctx, []*schema.Message{
{
Role: schema.System,

View File

@@ -2,12 +2,12 @@ package init
import (
"mayfly-go/internal/ai/api"
"mayfly-go/initialize"
"mayfly-go/pkg/starter"
)
func init() {
// 注册AI模块的IoC组件
initialize.AddInitIocFunc(func() {
starter.AddInitIocFunc(func() {
api.InitIoc()
})
}

View File

@@ -2,17 +2,28 @@ package model
import (
"context"
"errors"
"fmt"
"mayfly-go/internal/ai/config"
"mayfly-go/pkg/logx"
"mayfly-go/pkg/utils/collx"
"github.com/cloudwego/eino/components/model"
)
// 定义响应格式常量
const (
ResponseFormatJSON = "json_object"
ResponseFormatText = "text"
)
func init() {
RegisterAIModel(new(Openai))
}
var (
aiModelMap = map[string]AIModel{}
chatModels = collx.SM[string,model.ToolCallingChatModel]{}
)
type AIModel interface {
@@ -20,8 +31,8 @@ type AIModel interface {
// SupportModel 支持的模型
SupportModel() string
// GetChatModel 获取聊天模型
GetChatModel(ctx context.Context, aiConfig *config.AIModelConfig) (model.ToolCallingChatModel, error)
// NewChatModel 创建chat模型
NewChatModel(ctx context.Context, aiConfig *config.AIModelConfig) (model.ToolCallingChatModel, error)
}
// RegisterAIModel 注册AI模型
@@ -38,3 +49,41 @@ func GetAIModel(name string) AIModel {
func GetAIModelByConfig(aiConfig *config.AIModelConfig) AIModel {
return GetAIModel(aiConfig.ModelType)
}
// GetChatModel 获取Chat模型
func GetChatModel(ctx context.Context, aiConfig *config.AIModelConfig) (model.ToolCallingChatModel, error) {
aiModel := GetAIModelByConfig(aiConfig)
if aiModel == nil {
return nil, errors.New("no supported AI model found")
}
cacheKey := generateCacheKey(aiConfig)
if chatModel, ok := chatModels.Load(cacheKey); ok {
logx.Debugf("ai model [%s/%s] - get chat model from cache", aiConfig.ModelType, aiConfig.Model)
return chatModel, nil
}
// 删除已存在的缓存
chatModels.Clear()
chatModel, err := aiModel.NewChatModel(ctx, aiConfig)
if err != nil {
return nil, err
}
logx.Debugf("ai model [%s/%s] - new chat model", aiConfig.ModelType,aiConfig.Model)
chatModels.Store(cacheKey, chatModel)
return chatModel, nil
}
// generateCacheKey 生成基于 aiConfig 关键字段的缓存键
func generateCacheKey(config *config.AIModelConfig) string {
return fmt.Sprintf("%s_%s_%s_%s_%d_%f",
config.ModelType,
config.Model,
config.BaseUrl,
config.ApiKey,
config.MaxTokens,
config.Temperature,
)
}

View File

@@ -16,7 +16,7 @@ func (o *Openai) SupportModel() string {
return "openai"
}
func (o *Openai) GetChatModel(ctx context.Context, aiConfig *config.AIModelConfig) (model.ToolCallingChatModel, error) {
func (o *Openai) NewChatModel(ctx context.Context, aiConfig *config.AIModelConfig) (model.ToolCallingChatModel, error) {
return openai.NewChatModel(ctx, &openai.ChatModelConfig{
BaseURL: aiConfig.BaseUrl,
Model: aiConfig.Model,
@@ -24,5 +24,8 @@ func (o *Openai) GetChatModel(ctx context.Context, aiConfig *config.AIModelConfi
Timeout: time.Duration(aiConfig.TimeOut) * time.Second,
MaxTokens: &aiConfig.MaxTokens,
Temperature: &aiConfig.Temperature,
ResponseFormat: &openai.ChatCompletionResponseFormat{
Type: openai.ChatCompletionResponseFormatTypeJSONObject,
},
})
}

View File

@@ -13,6 +13,7 @@ import (
"mayfly-go/pkg/biz"
"mayfly-go/pkg/cache"
"mayfly-go/pkg/global"
"mayfly-go/pkg/gox"
"mayfly-go/pkg/req"
"mayfly-go/pkg/utils/collx"
"mayfly-go/pkg/utils/netx"
@@ -57,7 +58,9 @@ func LastLoginCheck(ctx context.Context, account *sysentity.Account, accountLogi
res["refresh_token"] = refreshToken
// 不进行otp二次校验则直接返回accessToken
// 保存登录消息
go saveLogin(ctx, account, loginIp)
gox.Go(func() {
saveLogin(ctx, account, loginIp)
})
}
// 赋值otp状态

View File

@@ -5,5 +5,5 @@ import (
)
func InitIoc() {
ioc.Register(new(oauth2AppImpl), ioc.WithComponentName("Oauth2App"))
ioc.Register(new(oauth2AppImpl))
}

View File

@@ -5,5 +5,5 @@ import (
)
func InitIoc() {
ioc.Register(newAuthAccountRepo(), ioc.WithComponentName("Oauth2AccountRepo"))
ioc.Register(newAuthAccountRepo())
}

View File

@@ -1,14 +1,14 @@
package init
import (
"mayfly-go/initialize"
"mayfly-go/internal/auth/api"
"mayfly-go/internal/auth/application"
"mayfly-go/internal/auth/infra/persistence"
"mayfly-go/pkg/starter"
)
func init() {
initialize.AddInitIocFunc(func() {
starter.AddInitIocFunc(func() {
persistence.InitIoc()
application.InitIoc()
api.InitIoc()

View File

@@ -1,12 +1,12 @@
package init
import (
"mayfly-go/initialize"
"mayfly-go/internal/common/api"
"mayfly-go/pkg/starter"
)
func init() {
initialize.AddInitIocFunc(func() {
starter.AddInitIocFunc(func() {
api.InitIoc()
})
}

View File

@@ -11,6 +11,7 @@ import (
fileapp "mayfly-go/internal/file/application"
tagapp "mayfly-go/internal/tag/application"
"mayfly-go/pkg/biz"
"mayfly-go/pkg/gox"
"mayfly-go/pkg/model"
"mayfly-go/pkg/req"
"strings"
@@ -150,12 +151,13 @@ func (d *DbTransferTask) FileRun(rc *req.Ctx) {
filename, reader, err := d.fileApp.GetReader(context.TODO(), tFile.FileKey)
biz.ErrIsNil(err)
go func() {
biz.ErrIsNil(d.dbSqlExecApp.ExecReader(rc.MetaCtx, &dto.SqlReaderExec{
gox.GoCtx(rc.MetaCtx, func(ctx context.Context) {
biz.ErrIsNil(d.dbSqlExecApp.ExecReader(ctx, &dto.SqlReaderExec{
Reader: reader,
Filename: filename,
DbConn: targetDbConn,
ClientId: fm.ClientId,
}))
}()
})
}

View File

@@ -4,7 +4,6 @@ type DataSyncTaskForm struct {
Id uint64 `json:"id"`
TaskName string `binding:"required" json:"taskName"`
TaskCron string `binding:"required" json:"taskCron"`
TaskKey string `json:"taskKey"`
Status int `binding:"required" json:"status"`
SrcDbId int64 `binding:"required" json:"srcDbId"`

View File

@@ -1,6 +1,10 @@
package form
import "mayfly-go/pkg/model"
type DbTransferTaskForm struct {
model.ExtraData
Id uint64 `json:"id"`
TaskName string `binding:"required" json:"taskName"` // 任务名称
@@ -28,14 +32,17 @@ type DbTransferTaskForm struct {
TargetInstName string `json:"targetInstName"` // 目标库实例名
TargetTagPath string `json:"targetTagPath"` // 目标库tagPath
}
type DbTransferTaskStatusForm struct {
Id uint64 `binding:"required" json:"taskId" form:"taskId"`
Status int8 `json:"status" form:"status"`
}
type DbTransferFileForm struct {
Id uint64 `json:"id"`
FileName string `json:"fileName" form:"fileName"`
}
type DbTransferFileRunForm struct {
Id uint64 `json:"id"` // 文件ID
TargetDbId uint64 `json:"targetDbId" form:"targetDbId"` // 需要执行sql的数据库id

View File

@@ -1,10 +1,13 @@
package vo
import (
"mayfly-go/pkg/model"
"time"
)
type DbTransferTaskListVO struct {
model.ExtraData
Id uint64 `json:"id"`
CreateTime *time.Time `json:"createTime"`
Creator string `json:"creator"`

View File

@@ -6,13 +6,13 @@ import (
)
func InitIoc() {
ioc.Register(new(instanceAppImpl), ioc.WithComponentName("DbInstanceApp"))
ioc.Register(new(dbAppImpl), ioc.WithComponentName("DbApp"))
ioc.Register(new(dbSqlExecAppImpl), ioc.WithComponentName("DbSqlExecApp"))
ioc.Register(new(dbSqlAppImpl), ioc.WithComponentName("DbSqlApp"))
ioc.Register(new(dataSyncAppImpl), ioc.WithComponentName("DbDataSyncTaskApp"))
ioc.Register(new(dbTransferAppImpl), ioc.WithComponentName("DbTransferTaskApp"))
ioc.Register(new(dbTransferFileAppImpl), ioc.WithComponentName("DbTransferFileApp"))
ioc.Register(new(instanceAppImpl))
ioc.Register(new(dbAppImpl))
ioc.Register(new(dbSqlExecAppImpl))
ioc.Register(new(dbSqlAppImpl))
ioc.Register(new(dataSyncAppImpl))
ioc.Register(new(dbTransferAppImpl))
ioc.Register(new(dbTransferFileAppImpl))
}
func Init() {

View File

@@ -36,10 +36,9 @@ type Db interface {
// 删除数据库信息
Delete(ctx context.Context, id uint64) error
// 获取数据库连接实例
// @param id 数据库id
//
// @param dbName 数据库名
// GetDbConn 获取数据库连接实例
// - dbId: 数据库id
// - dbName: 数据库名
GetDbConn(ctx context.Context, dbId uint64, dbName string) (*dbi.DbConn, error)
// 根据数据库实例id获取连接随机返回该instanceId下已连接的conn若不存在则是使用该instanceId关联的db进行连接并返回。
@@ -228,7 +227,7 @@ func (d *dbAppImpl) DumpDb(ctx context.Context, reqParam *dto.DumpDb) error {
}
writer := writerx.NewStringWriter(reqParam.Writer)
defer writer.Close()
dbId := reqParam.DbId
dbName := reqParam.DbName
tables := reqParam.Tables
@@ -291,14 +290,16 @@ func (d *dbAppImpl) DumpDb(ctx context.Context, reqParam *dto.DumpDb) error {
// 按表名排序
sort.Strings(tables)
quoteSchema := srcDialect.Quoter().Quote(dbConn.Info.CurrentSchema())
dumpHelper := targetDialect.GetDumpHelper()
targetDumpHelper := targetDialect.GetDumpHelper()
targetSqlGenerator := targetDialect.GetSQLGenerator()
targetDialectQuote := targetDialect.Quoter().Quote
// targetDialectQuote := targetDialect.Quoter().Quote
srcDialectQuote := srcDialect.Quoter().Quote
// 遍历获取每个表的信息
for _, tableName := range tables {
log(fmt.Sprintf("get table [%s] information...", tableName))
quoteTableName := targetDialectQuote(tableName)
// targetQuoteTableName := targetDialectQuote(tableName)
srcQuoteTableName := srcDialectQuote(tableName)
// 查询表信息,主要是为了查询表注释
tbs, err := srcMeta.GetTables(tableName)
@@ -332,22 +333,22 @@ func (d *dbAppImpl) DumpDb(ctx context.Context, reqParam *dto.DumpDb) error {
log(fmt.Sprintf("generate table [%s] DML...", tableName))
writer.WriteString(fmt.Sprintf("\n-- ----------------------------\n-- Data: %s \n-- ----------------------------\n", tableName))
dumpHelper.BeforeInsert(writer, quoteTableName)
targetDumpHelper.BeforeInsert(writer, tableName)
dataCount := 0
rows := make([][]any, 0)
_, err = dbConn.WalkTableRows(ctx, quoteTableName, func(row map[string]any, _ []*dbi.QueryColumn) error {
_, err = dbConn.WalkTableRows(ctx, srcQuoteTableName, func(row map[string]any, _ []*dbi.QueryColumn) error {
rowValues := make([]any, len(columns))
for i, col := range columns {
rowValues[i] = row[col.ColumnName]
}
rows = append(rows, rowValues)
dataCount++
if dataCount%500 != 0 {
if dataCount%100 != 0 {
return nil
}
beforeInsert := dumpHelper.BeforeInsertSql(quoteSchema, quoteTableName)
beforeInsert := targetDumpHelper.BeforeInsertSql(quoteSchema, tableName)
if beforeInsert != "" {
writer.WriteString(beforeInsert)
}
@@ -365,7 +366,7 @@ func (d *dbAppImpl) DumpDb(ctx context.Context, reqParam *dto.DumpDb) error {
}
if len(rows) > 0 {
beforeInsert := dumpHelper.BeforeInsertSql(quoteSchema, quoteTableName)
beforeInsert := targetDumpHelper.BeforeInsertSql(quoteSchema, tableName)
if beforeInsert != "" {
writer.WriteString(beforeInsert)
}
@@ -375,7 +376,7 @@ func (d *dbAppImpl) DumpDb(ctx context.Context, reqParam *dto.DumpDb) error {
}
}
dumpHelper.AfterInsert(writer, tableName, columns)
targetDumpHelper.AfterInsert(writer, tableName, columns)
progress(tableName, dbi.StmtTypeInsert, dataCount, true)
}

View File

@@ -13,6 +13,7 @@ import (
"mayfly-go/pkg/cache"
"mayfly-go/pkg/contextx"
"mayfly-go/pkg/errorx"
"mayfly-go/pkg/gox"
"mayfly-go/pkg/i18n"
"mayfly-go/pkg/logx"
"mayfly-go/pkg/model"
@@ -70,7 +71,13 @@ func (app *dataSyncAppImpl) Save(ctx context.Context, taskEntity *entity.DataSyn
taskEntity.TaskKey = uuid.New().String()
err = app.Insert(ctx, taskEntity)
} else {
taskEntity.TaskKey = ""
if taskEntity.TaskKey == "" {
task, err := app.GetById(taskEntity.Id)
if err != nil {
return errorx.NewBiz("db sync task not found")
}
taskEntity.TaskKey = task.TaskKey
}
err = app.UpdateById(ctx, taskEntity)
}
if err != nil {
@@ -126,7 +133,13 @@ func (app *dataSyncAppImpl) Run(ctx context.Context, id uint64) error {
CreateTime: &now,
Status: entity.DataSyncTaskStateFail, // 默认失败
}
defer app.endRunning(task, syncLog)
defer gox.Recover(func(err error) {
syncLog.ErrText = i18n.T(imsg.DataSyncFailMsg, "msg", err.Error())
logx.ErrorContext(ctx, syncLog.ErrText)
syncLog.Status = entity.DataSyncTaskStateFail
})
// 通过占位符格式化sql
updSql := ""

View File

@@ -1,6 +1,7 @@
package application
import (
"archive/zip"
"cmp"
"context"
"fmt"
@@ -17,6 +18,7 @@ import (
"mayfly-go/pkg/cache"
"mayfly-go/pkg/contextx"
"mayfly-go/pkg/errorx"
"mayfly-go/pkg/gox"
"mayfly-go/pkg/logx"
"mayfly-go/pkg/model"
"mayfly-go/pkg/scheduler"
@@ -75,11 +77,19 @@ func (app *dbTransferAppImpl) Save(ctx context.Context, taskEntity *entity.DbTra
taskEntity.TaskKey = uuid.New().String()
err = app.Insert(ctx, taskEntity)
} else {
if taskEntity.TaskKey == "" {
task, err := app.GetById(taskEntity.Id)
if err != nil {
return errorx.NewBiz("db transfer task not found")
}
taskEntity.TaskKey = task.TaskKey
}
err = app.UpdateById(ctx, taskEntity)
}
if err != nil {
return err
}
app.addCronJob(ctx, taskEntity)
return nil
}
@@ -131,7 +141,6 @@ func (app *dbTransferAppImpl) Run(ctx context.Context, taskId uint64) (uint64, e
}
logId, _ := app.CreateLog(ctx, taskId)
start := time.Now()
// 修改状态与关联日志id
task.LogId = logId
task.RunningState = entity.DbTransferTaskRunStateRunning
@@ -142,7 +151,7 @@ func (app *dbTransferAppImpl) Run(ctx context.Context, taskId uint64) (uint64, e
// 标记该任务开始执行
app.MarkRunning(taskId)
go func() {
gox.Go(func() {
// 获取源库连接、目标库连接判断连接可用性否则记录日志xx连接不可用
// 获取源库表信息
srcConn, err := app.dbApp.GetDbConn(ctx, uint64(task.SrcDbId), task.SrcDbName)
@@ -171,19 +180,21 @@ func (app *dbTransferAppImpl) Run(ctx context.Context, taskId uint64) (uint64, e
// 迁移到文件或数据库
switch task.Mode {
case entity.DbTransferTaskModeFile:
app.transfer2File(ctx, taskId, logId, task, srcConn, start, tables)
app.transfer2File(ctx, logId, task, tables)
case entity.DbTransferTaskModeDb:
app.transfer2Db(ctx, taskId, logId, task, srcConn, start, tables)
app.transfer2Db(ctx, logId, task, tables)
default:
app.EndTransfer(ctx, logId, taskId, "error in transfer mode, only migrating to files or databases is currently supported", err, nil)
return
}
}()
})
return logId, nil
}
func (app *dbTransferAppImpl) transfer2Db(ctx context.Context, taskId uint64, logId uint64, task *entity.DbTransferTask, srcConn *dbi.DbConn, start time.Time, tables []dbi.Table) {
func (app *dbTransferAppImpl) transfer2Db(ctx context.Context, logId uint64, task *entity.DbTransferTask, tables []dbi.Table) {
startTime := time.Now()
taskId := task.Id
defer app.MarkStop(taskId)
defer app.logApp.Flush(logId, true)
@@ -203,13 +214,18 @@ func (app *dbTransferAppImpl) transfer2Db(ctx context.Context, taskId uint64, lo
for _, tables := range tableGroups {
errGroup.Go(func() error {
defer gox.Recover()
if !app.IsRunning(taskId) {
return errorx.NewBiz("transfer stopped")
}
currentDumpTable := tables[0]
pr, pw := io.Pipe()
go func() {
defer pr.Close()
gox.Go(func () {
defer pw.Close()
err := app.dbApp.DumpDb(ctx, &dto.DumpDb{
LogId: logId,
DbId: uint64(task.SrcDbId),
@@ -240,20 +256,16 @@ func (app *dbTransferAppImpl) transfer2Db(ctx context.Context, taskId uint64, lo
},
})
if err != nil {
app.Log(ctx, logId, fmt.Sprintf("db dump failed: %s", err.Error()))
pr.CloseWithError(err)
return
}
}()
})
if err != nil {
pw.CloseWithError(err)
app.EndTransfer(ctx, logId, taskId, "transfer table failed", err, nil)
return err
}
tx, _ := targetConn.Begin()
err = sqlparser.SQLSplit(pr, ';', func(stmt string) error {
if _, err := targetConn.TxExec(tx, stmt); err != nil {
app.EndTransfer(ctx, logId, taskId, fmt.Sprintf("执行sql出错: %s", stmt), err, nil)
app.Log(ctx, logId, fmt.Sprintf("sql exec failed: %s", stmt), err, nil)
pw.CloseWithError(err)
return err
}
@@ -275,10 +287,11 @@ func (app *dbTransferAppImpl) transfer2Db(ctx context.Context, taskId uint64, lo
app.EndTransfer(ctx, logId, taskId, "transfer table failed", err, nil)
return
}
app.EndTransfer(ctx, logId, taskId, fmt.Sprintf("execute transfer task [taskId = %d] complete, time: %v", taskId, time.Since(start)), nil, nil)
app.EndTransfer(ctx, logId, taskId, fmt.Sprintf("execute transfer task [taskId = %d] complete, time: %v", taskId, time.Since(startTime)), nil, nil)
}
func (app *dbTransferAppImpl) transfer2File(ctx context.Context, taskId uint64, logId uint64, task *entity.DbTransferTask, srcConn *dbi.DbConn, start time.Time, tables []dbi.Table) {
func (app *dbTransferAppImpl) transfer2File(ctx context.Context, logId uint64, task *entity.DbTransferTask, tables []dbi.Table) {
taskId := task.Id
// 1、新增迁移文件数据
nowTime := time.Now()
tFile := &entity.DbTransferFile{
@@ -290,8 +303,9 @@ func (app *dbTransferAppImpl) transfer2File(ctx context.Context, taskId uint64,
}
_ = app.transferFileApp.Save(ctx, tFile)
filename := fmt.Sprintf("dtf_%s.sql", timex.TimeNo())
fileKey, writer, saveFileFunc, err := app.fileApp.NewWriter(ctx, "", filename)
fileType := cmp.Or(task.GetExtraString("fileType"), "sql")
filename := fmt.Sprintf("dtf_%s.%s", timex.TimeNo(), fileType)
fileKey, writer, closeFunc, err := app.fileApp.NewWriter(ctx, "", filename)
if err != nil {
app.EndTransfer(ctx, logId, taskId, "create file error", err, nil)
return
@@ -305,11 +319,30 @@ func (app *dbTransferAppImpl) transfer2File(ctx context.Context, taskId uint64,
go func() {
var err error
defer saveFileFunc(&err)
defer closeFunc(&err)
defer app.MarkStop(taskId)
defer app.logApp.Flush(logId, true)
defer gox.Recover(func(e error) {
err = e
app.EndTransfer(ctx, logId, taskId, "transfer to file panic", e, nil)
tFile.Status = entity.DbTransferFileStatusFail
app.transferFileApp.UpdateById(ctx, tFile)
})
ctx = context.Background()
if fileType == "zip" {
zipWriter := zip.NewWriter(writer)
defer zipWriter.Close()
// 创建SQL文件在ZIP内的条目
writer, err = zipWriter.Create(strings.Replace(filename, "zip", "sql", 1))
if err != nil {
app.EndTransfer(ctx, logId, taskId, "create zip file error", err, nil)
return
}
}
err = app.dbApp.DumpDb(ctx, &dto.DumpDb{
LogId: logId,
DbId: uint64(task.SrcDbId),
@@ -333,7 +366,7 @@ func (app *dbTransferAppImpl) transfer2File(ctx context.Context, taskId uint64,
tFile.Status = entity.DbTransferFileStatusSuccess
tFile.FileKey = fileKey
_ = app.transferFileApp.UpdateById(ctx, tFile)
app.transferFileApp.UpdateById(ctx, tFile)
}()
}
@@ -358,6 +391,7 @@ func (app *dbTransferAppImpl) Stop(ctx context.Context, taskId uint64) error {
func (d *dbTransferAppImpl) TimerDeleteTransferFile() {
logx.Debug("start deleting transfer files periodically...")
scheduler.AddFun("@every 100m", func() {
defer gox.Recover()
dts, err := d.ListByCond(model.NewCond().Eq("mode", entity.DbTransferTaskModeFile).Ge("file_save_days", 1))
if err != nil {
logx.Errorf("the task to periodically get database transfer to file failed: %s", err.Error())
@@ -385,12 +419,6 @@ func (app *dbTransferAppImpl) addCronJob(ctx context.Context, taskEntity *entity
// 根据状态添加新的任务
if taskEntity.Status == entity.DbTransferTaskStatusEnable && taskEntity.CronAble == entity.DbTransferTaskCronAbleEnable {
if key == "" {
taskEntity.TaskKey = uuid.New().String()
key = taskEntity.TaskKey
_ = app.UpdateById(ctx, taskEntity)
}
taskId := taskEntity.Id
if err := scheduler.AddFunByKey(key, taskEntity.Cron, func() {
logx.Infof("start the transfer task: %d", taskId)

View File

@@ -22,7 +22,7 @@ type DumpDb struct {
LogId uint64
Writer io.WriteCloser
Writer io.Writer
TargetDbType dbi.DbType
Log func(msg string)

View File

@@ -43,6 +43,7 @@ var DefaultDbDataType = NewDbDataType("string", DTString).WithCT(CTVarchar)
type Column struct {
TableName string `json:"tableName"` // 表名
ColumnName string `json:"columnName"` // 列名
ColumnType string `json:"columnType"` // 完整列类型带有数据类型以及长度、精度等。如varchar(2000)decimal(20,2)
DataType string `json:"dataType"` // 数据类型
ColumnComment string `json:"columnComment"` // 列备注
IsPrimaryKey bool `json:"isPrimaryKey"` // 是否为主键
@@ -57,6 +58,10 @@ type Column struct {
// GetColumnType 获取完整的列类型拼接数据类型与长度等。如varchar(2000)decimal(20,2)
func (c *Column) GetColumnType() string {
if c.ColumnType != "" {
return c.ColumnType
}
if c.CharMaxLength > 0 {
return fmt.Sprintf("%s(%d)", c.DataType, c.CharMaxLength)
}
@@ -178,6 +183,8 @@ func SQLValueBool(val any) string {
return fmt.Sprintf("%v", cast.ToBool(val))
}
// SQLValueString 转换为SQL字符串值处理特殊字符与单引号
// 适用于普通字符串类型,会将双引号等特殊字符进行转义
func SQLValueString(val any) string {
if val == nil {
return NULL
@@ -197,6 +204,26 @@ func SQLValueString(val any) string {
return fmt.Sprintf("'%s'", quoted)
}
// SQLValuePreserveSpecialChars 转换为SQL字符串值保留特殊字符如双引号、换行符等
// 仅转义单引号为两个单引号SQL标准转义方式
func SQLValuePreserveSpecialChars(val any) string {
if val == nil {
return NULL
}
strVal, ok := val.(string)
if !ok {
return fmt.Sprintf("%v", val)
}
// 直接处理 SQL 特殊字符,保留双引号和其他字符
// 只转义单引号为两个单引号SQL 标准转义方式)
escapedStr := strings.ReplaceAll(strVal, "'", "''")
// 返回 SQL 字符串,保持原始的双引号、换行符等
return fmt.Sprintf("'%s'", escapedStr)
}
var (
DTBit = &DataType{
Name: "bit",
@@ -266,6 +293,13 @@ var (
SQLValue: SQLValueString,
}
// DTStringPreserveSpecial 用于需要保留双引号换行符等特殊字符的字符串类型
DTStringPreserveSpecial = &DataType{
Name: "string",
Valuer: ValuerString,
SQLValue: SQLValuePreserveSpecialChars,
}
DTDate = &DataType{
Name: "date",
Valuer: ValuerDate,

View File

@@ -5,7 +5,9 @@ import (
"database/sql"
"errors"
"fmt"
"strconv"
"mayfly-go/internal/machine/mcm"
"mayfly-go/pkg/errorx"
"mayfly-go/pkg/logx"
)
@@ -26,15 +28,12 @@ type DbConn struct {
// 关闭连接
func (d *DbConn) Close() error {
if d.db != nil {
defer mcm.CloseSshTunnel(d.Info)
if err := d.db.Close(); err != nil {
logx.Errorf("关闭数据库实例[%s]连接失败: %v", d.Id, err)
return err
}
logx.Debugf("dbm - conn close success, connId: %s", d.Id)
// TODO 关闭实例隧道会影响其他正在使用的连接,所以暂时不关闭
// if d.Info.useSshTunnel {
// mcm.CloseSshTunnelMachine(uint64(d.Info.SshTunnelMachineId), fmt.Sprintf("db:%d", d.Info.Id))
// }
d.db = nil
}
@@ -42,14 +41,10 @@ func (d *DbConn) Close() error {
}
func (d *DbConn) Ping() error {
if d.db == nil {
return fmt.Errorf("db is nil")
}
stats := d.db.Stats()
logx.Debugf("[%s] db stats -> open: %d, idle: %d, inUse: %d, maxOpen: %d", d.Info.Name, stats.OpenConnections, stats.Idle, stats.InUse, stats.MaxOpenConnections)
if stats.OpenConnections == 0 {
logx.Info("db stats: no open connections")
logx.Infof("[%s]-[%s] db stats: no open connections", d.Info.Name, d.Info.Database)
}
return d.db.Ping()
@@ -58,6 +53,7 @@ func (d *DbConn) Ping() error {
// 执行数据库查询返回的列信息
type QueryColumn struct {
Name string `json:"name"` // 列名
Key string `json:"key"` // 列唯一标识
Type string `json:"type"` // 数据类型
DbDataType *DbDataType `json:"-"`
@@ -67,6 +63,7 @@ type QueryColumn struct {
func NewQueryColumn(colName string, columnType *DbDataType) *QueryColumn {
return &QueryColumn{
Name: colName,
Key: colName,
Type: columnType.DataType.Name,
DbDataType: columnType,
valuer: columnType.DataType.Valuer(),
@@ -77,6 +74,7 @@ func (qc *QueryColumn) getValuePtr() any {
return qc.valuer.NewValuePtr()
}
// value 获取列值
func (qc *QueryColumn) value() any {
return qc.valuer.Value()
}
@@ -244,7 +242,12 @@ func (d *DbConn) walkQueryRows(ctx context.Context, selectSql string, walkFn Wal
rowData := make(map[string]any, lenCols)
// 把values中的数据复制到row中
for i := range scans {
rowData[cols[i].Name] = cols[i].value()
colname := cols[i].Name
if _, e := rowData[colname]; e {
colname = colname + strconv.Itoa(i)
cols[i].Key = colname
}
rowData[colname] = cols[i].value()
}
if err = walkFn(rowData, cols); err != nil {
logx.ErrorfContext(ctx, "[%s] cursor traversal query result set error, exit traversal: %s", selectSql, err.Error())

View File

@@ -43,11 +43,24 @@ type DbInfo struct {
CodePath []string
SshTunnelMachineId int
useSshTunnel bool // 是否使用系统自己实现的ssh隧道连接,而非库自带的
RemoteAddr string `json:"-"` // ssh隧道远程地址格式 ip:port
Meta Meta
}
var _ (mcm.SshTunnelAble) = (*DbInfo)(nil)
func (di *DbInfo) GetSshTunnelMachineId() int64 {
return int64(di.SshTunnelMachineId)
}
func (di *DbInfo) GetRemoteAddr() string {
if di.RemoteAddr != "" {
return di.RemoteAddr
}
return fmt.Sprintf("%s:%d", di.Host, di.Port)
}
// 获取记录日志的描述
func (di *DbInfo) GetLogDesc() string {
return fmt.Sprintf("DB[id=%d, tag=%s, name=%s, ip=%s:%d, database=%s]", di.Id, di.CodePath, di.Name, di.Host, di.Port, di.Database)
@@ -68,6 +81,9 @@ func (di *DbInfo) Conn(ctx context.Context, meta Meta) (*DbConn, error) {
di.Database = database
}
if err := di.IfUseSshTunnelChangeIpPort(ctx); err != nil {
return nil, err
}
conn, err := meta.GetSqlDb(ctx, di)
if err != nil {
logx.Errorf("db connection failed: %s:%d/%s, err:%s", di.Host, di.Port, database, err.Error())
@@ -99,17 +115,17 @@ func (di *DbInfo) Conn(ctx context.Context, meta Meta) (*DbConn, error) {
func (di *DbInfo) IfUseSshTunnelChangeIpPort(ctx context.Context) error {
// 开启ssh隧道
if di.SshTunnelMachineId > 0 {
sshTunnelMachine, err := GetSshTunnel(ctx, di.SshTunnelMachineId)
di.RemoteAddr = di.GetRemoteAddr()
sshTunnelMachine, err := machineapp.GetMachineApp().GetSshTunnelMachine(ctx, int(di.SshTunnelMachineId))
if err != nil {
return err
}
exposedIp, exposedPort, err := sshTunnelMachine.OpenSshTunnel(fmt.Sprintf("db:%d", di.Id), di.Host, di.Port)
exposedIp, exposedPort, err := sshTunnelMachine.OpenSshTunnel(di)
if err != nil {
return err
}
di.Host = exposedIp
di.Port = exposedPort
di.useSshTunnel = true
}
return nil
}

View File

@@ -131,6 +131,9 @@ func ConvToTargetDbColumn(srcDbType DbType, targetDbType DbType, targetDialect D
return nil
}
// 需要转换至异构数据库时需要将该字段清空否则如mysql可以查出该值其他数据库可能不行会导致Column.GetColumnType错误。
column.ColumnType = ""
srcMap := commonTypeConverters[srcDbType]
if srcMap == nil {
return fmt.Errorf("src database type [%s] not suport transfer", srcDbType)

View File

@@ -9,7 +9,6 @@ import (
_ "mayfly-go/internal/db/dbm/oracle"
_ "mayfly-go/internal/db/dbm/postgres"
_ "mayfly-go/internal/db/dbm/sqlite"
"mayfly-go/internal/machine/mcm"
"mayfly-go/pkg/logx"
"mayfly-go/pkg/pool"
)
@@ -18,22 +17,6 @@ var (
poolGroup = pool.NewPoolGroup[*dbi.DbConn]()
)
func init() {
mcm.AddCheckSshTunnelMachineUseFunc(func(machineId int) bool {
items := poolGroup.AllPool()
for _, v := range items {
conn, err := v.Get(context.Background(), pool.WithGetNoUpdateLastActive(), pool.WithGetNoNewConn())
if err != nil {
continue // 获取连接失败,跳过
}
if conn.Info.SshTunnelMachineId == machineId {
return true
}
}
return false
})
}
// GetDbConn 从连接池中获取连接信息
func GetDbConn(ctx context.Context, dbId uint64, database string, getDbInfo func() (*dbi.DbInfo, error)) (*dbi.DbConn, error) {
connId := dbi.GetDbConnId(dbId, database)

View File

@@ -3,11 +3,10 @@ package dm
import (
"fmt"
"mayfly-go/internal/db/dbm/dbi"
"mayfly-go/pkg/gox"
"mayfly-go/pkg/utils/stringx"
"strings"
"time"
_ "gitee.com/chunanyong/dm"
)
type DMDialect struct {
@@ -38,7 +37,7 @@ func (dd *DMDialect) CopyTable(copy *dbi.DbCopyTable) error {
// 复制数据
if copy.CopyData {
go func() {
gox.Go(func() {
// 设置允许填充自增列之后,显示指定列名可以插入自增列\
identityInsert := fmt.Sprintf("set identity_insert \"%s\" on", newTableName)
// 获取列名
@@ -50,8 +49,7 @@ func (dd *DMDialect) CopyTable(copy *dbi.DbCopyTable) error {
columnStr := strings.Join(columnArr, ",")
// 插入新数据并显示指定列
_, _ = dd.dc.Exec(fmt.Sprintf("%s insert into \"%s\" (%s) select %s from \"%s\"", identityInsert, newTableName, columnStr, columnStr, tableName))
}()
})
}
return err
}

View File

@@ -8,6 +8,8 @@ import (
"mayfly-go/pkg/utils/collx"
"net/url"
"strings"
_ "gitee.com/chunanyong/dm"
)
func init() {
@@ -36,11 +38,6 @@ func (dm *Meta) GetSqlDb(ctx context.Context, d *dbi.DbInfo) (*sql.DB, error) {
dbParam += "&" + d.Params
}
err := d.IfUseSshTunnelChangeIpPort(ctx)
if err != nil {
return nil, err
}
dsn := fmt.Sprintf("dm://%s:%s@%s:%d%s", d.Username, url.PathEscape(d.Password), d.Host, d.Port, dbParam)
return sql.Open(driverName, dsn)
}

View File

@@ -3,6 +3,7 @@ package mssql
import (
"fmt"
"mayfly-go/internal/db/dbm/dbi"
"mayfly-go/pkg/gox"
"mayfly-go/pkg/logx"
"strings"
"time"
@@ -42,7 +43,7 @@ func (md *MssqlDialect) CopyTable(copy *dbi.DbCopyTable) error {
}
// 复制数据
if copy.CopyData {
go func() {
gox.Go(func() {
// 查询所有的列
columns, err := msMetadata.GetColumns(copy.TableName)
if err != nil {
@@ -71,7 +72,7 @@ func (md *MssqlDialect) CopyTable(copy *dbi.DbCopyTable) error {
if err != nil {
logx.Warnf("复制表[%s]数据失败: %s", copy.TableName, err.Error())
}
}()
})
}
return err

View File

@@ -25,10 +25,6 @@ type Meta struct {
}
func (mm *Meta) GetSqlDb(ctx context.Context, d *dbi.DbInfo) (*sql.DB, error) {
err := d.IfUseSshTunnelChangeIpPort(ctx)
if err != nil {
return nil, err
}
query := url.Values{}
// The application name (default is go-mssqldb)
query.Add("app name", "mayfly")

View File

@@ -5,6 +5,7 @@ import (
"mayfly-go/internal/db/dbm/dbi"
"mayfly-go/internal/db/dbm/sqlparser"
"mayfly-go/internal/db/dbm/sqlparser/mysql"
"mayfly-go/pkg/gox"
"time"
)
@@ -42,9 +43,9 @@ func (md *MysqlDialect) CopyTable(copy *dbi.DbCopyTable) error {
// 复制数据
if copy.CopyData {
go func() {
gox.Go(func() {
_, _ = md.dc.Exec(fmt.Sprintf("insert into %s select * from %s", newTableName, tableName))
}()
})
}
return err
}

View File

@@ -6,9 +6,8 @@ import (
"fmt"
"mayfly-go/internal/db/dbm/dbi"
"mayfly-go/pkg/utils/collx"
"net"
"github.com/go-sql-driver/mysql"
_ "github.com/go-sql-driver/mysql"
)
func init() {
@@ -26,23 +25,14 @@ type Meta struct {
}
func (mm *Meta) GetSqlDb(ctx context.Context, d *dbi.DbInfo) (*sql.DB, error) {
// SSH Conect
if d.SshTunnelMachineId > 0 {
sshTunnelMachine, err := dbi.GetSshTunnel(ctx, d.SshTunnelMachineId)
if err != nil {
return nil, err
}
mysql.RegisterDialContext(d.Network, func(ctx context.Context, addr string) (net.Conn, error) {
return sshTunnelMachine.GetDialConn("tcp", addr)
})
}
d.Network = "tcp"
// 设置dataSourceName -> 更多参数参考https://github.com/go-sql-driver/mysql#dsn-data-source-name
dsn := fmt.Sprintf("%s:%s@%s(%s:%d)/%s?parseTime=true&timeout=8s", d.Username, d.Password, d.Network, d.Host, d.Port, d.Database)
if d.Params != "" {
dsn = fmt.Sprintf("%s&%s", dsn, d.Params)
}
const driverName = "mysql"
return sql.Open(driverName, dsn)
return sql.Open("mysql", dsn)
}
func (mm *Meta) GetDialect(conn *dbi.DbConn) dbi.Dialect {

View File

@@ -97,10 +97,10 @@ func (md *MysqlMetadata) GetColumns(tableNames ...string) ([]dbi.Column, error)
columns := make([]dbi.Column, 0)
for _, re := range res {
column := dbi.Column{
TableName: cast.ToString(re["tableName"]),
ColumnName: cast.ToString(re["columnName"]),
ColumnType: cast.ToString(re["columnType"]),
DataType: cast.ToString(re["dataType"]),
ColumnComment: cast.ToString(re["columnComment"]),
Nullable: cast.ToString(re["nullable"]) == "YES",

View File

@@ -25,11 +25,6 @@ type Meta struct {
}
func (om *Meta) GetSqlDb(ctx context.Context, d *dbi.DbInfo) (*sql.DB, error) {
err := d.IfUseSshTunnelChangeIpPort(ctx)
if err != nil {
return nil, err
}
// 参数参考 https://github.com/sijms/go-ora?tab=readme-ov-file#other-connection-options
urlOptions := make(map[string]string)

View File

@@ -18,12 +18,13 @@ var (
Money = dbi.NewDbDataType("money", dbi.DTString).WithCT(dbi.CTVarchar)
Char = dbi.NewDbDataType("char", dbi.DTString).WithCT(dbi.CTChar)
Nchar = dbi.NewDbDataType("nchar", dbi.DTString).WithCT(dbi.CTVarchar)
Varchar = dbi.NewDbDataType("varchar", dbi.DTString).WithCT(dbi.CTVarchar)
Text = dbi.NewDbDataType("text", dbi.DTString).WithCT(dbi.CTText).WithFixColumn(dbi.ClearCharMaxLength)
Json = dbi.NewDbDataType("json", dbi.DTString).WithCT(dbi.CTJSON).WithFixColumn(dbi.ClearCharMaxLength)
Bytea = dbi.NewDbDataType("bytea", dbi.DTString).WithCT(dbi.CTBinary)
Char = dbi.NewDbDataType("char", dbi.DTStringPreserveSpecial).WithCT(dbi.CTChar)
Nchar = dbi.NewDbDataType("nchar", dbi.DTStringPreserveSpecial).WithCT(dbi.CTVarchar)
Varchar = dbi.NewDbDataType("varchar", dbi.DTStringPreserveSpecial).WithCT(dbi.CTVarchar)
Text = dbi.NewDbDataType("text", dbi.DTStringPreserveSpecial).WithCT(dbi.CTText).WithFixColumn(dbi.ClearCharMaxLength)
Json = dbi.NewDbDataType("json", dbi.DTStringPreserveSpecial).WithCT(dbi.CTJSON).WithFixColumn(dbi.ClearCharMaxLength)
Jsonb = dbi.NewDbDataType("jsonb", dbi.DTStringPreserveSpecial).WithCT(dbi.CTJSON).WithFixColumn(dbi.ClearCharMaxLength)
Bytea = dbi.NewDbDataType("bytea", dbi.DTStringPreserveSpecial).WithCT(dbi.CTBinary)
Date = dbi.NewDbDataType("date", dbi.DTDate).WithCT(dbi.CTDate).WithFixColumn(dbi.ClearCharMaxLength)
Time = dbi.NewDbDataType("time", dbi.DTTime).WithCT(dbi.CTTime).WithFixColumn(dbi.ClearCharMaxLength)

View File

@@ -3,6 +3,7 @@ package postgres
import (
"fmt"
"mayfly-go/internal/db/dbm/dbi"
"mayfly-go/pkg/gox"
"time"
"github.com/spf13/cast"
@@ -26,9 +27,9 @@ func (pd *PgsqlDialect) CopyTable(copy *dbi.DbCopyTable) error {
// 复制数据
if copy.CopyData {
go func() {
gox.Go(func() {
_, _ = pd.dc.Exec(fmt.Sprintf("insert into %s select * from %s", newTableName, tableName))
}()
})
}
// 查询旧表的自增字段名 重新设置新表的序列序列器

View File

@@ -31,7 +31,7 @@ func (dh *DumpHelper) AfterInsert(writer io.Writer, tableName string, columns []
// 设置自增序列当前值
for _, column := range columns {
if column.AutoIncrement {
seq := fmt.Sprintf("SELECT setval('%s_%s_seq', (SELECT max(%s) FROM %s));\n", tableName, column.ColumnName, column.ColumnName, tableName)
seq := fmt.Sprintf("SELECT setval('%s_%s_seq', (SELECT max(%s) FROM \"%s\"));\n", tableName, column.ColumnName, column.ColumnName, tableName)
writer.Write([]byte(seq))
}
}

View File

@@ -3,16 +3,12 @@ package postgres
import (
"context"
"database/sql"
"database/sql/driver"
"fmt"
"mayfly-go/internal/db/dbm/dbi"
"mayfly-go/pkg/utils/collx"
"mayfly-go/pkg/utils/netx"
"net"
"strings"
"time"
pq "gitee.com/liuzongyang/libpq"
_ "gitee.com/liuzongyang/libpq"
)
func init() {
@@ -39,17 +35,6 @@ type Meta struct {
}
func (pm *Meta) GetSqlDb(ctx context.Context, d *dbi.DbInfo) (*sql.DB, error) {
driverName := "postgres"
// SSH Conect
if d.SshTunnelMachineId > 0 {
// 如果使用了隧道,则使用`postgres:ssh:隧道机器id`注册名
driverName = fmt.Sprintf("postgres:ssh:%d", d.SshTunnelMachineId)
if !collx.ArrayContains(sql.Drivers(), driverName) {
sql.Register(driverName, &PqSqlDialer{sshTunnelMachineId: d.SshTunnelMachineId})
}
sql.Drivers()
}
db := d.Database
var dbParam string
existSchema := false
@@ -86,7 +71,7 @@ func (pm *Meta) GetSqlDb(ctx context.Context, d *dbi.DbInfo) (*sql.DB, error) {
dsn = fmt.Sprintf("%s %s", dsn, pm.Param)
}
return sql.Open(driverName, dsn)
return sql.Open("postgres", dsn)
}
func (pm *Meta) GetDialect(conn *dbi.DbConn) dbi.Dialect {
@@ -101,7 +86,7 @@ func (pm *Meta) GetDbDataTypes() []*dbi.DbDataType {
return collx.AsArray(
Bool, Int2, Int4, Int8, Numeric, Decimal, Smallserial, Serial, Bigserial, Largeserial,
Money,
Char, Nchar, Varchar, Text, Json,
Char, Nchar, Varchar, Text, Json, Jsonb,
Date, Time, Timestamp,
Bytea,
)
@@ -110,30 +95,3 @@ func (pm *Meta) GetDbDataTypes() []*dbi.DbDataType {
func (pm *Meta) GetCommonTypeConverter() dbi.CommonTypeConverter {
return &commonTypeConverter{}
}
// pgsql dialer
type PqSqlDialer struct {
sshTunnelMachineId int
}
func (pd *PqSqlDialer) Open(name string) (driver.Conn, error) {
return pq.DialOpen(pd, name)
}
func (pd *PqSqlDialer) Dial(network, address string) (net.Conn, error) {
// todo context.Background可能存在问题
sshTunnel, err := dbi.GetSshTunnel(context.Background(), pd.sshTunnelMachineId)
if err != nil {
return nil, err
}
if sshConn, err := sshTunnel.GetDialConn("tcp", address); err == nil {
// 将ssh conn包装否则会返回错误: ssh: tcpChan: deadline not supported
return &netx.WrapSshConn{Conn: sshConn}, nil
} else {
return nil, err
}
}
func (pd *PqSqlDialer) DialTimeout(network, address string, timeout time.Duration) (net.Conn, error) {
return pd.Dial(network, address)
}

View File

@@ -3,6 +3,7 @@ package sqlite
import (
"fmt"
"mayfly-go/internal/db/dbm/dbi"
"mayfly-go/pkg/gox"
"strings"
"time"
)
@@ -36,10 +37,10 @@ func (sd *SqliteDialect) CopyTable(copy *dbi.DbCopyTable) error {
// 使用异步线程插入数据
if copy.CopyData {
go func() {
gox.Go(func() {
// 执行插入语句
_, _ = sd.dc.Exec(fmt.Sprintf("INSERT INTO \"%s\" SELECT * FROM \"%s\"", newTableName, tableName))
}()
})
}
return err

View File

@@ -6,6 +6,7 @@ import (
type DbTransferTask struct {
model.Model
model.ExtraData
TaskName string `json:"taskName" gorm:"size:255;not null;"` // 任务名称
TaskKey string `json:"taskKey" gorm:"size:100;not null;"` // 定时任务唯一uuid key

View File

@@ -5,12 +5,12 @@ import (
)
func InitIoc() {
ioc.Register(NewInstanceRepo(), ioc.WithComponentName("DbInstanceRepo"))
ioc.Register(newDbRepo(), ioc.WithComponentName("DbRepo"))
ioc.Register(newDbSqlRepo(), ioc.WithComponentName("DbSqlRepo"))
ioc.Register(newDbSqlExecRepo(), ioc.WithComponentName("DbSqlExecRepo"))
ioc.Register(newDataSyncTaskRepo(), ioc.WithComponentName("DbDataSyncTaskRepo"))
ioc.Register(newDataSyncLogRepo(), ioc.WithComponentName("DbDataSyncLogRepo"))
ioc.Register(newDbTransferTaskRepo(), ioc.WithComponentName("DbTransferTaskRepo"))
ioc.Register(newDbTransferFileRepo(), ioc.WithComponentName("DbTransferFileRepo"))
ioc.Register(NewInstanceRepo())
ioc.Register(newDbRepo())
ioc.Register(newDbSqlRepo())
ioc.Register(newDbSqlExecRepo())
ioc.Register(newDataSyncTaskRepo())
ioc.Register(newDataSyncLogRepo())
ioc.Register(newDbTransferTaskRepo())
ioc.Register(newDbTransferFileRepo())
}

View File

@@ -1,22 +1,22 @@
package init
import (
"mayfly-go/initialize"
"mayfly-go/internal/db/ai/tools"
"mayfly-go/internal/db/api"
"mayfly-go/internal/db/application"
"mayfly-go/internal/db/infra/persistence"
"mayfly-go/pkg/starter"
)
func init() {
initialize.AddInitIocFunc(func() {
starter.AddInitIocFunc(func() {
persistence.InitIoc()
application.InitIoc()
api.InitIoc()
})
initialize.AddInitFunc(application.Init)
initialize.AddTerminateFunc(Terminate)
starter.AddInitFunc(application.Init)
starter.AddTerminateFunc(Terminate)
// 注册AI数据库工具
tools.Init()
}

View File

@@ -9,6 +9,7 @@ import (
"mayfly-go/internal/docker/imsg"
"mayfly-go/pkg/biz"
"mayfly-go/pkg/errorx"
"mayfly-go/pkg/gox"
"mayfly-go/pkg/logx"
"mayfly-go/pkg/req"
"mayfly-go/pkg/utils/anyx"
@@ -97,9 +98,9 @@ func (d *Container) GetContainersStats(rc *req.Ctx) {
var mu sync.Mutex
allStats := make([]vo.ContainerStats, 0)
for _, c := range cs {
go func(item container.Summary) {
gox.Go(func() {
defer wg.Done()
if item.State != "running" {
if c.State != "running" {
return
}
@@ -125,7 +126,7 @@ func (d *Container) GetContainersStats(rc *req.Ctx) {
mu.Lock()
allStats = append(allStats, cs)
mu.Unlock()
}(c)
})
}
wg.Wait()
@@ -222,7 +223,7 @@ func (d *Container) ContainerLogs(rc *req.Ctx) {
biz.ErrIsNil(err)
defer logs.Close()
go func() {
gox.Go(func() {
for {
select {
case <-ctx.Done():
@@ -236,7 +237,7 @@ func (d *Container) ContainerLogs(rc *req.Ctx) {
}
}
}
}()
})
buf := make([]byte, 1024)
for {

View File

@@ -5,7 +5,7 @@ import (
)
func InitIoc() {
ioc.Register(new(containerAppImpl), ioc.WithComponentName("ContainerApp"))
ioc.Register(new(containerAppImpl))
}
func GetContainerApp() Container {

View File

@@ -6,6 +6,7 @@ import (
"fmt"
"io"
"mayfly-go/internal/machine/mcm"
"mayfly-go/pkg/gox"
"mayfly-go/pkg/logx"
"mayfly-go/pkg/pool"
@@ -149,7 +150,7 @@ func (c Client) ContainerAttach(containerID string, wsConn *websocket.Conn, rows
wsConn.WriteMessage(websocket.TextMessage, []byte("\033[2J\033[3J\033[1;1H")) // 清屏
// 转发容器输出到前端
go func() {
gox.Go(func() {
buf := make([]byte, 1024)
for {
select {
@@ -169,7 +170,7 @@ func (c Client) ContainerAttach(containerID string, wsConn *websocket.Conn, rows
wsConn.WriteMessage(websocket.TextMessage, buf[:n])
}
}
}()
})
for {
select {

View File

@@ -5,5 +5,5 @@ import (
)
func InitIoc() {
ioc.Register(newContainerRepo(), ioc.WithComponentName("ContainerRepo"))
ioc.Register(newContainerRepo())
}

View File

@@ -1,14 +1,14 @@
package init
import (
"mayfly-go/initialize"
"mayfly-go/internal/docker/api"
"mayfly-go/internal/docker/application"
"mayfly-go/internal/docker/infra/persistence"
"mayfly-go/pkg/starter"
)
func init() {
initialize.AddInitIocFunc(func() {
starter.AddInitIocFunc(func() {
persistence.InitIoc()
application.InitIoc()
api.InitIoc()

View File

@@ -6,7 +6,7 @@ import (
)
func InitIoc() {
ioc.Register(new(instanceAppImpl), ioc.WithComponentName("EsInstanceApp"))
ioc.Register(new(instanceAppImpl))
}
func Init() {

View File

@@ -8,7 +8,6 @@ import (
"mayfly-go/internal/es/domain/repository"
"mayfly-go/internal/es/esm/esi"
"mayfly-go/internal/es/imsg"
"mayfly-go/internal/machine/mcm"
"mayfly-go/internal/pkg/consts"
tagapp "mayfly-go/internal/tag/application"
tagdto "mayfly-go/internal/tag/application/dto"
@@ -41,21 +40,6 @@ var _ Instance = &instanceAppImpl{}
var poolGroup = pool.NewPoolGroup[*esi.EsConn]()
func init() {
mcm.AddCheckSshTunnelMachineUseFunc(func(machineId int) bool {
items := poolGroup.AllPool()
for _, v := range items {
conn, err := v.Get(context.Background(), pool.WithGetNoUpdateLastActive(), pool.WithGetNoNewConn())
if err != nil {
continue // 获取连接失败,跳过
}
if conn.Info.SshTunnelMachineId == machineId {
return true
}
}
return false
})
}
type instanceAppImpl struct {
base.AppImpl[*entity.EsInstance, repository.EsInstance]

View File

@@ -20,10 +20,7 @@ type EsConn struct {
/******************* pool.Conn impl *******************/
func (d *EsConn) Close() error {
// 如果是使用了ssh隧道转发则需要手动将其关闭
if d.Info.useSshTunnel {
mcm.CloseSshTunnelMachine(uint64(d.Info.SshTunnelMachineId), fmt.Sprintf("es:%d", d.Id))
}
mcm.CloseSshTunnel(d.Info)
return nil
}

View File

@@ -35,13 +35,26 @@ type EsInfo struct {
CodePath []string
SshTunnelMachineId int
useSshTunnel bool // 是否使用系统自己实现的ssh隧道连接,而非库自带的
RemoteAddr string `json:"-"` // ssh隧道远程地址格式 ip:port
OriginUrl string // 原始url
baseUrl string // 发起http请求的基本url
authorization string // 发起http请求携带的认证信息
}
var _ (mcm.SshTunnelAble) = (*EsInfo)(nil)
func (di *EsInfo) GetSshTunnelMachineId() int64 {
return int64(di.SshTunnelMachineId)
}
func (di *EsInfo) GetRemoteAddr() string {
if di.RemoteAddr != "" {
return di.RemoteAddr
}
return fmt.Sprintf("%s:%d", di.Host, di.Port)
}
// 获取记录日志的描述
func (di *EsInfo) GetLogDesc() string {
return fmt.Sprintf("ES[id=%d, tag=%s, name=%s, ip=%s:%d]", di.InstanceId, di.CodePath, di.Name, di.Host, di.Port)
@@ -132,17 +145,17 @@ func (di *EsInfo) IfUseSshTunnelChangeIpPort(ctx context.Context) error {
// 开启ssh隧道
if di.SshTunnelMachineId > 0 {
di.RemoteAddr = di.GetRemoteAddr()
stm, err := GetSshTunnel(ctx, di.SshTunnelMachineId)
if err != nil {
return err
}
exposedIp, exposedPort, err := stm.OpenSshTunnel(fmt.Sprintf("es:%d", di.InstanceId), di.Host, di.Port)
exposedIp, exposedPort, err := stm.OpenSshTunnel(di)
if err != nil {
return err
}
di.Host = exposedIp
di.Port = exposedPort
di.useSshTunnel = true
di.baseUrl = fmt.Sprintf("%s://%s:%d", di.Protocol, exposedIp, exposedPort)
} else {
di.baseUrl = fmt.Sprintf("%s://%s:%d", di.Protocol, di.Host, di.Port)

View File

@@ -5,5 +5,5 @@ import (
)
func InitIoc() {
ioc.Register(NewInstanceRepo(), ioc.WithComponentName("EsInstanceRepo"))
ioc.Register(NewInstanceRepo())
}

View File

@@ -1,18 +1,18 @@
package init
import (
"mayfly-go/initialize"
"mayfly-go/internal/es/api"
"mayfly-go/internal/es/application"
"mayfly-go/internal/es/infra/persistence"
"mayfly-go/pkg/starter"
)
func init() {
initialize.AddInitIocFunc(func() {
starter.AddInitIocFunc(func() {
persistence.InitIoc()
application.InitIoc()
api.InitIoc()
})
initialize.AddInitFunc(application.Init)
starter.AddInitFunc(application.Init)
}

View File

@@ -5,5 +5,5 @@ import (
)
func InitIoc() {
ioc.Register(new(fileAppImpl), ioc.WithComponentName("FileApp"))
ioc.Register(new(fileAppImpl))
}

View File

@@ -24,33 +24,42 @@ type File interface {
// Upload 上传文件
//
// @param fileKey 文件key若存在则使用存在的文件key否则生成新的文件key。
// 参数:
// - fileKey: 文件key若不为空则使用该文件key否则生成新的文件key
// - filename: 文件名,带文件后缀
// - r: 文件内容读取流
//
// @param filename 文件名,带文件后缀
// 返回值:
// - fileKey: 文件key
// - error: 错误信息
//
// @return fileKey 文件key
// 注意此方法会在defer中自动调用saveFunc无论成功或失败都会正确处理文件保存或清理工作
Upload(ctx context.Context, fileKey string, filename string, r io.Reader) (string, error)
// NewWriter 创建文件writer
//
// @param canEmptyFileKey 文件key若不为空则使用该文件key否则生成新的文件key。
// 参数:
// - canEmptyFileKey: 文件key若不为空则使用该文件key否则生成新的文件key
// - filename: 文件名,带文件后缀
//
// @param filename 文件名,带文件后缀
//
// @return fileKey 文件key
//
// @return writer 文件writer
//
// @return saveFunc(*error) 保存文件信息的回调函数 (必须要defer中调用才会入库保存该文件信息),若*error不为nil则表示业务逻辑处理失败不需要保存文件信息并将创建的文件删除
NewWriter(ctx context.Context, canEmptyFileKey string, filename string) (fileKey string, writer *writerx.CountingWriteCloser, saveFunc func(*error) error, err error)
// 返回值:
// - fileKey: 文件key
// - writer: 文件writer实现了计数功能的io.Write
// - closeFunc: 关闭回调。用于保存文件信息关闭writer等操作
// 必须在defer中调用才会入库保存该文件信息
// 若传入的错误参数不为nil则不会保存文件信息并会删除已创建的文件
// - err: 错误信息
NewWriter(ctx context.Context, canEmptyFileKey string, filename string) (fileKey string, writer io.Writer, closeFunc func(*error) error, err error)
// GetReader 获取文件reader
// GetReader 获取文件读取器
//
// @return filename 文件名
// 参数:
// - fileKey: 文件唯一标识key
//
// @return reader 文件reader
//
// @return err 错误
// 返回值:
// - filename: 文件名(带后缀)
// - reader: 文件读取流,调用方需负责关闭
// - err: 错误信息
GetReader(ctx context.Context, fileKey string) (string, io.ReadCloser, error)
// Remove 删除文件
@@ -63,11 +72,11 @@ type fileAppImpl struct {
func (f *fileAppImpl) Upload(ctx context.Context, fileKey string, filename string, r io.Reader) (string, error) {
var err error
fileKey, writer, saveFileFunc, err := f.NewWriter(ctx, fileKey, filename)
fileKey, writer, closeFunc, err := f.NewWriter(ctx, fileKey, filename)
if err != nil {
return fileKey, err
}
defer saveFileFunc(&err)
defer closeFunc(&err)
if _, err = io.Copy(writer, r); err != nil {
return fileKey, err
@@ -75,7 +84,7 @@ func (f *fileAppImpl) Upload(ctx context.Context, fileKey string, filename strin
return fileKey, nil
}
func (f *fileAppImpl) NewWriter(ctx context.Context, canEmptyFileKey string, filename string) (fileKey string, writer *writerx.CountingWriteCloser, saveFunc func(*error) error, err error) {
func (f *fileAppImpl) NewWriter(ctx context.Context, canEmptyFileKey string, filename string) (fileKey string, writer io.Writer, closeFunc func(*error) error, err error) {
isNewFile := true
file := &entity.File{}
@@ -95,18 +104,20 @@ func (f *fileAppImpl) NewWriter(ctx context.Context, canEmptyFileKey string, fil
f.remove(ctx, file)
}
// 生新的文件名
// 生新的文件名
newFilename := canEmptyFileKey + filepath.Ext(filename)
filepath, w, err := f.newWriter(newFilename)
fp, w, err := f.newWriter(newFilename)
if err != nil {
return "", nil, nil, err
}
file.Path = filepath
file.Path = fp
fileKey = canEmptyFileKey
writer = writerx.NewCountingWriteCloser(w)
countWriter := writerx.NewCountingWriteCloser(w)
// 创建回调函数
saveFunc = func(e *error) error {
closeFunc = func(e *error) error {
countWriter.Close()
if e != nil {
err := *e
if err != nil {
@@ -116,14 +127,14 @@ func (f *fileAppImpl) NewWriter(ctx context.Context, canEmptyFileKey string, fil
return err
}
}
// 获取已写入的字节数
file.Size = writer.BytesWritten()
writer.Close()
file.Size = countWriter.BytesWritten()
// 保存文件信息
return f.Save(ctx, file)
}
return fileKey, writer, saveFunc, nil
return fileKey, countWriter, closeFunc, nil
}
func (f *fileAppImpl) GetReader(ctx context.Context, fileKey string) (string, io.ReadCloser, error) {
@@ -163,6 +174,7 @@ func (f *fileAppImpl) newWriter(filename string) (string, io.WriteCloser, error)
if err != nil {
return "", nil, err
}
return filePath, out, nil
}

View File

@@ -5,5 +5,5 @@ import (
)
func InitIoc() {
ioc.Register(newFileRepo(), ioc.WithComponentName("FileRepo"))
ioc.Register(newFileRepo())
}

View File

@@ -1,14 +1,14 @@
package init
import (
"mayfly-go/initialize"
"mayfly-go/internal/file/api"
"mayfly-go/internal/file/application"
"mayfly-go/internal/file/infra/persistence"
"mayfly-go/pkg/starter"
)
func init() {
initialize.AddInitIocFunc(func() {
starter.AddInitIocFunc(func() {
persistence.InitIoc()
application.InitIoc()
api.InitIoc()

View File

@@ -5,12 +5,12 @@ import (
)
func InitIoc() {
ioc.Register(new(procdefAppImpl), ioc.WithComponentName("ProcdefApp"))
ioc.Register(new(procinstAppImpl), ioc.WithComponentName("ProcinstApp"))
ioc.Register(new(executionAppImpl), ioc.WithComponentName("ExecutionApp"))
ioc.Register(new(procdefAppImpl))
ioc.Register(new(procinstAppImpl))
ioc.Register(new(executionAppImpl))
ioc.Register(new(procinstTaskAppImpl), ioc.WithComponentName("ProcinstTaskApp"))
ioc.Register(new(hisProcinstOpAppImpl), ioc.WithComponentName("HisProcinstOpApp"))
ioc.Register(new(procinstTaskAppImpl))
ioc.Register(new(hisProcinstOpAppImpl))
}
func Init() {

View File

@@ -7,6 +7,7 @@ import (
"mayfly-go/pkg/base"
"mayfly-go/pkg/errorx"
"mayfly-go/pkg/eventbus"
"mayfly-go/pkg/gox"
"mayfly-go/pkg/logx"
"mayfly-go/pkg/model"
"mayfly-go/pkg/utils/collx"
@@ -183,11 +184,11 @@ func (e *executionAppImpl) executeNode(ctx *ExecutionCtx) error {
// 执行节点逻辑
if node.IsAsync() {
go func() {
gox.Go(func() {
if err := node.Execute(ctx); err != nil {
logx.Errorf("async execute node error: %v, procinst_id: %d, node_key: %s", err, ctx.Procinst.Id, flowNode.Key)
}
}()
})
return nil
}

View File

@@ -6,12 +6,12 @@ import (
)
func InitIoc() {
ioc.Register(newProcdefRepo(), ioc.WithComponentName("ProcdefRepo"))
ioc.Register(newProcinstRepo(), ioc.WithComponentName("ProcinstRepo"))
ioc.Register(newExectionRepo(), ioc.WithComponentName("ExectionRepo"))
ioc.Register(newProcinstTaskRepo(), ioc.WithComponentName("ProcinstTaskRepo"))
ioc.Register(newProcinstTaskCandidateRepo(), ioc.WithComponentName("ProcinstTaskCandidateRepo"))
ioc.Register(newHisProcinstOpRepo(), ioc.WithComponentName("HisProcinstTaskRepo"))
ioc.Register(newProcdefRepo())
ioc.Register(newProcinstRepo())
ioc.Register(newExectionRepo())
ioc.Register(newProcinstTaskRepo())
ioc.Register(newProcinstTaskCandidateRepo())
ioc.Register(newHisProcinstOpRepo())
}
func GetProcinstTaskCandidateRepo() repository.ProcinstTaskCandidate {

View File

@@ -1,18 +1,18 @@
package init
import (
"mayfly-go/initialize"
"mayfly-go/internal/flow/api"
"mayfly-go/internal/flow/application"
"mayfly-go/internal/flow/infra/persistence"
"mayfly-go/pkg/starter"
)
func init() {
initialize.AddInitIocFunc(func() {
starter.AddInitIocFunc(func() {
persistence.InitIoc()
application.InitIoc()
api.InitIoc()
})
initialize.AddInitFunc(application.Init)
starter.AddInitFunc(application.Init)
}

View File

@@ -16,6 +16,7 @@ import (
tagentity "mayfly-go/internal/tag/domain/entity"
"mayfly-go/pkg/biz"
"mayfly-go/pkg/global"
"mayfly-go/pkg/gox"
"mayfly-go/pkg/logx"
"mayfly-go/pkg/model"
"mayfly-go/pkg/req"
@@ -378,7 +379,9 @@ func (m *Machine) WsGuacamole(rc *req.Ctx) {
defer tunnel.ReleaseWriter()
defer tunnel.ReleaseReader()
go guac.WsToGuacd(wsConn, tunnel, writer)
gox.Go(func() {
guac.WsToGuacd(wsConn, tunnel, writer)
})
guac.GuacdToWs(wsConn, tunnel, reader)
//OnConnect

View File

@@ -6,12 +6,12 @@ import (
)
func InitIoc() {
ioc.Register(new(machineAppImpl), ioc.WithComponentName("MachineApp"))
ioc.Register(new(machineFileAppImpl), ioc.WithComponentName("MachineFileApp"))
ioc.Register(new(machineScriptAppImpl), ioc.WithComponentName("MachineScriptApp"))
ioc.Register(new(machineCronJobAppImpl), ioc.WithComponentName("MachineCronJobApp"))
ioc.Register(new(machineTermOpAppImpl), ioc.WithComponentName("MachineTermOpApp"))
ioc.Register(new(machineCmdConfAppImpl), ioc.WithComponentName("MachineCmdConfApp"))
ioc.Register(new(machineAppImpl))
ioc.Register(new(machineFileAppImpl))
ioc.Register(new(machineScriptAppImpl))
ioc.Register(new(machineCronJobAppImpl))
ioc.Register(new(machineTermOpAppImpl))
ioc.Register(new(machineCmdConfAppImpl))
}
func Init() {

View File

@@ -14,6 +14,7 @@ import (
tagentity "mayfly-go/internal/tag/domain/entity"
"mayfly-go/pkg/base"
"mayfly-go/pkg/errorx"
"mayfly-go/pkg/gox"
"mayfly-go/pkg/logx"
"mayfly-go/pkg/model"
"mayfly-go/pkg/scheduler"
@@ -255,14 +256,11 @@ func (m *machineAppImpl) GetSshTunnelMachine(ctx context.Context, machineId int)
func (m *machineAppImpl) TimerUpdateStats() {
logx.Debug("start collecting and caching machine state information periodically...")
scheduler.AddFun("@every 2m", func() {
defer gox.Recover()
machineIds, _ := m.ListByCond(model.NewModelCond(&entity.Machine{Status: entity.MachineStatusEnable, Protocol: entity.MachineProtocolSsh}).Columns("id"))
for _, ma := range machineIds {
go func(mid uint64) {
defer func() {
if err := recover(); err != nil {
logx.ErrorTrace(fmt.Sprintf("failed to get machine [id=%d] status information on time", mid), err.(error))
}
}()
gox.Go(func() {
mid := ma.Id
logx.Debugf("time to get machine [id=%d] status information start", mid)
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
@@ -273,7 +271,9 @@ func (m *machineAppImpl) TimerUpdateStats() {
}
cache.SaveMachineStats(mid, cli.GetAllStats())
logx.Debugf("time to get the machine [id=%d] status information end", mid)
}(ma.Id)
}, func(err error) {
logx.ErrorTrace(fmt.Sprintf("failed to get machine [id=%d] status information on time", ma.Id), err)
})
}
})
}

View File

@@ -9,6 +9,7 @@ import (
tagentity "mayfly-go/internal/tag/domain/entity"
"mayfly-go/pkg/base"
"mayfly-go/pkg/errorx"
"mayfly-go/pkg/gox"
"mayfly-go/pkg/logx"
"mayfly-go/pkg/model"
"mayfly-go/pkg/rediscli"
@@ -135,7 +136,9 @@ func (m *machineCronJobAppImpl) RunCronJob(key string) {
})), "id")
for _, machine := range machines {
go m.runCronJob0(machine.Id, cronJob)
gox.Go(func() {
m.runCronJob0(machine.Id, cronJob)
})
}
}
@@ -149,6 +152,7 @@ func (m *machineCronJobAppImpl) addCronJob(mcj *entity.MachineCronJob) {
}
if err := scheduler.AddFunByKey(key, mcj.Cron, func() {
defer gox.Recover()
m.RunCronJob(key)
}); err != nil {
logx.ErrorTrace("add machine cron job failed", err)

View File

@@ -88,10 +88,6 @@ type machineFileAppImpl struct {
var _ MachineFile = (*machineFileAppImpl)(nil)
// 注入MachineFileRepo
func (m *machineFileAppImpl) InjectMachineFileRepo(repo repository.MachineFile) {
m.Repo = repo
}
// 分页获取机器文件配置信息列表
func (m *machineFileAppImpl) GetPageList(condition *entity.MachineFile, pageParam model.PageParam, orderBy ...string) (*model.PageResult[*entity.MachineFile], error) {

View File

@@ -12,6 +12,7 @@ import (
"mayfly-go/pkg/base"
"mayfly-go/pkg/contextx"
"mayfly-go/pkg/errorx"
"mayfly-go/pkg/gox"
"mayfly-go/pkg/logx"
"mayfly-go/pkg/model"
"mayfly-go/pkg/scheduler"
@@ -61,11 +62,11 @@ func (m *machineTermOpAppImpl) TermConn(ctx context.Context, cli *mcm.Cli, wsCon
termOpRecord.MachineId = cli.Info.Id
termOpRecord.Username = cli.Info.Username
fileKey, wc, saveFileFunc, err := m.fileApp.NewWriter(ctx, "", fmt.Sprintf("mto_%d_%s.cast", termOpRecord.MachineId, timex.TimeNo()))
fileKey, wc, closeFunc, err := m.fileApp.NewWriter(ctx, "", fmt.Sprintf("mto_%d_%s.cast", termOpRecord.MachineId, timex.TimeNo()))
if err != nil {
return errorx.NewBizf("failed to create a terminal playback log file: %v", err)
}
defer saveFileFunc(&err)
defer closeFunc(&err)
termOpRecord.FileKey = fileKey
recorder = mcm.NewRecorder(wc)
@@ -117,6 +118,7 @@ func (m *machineTermOpAppImpl) GetPageList(condition *entity.MachineTermOp, page
func (m *machineTermOpAppImpl) TimerDeleteTermOp() {
logx.Debug("start deleting machine terminal playback records every hour...")
scheduler.AddFun("@every 60m", func() {
defer gox.Recover()
startDate := time.Now().AddDate(0, 0, -config.GetMachine().TermOpSaveDays)
cond := &entity.MachineTermOpQuery{
StartCreateTime: &startDate,

View File

@@ -1,6 +1,7 @@
package guac
import (
"mayfly-go/pkg/gox"
"mayfly-go/pkg/logx"
"sync"
"time"
@@ -72,7 +73,9 @@ func NewTunnelMap() *TunnelMap {
tunnelMap: make(map[string]*LastAccessedTunnel),
tunnelTimeout: TunnelTimeout,
}
go tunnelMap.tunnelTimeoutTask()
gox.Go(func() {
tunnelMap.tunnelTimeoutTask()
})
return tunnelMap
}
@@ -116,7 +119,6 @@ func (m *TunnelMap) tunnelTimeoutTaskRun() {
}
}
m.Unlock()
return
}
// Get returns the Tunnel having the given UUID, wrapped within a LastAccessedTunnel.

View File

@@ -5,11 +5,11 @@ import (
)
func InitIoc() {
ioc.Register(newMachineRepo(), ioc.WithComponentName("MachineRepo"))
ioc.Register(newMachineFileRepo(), ioc.WithComponentName("MachineFileRepo"))
ioc.Register(newMachineScriptRepo(), ioc.WithComponentName("MachineScriptRepo"))
ioc.Register(newMachineCronJobRepo(), ioc.WithComponentName("MachineCronJobRepo"))
ioc.Register(newMachineCronJobExecRepo(), ioc.WithComponentName("MachineCronJobExecRepo"))
ioc.Register(newMachineTermOpRepoImpl(), ioc.WithComponentName("MachineTermOpRepo"))
ioc.Register(newMachineCmdConfRepo(), ioc.WithComponentName("MachineCmdConfRepo"))
ioc.Register(newMachineRepo())
ioc.Register(newMachineFileRepo())
ioc.Register(newMachineScriptRepo())
ioc.Register(newMachineCronJobRepo())
ioc.Register(newMachineCronJobExecRepo())
ioc.Register(newMachineTermOpRepoImpl())
ioc.Register(newMachineCmdConfRepo())
}

View File

@@ -1,18 +1,18 @@
package init
import (
"mayfly-go/initialize"
"mayfly-go/internal/machine/api"
"mayfly-go/internal/machine/application"
"mayfly-go/internal/machine/infra/persistence"
"mayfly-go/pkg/starter"
)
func init() {
initialize.AddInitIocFunc(func() {
starter.AddInitIocFunc(func() {
persistence.InitIoc()
application.InitIoc()
api.InitIoc()
})
initialize.AddInitFunc(application.Init)
starter.AddInitFunc(application.Init)
}

View File

@@ -21,7 +21,11 @@ type Cli struct {
/******************* pool.Conn impl *******************/
func (c *Cli) Ping() error {
_, _, err := c.sshClient.SendRequest("ping", true, nil)
_, _, err := c.sshClient.SendRequest(
"keepalive@openssh.com",
true,
nil,
)
return err
}
@@ -38,17 +42,7 @@ func (c *Cli) Close() error {
c.sftpClient = nil
}
var sshTunnelMachineId uint64
if m.SshTunnelMachine != nil {
sshTunnelMachineId = m.SshTunnelMachine.Id
}
if m.TempSshMachineId != 0 {
sshTunnelMachineId = m.TempSshMachineId
}
if sshTunnelMachineId != 0 {
logx.Debugf("close machine ssh tunnel -> machineId=%d, sshTunnelMachineId=%d", m.Id, sshTunnelMachineId)
CloseSshTunnelMachine(sshTunnelMachineId, m.GetTunnelId())
}
CloseSshTunnel(m)
return nil
}
@@ -79,8 +73,7 @@ func (c *Cli) GetSession() (*ssh.Session, error) {
}
session, err := c.sshClient.NewSession()
if err != nil {
logx.Errorf("failed to retrieve the machine client session: %s", err.Error())
return nil, errorx.NewBiz("the acquisition session failed, please try again later...")
return nil, errorx.NewBizf("the acquisition session failed: %s, please try again later...", err.Error())
}
return session, nil
}

View File

@@ -10,27 +10,6 @@ var (
poolGroup = pool.NewPoolGroup[*Cli]()
)
func init() {
AddCheckSshTunnelMachineUseFunc(func(machineId int) bool {
// 遍历所有redis连接实例若存在redis实例使用该ssh隧道机器则返回true表示还在使用中...
items := poolGroup.AllPool()
for _, v := range items {
if v.Stats().TotalConns == 0 {
continue // 连接池中没有连接,跳过
}
cli, err := v.Get(context.Background())
if err != nil {
continue // 获取连接失败,跳过
}
sshTunnelMachine := cli.Info.SshTunnelMachine
if sshTunnelMachine != nil && sshTunnelMachine.Id == uint64(machineId) {
return true
}
}
return false
})
}
// 从缓存中获取客户端信息,不存在则回调获取机器信息函数,并新建。
// @param 机器的授权凭证名
func GetMachineCli(ctx context.Context, authCertName string, getMachine func(string) (*MachineInfo, error)) (*Cli, error) {

View File

@@ -37,16 +37,26 @@ type MachineInfo struct {
SshTunnelMachine *MachineInfo `json:"-"` // ssh隧道机器
TempSshMachineId uint64 `json:"-"` // ssh隧道机器id用于记录隧道机器id连接出错后关闭隧道
RemoteAddr string `json:"-"` // ssh隧道远程地址格式 ip:port
EnableRecorder int8 `json:"-"` // 是否启用终端回放记录
CodePath []string `json:"codePath"`
}
func (mi *MachineInfo) UseSshTunnel() bool {
return mi.SshTunnelMachine != nil
var _ (SshTunnelAble) = (*MachineInfo)(nil)
func (mi *MachineInfo) GetSshTunnelMachineId() int64 {
return int64(mi.TempSshMachineId)
}
func (mi *MachineInfo) GetTunnelId() string {
return fmt.Sprintf("machine:%d", mi.Id)
func (mi *MachineInfo) GetRemoteAddr() string {
if mi.RemoteAddr != "" {
return mi.RemoteAddr
}
return fmt.Sprintf("%s:%d", mi.Ip, mi.Port)
}
func (mi *MachineInfo) UseSshTunnel() bool {
return mi.SshTunnelMachine != nil
}
// 连接
@@ -63,7 +73,7 @@ func (mi *MachineInfo) Conn(ctx context.Context) (*Cli, error) {
sshClient, err := GetSshClient(mi, nil)
if err != nil {
if mi.UseSshTunnel() {
CloseSshTunnelMachine(mi.TempSshMachineId, mi.GetTunnelId())
CloseSshTunnel(mi)
}
return nil, err
}
@@ -77,6 +87,7 @@ func (mi *MachineInfo) IfUseSshTunnelChangeIpPort(ctx context.Context, out bool)
return nil
}
mi.RemoteAddr = mi.GetRemoteAddr()
originId := mi.Id
if originId == 0 {
// 随机设置一个id如果使用了隧道则用于临时保存隧道
@@ -90,7 +101,7 @@ func (mi *MachineInfo) IfUseSshTunnelChangeIpPort(ctx context.Context, out bool)
if err != nil {
return err
}
exposeIp, exposePort, err := sshTunnelMachine.OpenSshTunnel(mi.GetTunnelId(), mi.Ip, mi.Port)
exposeIp, exposePort, err := sshTunnelMachine.OpenSshTunnel(mi)
if err != nil {
return err
}

View File

@@ -5,36 +5,87 @@ import (
"errors"
"fmt"
"io"
"mayfly-go/pkg/gox"
"mayfly-go/pkg/logx"
"mayfly-go/pkg/pool"
"mayfly-go/pkg/utils/collx"
"mayfly-go/pkg/utils/netx"
"net"
"sync"
"sync/atomic"
"time"
"golang.org/x/crypto/ssh"
)
// type SshTunnelAble interface {
// GetSshTunnelMachineId() int
// }
type SshTunnelAble interface {
// 获取ssh隧道机器id
GetSshTunnelMachineId() int64
// 获取ssh隧道的远程地址
GetRemoteAddr() string
}
var (
// 所有检测ssh隧道机器是否被使用的函数
checkSshTunnelMachineHasUseFuncs []CheckSshTunnelMachineHasUseFunc
tunnelPoolGroup = pool.NewPoolGroup[*SshTunnelMachine]()
)
// 检查ssh隧道机器是否有被使用
type CheckSshTunnelMachineHasUseFunc func(int) bool
// GetSshTunnelMachine 获取ssh隧道机器方便统一管理充当ssh隧道的机器避免创建多个ssh client
func GetSshTunnelMachine(ctx context.Context, machineId int, getMachine func(uint64) (*MachineInfo, error)) (*SshTunnelMachine, error) {
pool, err := tunnelPoolGroup.GetCachePool(fmt.Sprintf("machine-tunnel-%d", machineId), func() (*SshTunnelMachine, error) {
mi, err := getMachine(uint64(machineId))
if err != nil {
return nil, err
}
if mi == nil {
return nil, errors.New("error get machine info")
}
sshClient, err := GetSshClient(mi, nil)
if err != nil {
return nil, err
}
stm := &SshTunnelMachine{SshClient: sshClient, machineId: machineId, tunnels: map[string]*Tunnel{}, mi: mi}
logx.Infof("ssh tunnel machine - connect to machine for the first time - [%d][%s:%d]", machineId, mi.Ip, mi.Port)
// 添加ssh隧道机器检测是否使用函数
func AddCheckSshTunnelMachineUseFunc(checkFunc CheckSshTunnelMachineHasUseFunc) {
if checkSshTunnelMachineHasUseFuncs == nil {
checkSshTunnelMachineHasUseFuncs = make([]CheckSshTunnelMachineHasUseFunc, 0)
return stm, err
}, pool.WithIdleTimeout[*SshTunnelMachine](0), pool.WithHealthCheckInterval[*SshTunnelMachine](1*time.Minute))
if err != nil {
return nil, err
}
// 从连接池中获取一个可用的连接
return pool.Get(ctx)
}
// CloseSshTunnel 关闭ssh隧道
func CloseSshTunnel(sshTunnelAble SshTunnelAble) {
machineId := sshTunnelAble.GetSshTunnelMachineId()
remoteAddr := sshTunnelAble.GetRemoteAddr()
if machineId <= 0 || remoteAddr == "" {
return
}
sshTunnelMachinePool, ok := tunnelPoolGroup.Get(fmt.Sprintf("machine-tunnel-%d", machineId))
if !ok {
return
}
sshTunnelMachine, err := sshTunnelMachinePool.Get(context.Background())
if err != nil {
return
}
sshTunnelMachine.mutex.Lock()
defer sshTunnelMachine.mutex.Unlock()
tunnelId := buildTunnelKey(int(machineId), remoteAddr)
t := sshTunnelMachine.tunnels[tunnelId]
if t != nil {
t.Release()
if t.Closed.Load() {
logx.Infof("ssh tunnel machine - delete tunnel: %s", tunnelId)
delete(sshTunnelMachine.tunnels, tunnelId)
}
}
checkSshTunnelMachineHasUseFuncs = append(checkSshTunnelMachineHasUseFuncs, checkFunc)
}
// ssh隧道机器
@@ -49,10 +100,15 @@ type SshTunnelMachine struct {
/******************* pool.Conn impl *******************/
func (stm *SshTunnelMachine) Ping() error {
_, _, err := stm.SshClient.Conn.SendRequest("ping", true, nil)
return err
_, _, err := stm.SshClient.SendRequest(
"keepalive@openssh.com",
true,
nil,
)
return err
}
// Close 关闭ssh隧道机器及其所有隧道
func (stm *SshTunnelMachine) Close() error {
stm.mutex.Lock()
defer stm.mutex.Unlock()
@@ -75,163 +131,180 @@ func (stm *SshTunnelMachine) Close() error {
return nil
}
func (stm *SshTunnelMachine) OpenSshTunnel(id string, ip string, port int) (exposedIp string, exposedPort int, err error) {
// OpenSshTunnel 打开ssh隧道返回暴露的ip和端口
func (stm *SshTunnelMachine) OpenSshTunnel(sshTunnelAble SshTunnelAble) (exposedIp string, exposedPort int, err error) {
stm.mutex.Lock()
defer stm.mutex.Unlock()
tunnel := stm.tunnels[id]
// 已存在该id隧道则直接返回
remoteAddr := sshTunnelAble.GetRemoteAddr()
tunnelKey := buildTunnelKey(stm.machineId, remoteAddr)
tunnel := stm.tunnels[tunnelKey]
// 已存在该隧道,则直接返回
if tunnel != nil {
// FIXME 后期改成池化连接定时60秒检查连接可用性
return tunnel.localHost, tunnel.localPort, nil
tunnel.refCount.Add(1)
logx.Debugf("ssh tunnel [%s] exist, refCount: %v, localConns: %d, localAddr: %s:%d", tunnelKey, tunnel.refCount.Load(), tunnel.localConns.Len(), tunnel.LocalHost, tunnel.LocalPort)
return tunnel.LocalHost, tunnel.LocalPort, nil
}
localPort, err := netx.GetAvailablePort()
tunnel, err = NewTunnel(tunnelKey, stm.SshClient, remoteAddr)
if err != nil {
return "", 0, err
}
stm.tunnels[tunnelKey] = tunnel
return tunnel.LocalHost, tunnel.LocalPort, nil
}
// GetDialConn 获取通过ssh隧道连接远程地址的连接
func (stm *SshTunnelMachine) GetDialConn(network string, addr string) (net.Conn, error) {
return stm.SshClient.Dial(network, addr)
}
type Tunnel struct {
Id string // 唯一标识
LocalHost string // 本地监听地址
LocalPort int // 本地端口
RemoteAddr string // 远程连接地址
refCount atomic.Int64 // 引用计数
Closed atomic.Bool // 是否已关闭
localListener net.Listener
localConns collx.SM[net.Conn, any] // net.Conn -> struct{}
}
// 创建一个隧道
func NewTunnel(id string, sshClient *ssh.Client, remoteAddr string) (*Tunnel, error) {
localPort, err := netx.GetAvailablePort()
if err != nil {
return nil, err
}
localHost := "127.0.0.1"
localAddr := fmt.Sprintf("%s:%d", localHost, localPort)
listener, err := net.Listen("tcp", localAddr)
if err != nil {
return "", 0, err
}
tunnel = &Tunnel{
id: id,
machineId: stm.machineId,
localHost: localHost,
localPort: localPort,
remoteHost: ip,
remotePort: port,
listener: listener,
}
go tunnel.Open(stm.SshClient)
stm.tunnels[tunnel.id] = tunnel
return localHost, localPort, nil
}
func (stm *SshTunnelMachine) GetDialConn(network string, addr string) (net.Conn, error) {
stm.mutex.Lock()
defer stm.mutex.Unlock()
return stm.SshClient.Dial(network, addr)
}
// 获取ssh隧道机器方便统一管理充当ssh隧道的机器避免创建多个ssh client
func GetSshTunnelMachine(ctx context.Context, machineId int, getMachine func(uint64) (*MachineInfo, error)) (*SshTunnelMachine, error) {
pool, err := tunnelPoolGroup.GetCachePool(fmt.Sprintf("machine-tunnel-%d", machineId), func() (*SshTunnelMachine, error) {
mi, err := getMachine(uint64(machineId))
if err != nil {
return nil, err
}
if mi == nil {
return nil, errors.New("error get machine info")
}
sshClient, err := GetSshClient(mi, nil)
if err != nil {
return nil, err
}
stm := &SshTunnelMachine{SshClient: sshClient, machineId: machineId, tunnels: map[string]*Tunnel{}, mi: mi}
logx.Infof("connect to the ssh tunnel machine for the first time[%d][%s:%d]", machineId, mi.Ip, mi.Port)
return stm, err
}, pool.WithIdleTimeout[*SshTunnelMachine](50*time.Minute), pool.WithOnConnClose(func(conn *SshTunnelMachine) error {
mid := int(conn.mi.Id)
logx.Debugf("periodically check if the ssh tunnel machine [%d] is still in use...", mid)
for _, checkUseFunc := range checkSshTunnelMachineHasUseFuncs {
// 如果一个在使用则返回不关闭,不继续后续检查
if checkUseFunc(mid) {
return fmt.Errorf("ssh tunnel machine [%s] is still in use", conn.mi.Name)
}
}
return nil
}))
localListener, err := net.Listen("tcp", localAddr)
if err != nil {
return nil, err
}
// 从连接池中获取一个可用的连接
return pool.Get(ctx)
tunnel := &Tunnel{
Id: id,
LocalHost: localHost,
LocalPort: localPort,
RemoteAddr: remoteAddr,
localListener: localListener,
}
tunnel.refCount.Store(1)
gox.Go(func() {
tunnel.Start(sshClient)
})
gox.Go(tunnel.startJanitor)
logx.Infof("ssh tunnel [%s] new -> localAddr: %s", tunnel.Id, localAddr)
return tunnel, nil
}
// 关闭ssh隧道机器的指定隧道
func CloseSshTunnelMachine(machineId uint64, tunnelId string) {
sshTunnelMachinePool, ok := tunnelPoolGroup.Get(fmt.Sprintf("machine-tunnel-%d", machineId))
if !ok {
return
}
sshTunnelMachine, err := sshTunnelMachinePool.Get(context.Background())
if err != nil {
return
}
t := sshTunnelMachine.tunnels[tunnelId]
if t != nil {
t.Close()
delete(sshTunnelMachine.tunnels, tunnelId)
}
}
type Tunnel struct {
id string // 唯一标识
machineId int // 隧道机器id
localHost string // 本地监听地址
localPort int // 本地端口
remoteHost string // 远程连接地址
remotePort int // 远程端口
listener net.Listener
localConnections []net.Conn
remoteConnections []net.Conn
}
func (r *Tunnel) Open(sshClient *ssh.Client) {
localAddr := fmt.Sprintf("%s:%d", r.localHost, r.localPort)
// Start 启动隧道
func (t *Tunnel) Start(sshClient *ssh.Client) {
localAddr := fmt.Sprintf("%s:%d", t.LocalHost, t.LocalPort)
for {
logx.Debugf("隧道 %v 等待客户端访问 %v", r.id, localAddr)
localConn, err := r.listener.Accept()
localConn, err := t.localListener.Accept()
if err != nil {
logx.Debugf("隧道 %v 接受连接失败 %v, 退出循环", r.id, err.Error())
logx.Debug("-------------------------------------------------")
if t.Closed.Load() {
return
}
logx.Errorf("ssh tunnel [%s] - localListner accept error: %v", t.Id, err)
continue
}
t.localConns.Store(localConn, struct{}{})
logx.Debugf("ssh tunnel [%s] - add local conn %v", t.Id, localConn.RemoteAddr().String())
gox.Go(func() {
defer func() {
localConn.Close()
t.localConns.Delete(localConn)
logx.Debugf("ssh tunnel [%s] - localConn close, localConns: %d", t.Id, t.localConns.Len())
}()
logx.Debugf("ssh tunnel [%s] - waiting for client access %v", t.Id, localAddr)
logx.Debugf("ssh tunnel [%s] - connecting to remote address %v ...", t.Id, t.RemoteAddr)
remote, err := sshClient.Dial("tcp", t.RemoteAddr)
if err != nil {
return
}
defer remote.Close()
// 使用 channel 同步双向 copy
done := make(chan struct{}, 2)
// 本地 -> 远程
go func() {
io.Copy(remote, localConn)
done <- struct{}{}
}()
// 远程 -> 本地
go func() {
io.Copy(localConn, remote)
done <- struct{}{}
}()
// 等待任意一端结束
<-done
})
}
}
// Release 释放隧道引用计数
func (t *Tunnel) Release() {
t.refCount.Add(-1)
logx.Debugf("ssh tunnel [%s] release, refCount: %v, localConns: %d", t.Id, t.refCount.Load(), t.localConns.Len())
if t.shouldClose() {
t.Close()
}
}
// Close 关闭隧道
func (t *Tunnel) Close() {
if t.Closed.Swap(true) {
return
}
logx.Infof("ssh tunnel [%s] - closed", t.Id)
_ = t.localListener.Close()
t.localConns.Range(func(conn net.Conn, _ any) bool {
conn.Close()
return true
})
}
// startJanitor 定时检查隧道是否需要关闭
func (t *Tunnel) startJanitor() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for range ticker.C {
if t.Closed.Load() {
return
}
r.localConnections = append(r.localConnections, localConn)
logx.Debugf("隧道 %v 新增本地连接 %v", r.id, localConn.RemoteAddr().String())
remoteAddr := fmt.Sprintf("%s:%d", r.remoteHost, r.remotePort)
logx.Debugf("隧道 %v 连接远程地址 %v ...", r.id, remoteAddr)
remoteConn, err := sshClient.Dial("tcp", remoteAddr)
if err != nil {
logx.Debugf("隧道 %v 连接远程地址 %v, 退出循环", r.id, err.Error())
logx.Debug("-------------------------------------------------")
if t.shouldClose() {
t.Close()
return
}
r.remoteConnections = append(r.remoteConnections, remoteConn)
logx.Debugf("隧道 %v 连接远程主机成功", r.id)
go r.copyConn(localConn, remoteConn)
go r.copyConn(remoteConn, localConn)
logx.Debugf("隧道 %v 开始转发数据 [%v]->[%v]", r.id, localAddr, remoteAddr)
logx.Debug("~~~~~~~~~~~~~~~~~~~~分割线~~~~~~~~~~~~~~~~~~~~~~~~")
}
}
func (r *Tunnel) Close() {
for i := range r.localConnections {
_ = r.localConnections[i].Close()
// shouldClose 检查是否需要关闭
func (t *Tunnel) shouldClose() bool {
if t.refCount.Load() > 0 {
return false
}
r.localConnections = nil
for i := range r.remoteConnections {
_ = r.remoteConnections[i].Close()
}
r.remoteConnections = nil
_ = r.listener.Close()
logx.Debugf("隧道 %s 监听器关闭", r.id)
return true
}
func (r *Tunnel) copyConn(writer, reader net.Conn) {
_, _ = io.Copy(writer, reader)
func buildTunnelKey(machineId int, remoteAddr string) string {
return fmt.Sprintf("%d/%s", machineId, remoteAddr)
}

View File

@@ -6,6 +6,7 @@ import (
"fmt"
"io"
"mayfly-go/pkg/errorx"
"mayfly-go/pkg/gox"
"mayfly-go/pkg/logx"
"github.com/spf13/cast"
@@ -95,8 +96,12 @@ func NewTerminalSession(param *CreateTerminalSessionParam) (*TerminalSession, er
}
func (r TerminalSession) Start() {
go r.readFromTerminal()
go r.writeToWebsocket()
gox.Go(func() {
r.readFromTerminal()
})
gox.Go(func() {
r.writeToWebsocket()
})
r.receiveWsMsg()
}

Some files were not shown because too many files have changed in this diff Show More