!79 feat: 支持自定义数据定时同步

* fix: 达梦数据权限问题
* feat: 支持自定义数据定时同步
This commit is contained in:
zongyangleo
2024-01-05 05:31:32 +00:00
committed by Coder慌
parent 7a7a7020b4
commit 85910bf440
40 changed files with 2059 additions and 68 deletions

View File

@@ -29,7 +29,9 @@ const isFixedHeader = computed(() => {
watch(
() => route.path,
() => {
proxy.$refs.layoutScrollbarRef.wrapRef.scrollTop = 0;
try {
proxy.$refs.layoutScrollbarRef.wrapRef.scrollTop = 0;
} catch (e) {}
}
);
</script>

View File

@@ -596,7 +596,7 @@ const openDrawer = () => {
themeConfig.value.isDrawer = true;
nextTick(() => {
// 初始化复制功能,防止点击两次才可以复制
onCopyConfigClick(copyConfigBtnRef.value.$el);
onCopyConfigClick(copyConfigBtnRef.value?.$el);
});
};
// 触发 store 布局配置更新

View File

@@ -282,7 +282,9 @@ const onContextmenu = (v: any, e: any) => {
const onTagsClick = (v: any, k: number) => {
state.routePath = decodeURI(v.path);
state.tagsRefsIndex = k;
router.push(v);
try {
router.push(v);
} catch (e) {}
};
// 更新滚动条显示
const updateScrollbar = () => {

View File

@@ -0,0 +1,437 @@
<template>
<div class="sync-task-edit">
<el-dialog
:title="title"
v-model="dialogVisible"
:before-close="cancel"
:close-on-click-modal="false"
:close-on-press-escape="false"
:destroy-on-close="true"
width="700px"
>
<el-form :model="form" ref="dbForm" :rules="rules" label-width="auto">
<el-tabs v-model="tabActiveName">
<el-tab-pane label="基本信息" name="basic">
<el-form-item prop="taskName" label="任务名" required>
<el-input v-model.trim="form.taskName" placeholder="请输入数据库别名" auto-complete="off" />
</el-form-item>
<el-form-item prop="taskCron" label="cron" required>
<el-input v-model="form.taskCron" placeholder="只支持5位表达式,不支持秒级.如 0/2 * * * * 表示每两分钟执行" auto-complete="off" />
</el-form-item>
<el-form-item prop="pageSize" label="分页大小" required>
<el-input-number v-model.trim="form.pageSize" placeholder="同步数据时查询的每页数据大小" auto-complete="off" size="small" />
</el-form-item>
<el-form-item prop="updField" label="更新字段" required>
<el-input v-model.trim="form.updField" placeholder="查询数据源的时候会带上这个字段当前最大值" auto-complete="off" />
</el-form-item>
<el-form-item prop="updFieldVal" label="更新值">
<el-input v-model.trim="form.updFieldVal" placeholder="更新字段当前最大值" auto-complete="off" />
</el-form-item>
<el-form-item prop="status" label="状态" required>
<el-switch v-model="form.status" inline-prompt active-text="启用" inactive-text="禁用" :active-value="1" :inactive-value="-1" />
</el-form-item>
</el-tab-pane>
<el-tab-pane label="源数据库配置" name="srcDb">
<el-form-item prop="srcDbId" label="数据源" required>
<db-select-tree
v-model:db-id="form.srcDbId"
v-model:db-name="form.srcDbName"
v-model:tag-path="form.srcTagPath"
@select-db="onSelectSrcDb"
/>
</el-form-item>
<el-form-item prop="dataSql" label="数据sql" required>
<monaco-editor height="300px" class="task-sql" language="sql" v-model="form.dataSql" />
</el-form-item>
</el-tab-pane>
<el-tab-pane label="目标数据库配置" name="targetDb">
<el-form-item prop="targetDbId" label="数据源" required>
<db-select-tree
v-model:db-id="form.targetDbId"
v-model:db-name="form.targetDbName"
v-model:tag-path="form.targetTagPath"
@select-db="onSelectTargetDb"
/>
</el-form-item>
<el-form-item prop="targetTableName" label="目标表" required>
<el-select v-model="form.targetTableName" filterable placeholder="请选择目标数据库表">
<el-option
v-for="item in state.targetTableList"
:key="item.tableName"
:label="item.tableName + (item.tableComment && '-' + item.tableComment)"
:value="item.tableName"
/>
</el-select>
</el-form-item>
</el-tab-pane>
<el-tab-pane label="字段映射" name="field">
<el-form-item prop="fieldMap" label="字段映射" required>
<el-table :data="form.fieldMap" style="width: 100%" :max-height="400" size="small">
<el-table-column prop="src" label="源字段" />
<el-table-column prop="target" label="目标字段">
<template #default="scope">
<el-select v-model="scope.row.target">
<el-option
v-for="item in state.targetColumnList"
:key="item.columnName"
:label="item.columnName + ` ${item.columnType}` + (item.columnComment && ' - ' + item.columnComment)"
:value="item.columnName"
/>
</el-select>
</template>
</el-table-column>
</el-table>
</el-form-item>
</el-tab-pane>
<el-tab-pane label="sql预览" name="sqlPreview">
<el-form-item prop="fieldMap" label="查询sql">
<el-input type="textarea" v-model="state.previewDataSql" readonly :input-style="{ height: '190px' }" />
</el-form-item>
<el-form-item prop="fieldMap" label="插入sql">
<el-input type="textarea" v-model="state.previewInsertSql" readonly :input-style="{ height: '190px' }" />
</el-form-item>
</el-tab-pane>
</el-tabs>
</el-form>
<template #footer>
<div class="dialog-footer">
<el-button @click="cancel()"> </el-button>
<el-button type="primary" :loading="saveBtnLoading" @click="btnOk"> </el-button>
</div>
</template>
</el-dialog>
</div>
</template>
<script lang="ts" setup>
import { reactive, ref, toRefs, watch } from 'vue';
import { dbApi } from './api';
import { ElMessage } from 'element-plus';
import DbSelectTree from '@/views/ops/db/component/DbSelectTree.vue';
import MonacoEditor from '@/components/monaco/MonacoEditor.vue';
import { DbInst, registerDbCompletionItemProvider } from '@/views/ops/db/db';
import { getDbDialect } from '@/views/ops/db/dialect';
const props = defineProps({
visible: {
type: Boolean,
},
data: {
type: [Boolean, Object],
},
title: {
type: String,
},
});
//定义事件
const emit = defineEmits(['update:visible', 'cancel', 'val-change']);
const rules = {
taskName: [
{
required: true,
message: '请输入任务名',
trigger: ['change', 'blur'],
},
],
taskCron: [
{
required: true,
message: '请输入任务cron表达式',
trigger: ['change', 'blur'],
},
],
};
const dbForm: any = ref(null);
type FormData = {
id?: number;
taskName?: string;
taskCron?: string;
srcDbId?: number;
srcDbName?: string;
srcTagPath?: string;
targetDbId?: number;
targetDbName?: string;
targetTagPath?: string;
targetTableName?: string;
dataSql?: string;
pageSize?: number;
updField?: string;
updFieldVal?: string;
idRule?: 1 | 2;
pkField?: string;
fieldMap?: { src: string; target: string }[];
status?: 1 | 2;
};
const basicFormData = {
srcDbId: -1,
targetDbId: -1,
dataSql: 'select * from',
pageSize: 100,
updField: 'id',
updFieldVal: '0',
idRule: 2,
fieldMap: [{ src: 'a', target: 'b' }],
status: 1,
} as FormData;
const state = reactive({
dialogVisible: false,
tabActiveName: 'basic',
form: basicFormData,
submitForm: {} as any,
srcTableFields: [] as string[],
targetTableList: [] as { tableName: string; tableComment: string }[],
targetColumnList: [] as any[],
srcDbInst: {} as DbInst,
targetDbInst: {} as DbInst,
previewRes: {} as any,
previewDataSql: '',
previewInsertSql: '',
});
const onSelectSrcDb = async (params: any) => {
// 初始化数据源
params.databases = params.dbs; // 数据源里需要这个值
state.srcDbInst = DbInst.getOrNewInst(params);
registerDbCompletionItemProvider(params.id, params.db, params.dbs, params.type);
};
const onSelectTargetDb = async (params: any) => {
state.targetDbInst = DbInst.getOrNewInst(params);
await loadDbTables(params.id, params.db);
};
const loadDbTables = async (dbId: number, db: string) => {
// 加载db下的表
let data = await dbApi.tableInfos.request({ id: dbId, db });
state.targetTableList = data;
if (data && data.length > 0) {
let names = data.map((a: any) => a.tableName);
if (!names.includes(state.form.targetTableName)) {
state.form.targetTableName = data[0].tableName;
}
}
};
const { dialogVisible, tabActiveName, form, submitForm } = toRefs(state);
const { isFetching: saveBtnLoading, execute: saveExec } = dbApi.saveDatasyncTask.useApi(submitForm);
watch(props, async (newValue: any) => {
state.dialogVisible = newValue.visible;
if (!state.dialogVisible) {
return;
}
state.tabActiveName = 'basic';
if (newValue.data?.id) {
let data = await dbApi.getDatasyncTask.request({ taskId: newValue.data?.id });
state.form = data;
try {
state.form.fieldMap = JSON.parse(data.fieldMap);
} catch (e) {
state.form.fieldMap = [];
}
let { srcDbId, srcTagPath, srcDbName, targetTagPath, targetDbId } = state.form;
// 初始化src数据源
if (srcTagPath && srcDbId) {
// 通过tagPath查询实例列表
const dbInfoRes = await dbApi.dbs.request({ tagPath: srcTagPath });
dbInfoRes.list.forEach((a: any) => {
if (a.id === srcDbId) {
// 初始化实例
a.databases = a.database?.split(' ').sort() || [];
state.srcDbInst = DbInst.getOrNewInst(a);
}
});
}
// 初始化target数据源
if (targetTagPath && targetDbId) {
// 通过tagPath查询实例列表
const dbInfoRes = await dbApi.dbs.request({ tagPath: targetTagPath });
dbInfoRes.list.forEach((a: any) => {
if (a.id === targetDbId) {
// 初始化实例
a.databases = a.database?.split(' ').sort() || [];
state.targetDbInst = DbInst.getOrNewInst(a);
}
});
}
// 注册sql代码提示
if (srcDbId && srcDbName) {
registerDbCompletionItemProvider(srcDbId, srcDbName, state.srcDbInst.databases, state.srcDbInst.type);
}
} else {
state.form = basicFormData;
}
});
watch(tabActiveName, async (newValue: string) => {
switch (newValue) {
case 'field':
await handleGetSrcFields();
await handleGetTargetFields();
break;
case 'targetDb':
await handleGetSrcFields();
await handleGetTargetFields();
if (state.form.targetDbId && state.form.targetDbName) {
await loadDbTables(state.form.targetDbId, state.form.targetDbName);
}
break;
case 'sqlPreview':
let srcDbDialect = getDbDialect(state.srcDbInst.type);
let targetDbDialect = getDbDialect(state.targetDbInst.type);
let updField = srcDbDialect.wrapName(state.form.updField!);
state.previewDataSql = `SELECT * FROM (\n ${state.form.dataSql?.trim() || '请输入数据sql'} \n ) t \n where ${updField} > '${
state.form.updFieldVal || ''
}' \n ${srcDbDialect.getPageSql(1, state.form.pageSize || 100)}`;
// 检查字段映射中是否存在重复的目标字段
let fields = new Set();
state.form.fieldMap?.map((a) => {
if (a.target) {
fields.add(a.target);
}
});
if (fields.size < (state.form.fieldMap?.length || 0)) {
ElMessage.warning('字段映射中存在重复的目标字段,请检查');
return;
}
let fieldArr = state.form.fieldMap?.map((a: any) => targetDbDialect.wrapName(a.target)) || [];
let placeholder = '?'.repeat(fieldArr.length).split('').join(',');
state.previewInsertSql = ` insert into ${targetDbDialect.wrapName(state.form.targetTableName!)}(${fieldArr.join(',')}) values (${placeholder});`;
break;
default:
break;
}
});
const handleGetSrcFields = async () => {
// 执行sql获取字段信息
if (!state.form.dataSql || !state.form.dataSql.trim()) {
ElMessage.warning('请输入数据源sql');
return;
}
// 判断sql是否是查询语句
if (!/^select/i.test(state.form.dataSql!)) {
let msg = 'sql语句错误请输入查询语句';
ElMessage.warning(msg);
return;
}
// 判断是否有多条sql
if (/;/i.test(state.form.dataSql!)) {
let msg = 'sql语句错误请输入单条查询语句';
ElMessage.warning(msg);
return;
}
// 执行sql
const res = await dbApi.sqlExec.request({
id: state.form.srcDbId,
db: state.form.srcDbName,
sql: state.form.dataSql.trim() + ' limit 1',
});
if (!res.columns) {
ElMessage.warning('没有查询到字段请检查sql');
return;
}
let filedMap = {};
if (state.form.fieldMap && state.form.fieldMap.length > 0) {
state.form.fieldMap.forEach((a: any) => {
filedMap[a.src] = a.target;
});
}
// 如果主键字段名为空或不存在于字段列表中,则取第一个字段
if (!state.form.pkField || !res.columns.find((a: any) => a.src === state.form.pkField)) {
state.form.pkField = res.columns[0].name;
}
state.srcTableFields = res.columns.map((a: any) => a.name);
// 如果主键规则是参照数据源字段映射不显示数据源的id
if (state.form.idRule === 2) {
res.columns = res.columns.filter((a: any) => a.name !== state.form.pkField);
}
state.form.fieldMap = res.columns.map((a: any) => ({ src: a.name, target: filedMap[a.name] || '' }));
state.previewRes = res;
};
const handleGetTargetFields = async () => {
// 查询目标表下的字段信息
if (state.form.targetDbName && state.form.targetTableName) {
let columns = await state.targetDbInst.loadColumns(state.form.targetDbName, state.form.targetTableName);
if (columns && Array.isArray(columns)) {
state.targetColumnList = columns;
// 过滤目标字段,不存在的字段值设置为空
let names = columns.map((a) => a.columnName);
state.form.fieldMap?.forEach((a) => {
if (a.target && !names.includes(a.target)) {
a.target = '';
}
// 优先设置字段名和src一样的值
if (names.includes(a.src)) {
a.target = a.src;
}
});
}
}
};
const getReqForm = async () => {
return { ...state.form };
};
const btnOk = async () => {
dbForm.value.validate(async (valid: boolean) => {
if (!valid) {
ElMessage.error('请正确填写信息');
return false;
}
// 正则表达式检测corn表达式正确性
// 处理一些数字类型
state.submitForm = await getReqForm();
state.submitForm.fieldMap = JSON.stringify(state.form.fieldMap);
await saveExec();
ElMessage.success('保存成功');
emit('val-change', state.form);
cancel();
});
};
const cancel = () => {
emit('update:visible', false);
emit('cancel');
};
</script>
<style lang="scss">
.sync-task-edit {
.el-select {
width: 400px;
}
.task-sql {
width: 100%;
}
}
</style>

View File

@@ -0,0 +1,195 @@
<template>
<div class="db-list">
<page-table
ref="pageTableRef"
:page-api="dbApi.datasyncTasks"
:searchItems="searchItems"
v-model:query-form="query"
:show-selection="true"
v-model:selection-data="state.selectionData"
:columns="columns"
>
<template #tableHeader>
<el-button v-auth="perms.save" type="primary" icon="plus" @click="edit(false)">添加</el-button>
<el-button v-auth="perms.del" :disabled="selectionData.length < 1" @click="del()" type="danger" icon="delete">删除</el-button>
</template>
<template #recentState="{ data }">
<el-tag v-if="data.recentState == 1" class="ml-2" type="success">成功</el-tag>
<el-tag v-else-if="data.recentState == 2" class="ml-2" type="success">失败</el-tag>
<el-tag v-else-if="data.recentState == 0" class="ml-2" type="">未执行</el-tag>
</template>
<template #status="{ data }">
<span v-if="actionBtns[perms.status]">
<el-switch
v-model="data.status"
@click="updStatus(data.id, data.status)"
inline-prompt
active-text="启用"
inactive-text="禁用"
:active-value="1"
:inactive-value="-1"
/>
</span>
<span v-else>
<el-tag v-if="data.status == 1" class="ml-2" type="success">启用</el-tag>
<el-tag v-else class="ml-2" type="danger">禁用</el-tag>
</span>
</template>
<template #action="{ data }">
<!-- 删除启停用编辑 -->
<el-button v-if="actionBtns[perms.save]" @click="edit(data)" type="primary" link>编辑</el-button>
<el-button v-if="data.status === 1 && data.runningState !== 1" @click="run(data.id)" type="success" link>执行</el-button>
<el-button v-if="data.runningState === 1" @click="stop(data.id)" type="danger" link>停止</el-button>
<el-button v-if="actionBtns[perms.log]" type="primary" link @click="log(data)">日志</el-button>
</template>
</page-table>
<data-sync-task-edit @val-change="search" :title="editDialog.title" v-model:visible="editDialog.visible" v-model:data="editDialog.data" />
<data-sync-task-log v-model:visible="logsDialog.visible" v-model:taskId="logsDialog.taskId" :running="state.logsDialog.running" />
</div>
</template>
<script lang="ts" setup>
import { defineAsyncComponent, onMounted, reactive, ref, Ref, toRefs } from 'vue';
import { ElMessage, ElMessageBox } from 'element-plus';
import { dbApi } from './api';
import PageTable from '@/components/pagetable/PageTable.vue';
import { TableColumn } from '@/components/pagetable';
import { hasPerms } from '@/components/auth/auth';
import { SearchItem } from '@/components/SearchForm';
const DataSyncTaskEdit = defineAsyncComponent(() => import('./SyncTaskEdit.vue'));
const DataSyncTaskLog = defineAsyncComponent(() => import('./SyncTaskLog.vue'));
const perms = {
save: 'db:sync:save',
del: 'db:sync:del',
status: 'db:sync:status',
log: 'db:sync:log',
};
const searchItems = [SearchItem.input('name', '名称')];
// 任务名、修改人、修改时间、最近一次任务执行状态、状态(停用启用)、操作
const columns = ref([
TableColumn.new('taskName', '任务名'),
TableColumn.new('recentState', '最近任务状态').alignCenter().isSlot(),
TableColumn.new('status', '状态').alignCenter().isSlot(),
TableColumn.new('modifier', '修改人').alignCenter(),
TableColumn.new('updateTime', '修改时间').alignCenter().isTime(),
]);
// 该用户拥有的的操作列按钮权限
const actionBtns = hasPerms([perms.save, perms.del, perms.status, perms.log]);
const actionWidth = ((actionBtns[perms.save] ? 1 : 0) + (actionBtns[perms.log] ? 1 : 0)) * 55 + 55;
const actionColumn = TableColumn.new('action', '操作').isSlot().setMinWidth(actionWidth).fixedRight().alignCenter();
const pageTableRef: Ref<any> = ref(null);
const state = reactive({
row: {},
dbId: 0,
db: '',
/**
* 选中的数据
*/
selectionData: [],
/**
* 查询条件
*/
query: {
name: null,
pageNum: 1,
pageSize: 0,
},
editDialog: {
visible: false,
data: null as any,
title: '新增数据同步任务',
},
logsDialog: {
taskId: 0,
visible: false,
data: null as any,
running: false,
},
});
const { selectionData, query, editDialog, logsDialog } = toRefs(state);
onMounted(async () => {
if (Object.keys(actionBtns).length > 0) {
columns.value.push(actionColumn);
}
});
const search = () => {
pageTableRef.value.search();
};
const edit = async (data: any) => {
if (!data) {
state.editDialog.data = null;
state.editDialog.title = '新增数据同步任务';
} else {
state.editDialog.data = data;
state.editDialog.title = '修改数据同步任务';
}
state.editDialog.visible = true;
};
const run = async (id: any) => {
await ElMessageBox.confirm(`确定执行?`, '提示', {
confirmButtonText: '确定',
cancelButtonText: '取消',
type: 'warning',
});
await dbApi.runDatasyncTask.request({ taskId: id });
ElMessage.success(`执行成功`);
setTimeout(search, 1000);
};
const stop = async (id: any) => {
await ElMessageBox.confirm(`确定停止?`, '提示', {
confirmButtonText: '确定',
cancelButtonText: '取消',
type: 'warning',
});
await dbApi.stopDatasyncTask.request({ taskId: id });
ElMessage.success(`停止成功`);
search();
};
const log = async (data: any) => {
state.logsDialog.taskId = data.id;
state.logsDialog.visible = true;
state.logsDialog.running = data.state === 1;
};
const updStatus = async (id: any, status: 1 | -1) => {
try {
await dbApi.updateDatasyncTaskStatus.request({ taskId: id, status });
ElMessage.success(`${status === 1 ? '启用' : '禁用'}成功`);
search();
} catch (err) {
//
}
};
const del = async () => {
try {
await ElMessageBox.confirm(`确定删除数据同步任务【${state.selectionData.map((x: any) => x.taskName).join(', ')}】?`, '提示', {
confirmButtonText: '确定',
cancelButtonText: '取消',
type: 'warning',
});
await dbApi.deleteDatasyncTask.request({ taskId: state.selectionData.map((x: any) => x.id).join(',') });
ElMessage.success('删除成功');
search();
} catch (err) {
//
}
};
</script>
<style lang="scss"></style>

View File

@@ -0,0 +1,123 @@
<template>
<div class="sync-task-logs">
<el-dialog v-model="dialogVisible" :before-close="cancel" :destroy-on-close="false" width="1120px">
<template #header>
<span class="mr10">任务执行日志</span>
<el-switch v-model="realTime" @change="watchPolling" inline-prompt active-text="实时" inactive-text="非实时" />
<span v-if="realTime" v-loading="true"></span>
</template>
<page-table ref="logTableRef" :page-api="dbApi.datasyncLogs" v-model:query-form="query" :tool-button="false" :columns="columns" size="small">
<template #status="{ data }">
<el-tag v-if="data.status == 1" class="ml-2" type="success">成功</el-tag>
<el-tag v-else-if="data.status == -1" class="ml-2" type="danger">失败</el-tag>
</template>
</page-table>
</el-dialog>
</div>
</template>
<script lang="ts" setup>
import { reactive, Ref, ref, toRefs, watch } from 'vue';
import { dbApi } from '@/views/ops/db/api';
import PageTable from '@/components/pagetable/PageTable.vue';
import { TableColumn } from '@/components/pagetable';
const props = defineProps({
visible: {
type: Boolean,
},
taskId: {
type: Number,
},
running: {
type: Boolean,
default: false,
},
});
const columns = ref([
// 状态:1.成功 -1.失败
TableColumn.new('status', '状态').alignCenter().isSlot(),
TableColumn.new('createTime', '时间').alignCenter().isTime(),
TableColumn.new('errText', '日志'),
TableColumn.new('dataSqlFull', 'SQL').alignCenter(),
TableColumn.new('resNum', '数据条数'),
]);
watch(props, async (newValue: any) => {
state.dialogVisible = newValue.visible;
if (!state.dialogVisible) {
state.polling = false;
watchPolling(false);
return;
}
state.query.taskId = props.taskId!;
state.realTime = props.running;
watchPolling(props.running);
});
const startPolling = () => {
if (!state.polling) {
state.polling = true;
state.pollingIndex = setInterval(search, 1000);
}
};
const stopPolling = () => {
if (state.polling) {
state.polling = false;
clearInterval(state.pollingIndex);
}
};
const watchPolling = (polling: boolean) => {
if (polling) {
startPolling();
} else {
stopPolling();
}
};
watch(
() => props.taskId,
async (newValue: any) => {
state.query.taskId = newValue!;
search();
}
);
const logTableRef: Ref<any> = ref(null);
const search = () => {
try {
logTableRef.value.search();
} catch (e) {
/* empty */
}
};
const emit = defineEmits(['update:visible', 'cancel', 'val-change']);
//定义事件
const cancel = () => {
emit('update:visible', false);
emit('cancel');
watchPolling(false);
};
const state = reactive({
dialogVisible: false,
polling: false,
pollingIndex: 0 as any,
realTime: props.running,
/**
* 查询条件
*/
query: {
taskId: 0,
name: null,
pageNum: 1,
pageSize: 0,
},
});
const { dialogVisible, query, realTime } = toRefs(state);
</script>

View File

@@ -58,4 +58,20 @@ export const dbApi = {
enableDbRestore: Api.newPut('/dbs/{dbId}/restores/{restoreId}/enable'),
disableDbRestore: Api.newPut('/dbs/{dbId}/restores/{restoreId}/disable'),
saveDbRestore: Api.newPut('/dbs/{dbId}/restores/{id}'),
// 数据同步相关
datasyncTasks: Api.newGet('/datasync/tasks'),
saveDatasyncTask: Api.newPost('/datasync/tasks/save').withBeforeHandler((param: any) => {
// sql编码处理
if (param.dataSql) {
param.dataSql = Base64.encode(param.dataSql);
}
return param;
}),
getDatasyncTask: Api.newGet('/datasync/tasks/{taskId}'),
deleteDatasyncTask: Api.newDelete('/datasync/tasks/{taskId}/del'),
updateDatasyncTaskStatus: Api.newPost('/datasync/tasks/{taskId}/status'),
runDatasyncTask: Api.newPost('/datasync/tasks/{taskId}/run'),
stopDatasyncTask: Api.newPost('/datasync/tasks/{taskId}/stop'),
datasyncLogs: Api.newGet('/datasync/tasks/{taskId}/logs'),
};

View File

@@ -0,0 +1,159 @@
<template>
<div class="db-select-tree">
<div style="color: gray">{{ (tagPath || '') + ' - ' + (dbName || '请选择数据源schema') }}</div>
<tag-tree :resource-type="TagResourceTypeEnum.Db.value" :tag-path-node-type="NodeTypeTagPath" ref="tagTreeRef">
<template #prefix="{ data }">
<SvgIcon v-if="data.type.value == SqlExecNodeType.DbInst" :name="getDbDialect(data.params.type).getInfo().icon" :size="18" />
<SvgIcon v-if="data.icon" :name="data.icon.name" :color="data.icon.color" />
</template>
</tag-tree>
</div>
</template>
<script setup lang="ts">
import { TagResourceTypeEnum } from '@/common/commonEnum';
import TagTree from '@/views/ops/component/TagTree.vue';
import { NodeType, TagTreeNode } from '@/views/ops/component/tag';
import { dbApi } from '@/views/ops/db/api';
import { sleep } from '@/common/utils/loading';
import SvgIcon from '@/components/svgIcon/index.vue';
import { DbType, getDbDialect } from '@/views/ops/db/dialect';
defineProps({
dbId: {
type: Number,
},
dbName: {
type: String,
},
tagPath: {
type: String,
},
});
const emits = defineEmits(['update:dbName', 'update:tagPath', 'update:dbId', 'selectDb']);
/**
* 树节点类型
*/
class SqlExecNodeType {
static DbInst = 1;
static Db = 2;
static TableMenu = 3;
static SqlMenu = 4;
static Table = 5;
static Sql = 6;
static PgSchemaMenu = 7;
static PgSchema = 8;
}
const DbIcon = {
name: 'Coin',
color: '#67c23a',
};
// pgsql schema icon
const SchemaIcon = {
name: 'List',
color: '#67c23a',
};
const NodeTypeTagPath = new NodeType(TagTreeNode.TagPath).withLoadNodesFunc(async (parentNode: TagTreeNode) => {
const dbInfoRes = await dbApi.dbs.request({ tagPath: parentNode.key });
const dbInfos = dbInfoRes.list;
if (!dbInfos) {
return [];
}
// 防止过快加载会出现一闪而过,对眼睛不好
await sleep(100);
return dbInfos?.map((x: any) => {
x.tagPath = parentNode.key;
return new TagTreeNode(`${parentNode.key}.${x.id}`, x.name, NodeTypeDbInst).withParams(x);
});
});
/** mysql类型的数据库没有schema层 */
const mysqlType = (type: string) => {
return type === DbType.mysql;
};
// 数据库实例节点类型
const NodeTypeDbInst = new NodeType(SqlExecNodeType.DbInst).withLoadNodesFunc((parentNode: TagTreeNode) => {
const params = parentNode.params;
const dbs = params.database.split(' ')?.sort();
let fn: NodeType;
if (mysqlType(params.type)) {
fn = MysqlNodeTypes;
} else {
fn = PgNodeTypes;
}
return dbs.map((x: any) => {
let tagTreeNode = new TagTreeNode(`${parentNode.key}.${x}`, x, fn)
.withParams({
tagPath: params.tagPath,
id: params.id,
instanceId: params.instanceId,
name: params.name,
type: params.type,
host: `${params.host}:${params.port}`,
dbs: dbs,
db: x,
})
.withIcon(DbIcon);
if (mysqlType(params.type)) {
tagTreeNode.isLeaf = true;
}
return tagTreeNode;
});
});
const nodeClickChangeDb = (nodeData: TagTreeNode) => {
const params = nodeData.params;
// postgres
emits('update:dbName', params.db);
emits('update:dbId', params.id);
emits('update:tagPath', params.tagPath);
emits('selectDb', params);
return true;
};
// 数据库节点
const PgNodeTypes = new NodeType(SqlExecNodeType.Db).withLoadNodesFunc(async (parentNode: TagTreeNode) => {
// pg类数据库会多一层schema
const params = parentNode.params;
const { id, db } = params;
const schemaNames = await dbApi.pgSchemas.request({ id, db });
return schemaNames.map((sn: any) => {
// 将db变更为 db/schema;
const nParams = { ...params };
nParams.schema = sn;
nParams.db = nParams.db + '/' + sn;
nParams.dbs = schemaNames;
let tagTreeNode = new TagTreeNode(`${params.id}.${params.db}.schema.${sn}`, sn, NodeTypePostgresSchema).withParams(nParams).withIcon(SchemaIcon);
tagTreeNode.isLeaf = true;
return tagTreeNode;
});
});
const MysqlNodeTypes = new NodeType(SqlExecNodeType.Db).withNodeClickFunc(nodeClickChangeDb);
// postgres schema模式
const NodeTypePostgresSchema = new NodeType(SqlExecNodeType.PgSchema).withNodeClickFunc(nodeClickChangeDb);
</script>
<style lang="scss">
.db-select-tree {
.tag-tree {
height: auto !important;
overflow-x: hidden;
width: 560px;
.el-tree {
height: 200px;
overflow-y: auto;
overflow-x: hidden;
}
}
}
</style>

View File

@@ -357,7 +357,6 @@ const openEditTable = async (row: any) => {
const onSubmitSql = async (row: { tableName: string }) => {
await openEditTable(row);
state.tableCreateDialog.visible = false;
state.tables = await dbApi.tableInfos.request({ id: props.dbId, db: props.db });
};
</script>

View File

@@ -139,6 +139,7 @@ export class DbInst {
/**
* 获取表的所有列信息
* @param dbName 数据库名
* @param table 表名
*/
async loadColumns(dbName: string, table: string) {
@@ -370,7 +371,7 @@ export class DbInst {
* @returns
*/
static isNumber(columnType: string) {
return columnType.match(/int|double|float|nubmer|decimal|byte|bit/gi);
return columnType.match(/int|double|float|number|decimal|byte|bit/gi);
}
/**
@@ -617,7 +618,7 @@ export function registerDbCompletionItemProvider(dbId: number, db: string, dbs:
description: 'schema',
},
kind: monaco.languages.CompletionItemKind.Folder,
insertText: a,
insertText: dbDialect.wrapName(a),
range,
});
});
@@ -678,7 +679,7 @@ export function registerDbCompletionItemProvider(dbId: number, db: string, dbs:
},
kind: monaco.languages.CompletionItemKind.File,
detail: tableComment,
insertText: tableName + ' ',
insertText: dbDialect.wrapName(tableName) + ' ',
range,
sortText: 300 + index + '',
});

View File

@@ -1,16 +1,17 @@
import { DbInst } from '../db';
import {
DbDialect,
sqlColumnType,
DialectInfo,
RowDefinition,
IndexDefinition,
EditorCompletionItem,
commonCustomKeywords,
EditorCompletion,
DataType,
DbDialect,
DialectInfo,
EditorCompletion,
EditorCompletionItem,
IndexDefinition,
RowDefinition,
sqlColumnType,
} from './index';
import { language as sqlLanguage } from 'monaco-editor/esm/vs/basic-languages/sql/sql.js';
export { DMDialect, DM_TYPE_LIST };
// 参考文档:https://eco.dameng.com/document/dm/zh-cn/sql-dev/dmpl-sql-datatype.html#%E5%AD%97%E7%AC%A6%E6%95%B0%E6%8D%AE%E7%B1%BB%E5%9E%8B
@@ -374,9 +375,11 @@ class DMDialect implements DbDialect {
}
getDefaultSelectSql(table: string, condition: string, orderBy: string, pageNum: number, limit: number) {
return `SELECT * FROM ${this.wrapName(table)} ${condition ? 'WHERE ' + condition : ''} ${orderBy ? orderBy : ''} OFFSET ${
(pageNum - 1) * limit
} LIMIT ${limit};`;
return `SELECT * FROM "${table}" ${condition ? 'WHERE ' + condition : ''} ${orderBy ? orderBy : ''} ${this.getPageSql(pageNum, limit)};`;
}
getPageSql(pageNum: number, limit: number) {
return ` OFFSET ${(pageNum - 1) * limit} LIMIT ${limit};`;
}
getDefaultRows(): RowDefinition[] {
@@ -442,7 +445,7 @@ class DMDialect implements DbDialect {
}
wrapName = (name: string) => {
return name;
return `"${name}"`;
};
matchType(text: string, arr: string[]): boolean {
@@ -497,7 +500,7 @@ class DMDialect implements DbDialect {
// 默认值
let defVal = this.getDefaultValueSql(cl);
let incr = cl.auto_increment ? 'IDENTITY' : '';
return ` ${cl.name} ${cl.type}${length} ${incr} ${cl.notNull ? 'NOT NULL' : ''} ${defVal} `;
return ` "${cl.name}" ${cl.type}${length} ${incr} ${cl.notNull ? 'NOT NULL' : ''} ${defVal} `;
}
getCreateTableSql(data: any): string {
@@ -515,18 +518,18 @@ class DMDialect implements DbDialect {
}
// 列注释
if (item.remark) {
columCommentSql += ` comment on column ${data.tableName}.${item.name} is '${item.remark}'; `;
columCommentSql += ` comment on column "${data.tableName}"."${item.name}" is '${item.remark}'; `;
}
});
// 建表
createSql = `CREATE TABLE ${data.tableName}
createSql = `CREATE TABLE "${data.tableName}"
(
${fields.join(',')}
${pks ? `, PRIMARY KEY (${pks.join(',')})` : ''}
);`;
// 表注释
if (data.tableComment) {
tableCommentSql = ` comment on table ${data.tableName} is '${data.tableComment}'; `;
tableCommentSql = ` comment on table "${data.tableName}" is '${data.tableComment}'; `;
}
return createSql + tableCommentSql + columCommentSql;
@@ -547,7 +550,7 @@ class DMDialect implements DbDialect {
let sql: string[] = [];
if (changeData.add.length > 0) {
changeData.add.forEach((a) => {
sql.push(`ALTER TABLE ${tableName} add COLUMN ${this.genColumnBasicSql(a)}`);
sql.push(`ALTER TABLE "${tableName}" add COLUMN ${this.genColumnBasicSql(a)}`);
if (a.remark) {
sql.push(`comment on COLUMN "${tableName}"."${a.name}" is '${a.remark}'`);
}
@@ -556,7 +559,7 @@ class DMDialect implements DbDialect {
if (changeData.upd.length > 0) {
changeData.upd.forEach((a) => {
sql.push(`ALTER TABLE ${tableName} MODIFY ${this.genColumnBasicSql(a)}`);
sql.push(`ALTER TABLE "${tableName}" MODIFY ${this.genColumnBasicSql(a)}`);
if (a.remark) {
sql.push(`comment on COLUMN "${tableName}"."${a.name}" is '${a.remark}'`);
}
@@ -565,7 +568,7 @@ class DMDialect implements DbDialect {
if (changeData.del.length > 0) {
changeData.del.forEach((a) => {
sql.push(`ALTER TABLE ${tableName} DROP COLUMN ${a.name}`);
sql.push(`ALTER TABLE "${tableName}" DROP COLUMN ${a.name}`);
});
}
return sql.join(';');

View File

@@ -125,6 +125,8 @@ export interface DbDialect {
*/
getDefaultSelectSql(table: string, condition: string, orderBy: string, pageNum: number, limit: number): string;
getPageSql(pageNum: number, limit: number): string;
getDefaultRows(): RowDefinition[];
getDefaultIndex(): IndexDefinition;

View File

@@ -112,9 +112,11 @@ class MysqlDialect implements DbDialect {
}
getDefaultSelectSql(table: string, condition: string, orderBy: string, pageNum: number, limit: number) {
return `SELECT * FROM ${this.wrapName(table)} ${condition ? 'WHERE ' + condition : ''} ${orderBy ? orderBy : ''} LIMIT ${
(pageNum - 1) * limit
}, ${limit};`;
return `SELECT * FROM ${this.wrapName(table)} ${condition ? 'WHERE ' + condition : ''} ${orderBy ? orderBy : ''} ${this.getPageSql(pageNum, limit)};`;
}
getPageSql(pageNum: number, limit: number) {
return ` LIMIT ${(pageNum - 1) * limit}, ${limit}`;
}
getDefaultRows(): RowDefinition[] {

View File

@@ -133,9 +133,11 @@ class PostgresqlDialect implements DbDialect {
}
getDefaultSelectSql(table: string, condition: string, orderBy: string, pageNum: number, limit: number) {
return `SELECT * FROM ${this.wrapName(table)} ${condition ? 'WHERE ' + condition : ''} ${orderBy ? orderBy : ''} OFFSET ${
(pageNum - 1) * limit
} LIMIT ${limit};`;
return `SELECT * FROM ${this.wrapName(table)} ${condition ? 'WHERE ' + condition : ''} ${orderBy ? orderBy : ''} ${this.getPageSql(pageNum, limit)};`;
}
getPageSql(pageNum: number, limit: number) {
return ` OFFSET ${(pageNum - 1) * limit} LIMIT ${limit};`;
}
getDefaultRows(): RowDefinition[] {

View File

@@ -0,0 +1,114 @@
package api
import (
"context"
"encoding/base64"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
"mayfly-go/internal/db/api/form"
"mayfly-go/internal/db/api/vo"
"mayfly-go/internal/db/application"
"mayfly-go/internal/db/domain/entity"
"mayfly-go/pkg/biz"
"mayfly-go/pkg/ginx"
"mayfly-go/pkg/req"
"mayfly-go/pkg/utils/stringx"
"strconv"
"strings"
)
type DataSyncTask struct {
DataSyncTaskApp application.DataSyncTask
DataSyncLogApp application.DataSyncLog
}
func (d *DataSyncTask) Tasks(rc *req.Ctx) {
queryCond, page := ginx.BindQueryAndPage[*entity.DataSyncTaskQuery](rc.GinCtx, new(entity.DataSyncTaskQuery))
res, err := d.DataSyncTaskApp.GetPageList(queryCond, page, new([]vo.DataSyncTaskListVO))
biz.ErrIsNil(err)
rc.ResData = res
}
func (d *DataSyncTask) Logs(rc *req.Ctx) {
queryCond, page := ginx.BindQueryAndPage[*entity.DataSyncLogQuery](rc.GinCtx, new(entity.DataSyncLogQuery))
res, err := d.DataSyncLogApp.GetTaskLogList(queryCond, page, new([]vo.DataSyncLogListVO))
biz.ErrIsNil(err)
rc.ResData = res
}
func (d *DataSyncTask) SaveTask(rc *req.Ctx) {
form := &form.DataSyncTaskForm{}
task := ginx.BindJsonAndCopyTo[*entity.DataSyncTask](rc.GinCtx, form, new(entity.DataSyncTask))
// 解码base64 sql
sqlBytes, err := base64.StdEncoding.DecodeString(task.DataSql)
biz.ErrIsNilAppendErr(err, "sql解码失败: %s")
sql := stringx.TrimSpaceAndBr(string(sqlBytes))
task.DataSql = sql
form.DataSql = sql
key := task.TaskKey
// 判断key为空就生成随机key
if key == "" {
key = uuid.New().String()
task.TaskKey = key
}
rc.ReqParam = form
biz.ErrIsNil(d.DataSyncTaskApp.Save(rc.MetaCtx, task))
}
func (d *DataSyncTask) DeleteTask(rc *req.Ctx) {
taskId := ginx.PathParam(rc.GinCtx, "taskId")
rc.ReqParam = taskId
ids := strings.Split(taskId, ",")
for _, v := range ids {
value, err := strconv.Atoi(v)
biz.ErrIsNilAppendErr(err, "string类型转换为int异常: %s")
id := uint64(value)
_ = d.DataSyncTaskApp.Delete(rc.MetaCtx, id)
_ = d.DataSyncTaskApp.RemoveCronJobById(id)
}
}
func (d *DataSyncTask) ChangeStatus(rc *req.Ctx) {
form := &form.DataSyncTaskStatusForm{}
task := ginx.BindJsonAndCopyTo[*entity.DataSyncTask](rc.GinCtx, form, new(entity.DataSyncTask))
_ = d.DataSyncTaskApp.UpdateById(context.Background(), task)
if task.Status == entity.DataSyncTaskStatusEnable {
_ = d.DataSyncTaskApp.AddCronJobById(task.Id)
} else {
_ = d.DataSyncTaskApp.RemoveCronJobById(task.Id)
}
// 记录请求日志
rc.ReqParam = form
}
func (d *DataSyncTask) Run(rc *req.Ctx) {
taskId := getTaskId(rc.GinCtx)
rc.ReqParam = taskId
d.DataSyncTaskApp.RunCronJob(taskId)
}
func (d *DataSyncTask) Stop(rc *req.Ctx) {
taskId := getTaskId(rc.GinCtx)
rc.ReqParam = taskId
task := new(entity.DataSyncTask)
task.Id = taskId
task.RunningState = entity.DataSyncTaskRunStateStop
_ = d.DataSyncTaskApp.UpdateById(context.Background(), task)
}
func (d *DataSyncTask) GetTask(rc *req.Ctx) {
taskId := getTaskId(rc.GinCtx)
dbEntity, _ := d.DataSyncTaskApp.GetById(new(entity.DataSyncTask), taskId)
rc.ResData = dbEntity
}
func getTaskId(g *gin.Context) uint64 {
instanceId, _ := strconv.Atoi(g.Param("taskId"))
biz.IsTrue(instanceId > 0, "instanceId 错误")
return uint64(instanceId)
}

View File

@@ -0,0 +1,28 @@
package form
type DataSyncTaskForm struct {
Id uint64 `json:"id"`
TaskName string `binding:"required" json:"taskName"`
TaskCron string `binding:"required" json:"taskCron"`
TaskKey string `json:"taskKey"`
Status int `binding:"required" json:"status"`
SrcDbId int64 `binding:"required" json:"srcDbId"`
SrcDbName string `binding:"required" json:"srcDbName"`
SrcTagPath string `binding:"required" json:"srcTagPath"`
DataSql string `binding:"required" json:"dataSql"`
PageSize int `binding:"required" json:"pageSize"`
UpdField string `binding:"required" json:"updField"`
UpdFieldVal string `binding:"required" json:"updFieldVal"`
TargetDbId int64 `binding:"required" json:"targetDbId"`
TargetDbName string `binding:"required" json:"targetDbName"`
TargetTagPath string `binding:"required" json:"targetTagPath"`
TargetTableName string `binding:"required" json:"targetTableName"`
FieldMap string `binding:"required" json:"fieldMap"`
}
type DataSyncTaskStatusForm struct {
Id uint64 `binding:"required" json:"taskId"`
Status int `json:"status"`
}

View File

@@ -0,0 +1,22 @@
package vo
import "time"
type DataSyncTaskListVO struct {
Id *int64 `json:"id"`
TaskName *string `json:"taskName"`
UpdateTime *time.Time `json:"updateTime"`
ModifierId uint64 `json:"modifierId"`
Modifier string `json:"modifier"`
RecentState *int `json:"recentState"`
RunningState *int `json:"runningState"`
Status *int `json:"status"`
}
type DataSyncLogListVO struct {
CreateTime *time.Time `json:"createTime"`
DataSqlFull string `json:"dataSqlFull"`
ResNum string `json:"resNum"`
ErrText string `json:"errText"`
Status *int `json:"status"`
}

View File

@@ -18,6 +18,8 @@ var (
dbRestoreApp *DbRestoreApp
dbRestoreHistoryApp *DbRestoreHistoryApp
dbBinlogApp *DbBinlogApp
dataSyncApp DataSyncTask
dataSyncLogApp DataSyncLog
)
var repositories *repository.Repositories
@@ -39,6 +41,8 @@ func Init() {
dbApp = newDbApp(persistence.GetDbRepo(), persistence.GetDbSqlRepo(), instanceApp, tagapp.GetTagTreeApp())
dbSqlExecApp = newDbSqlExecApp(persistence.GetDbSqlExecRepo())
dbSqlApp = newDbSqlApp(persistence.GetDbSqlRepo())
dataSyncApp = newDataSyncApp(persistence.GetDataSyncTaskRepo())
dataSyncLogApp = newDataSyncLogApp(persistence.GetDataSyncLogRepo())
dbBackupApp, err = newDbBackupApp(repositories, dbApp)
if err != nil {
@@ -60,6 +64,8 @@ func Init() {
if err != nil {
panic(fmt.Sprintf("初始化 dbBinlogApp 失败: %v", err))
}
dataSyncApp.InitCronJob()
})()
}
@@ -98,3 +104,11 @@ func GetDbRestoreHistoryApp() *DbRestoreHistoryApp {
func GetDbBinlogApp() *DbBinlogApp {
return dbBinlogApp
}
func GetDataSyncTaskApp() DataSyncTask {
return dataSyncApp
}
func GetDataSyncLogApp() DataSyncLog {
return dataSyncLogApp
}

View File

@@ -156,7 +156,7 @@ func (d *dbAppImpl) GetDbConn(dbId uint64, dbName string) (*dbm.DbConn, error) {
checkDb := dbName
// 兼容pgsql/dm db/schema模式
if dbm.DbTypePostgres.Equal(instance.Type) || dbm.DM.Equal(instance.Type) {
if dbm.DbTypePostgres.Equal(instance.Type) || dbm.DbTypeDM.Equal(instance.Type) {
ss := strings.Split(dbName, "/")
if len(ss) > 1 {
checkDb = ss[0]

View File

@@ -0,0 +1,359 @@
package application
import (
"context"
"encoding/json"
"fmt"
"mayfly-go/internal/db/dbm"
"mayfly-go/internal/db/domain/entity"
"mayfly-go/internal/db/domain/repository"
"mayfly-go/pkg/base"
"mayfly-go/pkg/gormx"
"mayfly-go/pkg/logx"
"mayfly-go/pkg/model"
"mayfly-go/pkg/scheduler"
"strings"
"time"
)
type DataSyncTask interface {
base.App[*entity.DataSyncTask]
// GetPageList 分页获取数据库实例
GetPageList(condition *entity.DataSyncTaskQuery, pageParam *model.PageParam, toEntity any, orderBy ...string) (*model.PageResult[any], error)
Save(ctx context.Context, instanceEntity *entity.DataSyncTask) error
// Delete 删除数据库信息
Delete(ctx context.Context, id uint64) error
InitCronJob()
AddCronJob(taskEntity *entity.DataSyncTask)
AddCronJobById(id uint64) error
RemoveCronJob(taskEntity *entity.DataSyncTask)
RemoveCronJobById(id uint64) error
RemoveCronJobByKey(taskKey string)
RunCronJob(id uint64)
}
func newDataSyncApp(dataSyncRepo repository.DataSyncTask) DataSyncTask {
app := new(dataSyncAppImpl)
app.Repo = dataSyncRepo
return app
}
type dataSyncAppImpl struct {
base.AppImpl[*entity.DataSyncTask, repository.DataSyncTask]
}
func (app *dataSyncAppImpl) GetPageList(condition *entity.DataSyncTaskQuery, pageParam *model.PageParam, toEntity any, orderBy ...string) (*model.PageResult[any], error) {
return app.GetRepo().GetTaskList(condition, pageParam, toEntity, orderBy...)
}
func (app *dataSyncAppImpl) Save(ctx context.Context, taskEntity *entity.DataSyncTask) error {
app.AddCronJob(taskEntity)
if taskEntity.Id == 0 {
return app.Insert(ctx, taskEntity)
}
return app.UpdateById(ctx, taskEntity)
}
func (app *dataSyncAppImpl) Delete(ctx context.Context, id uint64) error {
return app.DeleteById(ctx, id)
}
func (app *dataSyncAppImpl) AddCronJob(taskEntity *entity.DataSyncTask) {
key := taskEntity.TaskKey
// 先移除旧的任务
scheduler.RemoveByKey(key)
// 根据状态添加新的任务
if taskEntity.Status == entity.DataSyncTaskStatusEnable {
scheduler.AddFunByKey(key, taskEntity.TaskCron, func() {
go app.RunCronJob(taskEntity.Id)
})
}
}
func (app *dataSyncAppImpl) AddCronJobById(id uint64) error {
task, err := app.GetById(new(entity.DataSyncTask), id)
if err != nil {
return err
}
app.AddCronJob(task)
return nil
}
func (app *dataSyncAppImpl) RemoveCronJob(taskEntity *entity.DataSyncTask) {
app.RemoveCronJobByKey(taskEntity.TaskKey)
}
func (app *dataSyncAppImpl) RemoveCronJobById(id uint64) error {
task, err := app.GetById(new(entity.DataSyncTask), id)
if err != nil {
return err
}
app.RemoveCronJob(task)
return nil
}
func (app *dataSyncAppImpl) RemoveCronJobByKey(taskKey string) {
if taskKey != "" {
scheduler.RemoveByKey(taskKey)
}
}
func (app *dataSyncAppImpl) changeRunningState(id uint64, state int8) {
task := new(entity.DataSyncTask)
task.Id = id
task.RunningState = state
_ = app.UpdateById(context.Background(), task)
}
func (app *dataSyncAppImpl) RunCronJob(id uint64) {
// 查询最新的任务信息
task, err := app.GetById(new(entity.DataSyncTask), id)
if task.RunningState == entity.DataSyncTaskRunStateRunning {
logx.Warnf("数据同步任务正在执行中:%s => %s", task.TaskName, task.TaskKey)
return
}
// 开始运行时,修改状态为运行中
app.changeRunningState(id, entity.DataSyncTaskRunStateRunning)
logx.Warnf("开始执行数据同步任务:%s => %s", task.TaskName, task.TaskKey)
// 获取源数据库连接
srcConn, err := GetDbApp().GetDbConn(uint64(task.SrcDbId), task.SrcDbName)
if err != nil {
app.endRunning(task, entity.DataSyncTaskStateFail, "连接源数据库失败", "", 0)
return
}
// 获取目标数据库连接
targetConn, err := GetDbApp().GetDbConn(uint64(task.TargetDbId), task.TargetDbName)
if err != nil {
app.endRunning(task, entity.DataSyncTaskStateFail, "连接目标数据库失败", "", 0)
return
}
// 当前分页
page := 1
// 记录每次分页返回数据条数
resSize := task.PageSize
// 记录本次同步数据总数
total := 0
srcDialect := srcConn.GetDialect()
// 记录更新字段最新值
updFieldVal := task.UpdFieldVal
targetDialect := targetConn.GetDialect()
for {
if resSize < task.PageSize {
break
}
// 通过占位符格式化sql
updSql := ""
orderSql := ""
if task.UpdFieldVal != "0" && task.UpdFieldVal != "" && task.UpdField != "" {
updSql = fmt.Sprintf("and %s > '%s'", task.UpdField, task.UpdFieldVal)
orderSql = "order by " + task.UpdField + " asc "
}
pageSql := srcDialect.PageSql(page, task.PageSize)
// 组装查询sql
sql := fmt.Sprintf("select * from (%s) t where 1 = 1 %s %s %s", task.DataSql, updSql, orderSql, pageSql)
logx.Infof("同步任务:[%s]执行sql[%s]", task.TaskName, sql)
// 源数据库执行sql查询结果
columns, res, err := srcConn.Query(sql)
if err != nil {
app.endRunning(task, entity.DataSyncTaskStateFail, fmt.Sprintf("查询源数据库失败:%s", err.Error()), sql, 0)
return
}
if len(res) == 0 {
app.endRunning(task, entity.DataSyncTaskStateSuccess, fmt.Sprintf("执行成功,新数据:%d 条", total), sql, 0)
return
}
// 每次分页查询成功后,记录一些数据
resSize = len(res)
total += resSize
page++
index := 0
// task.FieldMap为json数组字符串 [{"src":"id","target":"id"}]转为map
var fieldMap []map[string]string
err = json.Unmarshal([]byte(task.FieldMap), &fieldMap)
if err != nil {
app.endRunning(task, entity.DataSyncTaskStateFail, fmt.Sprintf("解析字段映射json出错"), sql, resSize)
return
}
// 遍历columns 取task.UpdField的字段类型
updFieldType := dbm.DataTypeString
for _, column := range columns {
if column.Name == task.UpdField {
updFieldType = srcDialect.GetDataType(column.Type)
break
}
}
var data = make([]map[string]any, 0)
// 遍历res组装插入sql
for _, record := range res {
index++
// 获取查询结果最后一条数据的UpdField字段值
if index == resSize {
updFieldVal = fmt.Sprintf("%v", record[task.UpdField])
updFieldVal = srcDialect.FormatStrData(updFieldVal, updFieldType)
}
var rowData = make(map[string]any)
// 遍历字段映射, target字段的值为src字段取值
for _, item := range fieldMap {
srcField := item["src"]
targetField := item["target"]
// target字段的值为src字段取值
rowData[targetField] = record[srcField]
}
data = append(data, rowData)
}
// 获取目标库字段数组
targetWrapColumns := make([]string, 0)
// 获取源库字段数组
srcColumns := make([]string, 0)
for _, item := range fieldMap {
targetField := item["target"]
srcField := item["target"]
targetWrapColumns = append(targetWrapColumns, targetDialect.WrapName(targetField))
srcColumns = append(srcColumns, srcField)
}
// 从目标库数据中取出源库字段对应的值
values := make([][]any, 0)
for _, record := range data {
rawValue := make([]any, 0)
for _, column := range srcColumns {
rawValue = append(rawValue, record[column])
}
values = append(values, rawValue)
}
// 生成占位符字符串:如:(?,?)
// 重复字符串并用逗号连接
repeated := strings.Repeat("?,", len(targetWrapColumns))
// 去除最后一个逗号,占位符由括号包裹
placeholder := fmt.Sprintf("(%s)", strings.TrimSuffix(repeated, ","))
// 目标数据库执行sql批量插入
err = targetDialect.SaveBatch(targetConn, task.TargetTableName, strings.Join(targetWrapColumns, ","), placeholder, values)
if err != nil {
// 保存执行成功日志
logx.Errorf("保存记录失败:%s", err.Error())
app.endRunning(task, entity.DataSyncTaskStateFail, err.Error(), sql, resSize)
return
}
// 保存运行时日志
logx.Infof("同步任务:[%s],保存记录成功:[%d]条", task.TaskName, total)
app.saveLog(task.Id, entity.DataSyncTaskStateSuccess, fmt.Sprintf("分页执行成功,新数据:%d 条", total), sql, total)
// 运行过程中,判断状态是否为已关闭,是则结束运行,否则继续运行
taskParam, _ := app.GetById(new(entity.DataSyncTask), id)
if taskParam.RunningState == entity.DataSyncTaskRunStateStop {
app.endRunning(task, entity.DataSyncTaskStateFail, "手动停止任务", sql, resSize)
return
}
// 记录一次数据状态
taskParam = new(entity.DataSyncTask)
taskParam.Id = task.Id
taskParam.UpdFieldVal = updFieldVal
taskParam.RecentState = entity.DataSyncTaskStateSuccess
taskParam.RunningState = entity.DataSyncTaskRunStateRunning
_ = app.UpdateById(context.Background(), taskParam)
}
logx.Infof("同步任务:[%s],执行完毕,保存记录成功:[%d]条", task.TaskName, total)
// 记录更新字段最新值
task.UpdFieldVal = updFieldVal
// 保存执行成功日志
app.endRunning(task, entity.DataSyncTaskStateSuccess, fmt.Sprintf("本次任务执行成功,新数据:%d 条", total), "", total)
}
func (app *dataSyncAppImpl) endRunning(taskEntity *entity.DataSyncTask, state int8, msg string, sql string, resNum int) {
logx.Info(msg)
task := new(entity.DataSyncTask)
task.Id = taskEntity.Id
task.RecentState = state
task.UpdFieldVal = taskEntity.UpdFieldVal
task.RunningState = entity.DataSyncTaskRunStateReady
// 运行失败之后设置任务状态为禁用
//if state == entity.DataSyncTaskStateFail {
// taskEntity.Status = entity.DataSyncTaskStatusDisable
// app.RemoveCronJob(taskEntity)
//}
_ = app.UpdateById(context.Background(), task)
// 保存执行日志
app.saveLog(taskEntity.Id, state, msg, sql, resNum)
}
func (app *dataSyncAppImpl) saveLog(taskId uint64, state int8, msg string, sql string, resNum int) {
now := time.Now()
_ = GetDataSyncLogApp().Insert(context.Background(), &entity.DataSyncLog{
TaskId: taskId,
CreateTime: &now,
DataSqlFull: sql,
ResNum: resNum,
ErrText: msg,
Status: state,
})
}
func (app *dataSyncAppImpl) InitCronJob() {
defer func() {
if err := recover(); err != nil {
logx.ErrorTrace("数据同步任务初始化失败: %s", err.(error))
}
}()
// 修改执行状态为待执行
updateMap := map[string]interface{}{
"running_state": entity.DataSyncTaskRunStateReady,
}
taskParam := new(entity.DataSyncTask)
taskParam.RunningState = 1
_ = gormx.Updates(taskParam, taskParam, updateMap)
// 把所有正常任务添加到定时任务中
pageParam := &model.PageParam{
PageSize: 100,
PageNum: 1,
}
cond := new(entity.DataSyncTaskQuery)
cond.Status = entity.DataSyncTaskStatusEnable
jobs := new([]entity.DataSyncTask)
pr, _ := app.GetPageList(cond, pageParam, jobs)
total := pr.Total
add := 0
for {
for _, job := range *jobs {
app.AddCronJob(&job)
add++
}
if add >= int(total) {
return
}
pageParam.PageNum++
_, _ = app.GetPageList(cond, pageParam, jobs)
}
}

View File

@@ -0,0 +1,29 @@
package application
import (
"mayfly-go/internal/db/domain/entity"
"mayfly-go/internal/db/domain/repository"
"mayfly-go/pkg/base"
"mayfly-go/pkg/model"
)
type DataSyncLog interface {
base.App[*entity.DataSyncLog]
// GetTaskLogList 分页获取数据库实例
GetTaskLogList(condition *entity.DataSyncLogQuery, pageParam *model.PageParam, toEntity any, orderBy ...string) (*model.PageResult[any], error)
}
func newDataSyncLogApp(dataSyncRepo repository.DataSyncLog) DataSyncLog {
app := new(dataSyncLogAppImpl)
app.Repo = dataSyncRepo
return app
}
type dataSyncLogAppImpl struct {
base.AppImpl[*entity.DataSyncLog, repository.DataSyncLog]
}
func (app *dataSyncLogAppImpl) GetTaskLogList(condition *entity.DataSyncLogQuery, pageParam *model.PageParam, toEntity any, orderBy ...string) (*model.PageResult[any], error) {
return app.GetRepo().GetTaskLogList(condition, pageParam, toEntity, orderBy...)
}

View File

@@ -0,0 +1,11 @@
package dbm
type DataType string
const (
DataTypeString DataType = "string"
DataTypeNumber DataType = "number"
DataTypeDate DataType = "date"
DataTypeTime DataType = "time"
DataTypeDateTime DataType = "datetime"
)

View File

@@ -69,14 +69,14 @@ func (d *DbConn) WalkTableRecord(ctx context.Context, selectSql string, walk fun
// 执行 update, insert, delete建表等sql
// 返回影响条数和错误
func (d *DbConn) Exec(sql string) (int64, error) {
return d.ExecContext(context.Background(), sql)
func (d *DbConn) Exec(sql string, args ...any) (int64, error) {
return d.ExecContext(context.Background(), sql, args...)
}
// 执行 update, insert, delete建表等sql
// 返回影响条数和错误
func (d *DbConn) ExecContext(ctx context.Context, sql string) (int64, error) {
res, err := d.db.ExecContext(ctx, sql)
func (d *DbConn) ExecContext(ctx context.Context, sql string, args ...any) (int64, error) {
res, err := d.db.ExecContext(ctx, sql, args...)
if err != nil {
return 0, wrapSqlError(err)
}
@@ -90,7 +90,7 @@ func (d *DbConn) GetDialect() DbDialect {
return &MysqlDialect{dc: d}
case DbTypePostgres:
return &PgsqlDialect{dc: d}
case DM:
case DbTypeDM:
return &DMDialect{dc: d}
default:
panic(fmt.Sprintf("invalid database type: %s", d.Info.Type))
@@ -104,7 +104,7 @@ func (d *DbConn) Close() {
logx.Errorf("关闭数据库实例[%s]连接失败: %s", d.Id, err.Error())
}
// 如果是达梦并且使用了ssh隧道则需要手动将其关闭
if d.Info.Type == DM && d.Info.SshTunnelMachineId > 0 {
if d.Info.Type == DbTypeDM && d.Info.SshTunnelMachineId > 0 {
mcm.CloseSshTunnelMachine(d.Info.SshTunnelMachineId, fmt.Sprintf("db:%d", d.Info.Id))
}
d.db = nil

View File

@@ -13,7 +13,7 @@ type DbType string
const (
DbTypeMysql DbType = "mysql"
DbTypePostgres DbType = "postgres"
DM DbType = "dm"
DbTypeDM DbType = "dm"
)
func ToDbType(dbType string) DbType {
@@ -30,7 +30,7 @@ func (dbType DbType) MetaDbName() string {
return ""
case DbTypePostgres:
return "postgres"
case DM:
case DbTypeDM:
return ""
default:
panic(fmt.Sprintf("invalid database type: %s", dbType))
@@ -67,7 +67,7 @@ func (dbType DbType) Dialect() sqlparser.Dialect {
return sqlparser.MysqlDialect{}
case DbTypePostgres:
return sqlparser.PostgresDialect{}
case DM:
case DbTypeDM:
return sqlparser.PostgresDialect{}
default:
panic(fmt.Sprintf("invalid database type: %s", dbType))

View File

@@ -82,6 +82,19 @@ type DbDialect interface {
// GetDbProgram 获取数据库程序模块,用于数据库备份与恢复
GetDbProgram() DbProgram
// 封装名字mysql: `table_name`, dm: "table_name"
WrapName(name string) string
// 分页sqlmysql: limit 1 ,10, dm: limit 10 offset 1
PageSql(pageNum int, pageSize int) string
// 批量保存数据
SaveBatch(conn *DbConn, tableName string, columns string, placeholder string, values [][]any) error
GetDataType(dbColumnType string) DataType
FormatStrData(dbColumnValue string, dataType DataType) string
}
// ------------------------- 元数据sql操作 -------------------------

View File

@@ -6,8 +6,11 @@ import (
"fmt"
machineapp "mayfly-go/internal/machine/application"
"mayfly-go/pkg/errorx"
"mayfly-go/pkg/logx"
"mayfly-go/pkg/utils/anyx"
"regexp"
"strings"
"time"
_ "gitee.com/chunanyong/dm"
)
@@ -208,7 +211,7 @@ func (dd *DMDialect) GetTableDDL(tableName string) (string, error) {
// 表注释
_, res, err = dd.dc.Query(fmt.Sprintf(`
select OWNER, COMMENTS from DBA_TAB_COMMENTS where TABLE_TYPE='TABLE' and TABLE_NAME = '%s'
select OWNER, COMMENTS from ALL_TAB_COMMENTS where TABLE_TYPE='TABLE' and TABLE_NAME = '%s'
and owner = (SELECT SF_GET_SCHEMA_NAME_BY_ID(CURRENT_SCHID))
`, tableName))
if err != nil {
@@ -245,8 +248,8 @@ func (dd *DMDialect) GetTableDDL(tableName string) (string, error) {
// 索引信息
indexSql := fmt.Sprintf(`
select indexdef(b.object_id,1) as INDEX_DEF from DBA_INDEXES a
join dba_objects b on a.owner = b.owner and b.object_name = a.index_name and b.object_type = 'INDEX'
select indexdef(b.object_id,1) as INDEX_DEF from ALL_INDEXES a
join ALL_objects b on a.owner = b.owner and b.object_name = a.index_name and b.object_type = 'INDEX'
where a.owner = (SELECT SF_GET_SCHEMA_NAME_BY_ID(CURRENT_SCHID))
and a.table_name = '%s'
and indexdef(b.object_id,1) != '禁止查看系统定义的索引信息'
@@ -288,3 +291,61 @@ func (dd *DMDialect) GetSchemas() ([]string, error) {
func (dd *DMDialect) GetDbProgram() DbProgram {
panic("implement me")
}
func (pd *DMDialect) WrapName(name string) string {
return "\"" + name + "\""
}
func (pd *DMDialect) PageSql(pageNum int, pageSize int) string {
return fmt.Sprintf("LIMIT %d OFFSET %d", pageSize, (pageNum-1)*pageSize)
}
func (pd *DMDialect) GetDataType(dbColumnType string) DataType {
if regexp.MustCompile(`(?i)int|double|float|number|decimal|byte|bit`).MatchString(dbColumnType) {
return DataTypeNumber
}
// 日期时间类型
if regexp.MustCompile(`(?i)datetime|timestamp`).MatchString(dbColumnType) {
return DataTypeDateTime
}
// 日期类型
if regexp.MustCompile(`(?i)date`).MatchString(dbColumnType) {
return DataTypeDate
}
// 时间类型
if regexp.MustCompile(`(?i)time`).MatchString(dbColumnType) {
return DataTypeTime
}
return DataTypeString
}
func (pd *DMDialect) SaveBatch(conn *DbConn, tableName string, columns string, placeholder string, values [][]any) error {
// 执行批量insert sql
// insert into "table_name" ("column1", "column2", ...) values (value1, value2, ...)
sqlTemp := fmt.Sprintf("insert into %s (%s) values %s", pd.WrapName(tableName), columns, placeholder)
for _, value := range values {
// 达梦数据库只能一条条的执行insert
_, err := conn.Exec(sqlTemp, value...)
if err != nil {
logx.Errorf("执行sql失败%s", err.Error())
return err
}
}
// 执行批量insert sql
return nil
}
func (pd *DMDialect) FormatStrData(dbColumnValue string, dataType DataType) string {
switch dataType {
case DataTypeDateTime: // "2024-01-02T22:08:22.275697+08:00"
res, _ := time.Parse(time.RFC3339, dbColumnValue)
return res.Format(time.DateTime)
case DataTypeDate: // "2024-01-02T00:00:00+08:00"
res, _ := time.Parse(time.RFC3339, dbColumnValue)
return res.Format(time.DateOnly)
case DataTypeTime: // "0000-01-01T22:08:22.275688+08:00"
res, _ := time.Parse(time.RFC3339, dbColumnValue)
return res.Format(time.TimeOnly)
}
return dbColumnValue
}

View File

@@ -4,12 +4,13 @@ import (
"context"
"database/sql"
"fmt"
"github.com/go-sql-driver/mysql"
machineapp "mayfly-go/internal/machine/application"
"mayfly-go/pkg/errorx"
"mayfly-go/pkg/utils/anyx"
"net"
"github.com/go-sql-driver/mysql"
"regexp"
"strings"
)
func getMysqlDB(d *DbInfo) (*sql.DB, error) {
@@ -202,3 +203,53 @@ func (md *MysqlDialect) GetSchemas() ([]string, error) {
func (md *MysqlDialect) GetDbProgram() DbProgram {
return NewDbProgramMysql(md.dc)
}
func (pd *MysqlDialect) WrapName(name string) string {
return "`" + name + "`"
}
func (pd *MysqlDialect) PageSql(pageNum int, pageSize int) string {
return fmt.Sprintf("limit %d, %d", (pageNum-1)*pageSize, pageSize)
}
func (pd *MysqlDialect) GetDataType(dbColumnType string) DataType {
if regexp.MustCompile(`(?i)int|double|float|number|decimal|byte|bit`).MatchString(dbColumnType) {
return DataTypeNumber
}
// 日期时间类型
if regexp.MustCompile(`(?i)datetime|timestamp`).MatchString(dbColumnType) {
return DataTypeDateTime
}
// 日期类型
if regexp.MustCompile(`(?i)date`).MatchString(dbColumnType) {
return DataTypeDate
}
// 时间类型
if regexp.MustCompile(`(?i)time`).MatchString(dbColumnType) {
return DataTypeTime
}
return DataTypeString
}
func (pd *MysqlDialect) SaveBatch(conn *DbConn, tableName string, columns string, placeholder string, values [][]any) error {
// 执行批量insert sqlmysql支持批量insert语法
// insert into table_name (column1, column2, ...) values (value1, value2, ...), (value1, value2, ...), ...
// 重复占位符字符串n遍
repeated := strings.Repeat(placeholder+",", len(values))
// 去除最后一个逗号
placeholder = strings.TrimSuffix(repeated, ",")
sqlStr := fmt.Sprintf("insert into %s (%s) values %s", pd.WrapName(tableName), columns, placeholder)
// 执行批量insert sql
// 把二维数组转为一维数组
var args []any
for _, v := range values {
args = append(args, v...)
}
_, err := conn.Exec(sqlStr, args...)
return err
}
func (pd *MysqlDialect) FormatStrData(dbColumnValue string, dataType DataType) string {
// mysql不需要格式化时间日期等
return dbColumnValue
}

View File

@@ -11,6 +11,7 @@ import (
"mayfly-go/pkg/utils/collx"
"mayfly-go/pkg/utils/netx"
"net"
"regexp"
"strings"
"time"
@@ -282,3 +283,63 @@ func (pd *PgsqlDialect) GetSchemas() ([]string, error) {
func (pd *PgsqlDialect) GetDbProgram() DbProgram {
panic("implement me")
}
func (pd *PgsqlDialect) WrapName(name string) string {
return name
}
func (pd *PgsqlDialect) PageSql(pageNum int, pageSize int) string {
return fmt.Sprintf("LIMIT %d OFFSET %d", pageSize, (pageNum-1)*pageSize)
}
func (pd *PgsqlDialect) GetDataType(dbColumnType string) DataType {
if regexp.MustCompile(`(?i)int|double|float|number|decimal|byte|bit`).MatchString(dbColumnType) {
return DataTypeNumber
}
// 日期时间类型
if regexp.MustCompile(`(?i)datetime|timestamp`).MatchString(dbColumnType) {
return DataTypeDateTime
}
// 日期类型
if regexp.MustCompile(`(?i)date`).MatchString(dbColumnType) {
return DataTypeDate
}
// 时间类型
if regexp.MustCompile(`(?i)time`).MatchString(dbColumnType) {
return DataTypeTime
}
return DataTypeString
}
func (pd *PgsqlDialect) SaveBatch(conn *DbConn, tableName string, columns string, placeholder string, values [][]any) error {
// 执行批量insert sql跟mysql一样 pg或高斯支持批量insert语法
// insert into table_name (column1, column2, ...) values (value1, value2, ...), (value1, value2, ...), ...
// 重复占位符字符串n遍
repeated := strings.Repeat(placeholder+",", len(values))
// 去除最后一个逗号
placeholder = strings.TrimSuffix(repeated, ",")
sqlStr := fmt.Sprintf("insert into %s (%s) values %s", pd.WrapName(tableName), columns, placeholder)
// 执行批量insert sql
// 把二维数组转为一维数组
var args []any
for _, v := range values {
args = append(args, v...)
}
_, err := conn.Exec(sqlStr, args...)
return err
}
func (pd *PgsqlDialect) FormatStrData(dbColumnValue string, dataType DataType) string {
switch dataType {
case DataTypeDateTime: // "2024-01-02T22:16:28.545377+08:00"
res, _ := time.Parse(time.RFC3339, dbColumnValue)
return res.Format(time.DateTime)
case DataTypeDate: // "2024-01-02T00:00:00Z"
res, _ := time.Parse(time.RFC3339, dbColumnValue)
return res.Format(time.DateOnly)
case DataTypeTime: // "0000-01-01T22:16:28.545075+08:00"
res, _ := time.Parse(time.RFC3339, dbColumnValue)
return res.Format(time.TimeOnly)
}
return dbColumnValue
}

View File

@@ -41,7 +41,7 @@ func (dbInfo *DbInfo) Conn() (*DbConn, error) {
conn, err = getMysqlDB(dbInfo)
case DbTypePostgres:
conn, err = getPgsqlDB(dbInfo)
case DM:
case DbTypeDM:
conn, err = getDmDB(dbInfo)
default:
return nil, errorx.NewBiz("invalid database type: %s", dbInfo.Type)

View File

@@ -1,7 +1,7 @@
--DM_DB_SCHEMAS schemas
select
distinct owner as SCHEMA_NAME
from dba_objects
from all_objects
---------------------------------------
--DM_TABLE_INFO 表详细信息
SELECT a.object_name as TABLE_NAME,
@@ -14,16 +14,16 @@ SELECT a.object_name as TABLE_NAME,
(SELECT sum(INDEX_USED_PAGES(id))* page()
FROM SYSOBJECTS
WHERE NAME IN (SELECT INDEX_NAME
FROM DBA_INDEXES
FROM ALL_INDEXES
WHERE OWNER = 'wxb'
AND TABLE_NAME = a.object_name)) as INDEX_LENGTH,
c.num_rows as TABLE_ROWS
FROM dba_objects a
LEFT JOIN DBA_TAB_COMMENTS b ON b.TABLE_TYPE = 'TABLE'
FROM all_objects a
LEFT JOIN ALL_TAB_COMMENTS b ON b.TABLE_TYPE = 'TABLE'
AND a.object_name = b.TABLE_NAME
AND b.owner = a.owner
LEFT JOIN (SELECT a.owner, a.table_name, a.num_rows FROM dba_tables a) c
LEFT JOIN (SELECT a.owner, a.table_name, a.num_rows FROM all_tables a) c
ON c.owner = a.owner AND c.table_name = a.object_name
WHERE a.owner = (SELECT SF_GET_SCHEMA_NAME_BY_ID(CURRENT_SCHID))
@@ -39,9 +39,10 @@ select
c.column_name as COLUMN_NAME,
c.column_position as SEQ_IN_INDEX,
'' as INDEX_COMMENT
FROM DBA_INDEXES a
LEFT JOIN dba_objects b on a.owner = b.owner and b.object_name = a.index_name and b.object_type = 'INDEX'
LEFT JOIN DBA_IND_COLUMNS c on a.owner = c.table_owner and a.index_name = c.index_name and a.TABLE_NAME = c.table_name
FROM ALL_INDEXES a
LEFT JOIN all_objects b on a.owner = b.owner and b.object_name = a.index_name and b.object_type = 'INDEX'
LEFT JOIN ALL_IND_COLUMNS c
on a.owner = c.table_owner and a.index_name = c.index_name and a.TABLE_NAME = c.table_name
WHERE a.owner = (SELECT SF_GET_SCHEMA_NAME_BY_ID(CURRENT_SCHID))
and a.TABLE_NAME = '%s'
@@ -62,13 +63,13 @@ select a.table_name
a.data_default as COLUMN_DEFAULT,
a.data_scale as NUM_SCALE,
case when t.COL_NAME = a.column_name then 'PRI' else '' end as COLUMN_KEY
from dba_tab_columns a
from all_tab_columns a
left join user_col_comments b
on b.owner = (SELECT SF_GET_SCHEMA_NAME_BY_ID(CURRENT_SCHID)) and b.table_name = a.table_name and
a.column_name = b.column_name
left join (select b.owner, b.table_name, a.name COL_NAME
from SYS.SYSCOLUMNS a,
dba_tables b,
all_tables b,
sys.sysobjects c,
sys.sysobjects d
where a.INFO2 & 0x01 = 0x01

View File

@@ -0,0 +1,68 @@
package entity
import (
"mayfly-go/pkg/model"
"time"
)
type DataSyncTask struct {
model.Model
// 基本信息
TaskName string `orm:"column(task_name)" json:"taskName"` // 任务名
TaskCron string `orm:"column(task_cron)" json:"taskCron"` // 任务Cron表达式
Status int8 `orm:"column(status)" json:"status"` // 状态 1启用 2禁用
TaskKey string `orm:"column(key)" json:"taskKey"` // 任务唯一标识
RecentState int8 `orm:"column(recent_state)" json:"recentState"` // 最近执行状态 1成功 -1失败
RunningState int8 `orm:"column(running_state)" json:"runningState"` // 运行时状态 1运行中、2待运行、3已停止
// 源数据库信息
SrcDbId int64 `orm:"column(src_db_id)" json:"srcDbId"`
SrcDbName string `orm:"column(src_db_name)" json:"srcDbName"`
SrcTagPath string `orm:"column(src_tag_path)" json:"srcTagPath"`
DataSql string `orm:"column(data_sql)" json:"dataSql"` // 数据源查询sql
PageSize int `orm:"column(page_size)" json:"pageSize"` // 配置分页sql查询的条数
UpdField string `orm:"column(upd_field)" json:"updField"` //更新字段, 选择由哪个字段为更新字段查询数据源的时候会带上这个字段where update_time > {最近更新的最大值}
UpdFieldVal string `orm:"column(upd_field_val)" json:"updFieldVal"` // 更新字段当前值
// 目标数据库信息
TargetDbId int64 `orm:"column(target_db_id)" json:"targetDbId"`
TargetDbName string `orm:"column(target_db_name)" json:"targetDbName"`
TargetTagPath string `orm:"column(target_tag_path)" json:"targetTagPath"`
TargetTableName string `orm:"column(target_table_name)" json:"targetTableName"`
FieldMap string `orm:"column(field_map)" json:"fieldMap"` // 字段映射json
}
func (d *DataSyncTask) TableName() string {
return "t_db_data_sync_task"
}
type DataSyncLog struct {
Id uint64 `json:"id"` // 自增主键
TaskId uint64 `orm:"column(task_id)" json:"taskId"` // 任务表id
CreateTime *time.Time `orm:"column(create_time)" json:"createTime"`
DataSqlFull string `orm:"column(data_sql_full)" json:"dataSqlFull"` // 执行的完整sql
ResNum int `orm:"column(res_num)" json:"resNum"` // 收到数据条数
ErrText string `orm:"column(err_text)" json:"errText"` // 错误日志
Status int8 `orm:"column(status)" json:"status"` // 状态:1.成功 -1.失败
}
func (d *DataSyncLog) SetBaseInfo(account *model.LoginAccount) {
//TODO implement me
}
func (d *DataSyncLog) TableName() string {
return "t_db_data_sync_log"
}
const (
DataSyncTaskStatusEnable int8 = 1 // 启用状态
DataSyncTaskStatusDisable int8 = -1 // 禁用状态
DataSyncTaskStateSuccess int8 = 1 // 执行成功状态
DataSyncTaskStateFail int8 = -1 // 执行失败状态
DataSyncTaskRunStateRunning int8 = 1 // 运行中状态
DataSyncTaskRunStateReady int8 = 2 // 待运行状态
DataSyncTaskRunStateStop int8 = 3 // 手动停止状态
)

View File

@@ -9,6 +9,14 @@ type InstanceQuery struct {
Host string `json:"host" form:"host"`
}
type DataSyncTaskQuery struct {
Name string `json:"name" form:"name"`
Status int8 `json:"status" form:"status"`
}
type DataSyncLogQuery struct {
TaskId uint64 `json:"task_id" form:"taskId"`
}
// 数据库查询实体,不与数据库表字段一一对应
type DbQuery struct {
model.Model

View File

@@ -0,0 +1,21 @@
package repository
import (
"mayfly-go/internal/db/domain/entity"
"mayfly-go/pkg/base"
"mayfly-go/pkg/model"
)
type DataSyncTask interface {
base.Repo[*entity.DataSyncTask]
// 分页获取数据库实例信息列表
GetTaskList(condition *entity.DataSyncTaskQuery, pageParam *model.PageParam, toEntity any, orderBy ...string) (*model.PageResult[any], error)
}
type DataSyncLog interface {
base.Repo[*entity.DataSyncLog]
// 分页获取数据库实例信息列表
GetTaskLogList(condition *entity.DataSyncLogQuery, pageParam *model.PageParam, toEntity any, orderBy ...string) (*model.PageResult[any], error)
}

View File

@@ -0,0 +1,39 @@
package persistence
import (
"mayfly-go/internal/db/domain/entity"
"mayfly-go/internal/db/domain/repository"
"mayfly-go/pkg/base"
"mayfly-go/pkg/gormx"
"mayfly-go/pkg/model"
)
type dataSyncTaskRepoImpl struct {
base.RepoImpl[*entity.DataSyncTask]
}
func newDataSyncTaskRepo() repository.DataSyncTask {
return &dataSyncTaskRepoImpl{base.RepoImpl[*entity.DataSyncTask]{M: new(entity.DataSyncTask)}}
}
// 分页获取数据库信息列表
func (d *dataSyncTaskRepoImpl) GetTaskList(condition *entity.DataSyncTaskQuery, pageParam *model.PageParam, toEntity any, orderBy ...string) (*model.PageResult[any], error) {
qd := gormx.NewQuery(new(entity.DataSyncTask)).
Like("task_name", condition.Name)
return gormx.PageQuery(qd, pageParam, toEntity)
}
type dataSyncLogRepoImpl struct {
base.RepoImpl[*entity.DataSyncLog]
}
// 分页获取数据库信息列表
func (d *dataSyncLogRepoImpl) GetTaskLogList(condition *entity.DataSyncLogQuery, pageParam *model.PageParam, toEntity any, orderBy ...string) (*model.PageResult[any], error) {
qd := gormx.NewQuery(new(entity.DataSyncLog)).
Eq("task_id", condition.TaskId)
return gormx.PageQuery(qd, pageParam, toEntity)
}
func newDataSyncLogRepo() repository.DataSyncLog {
return &dataSyncLogRepoImpl{base.RepoImpl[*entity.DataSyncLog]{M: new(entity.DataSyncLog)}}
}

View File

@@ -3,12 +3,14 @@ package persistence
import "mayfly-go/internal/db/domain/repository"
var (
instanceRepo repository.Instance = newInstanceRepo()
dbRepo repository.Db = newDbRepo()
dbSqlRepo repository.DbSql = newDbSqlRepo()
dbSqlExecRepo repository.DbSqlExec = newDbSqlExecRepo()
dbBackupHistoryRepo = NewDbBackupHistoryRepo()
dbRestoreHistoryRepo = NewDbRestoreHistoryRepo()
instanceRepo repository.Instance = newInstanceRepo()
dbRepo repository.Db = newDbRepo()
dbSqlRepo repository.DbSql = newDbSqlRepo()
dbSqlExecRepo repository.DbSqlExec = newDbSqlExecRepo()
dbBackupHistoryRepo = NewDbBackupHistoryRepo()
dbRestoreHistoryRepo = NewDbRestoreHistoryRepo()
dbDataSyncTaskRepo repository.DataSyncTask = newDataSyncTaskRepo()
dbDataSyncLogRepo repository.DataSyncLog = newDataSyncLogRepo()
)
func GetInstanceRepo() repository.Instance {
@@ -34,3 +36,11 @@ func GetDbBackupHistoryRepo() repository.DbBackupHistory {
func GetDbRestoreHistoryRepo() repository.DbRestoreHistory {
return dbRestoreHistoryRepo
}
func GetDataSyncLogRepo() repository.DataSyncLog {
return dbDataSyncLogRepo
}
func GetDataSyncTaskRepo() repository.DataSyncTask {
return dbDataSyncTaskRepo
}

View File

@@ -0,0 +1,45 @@
package router
import (
"mayfly-go/internal/db/api"
"mayfly-go/internal/db/application"
"mayfly-go/pkg/req"
"github.com/gin-gonic/gin"
)
func InitDbDataSyncRouter(router *gin.RouterGroup) {
instances := router.Group("/datasync/tasks")
d := &api.DataSyncTask{
DataSyncTaskApp: application.GetDataSyncTaskApp(),
DataSyncLogApp: application.GetDataSyncLogApp(),
}
reqs := [...]*req.Conf{
// 获取任务列表 /datasync
req.NewGet("", d.Tasks),
req.NewGet(":taskId/logs", d.Logs).RequiredPermissionCode("db:sync:log"),
// 保存任务 /datasync/save
req.NewPost("save", d.SaveTask).Log(req.NewLogSave("datasync-保存数据同步任务信息")).RequiredPermissionCode("db:sync:save"),
// 获取单个详情 /datasync/:taskId
req.NewGet(":taskId", d.GetTask),
// 删除任务 /datasync/:taskId/del
req.NewDelete(":taskId/del", d.DeleteTask).Log(req.NewLogSave("datasync-删除数据同步任务信息")).RequiredPermissionCode("db:sync:del"),
// 启停用任务 /datasync/status
req.NewPost(":taskId/status", d.ChangeStatus).Log(req.NewLogSave("datasync-启停任务")).RequiredPermissionCode("db:sync:status"),
// 立即执行任务 /datasync/run
req.NewPost(":taskId/run", d.Run),
// 停止正在执行中的任务
req.NewPost(":taskId/stop", d.Stop),
}
req.BatchSetGroup(instances, reqs[:])
}

View File

@@ -11,4 +11,5 @@ func Init(router *gin.RouterGroup) {
InitDbBackupHistoryRouter(router)
InitDbRestoreRouter(router)
InitDbRestoreHistoryRouter(router)
InitDbDataSyncRouter(router)
}

View File

@@ -9,14 +9,14 @@ import (
"time"
)
const MachineStatCackeKey = "mayfly:machine:%d:stat"
const MachineStatCacheKey = "mayfly:machine:%d:stat"
func SaveMachineStats(machineId uint64, stat *mcm.Stats) error {
return global_cache.SetStr(fmt.Sprintf(MachineStatCackeKey, machineId), jsonx.ToStr(stat), 10*time.Minute)
return global_cache.SetStr(fmt.Sprintf(MachineStatCacheKey, machineId), jsonx.ToStr(stat), 10*time.Minute)
}
func GetMachineStats(machineId uint64) (*mcm.Stats, error) {
cacheStr := global_cache.GetStr(fmt.Sprintf(MachineStatCackeKey, machineId))
cacheStr := global_cache.GetStr(fmt.Sprintf(MachineStatCacheKey, machineId))
if cacheStr == "" {
return nil, errors.New("不存在该值")
}

View File

@@ -11,6 +11,7 @@ import (
func Init() {
application.GetMachineCronJobApp().InitCronJob()
application.GetMachineApp().TimerUpdateStats()
global.EventBus.Subscribe(consts.DeleteMachineEventTopic, "machineFile", func(ctx context.Context, event *eventbus.Event) error {

View File

@@ -139,5 +139,96 @@ CREATE TABLE `t_db_binlog_history` (
INSERT INTO `t_sys_config` (`name`, `key`, `params`, `value`, `remark`, `permission`, `create_time`, `creator_id`, `creator`, `update_time`, `modifier_id`, `modifier`, `is_deleted`, `delete_time`) VALUES('Mysql可执行文件', 'MysqlBin', '[{"model":"path","name":"路径","placeholder":"可执行文件路径","required":true},{"model":"mysql","name":"mysql","placeholder":"mysql命令路径(空则为 路径/mysql)","required":false},{"model":"mysqldump","name":"mysqldump","placeholder":"mysqldump命令路径(空则为 路径/mysqldump)","required":false},{"model":"mysqlbinlog","name":"mysqlbinlog","placeholder":"mysqlbinlog命令路径(空则为 路径/mysqlbinlog)","required":false}]', '{"mysql":"","mysqldump":"","mysqlbinlog":"","path":""}', '', 'admin,', '2023-12-29 10:01:33', 1, 'admin', '2023-12-29 13:34:40', 1, 'admin', 0, NULL);
INSERT INTO `t_sys_config` (`name`, `key`, `params`, `value`, `remark`, `permission`, `create_time`, `creator_id`, `creator`, `update_time`, `modifier_id`, `modifier`, `is_deleted`, `delete_time`) VALUES('数据库备份恢复', 'DbBackupRestore', '[{"model":"backupPath","name":"备份路径","placeholder":"备份文件存储路径"}]', '{"backupPath":"./db/backup"}', '', 'admin,', '2023-12-29 09:55:26', 1, 'admin', '2023-12-29 15:45:24', 1, 'admin', 0, NULL);
DELETE FROM `t_sys_config` WHERE `key` = 'UseWatermark'
INSERT INTO `t_sys_config` (`name`, `key`, `params`, `value`, `remark`, `permission`, `create_time`, `creator_id`, `creator`, `update_time`, `modifier_id`, `modifier`, `is_deleted`, `delete_time`) VALUES('系统全局样式设置', 'SysStyleConfig', '[{"model":"logoIcon","name":"logo图标","placeholder":"系统logo图标base64编码, 建议svg格式不超过10k","required":false},{"model":"title","name":"菜单栏标题","placeholder":"系统菜单栏标题展示","required":false},{"model":"viceTitle","name":"登录页标题","placeholder":"登录页标题展示","required":false},{"model":"useWatermark","name":"是否启用水印","placeholder":"是否启用系统水印","options":"true,false","required":false},{"model":"watermarkContent","name":"水印补充信息","placeholder":"额外水印信息","required":false}]', '{"title":"mayfly-go","viceTitle":"mayfly-go","logoIcon":"","useWatermark":"true","watermarkContent":""}', '系统icon、标题、水印信息等配置', 'all', '2024-01-04 15:17:18', 1, 'admin', '2024-01-05 09:40:44', 1, 'admin', 0, NULL);
DELETE
FROM `t_sys_config`
WHERE `key` = 'UseWatermark';
INSERT INTO `t_sys_config` (`name`, `key`, `params`, `value`, `remark`, `permission`, `create_time`, `creator_id`, `creator`, `update_time`, `modifier_id`, `modifier`, `is_deleted`, `delete_time`) VALUES('系统全局样式设置', 'SysStyleConfig', '[{"model":"logoIcon","name":"logo图标","placeholder":"系统logo图标base64编码, 建议svg格式不超过10k","required":false},{"model":"title","name":"菜单栏标题","placeholder":"系统菜单栏标题展示","required":false},{"model":"viceTitle","name":"登录页标题","placeholder":"登录页标题展示","required":false},{"model":"useWatermark","name":"是否启用水印","placeholder":"是否启用系统水印","options":"true,false","required":false},{"model":"watermarkContent","name":"水印补充信息","placeholder":"额外水印信息","required":false}]', '{"title":"mayfly-go","viceTitle":"mayfly-go","logoIcon":"","useWatermark":"true","watermarkContent":""}', '系统icon、标题、水印信息等配置', 'all', '2024-01-04 15:17:18', 1, 'admin', '2024-01-05 09:40:44', 1, 'admin', 0, NULL);
-- ----------------------------
-- Table structure for t_db_data_sync_task
-- ----------------------------
DROP TABLE IF EXISTS `t_db_data_sync_task`;
CREATE TABLE `t_db_data_sync_task`
(
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`creator_id` bigint(20) NOT NULL COMMENT '创建人id',
`creator` varchar(100) NOT NULL COMMENT '创建人姓名',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`modifier` varchar(100) NOT NULL COMMENT '修改人姓名',
`modifier_id` bigint(20) NOT NULL COMMENT '修改人id',
`task_name` varchar(500) NOT NULL COMMENT '任务名',
`task_cron` varchar(50) NOT NULL COMMENT '任务Cron表达式',
`src_db_id` bigint(20) NOT NULL COMMENT '源数据库ID',
`src_db_name` varchar(100) DEFAULT NULL COMMENT '源数据库名',
`src_tag_path` varchar(200) DEFAULT NULL COMMENT '源数据库tag路径',
`target_db_id` bigint(20) NOT NULL COMMENT '目标数据库ID',
`target_db_name` varchar(100) DEFAULT NULL COMMENT '目标数据库名',
`target_tag_path` varchar(200) DEFAULT NULL COMMENT '目标数据库tag路径',
`target_table_name` varchar(100) DEFAULT NULL COMMENT '目标数据库表名',
`data_sql` text NOT NULL COMMENT '数据查询sql',
`page_size` int(11) NOT NULL COMMENT '数据同步分页大小',
`upd_field` varchar(100) NOT NULL DEFAULT 'id' COMMENT '更新字段,默认"id"',
`upd_field_val` varchar(100) DEFAULT NULL COMMENT '当前更新值',
`id_rule` tinyint(2) NOT NULL DEFAULT '1' COMMENT 'id生成规则1、MD5(时间戳+更新字段的值)。2、无(不自动生成id选择无的时候需要指定主键ID字段是数据源哪个字段)',
`pk_field` varchar(100) DEFAULT 'id' COMMENT '主键id字段名默认"id"',
`field_map` text COMMENT '字段映射json',
`is_deleted` tinyint(8) DEFAULT '0',
`delete_time` datetime DEFAULT NULL,
`status` tinyint(1) NOT NULL DEFAULT '1' COMMENT '状态 1启用 2停用',
`recent_state` tinyint(1) NOT NULL DEFAULT '0' COMMENT '最近一次状态 0未执行 1成功 2失败',
`task_key` varchar(100) DEFAULT NULL COMMENT '定时任务唯一uuid key',
`running_state` tinyint(1) DEFAULT '2' COMMENT '运行时状态 1运行中、2待运行、3已停止',
PRIMARY KEY (`id`)
) COMMENT='数据同步';
-- ----------------------------
-- Table structure for t_db_data_sync_log
-- ----------------------------
DROP TABLE IF EXISTS `t_db_data_sync_log`;
CREATE TABLE `t_db_data_sync_log`
(
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
`task_id` bigint(20) NOT NULL COMMENT '同步任务表id',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`data_sql_full` text NOT NULL COMMENT '执行的完整sql',
`res_num` int(11) DEFAULT NULL COMMENT '收到数据条数',
`err_text` text COMMENT '错误日志',
`status` tinyint(4) NOT NULL DEFAULT '1' COMMENT '状态:1.成功 0.失败',
`is_deleted` tinyint(1) NOT NULL DEFAULT '0' COMMENT '是否删除 1是 0 否',
PRIMARY KEY (`id`),
KEY `t_db_data_sync_log_taskid_idx` (`task_id`) USING BTREE COMMENT 't_db_data_sync_log表(taskid)普通索引'
) COMMENT='数据同步日志';
INSERT INTO `t_sys_resource` (`id`, `pid`, `type`, `status`, `name`, `code`, `weight`, `meta`, `creator_id`, `creator`,
`modifier_id`, `modifier`, `create_time`, `update_time`, `ui_path`, `is_deleted`,
`delete_time`)
VALUES (150, 36, 1, 1, '数据同步', 'sync', 1693040707,
'{\"component\":\"ops/db/SyncTaskList\",\"icon\":\"Coin\",\"isKeepAlive\":true,\"routeName\":\"SyncTaskList\"}',
12, 'liuzongyang', 12, 'liuzongyang', '2023-12-22 09:51:34', '2023-12-27 10:16:57', 'Jra0n7De/', 0, NULL);
INSERT INTO `t_sys_resource` (`id`, `pid`, `type`, `status`, `name`, `code`, `weight`, `meta`, `creator_id`, `creator`,
`modifier_id`, `modifier`, `create_time`, `update_time`, `ui_path`, `is_deleted`,
`delete_time`)
VALUES (151, 150, 2, 1, '基本权限', 'db:sync', 1703641202, 'null', 12, 'liuzongyang', 12, 'liuzongyang',
'2023-12-27 09:40:02', '2023-12-27 09:40:02', 'Jra0n7De/uAnHZxEV/', 0, NULL);
INSERT INTO `t_sys_resource` (`id`, `pid`, `type`, `status`, `name`, `code`, `weight`, `meta`, `creator_id`, `creator`,
`modifier_id`, `modifier`, `create_time`, `update_time`, `ui_path`, `is_deleted`,
`delete_time`)
VALUES (152, 150, 2, 1, '编辑', 'db:sync:save', 1703641320, 'null', 12, 'liuzongyang', 12, 'liuzongyang',
'2023-12-27 09:42:00', '2023-12-27 09:42:12', 'Jra0n7De/zvAMo2vk/', 0, NULL);
INSERT INTO `t_sys_resource` (`id`, `pid`, `type`, `status`, `name`, `code`, `weight`, `meta`, `creator_id`, `creator`,
`modifier_id`, `modifier`, `create_time`, `update_time`, `ui_path`, `is_deleted`,
`delete_time`)
VALUES (153, 150, 2, 1, '删除', 'db:sync:del', 1703641342, 'null', 12, 'liuzongyang', 12, 'liuzongyang',
'2023-12-27 09:42:22', '2023-12-27 09:42:22', 'Jra0n7De/pLOA2UYz/', 0, NULL);
INSERT INTO `t_sys_resource` (`id`, `pid`, `type`, `status`, `name`, `code`, `weight`, `meta`, `creator_id`, `creator`,
`modifier_id`, `modifier`, `create_time`, `update_time`, `ui_path`, `is_deleted`,
`delete_time`)
VALUES (154, 150, 2, 1, '启停', 'db:sync:status', 1703641364, 'null', 12, 'liuzongyang', 12, 'liuzongyang',
'2023-12-27 09:42:45', '2023-12-27 09:42:45', 'Jra0n7De/VBt68CDx/', 0, NULL);
INSERT INTO `t_sys_resource` (`id`, `pid`, `type`, `status`, `name`, `code`, `weight`, `meta`, `creator_id`, `creator`,
`modifier_id`, `modifier`, `create_time`, `update_time`, `ui_path`, `is_deleted`,
`delete_time`)
VALUES (155, 150, 2, 1, '日志', 'db:sync:log', 1704266866, 'null', 12, 'liuzongyang', 12, 'liuzongyang',
'2024-01-03 15:27:47', '2024-01-03 15:27:47', 'Jra0n7De/PigmSGVg/', 0, NULL);