!148 refactor: 支持kafka操作

* fix: 达梦连接问题修复
* refactor: 支持kafka操作
This commit is contained in:
zongyangleo
2026-03-18 09:00:55 +00:00
committed by Coder慌
parent 84ab496308
commit bfa41c3621
55 changed files with 3895 additions and 29 deletions

View File

@@ -47,6 +47,7 @@ function convertSvgToSymbol(svgString, symbolId) {
iconNames.push(`icon ${name}`);
svgsymbols += convertSvgToSymbol(allSvgIcons[path].default, name);
}
svgsymbols += '</svg>';
var t = (t = document.getElementsByTagName('script'))[t.length - 1],

View File

@@ -0,0 +1 @@
<svg class="icon" viewBox="0 0 1024 1024" version="1.1" xmlns="http://www.w3.org/2000/svg" width="64" height="64"><path d="M668.8 460.8c64 0 118.4-54.4 118.4-118.4S732.8 224 668.8 224s-118.4 54.4-118.4 118.4c0 12.8 3.2 22.4 6.4 35.2L502.4 416c-22.4-28.8-57.6-48-92.8-57.6v-64c54.4-12.8 92.8-60.8 92.8-115.2 0-64-54.4-118.4-118.4-118.4s-118.4 54.4-118.4 118.4c0 54.4 38.4 102.4 92.8 115.2v67.2c-80 16-134.4 96-118.4 176 12.8 60.8 57.6 105.6 118.4 118.4v70.4c-54.4 12.8-92.8 57.6-92.8 115.2 0 64 54.4 118.4 118.4 118.4s118.4-54.4 118.4-118.4c0-54.4-38.4-102.4-92.8-115.2v-70.4c35.2-6.4 70.4-22.4 92.8-54.4l54.4 41.6c-3.2 9.6-6.4 22.4-6.4 35.2 3.2 64 54.4 115.2 121.6 115.2 64-3.2 115.2-54.4 115.2-121.6-3.2-64-54.4-115.2-118.4-115.2-35.2 0-67.2 16-89.6 41.6l-54.4-41.6c6.4-16 9.6-35.2 9.6-54.4 0-16-3.2-35.2-9.6-48l54.4-38.4c19.2 32 54.4 48 89.6 44.8z m0-176c32 0 54.4 25.6 54.4 54.4s-25.6 54.4-54.4 54.4c-32 0-54.4-25.6-54.4-54.4-3.2-28.8 22.4-54.4 54.4-54.4z m-345.6-105.6c0-32 25.6-54.4 54.4-54.4 32 0 54.4 25.6 54.4 54.4s-25.6 54.4-54.4 54.4c-28.8 3.2-54.4-22.4-54.4-54.4z m115.2 662.4c0 32-22.4 57.6-54.4 57.6s-57.6-22.4-57.6-54.4 22.4-57.6 54.4-57.6 57.6 25.6 57.6 54.4z m-54.4-249.6c-44.8-3.2-76.8-41.6-73.6-86.4 3.2-38.4 35.2-70.4 73.6-73.6 44.8 0 80 35.2 80 80 0 41.6-38.4 76.8-80 80z m284.8 32c28.8 0 54.4 22.4 54.4 54.4s-22.4 54.4-51.2 54.4h-3.2c-32 0-54.4-25.6-54.4-57.6s25.6-54.4 54.4-54.4v3.2z" fill="#0171F1"></path></svg>

After

Width:  |  Height:  |  Size: 1.4 KiB

View File

@@ -0,0 +1 @@
<svg class="icon" viewBox="0 0 1024 1024" version="1.1" xmlns="http://www.w3.org/2000/svg" width="64" height="64"><path d="M750.08 657.92c-16.384 0-30.208 2.56-44.032 8.192l-44.032-52.224c33.28-38.4 55.296-90.624 55.296-145.92s-19.456-101.888-52.224-140.288l27.648-24.576c16.384 8.192 33.28 13.824 52.224 13.824 63.488 0 115.712-52.224 115.712-115.712s-52.224-115.712-115.712-115.712c-63.488 0-115.712 52.224-115.712 115.712 0 13.824 2.56 30.208 8.192 41.472l-35.84 30.208c-34.304-19.968-73.216-30.208-112.64-30.208-44.032 0-85.504 13.824-121.344 32.768l-41.472-52.224c0-5.632 2.56-13.824 2.56-19.456 0-40.96-32.768-74.24-73.728-74.24h-0.512c-40.96 0-74.24 32.768-74.24 73.728v0.512c0 40.96 32.768 74.24 73.728 74.24H262.656L306.688 332.8c-27.648 38.4-46.592 85.504-46.592 134.656 0 35.84 8.192 71.68 24.576 101.888l-35.84 30.208c-8.192-5.632-19.456-5.632-30.208-5.632-49.664 0-90.624 41.472-90.624 90.624 0 49.664 41.472 90.624 90.624 90.624s90.624-41.472 90.624-90.624c0-8.192 0-16.384-2.56-24.576l30.208-27.648c41.472 35.84 93.696 57.856 154.112 57.856 33.28 0 63.488-5.632 90.624-19.456l46.592 57.856c-11.264 19.456-19.456 44.032-19.456 68.608 0 77.312 63.488 140.288 143.36 140.288s143.36-63.488 143.36-140.288c0.512-75.776-65.536-139.264-145.408-139.264z m-261.632-11.264c-99.328 0-181.76-79.872-181.76-178.688C306.688 368.64 389.12 289.28 488.448 289.28s181.76 79.872 181.76 178.688c0 99.328-82.432 178.688-181.76 178.688zM430.592 465.408c0.512 18.432-13.824 33.28-32.256 33.792-18.432 0.512-33.28-13.824-33.792-32.256v-1.536c0-18.432 14.848-32.768 33.28-32.768 18.432-0.512 32.768 14.336 32.768 32.768z m88.064 0c0 18.432-14.848 33.28-32.768 33.28-18.432 0-33.28-14.848-33.28-32.768 0-18.432 14.848-32.768 33.28-32.768s32.768 13.824 32.768 32.256z m91.136 0c0 18.432-14.848 33.28-32.768 33.28s-33.28-14.848-33.28-32.768c0-18.432 14.848-32.768 33.28-32.768 17.92-1.024 32.768 13.824 32.768 32.256z" fill="#2B85FB"></path></svg>

After

Width:  |  Height:  |  Size: 1.9 KiB

View File

@@ -23,6 +23,7 @@ export const ResourceTypeEnum = {
AuthCert: EnumValue.of(5, 'ac.ac').setExtra({ icon: 'Ticket', iconColor: 'var(--el-color-success)' }),
Es: EnumValue.of(6, 'tag.es').setExtra({ icon: 'icon es/es-color', iconColor: 'var(--el-color-warning)' }).tagTypeWarning(),
Container: EnumValue.of(7, 'tag.container').setExtra({ icon: 'icon docker/docker', iconColor: 'var(--el-color-primary)' }),
MqKafka: EnumValue.of(8, 'tag.mq.kafka').setExtra({ icon: 'icon mq/kafka', iconColor: 'var(--el-color-primary)' }),
};
// 标签关联的资源类型
@@ -38,6 +39,8 @@ export const TagResourceTypeEnum = {
AuthCert: ResourceTypeEnum.AuthCert,
Container: ResourceTypeEnum.Container,
MqKafka: ResourceTypeEnum.MqKafka,
Db: EnumValue.of(22, '数据库').setExtra({ icon: 'icon db/db' }),
};

View File

@@ -49,6 +49,7 @@ export default {
reset: 'Reset',
success: 'Success',
fail: 'Fail',
requestFail: 'Request Fail',
previousStep: 'Previous Step',
nextStep: 'Next Step',
copy: 'Copy',

View File

@@ -0,0 +1,89 @@
export default {
mq: {
kafka: {
hosts: 'Hosts',
username: 'Username',
sasl_mechanism: 'SASL Mechanism',
sasl_mechanism_placeholder: 'please select SASL Mechanism',
key: 'Key',
keywordPlaceholder: 'ip/name',
nodeManage: 'Node Management',
topicManage: 'Topic Management',
produceMessage: 'Produce Message',
consumeMessage: 'Consume Message',
nodes: 'Nodes',
config: 'Config',
nodeId: 'Node ID',
addr: 'Addr',
rack: 'Rack',
viewConfig: 'View Config',
brokerConfig: 'Broker Config',
topicConfig: 'topic Config',
selectTopic: 'select topic',
selectTopicPlaceholder: 'select topic',
selectTopicWarning: 'Please select topic',
selectBrokerViewConfig: 'Please select a broker to view config',
configName: 'Config Name',
configValue: 'Config Value',
configSource: 'Config Source',
configSensitive: 'Sensitive',
searchTopic: 'Enter topic name',
createTopic: 'Create Topic',
createPartitions: 'Create Partitions',
topicName: 'Topic Name',
topicNamePlaceholder: 'Please enter topic name',
partitions: 'Partitions',
topicPartitions: 'Topic Partitions',
replicationFactor: 'Replication Factor',
topicStatus: 'Status',
topicIsInternal: 'IsInternal',
viewPartitions: 'View Partitions',
delete: 'Delete',
messageKey: 'Message Key',
messageKeyPlaceholder: 'Optional: Enter message key',
partition: 'Partition',
partitionPlaceholder: 'Optional: Specify partition number, -1 for auto',
messageBody: 'Message Body',
messageHeaders: 'Message Headers',
addHeader: 'Add Header',
headerKey: 'Header Key',
headerValue: 'Header Value',
sendTimes: 'Send Times',
compression: 'Compression',
compressionPlaceholder: 'Please select compression',
sendMessage: 'Send Message',
messageNumber: 'Number',
consumerGroup: 'Consumer Group',
consumerGroupPlaceholder: 'Enter consumer group, empty for auto generate',
consumerOnlyTip: 'Note: Only effective when Group is first consumed, subsequent changes are invalid',
pullTimeout: 'Pull Timeout (s)',
decompression: 'Decompression',
decompressionPlaceholder: 'Please select decompression',
decode: 'Decode',
decodePlaceholder: 'Please select decode',
isolationLevel: 'Isolation Level',
isolationLevelPlaceholder: 'Please select isolation level',
readUncommitted: 'Read Uncommitted',
readCommitted: 'Read Committed',
commitOffset: 'Commit Offset',
loadOffsets: 'Load Offsets',
defaultConsumePosition: 'Default Consume Position',
earliest: 'Earliest',
latest: 'Latest',
defaultConsumeStartTime: 'Default Consume Start Time',
selectDateTime: 'Select date time',
offset: 'Offset',
timestamp: 'Timestamp',
headers: 'Headers',
messageDetail: 'Message Detail',
groupId: 'Group ID',
coordinator: 'Coordinator',
state: 'State',
protocolType: 'Protocol Type',
searchGroup: 'Enter group name',
selectGroupPlaceholder: 'select group',
Members: 'Members',
partitionsFeatureComingSoon: 'Partitions feature coming soon',
},
},
};

View File

@@ -21,6 +21,10 @@ export default {
esDataOp: 'Es Operation',
mongoDataOp: 'Mongo Operation',
allResource: 'All Resource',
mq: {
kafka: 'Kafka',
kafkaOp: 'Kafka Operation',
},
},
team: {
team: 'Team',

View File

@@ -49,6 +49,7 @@ export default {
reset: '重置',
success: '成功',
fail: '失败',
requestFail: '请求失败',
previousStep: '上一步',
nextStep: '下一步',
copy: '复制',

View File

@@ -0,0 +1,89 @@
export default {
mq: {
kafka: {
hosts: 'Hosts',
username: '用户名',
sasl_mechanism: 'SASL机制',
sasl_mechanism_placeholder: '请选择SASL机制',
key: 'Key',
keywordPlaceholder: 'ip/名称',
nodeManage: '节点管理',
topicManage: '主题管理',
produceMessage: '生产消息',
consumeMessage: '消费消息',
nodes: '节点',
config: '配置',
nodeId: '节点 ID',
addr: 'Addr',
rack: '机架',
viewConfig: '查看配置',
brokerConfig: 'Broker 配置',
topicConfig: 'Topic 配置',
selectTopic: '选择 topic',
selectTopicPlaceholder: '选择 topic',
selectTopicWarning: '请选择 topic',
selectBrokerViewConfig: '请选择 Broker 查看配置',
configName: '配置项',
configValue: '配置值',
configSource: '配置来源',
configSensitive: '敏感',
searchTopic: '输入主题名称',
createTopic: '创建主题',
createPartitions: '添加分区',
topicName: '主题名称',
topicNamePlaceholder: '请输入主题名称',
partitions: '分区',
topicPartitions: 'Topic 分区信息',
replicationFactor: '副本数',
topicStatus: '状态',
topicIsInternal: '是否内建',
viewPartitions: '查看分区',
delete: '删除',
messageKey: '消息 Key',
messageKeyPlaceholder: '可选:输入消息 Key',
partition: '分区号',
partitionPlaceholder: '可选:指定分区号,-1 表示自动分配',
messageBody: '消息内容',
messageHeaders: '消息 Headers',
addHeader: '添加 Header',
headerKey: 'Header Key',
headerValue: 'Header Value',
sendTimes: '发送次数',
compression: '压缩',
compressionPlaceholder: '请选择压缩方式',
sendMessage: '发送消息',
messageNumber: '消息数量',
consumerGroup: '消费者组',
consumerGroupPlaceholder: '请输入消费者组,为空则自动生成',
consumerOnlyTip: '注意仅在Group首次消费时生效后续改动无效',
pullTimeout: '拉取超时 (秒)',
decompression: '解压',
decompressionPlaceholder: '请选择解压方式',
decode: '解码',
decodePlaceholder: '请选择解码方式',
isolationLevel: '隔离级别',
isolationLevelPlaceholder: '请选择隔离级别',
readUncommitted: '读未提交',
readCommitted: '读已提交',
commitOffset: '提交 Offset',
loadOffsets: '读取 Offsets',
defaultConsumePosition: '默认消费位置',
earliest: '最早',
latest: '最新',
defaultConsumeStartTime: '默认消费起始时间',
selectDateTime: '选择日期时间',
offset: '偏移量',
timestamp: '时间戳',
headers: 'Headers',
messageDetail: '消息详情',
groupId: '组 ID',
coordinator: '协调器',
state: '状态',
protocolType: '协议类型',
searchGroup: '输入组名称',
selectGroupPlaceholder: '选择分组',
Members: '成员',
partitionsFeatureComingSoon: '分区详情功能即将上线',
},
},
};

View File

@@ -24,6 +24,10 @@ export default {
mongoDataOp: 'Mongo操作',
containerOp: '容器操作',
allResource: '所有资源',
mq: {
kafka: 'Kafka',
kafkaOp: 'Kafka操作',
},
},
team: {
team: '团队',

View File

@@ -125,7 +125,7 @@
<!-- 行号列 -->
<div v-if="column.key == rowNoColumn.key">
<b class="el-text el-text--small">
{{ rowIndex + 1 }}
{{ (pageNum - 1) * pageSize + rowIndex + 1 }}
</b>
</div>
@@ -259,6 +259,14 @@ const props = defineProps({
type: String,
default: '600px',
},
pageSize: {
type: Number,
default: 25,
},
pageNum: {
type: Number,
default: 1,
},
});
const contextmenuRef = ref();

View File

@@ -140,6 +140,8 @@
:columns="columns"
:loading="loading"
:height="tableHeight"
:page-size="pageSize"
:page-num="pageNum"
:show-column-tip="true"
@sort-change="(sort: any) => onTableSortChange(sort)"
@selection-change="onDataSelectionChange"

View File

@@ -232,7 +232,7 @@
</template>
</el-table-column>
<el-table-column min-width="35">
<el-table-column min-width="40">
<template #default="scope">
<el-button class="mt-1" link type="primary" @click="handleDevicesDelete(scope.$index)">
{{ $t('common.delete') }}

View File

@@ -102,7 +102,7 @@
</template>
</el-table-column>
<el-table-column prop="networks" :label="$t('docker.ip')" :min-width="90">
<el-table-column prop="networks" :label="$t('docker.ip')" :min-width="100" >
<template #default="scope">
<el-tag v-for="network in scope.row.networks" :key="network" type="primary">{{ network || '-' }}</el-tag>
</template>

View File

@@ -0,0 +1,21 @@
import Api from '@/common/Api';
export const mqApi = {
kafkaList: Api.newGet('/mq/kafka'),
KafkaTestConn: Api.newPost('/mq/kafka/test-conn'),
kafkaSave: Api.newPost('/mq/kafka'),
kafkaDel: Api.newDelete('/mq/kafka/{id}'),
kafkaGetPwd: Api.newGet('/mq/kafka/{id}/pwd'),
kafkaTopicList: Api.newGet('/mq/kafka/{id}/getTopics'),
kafkaTopicCreate: Api.newPost('/mq/kafka/{id}/createTopic'),
kafkaTopicInfo: Api.newGet('/mq/kafka/{id}/{topic}/getTopicConfig'),
kafkaTopicDelete: Api.newDelete('/mq/kafka/{id}/{topic}/deleteTopic'),
kafkaTopicCreatePartitions: Api.newPost('/mq/kafka/{id}/createPartitions'),
kafkaTopicProduce: Api.newPost('/mq/kafka/{id}/{topic}/produce'),
kafkaTopicConsume: Api.newPost('/mq/kafka/{id}/{topic}/consume'),
kafkaTopicBrokers: Api.newGet('/mq/kafka/{id}/getBrokers'),
kafkaTopicBrokerConfig: Api.newGet('/mq/kafka/{id}/getBrokerConfig/{brokerId}'),
kafkaGetGroups: Api.newGet('/mq/kafka/{id}/getGroups'),
kafkaDeleteGroup: Api.newDelete('/mq/kafka/{id}/deleteGroup/{group}'),
kafkaGetGroupMembers: Api.newGet('/mq/kafka/{id}/getGroupMembers/{group}'),
};

View File

@@ -0,0 +1,165 @@
<template>
<div>
<el-drawer :title="title" v-model="dialogVisible" :before-close="onCancel" :close-on-click-modal="false" size="40%" :destroy-on-close="true">
<el-form :model="form" ref="kafkaFormRef" :rules="rules" label-width="auto">
<el-form-item prop="tagCodePaths" :label="$t('tag.relateTag')" required>
<tag-tree-select multiple v-model="form.tagCodePaths" />
</el-form-item>
<el-form-item prop="name" :label="$t('common.name')" required>
<el-input v-model.trim="form.name" auto-complete="off"></el-input>
</el-form-item>
<el-form-item prop="hosts" label="Hosts" required>
<el-input
type="textarea"
:rows="2"
v-model.trim="form.hosts"
placeholder="Kafka 连接地址,格式: host1:port1,host2:port2 或单个 broker"
auto-complete="off"
/>
</el-form-item>
<el-form-item prop="saslMechanism" :label="$t('mq.kafka.sasl_mechanism')">
<el-select
v-model="form.saslMechanism"
:options="sasl_mechanism_options"
:placeholder="t('mq.kafka.sasl_mechanism_placeholder')"
filterable
clearable
/>
</el-form-item>
<el-form-item prop="username" :label="$t('mq.kafka.username')">
<el-input v-model.trim="form.username" auto-complete="off"></el-input>
</el-form-item>
<el-form-item prop="password" :label="$t('common.password')">
<el-input v-model.trim="form.password" auto-complete="off"></el-input>
</el-form-item>
<el-form-item prop="sshTunnelMachineId" :label="$t('machine.sshTunnel')">
<ssh-tunnel-select v-model="form.sshTunnelMachineId" />
</el-form-item>
</el-form>
<template #footer>
<div class="dialog-footer">
<el-button @click="onTestConn" :loading="testConnBtnLoading" type="success">{{ $t('ac.testConn') }}</el-button>
<el-button @click="onCancel()">{{ $t('common.cancel') }}</el-button>
<el-button type="primary" :loading="saveBtnLoading" @click="onConfirm">{{ $t('common.confirm') }}</el-button>
</div>
</template>
</el-drawer>
</div>
</template>
<script lang="ts" setup>
import { toRefs, reactive, watchEffect, useTemplateRef } from 'vue';
import { ElMessage } from 'element-plus';
import TagTreeSelect from '../../component/TagTreeSelect.vue';
import SshTunnelSelect from '../../component/SshTunnelSelect.vue';
import { useI18nFormValidate, useI18nSaveSuccessMsg } from '@/hooks/useI18n';
import { useI18n } from 'vue-i18n';
import { Rules } from '@/common/rule';
import { mqApi } from '@/views/ops/mq/api';
const { t } = useI18n();
const props = defineProps({
kafka: {
type: [Boolean, Object],
},
title: {
type: String,
},
});
const sasl_mechanism_options = [
{
label: 'PLAIN',
value: 'PLAIN',
},
{
label: 'SCRAM-SHA-256',
value: 'SCRAM-SHA-256',
},
{
label: 'SCRAM-SHA-512',
value: 'SCRAM-SHA-512',
},
];
const dialogVisible = defineModel<boolean>('visible', { default: false });
//定义事件
const emit = defineEmits(['cancel', 'val-change']);
const rules = {
tagCodePaths: [Rules.requiredSelect('tag.relateTag')],
name: [Rules.requiredInput('common.name')],
uri: [Rules.requiredInput('kafka.connUrl')],
};
const kafkaFormRef: any = useTemplateRef('kafkaFormRef');
const state = reactive({
tabActiveName: 'basic',
form: {
id: null,
code: '',
name: null,
hosts: '',
username: '',
saslMechanism: 'PLAIN',
password: '',
sshTunnelMachineId: null as any,
tagCodePaths: [],
},
submitForm: {},
});
const { tabActiveName, form, submitForm } = toRefs(state);
const { isFetching: testConnBtnLoading, execute: testConnExec } = mqApi.KafkaTestConn.useApi(submitForm);
const { isFetching: saveBtnLoading, execute: saveKafkaExec } = mqApi.kafkaSave.useApi(submitForm);
watchEffect(() => {
if (!dialogVisible.value) {
return;
}
state.tabActiveName = 'basic';
const kafka: any = props.kafka;
if (kafka) {
state.form = { ...kafka };
state.form.tagCodePaths = kafka.tags.map((t: any) => t.codePath);
} else {
state.form = { db: 0, tagCodePaths: [] } as any;
}
});
const getReqForm = () => {
const reqForm = { ...state.form };
if (!state.form.sshTunnelMachineId || state.form.sshTunnelMachineId <= 0) {
reqForm.sshTunnelMachineId = -1;
}
return reqForm;
};
const onTestConn = async () => {
await useI18nFormValidate(kafkaFormRef);
state.submitForm = getReqForm();
await testConnExec();
ElMessage.success(t('ac.connSuccess'));
};
const onConfirm = async () => {
await useI18nFormValidate(kafkaFormRef);
state.submitForm = getReqForm();
await saveKafkaExec();
useI18nSaveSuccessMsg();
emit('val-change', state.form);
onCancel();
};
const onCancel = () => {
dialogVisible.value = false;
emit('cancel');
};
</script>
<style lang="scss"></style>

View File

@@ -0,0 +1,140 @@
<template>
<div class="h-full">
<page-table
ref="pageTableRef"
:page-api="mqApi.kafkaList"
:before-query-fn="checkRouteTagPath"
:search-items="searchItems"
v-model:query-form="query"
:show-selection="true"
v-model:selection-data="selectionData"
:columns="columns"
lazy
>
<template #tableHeader>
<el-button v-auth="'mq:kafka:save'" type="primary" icon="plus" @click="editKafka(false)" plain>{{ $t('common.create') }}</el-button>
<el-button v-auth="'mq:kafka:del'" type="danger" icon="delete" :disabled="selectionData.length < 1" @click="deleteKafka" plain>
{{ $t('common.delete') }}
</el-button>
</template>
<template #tagPath="{ data }">
<resource-tags :tags="data.tags" />
</template>
<template #action="{ data }">
<el-button v-auth="'mq:kafka:save'" @click="editKafka(data)" link type="primary">{{ $t('common.edit') }}</el-button>
</template>
</page-table>
<kafka-edit @val-change="search()" :title="kafkaEditDialog.title" v-model:visible="kafkaEditDialog.visible" v-model:kafka="kafkaEditDialog.data" />
</div>
</template>
<script lang="ts" setup>
import { defineAsyncComponent, onMounted, reactive, ref, Ref, toRefs } from 'vue';
import ResourceTags from '../../component/ResourceTags.vue';
import PageTable from '@/components/pagetable/PageTable.vue';
import { TableColumn } from '@/components/pagetable';
import { TagResourceTypeEnum } from '@/common/commonEnum';
import { useRoute } from 'vue-router';
import { SearchItem } from '@/components/pagetable/SearchForm';
import { useI18nCreateTitle, useI18nDeleteConfirm, useI18nDeleteSuccessMsg, useI18nEditTitle } from '@/hooks/useI18n';
import { mqApi } from '@/views/ops/mq/api';
import { getTagPathSearchItem } from '@/views/ops/component/tag';
const KafkaEdit = defineAsyncComponent(() => import('./KafkaEdit.vue'));
const props = defineProps({
lazy: {
type: [Boolean],
default: false,
},
});
const route = useRoute();
const pageTableRef: Ref<any> = ref(null);
const searchItems = [
SearchItem.input('keyword', 'common.keyword').withPlaceholder('mq.kafka.keywordPlaceholder'),
getTagPathSearchItem(TagResourceTypeEnum.MqKafka.value),
];
const columns = [
TableColumn.new('tags[0].tagPath', 'tag.relateTag').isSlot('tagPath').setAddWidth(20),
TableColumn.new('name', 'common.name'),
TableColumn.new('hosts', 'Hosts'),
TableColumn.new('username', 'mq.kafka.username'),
TableColumn.new('password', 'common.password'),
TableColumn.new('createTime', 'common.createTime').isTime(),
TableColumn.new('creator', 'common.creator'),
TableColumn.new('code', 'common.code'),
TableColumn.new('action', 'common.operation').isSlot().setMinWidth(170).fixedRight().alignCenter(),
];
const state = reactive({
dbOps: {
dbId: 0,
db: '',
},
selectionData: [],
query: {
pageNum: 1,
pageSize: 0,
tagPath: '',
},
kafkaEditDialog: {
visible: false,
data: null as any,
title: '',
},
});
const { selectionData, query, kafkaEditDialog } = toRefs(state);
const checkRouteTagPath = (query: any) => {
if (route.query.tagPath) {
query.tagPath = route.query.tagPath as string;
}
return query;
};
const deleteKafka = async () => {
try {
await useI18nDeleteConfirm(state.selectionData.map((x: any) => x.name).join('、'));
await mqApi.kafkaDel.request({ id: state.selectionData.map((x: any) => x.id).join(',') });
useI18nDeleteSuccessMsg();
search();
} catch (err) {
//
}
};
const search = async (tagPath: string = '') => {
if (tagPath) {
state.query.tagPath = tagPath;
}
pageTableRef.value.search();
};
const editKafka = async (data: any) => {
if (!data) {
state.kafkaEditDialog.data = null;
state.kafkaEditDialog.title = useI18nCreateTitle('Kafka');
} else {
state.kafkaEditDialog.data = data;
state.kafkaEditDialog.title = useI18nEditTitle('Kafka');
}
state.kafkaEditDialog.visible = true;
};
onMounted(() => {
if (!props.lazy) {
search();
}
});
defineExpose({ search });
</script>
<style></style>

View File

@@ -0,0 +1,278 @@
<template>
<div class="kafka-consume-message h-full card !p-1">
<el-form ref="consumeFormRef" :model="form" label-width="auto" size="small">
<el-row :gutter="10">
<el-col :span="10">
<el-form-item :label="$t('mq.kafka.selectTopic')" required>
<el-select v-model="form.topic" filterable :placeholder="$t('mq.kafka.selectTopicPlaceholder')" clearable>
<el-option v-for="topic in topics" :key="topic" :label="topic" :value="topic" />
</el-select>
</el-form-item>
</el-col>
<el-col :span="5">
<el-form-item :label="$t('mq.kafka.messageNumber')" required>
<el-input-number v-model="form.number" :min="1" :max="1000" />
</el-form-item>
</el-col>
<el-col :span="9">
<el-form-item :label="$t('mq.kafka.consumerGroup')">
<el-select v-model="form.group" filterable :placeholder="$t('mq.kafka.consumerGroupPlaceholder')" clearable allow-create>
<el-option label="(auto generate)" value="" />
<el-option v-for="g in groups" :key="g.Group" :label="g.Group" :value="g.Group" />
</el-select>
</el-form-item>
</el-col>
</el-row>
<el-row :gutter="10">
<el-col :span="5">
<el-form-item :label="$t('mq.kafka.pullTimeout')">
<el-input-number v-model="form.pullTimeout" :min="1" :max="100" />
</el-form-item>
</el-col>
<el-col :span="5">
<el-form-item :label="$t('mq.kafka.decompression')">
<el-select v-model="form.decompression" :placeholder="$t('mq.kafka.decompressionPlaceholder')" clearable>
<el-option label="none" value="" />
<el-option label="gzip" value="gzip" />
<el-option label="lz4" value="lz4" />
<el-option label="zstd" value="zstd" />
<el-option label="snappy" value="snappy" />
</el-select>
</el-form-item>
</el-col>
<el-col :span="5">
<el-form-item :label="$t('mq.kafka.decode')">
<el-select v-model="form.decode" :placeholder="$t('mq.kafka.decodePlaceholder')" clearable>
<el-option label="None" value="" />
<el-option label="Base64" value="base64" />
</el-select>
</el-form-item>
</el-col>
<el-col :span="5">
<el-form-item :label="$t('mq.kafka.isolationLevel')">
<el-select v-model="form.isolationLevel" :placeholder="$t('mq.kafka.isolationLevelPlaceholder')">
<el-option :label="$t('mq.kafka.readUncommitted')" value="read_uncommitted" />
<el-option :label="$t('mq.kafka.readCommitted')" value="read_committed" />
</el-select>
</el-form-item>
</el-col>
</el-row>
<el-row :gutter="10">
<el-col :span="5">
<el-form-item :label="$t('mq.kafka.commitOffset')">
<el-switch v-model="form.commitOffset" />
</el-form-item>
</el-col>
<el-col :span="7">
<el-tooltip :content="$t('mq.kafka.consumerOnlyTip')">
<el-form-item :label="$t('mq.kafka.defaultConsumePosition')">
<el-switch v-model="form.earliest" :active-text="$t('mq.kafka.earliest')" :inactive-text="$t('mq.kafka.latest')" />
</el-form-item>
</el-tooltip>
</el-col>
<el-col :span="8">
<el-tooltip :content="$t('mq.kafka.consumerOnlyTip')">
<el-form-item :label="$t('mq.kafka.defaultConsumeStartTime')">
<el-date-picker
v-model="form.startTime"
type="datetime"
:placeholder="$t('mq.kafka.selectDateTime')"
value-format="YYYY-MM-DD HH:mm:ss"
size="small"
/>
</el-form-item>
</el-tooltip>
</el-col>
</el-row>
<el-form-item>
<el-button @click="resetForm" icon="refresh">{{ $t('common.reset') }}</el-button>
<el-button @click="consumeMessage" type="primary" icon="download" :loading="consuming">
{{ $t('mq.kafka.consumeMessage') }}
</el-button>
</el-form-item>
</el-form>
<el-table :data="messages" stripe style="width: 100%" v-loading="consuming" max-height="700">
<el-table-column prop="offset" :label="$t('mq.kafka.offset')" min-width="100" />
<el-table-column prop="partition" :label="$t('mq.kafka.partition')" min-width="80" />
<el-table-column prop="key" :label="$t('mq.kafka.key')" min-width="150" />
<el-table-column prop="timestamp" :label="$t('mq.kafka.timestamp')" min-width="180" />
<el-table-column prop="value" :label="$t('mq.kafka.messageBody')" min-width="300">
<template #default="{ row }">
<div class="flex items-center">
<el-input v-model="row.displayValue" type="textarea" :rows="1" size="small" class="flex-1" />
<SvgIcon
v-if="row.value && row.value.length > 50"
@click="viewMessageDetail(row)"
class="string-input-container-icon ml-1 cursor-pointer"
name="FullScreen"
:size="10"
/>
</div>
</template>
</el-table-column>
<el-table-column prop="headers" :label="$t('mq.kafka.headers')" min-width="150">
<template #default="{ row }">
{{ JSON.stringify(row.headers) }}
</template>
</el-table-column>
</el-table>
</div>
</template>
<script lang="ts" setup>
import { ref, reactive, toRefs, onMounted, defineAsyncComponent, watch } from 'vue';
import { mqApi } from '../../api';
import { ElMessage } from 'element-plus';
import { useI18n } from 'vue-i18n';
import SvgIcon from '@/components/svgIcon/index.vue';
import MonacoEditorBox from '@/components/monaco/MonacoEditorBox';
import { ConsumerGroup } from '@/views/ops/mq/kafka/component/ConsumerGroup.vue';
import { randomUuid } from '@/common/utils/string';
const { t } = useI18n();
const props = defineProps({
kafkaId: {
type: Number,
required: true,
},
defaultTopic: {
type: String,
default: '',
},
topics: {
type: Array as () => string[],
default: () => [],
},
groups: {
type: Array as () => ConsumerGroup[],
default: () => [],
},
});
const consumeFormRef = ref();
const consuming = ref(false);
const state = reactive({
form: {
topic: '',
number: 10,
group: '',
pullTimeout: 10,
decompression: '',
decode: '',
isolationLevel: 'read_uncommitted',
commitOffset: false,
earliest: true,
startTime: '',
},
messages: [] as any[],
});
const { form, messages } = toRefs(state);
onMounted(() => {
if (props.defaultTopic) {
state.form.topic = props.defaultTopic;
}
});
watch(
() => props.defaultTopic,
(newTopic) => {
state.form.topic = newTopic || '';
}
);
const resetForm = () => {
state.form = {
topic: props.defaultTopic || '',
number: 10,
group: '',
pullTimeout: 10,
decompression: '',
decode: '',
isolationLevel: 'read_uncommitted',
commitOffset: false,
earliest: true,
startTime: '',
};
state.messages = [];
};
const consumeMessage = async () => {
if (!consumeFormRef.value) return;
consuming.value = true;
if (!state.form.group) {
state.form.group = '__mayfly-server__' + randomUuid();
}
try {
const param = {
id: props.kafkaId,
...state.form,
};
const res = await mqApi.kafkaTopicConsume.request(param);
state.messages = (res || []).map((msg: any, index: number) => ({
...msg,
displayValue: typeof msg.value === 'object' ? JSON.stringify(msg.value, null, 2) : String(msg.value),
}));
} catch (error: any) {
ElMessage.error(error.message || t('common.requestFail'));
} finally {
consuming.value = false;
}
};
const viewMessageDetail = (row: any) => {
const value = typeof row.value === 'object' ? JSON.stringify(row.value, null, 2) : String(row.value);
const editorLang = getEditorLangByValue(value);
MonacoEditorBox({
content: value,
title: `${t('mq.kafka.messageBody')} - Offset ${row.offset}`,
language: editorLang,
showConfirmButton: false,
closeFn: () => {},
});
};
const getEditorLangByValue = (value: any) => {
try {
if (typeof JSON.parse(value) === 'object') {
return 'json';
}
} catch (e) {
/* empty */
}
try {
const doc = new DOMParser().parseFromString(value, 'text/html');
if (Array.from(doc.body.childNodes).some((node) => node.nodeType === 1)) {
return 'html';
}
} catch (e) {
/* empty */
}
return 'text';
};
</script>
<style lang="scss" scoped>
.kafka-consume-message {
overflow: auto;
.string-input-container-icon {
color: var(--el-color-primary);
&:hover {
color: var(--el-color-success);
}
}
}
</style>

View File

@@ -0,0 +1,127 @@
<template>
<div class="kafka-consumer-group h-full card !p-1">
<div class="toolbar flex items-center justify-between mb-2">
<div class="flex items-center">
<el-input v-model="searchGroup" :placeholder="$t('mq.kafka.searchGroup')" clearable size="small" class="w-60" @clear="loadGroups" />
<el-button @click="loadGroups" icon="refresh" :loading="loading" size="small" plain class="ml-2">
{{ $t('common.refresh') }}
</el-button>
</div>
<div class="flex items-center">
<span class="text-sm text-gray-500 mr-2">{{ $t('count') + ` ${groups.length}` }}</span>
</div>
</div>
<el-table :data="filteredGroups" stripe style="width: 100%" v-loading="loading">
<el-table-column prop="Group" :label="$t('mq.kafka.groupId')" min-width="250" />
<el-table-column prop="Coordinator" :label="$t('mq.kafka.coordinator')" min-width="150" />
<el-table-column prop="State" :label="$t('mq.kafka.state')" min-width="120">
<template #default="{ row }">
<el-tag :type="getStateTagType(row.State)" size="small">{{ row.State }}</el-tag>
</template>
</el-table-column>
<el-table-column prop="ProtocolType" :label="$t('mq.kafka.protocolType')" min-width="150" />
<el-table-column :label="$t('common.operation')" width="150" fixed="right">
<template #default="{ row }">
<el-button @click="handleGetGroupMembers(row)" size="small" icon="setting" link>
{{ $t('mq.kafka.Members') }}
</el-button>
<el-button @click="handleDeleteGroup(row)" type="danger" size="small" icon="delete" link v-auth="'kafka:group:delete'">
{{ $t('common.delete') }}
</el-button>
</template>
</el-table-column>
</el-table>
</div>
</template>
<script lang="ts" setup>
import { ref, reactive, toRefs, computed } from 'vue';
import { mqApi } from '../../api';
import { ElMessage } from 'element-plus';
import { useI18n } from 'vue-i18n';
import { useI18nDeleteConfirm, useI18nSaveSuccessMsg } from '@/hooks/useI18n';
export interface ConsumerGroup {
Coordinator: number;
State: string;
ProtocolType: string;
Group: string;
}
const { t } = useI18n();
const props = defineProps({
kafkaId: {
type: Number,
required: true,
},
groups: {
type: Array as () => ConsumerGroup[],
default: () => [],
},
loading: {
type: Boolean,
default: false,
},
});
const emits = defineEmits(['refresh']);
const searchGroup = ref('');
const filteredGroups = computed(() => {
if (!searchGroup.value) {
return props.groups;
}
return props.groups.filter((group: ConsumerGroup) => group.Group.toLowerCase().includes(searchGroup.value.toLowerCase()));
});
const loadGroups = () => {
emits('refresh');
};
const handleDeleteGroup = async (group: ConsumerGroup) => {
await useI18nDeleteConfirm(`Group: ${group.Group}`);
try {
await mqApi.kafkaDeleteGroup.request({
id: props.kafkaId,
group: group.Group,
});
useI18nSaveSuccessMsg();
emits('refresh');
} catch (error: any) {
ElMessage.error(error.message || t('common.requestFail'));
}
};
const handleGetGroupMembers = async (group: ConsumerGroup) => {
try {
let res = await mqApi.kafkaGetGroupMembers.request({
id: props.kafkaId,
group: group.Group,
});
console.log(res);
} catch (error: any) {
ElMessage.error(error.message || t('common.requestFail'));
}
};
const getStateTagType = (state: string) => {
switch (state?.toLowerCase()) {
case 'stable':
return 'success';
default:
return '';
}
};
</script>
<style lang="scss" scoped>
.kafka-consumer-group {
.toolbar {
display: flex;
justify-content: space-between;
align-items: center;
}
}
</style>

View File

@@ -0,0 +1,147 @@
<template>
<div class="kafka-node-manage h-full card !p-1">
<div class="toolbar flex items-center mb-2">
<el-button @click="refreshBrokers" icon="refresh" :loading="loading" size="small" plain>
{{ $t('common.refresh') }}
</el-button>
</div>
<el-table :data="brokers" stripe style="width: 100%" v-loading="loading">
<el-table-column prop="id" :label="$t('mq.kafka.nodeId')" min-width="100" />
<el-table-column prop="addr" :label="$t('mq.kafka.addr')" min-width="100" />
<el-table-column prop="rack" :label="$t('mq.kafka.rack')" min-width="150" />
<el-table-column :label="$t('common.operation')" width="120" fixed="right">
<template #default="{ row }">
<el-button @click="viewBrokerConfig(row)" type="primary" size="small" icon="setting" link>
{{ $t('mq.kafka.viewConfig') }}
</el-button>
</template>
</el-table-column>
</el-table>
<el-drawer
v-model="openDrawer"
:before-close="cancel"
:destroy-on-close="true"
:close-on-click-modal="false"
size="80%"
:title="$t('mq.kafka.brokerConfig') + selectedBroker?.addr"
>
<div class="toolbar">
<div class="">
<el-input v-model="searchConfig" :placeholder="$t('mq.kafka.configName')" clearable size="small" class="w-60 mb-2" />
</div>
<span class="text-sm text-gray-500">{{ `count: ${filteredBrokerConfigs.length}` }}</span>
</div>
<el-table :data="filteredBrokerConfigs" stripe style="width: 100%" v-loading="loading">
<el-table-column type="index" label="#" width="50" />
<el-table-column prop="Key" :label="$t('mq.kafka.configName')" min-width="200" />
<el-table-column prop="Value" :label="$t('mq.kafka.configValue')" min-width="300" />
<el-table-column prop="Source" :label="$t('mq.kafka.configSource')" min-width="150" />
<el-table-column prop="Sensitive" :label="$t('mq.kafka.configSensitive')" min-width="150" />
</el-table>
</el-drawer>
</div>
</template>
<script lang="ts" setup>
import { ref, reactive, toRefs, computed, onMounted } from 'vue';
import { mqApi } from '../../api';
import { ElMessage } from 'element-plus';
import { useI18n } from 'vue-i18n';
interface Broker {
id: number;
addr: string;
rack: string;
}
interface BrokerConfig {
Key: string;
Value: string;
Source: number;
Sensitive: boolean;
}
const { t } = useI18n();
const props = defineProps({
kafkaId: {
type: Number,
required: true,
},
});
const loading = ref(false);
const selectedBroker = ref<Broker | null>(null);
const openDrawer = ref(false);
const searchConfig = ref('');
const state = reactive({
brokers: [] as Broker[],
brokerConfigs: [] as BrokerConfig[],
});
const cancel = () => {
state.brokerConfigs = [];
openDrawer.value = false;
searchConfig.value = '';
};
const { brokers, brokerConfigs } = toRefs(state);
const filteredBrokerConfigs = computed(() => {
if (!searchConfig.value) {
return state.brokerConfigs;
}
return state.brokerConfigs.filter((config: BrokerConfig) => config.Key.toLowerCase().includes(searchConfig.value.toLowerCase()));
});
onMounted(() => {
refreshBrokers();
});
const refreshBrokers = async () => {
loading.value = true;
try {
const res = await mqApi.kafkaTopicBrokers.request({ id: props.kafkaId });
state.brokers = res || [];
} catch (error: any) {
ElMessage.error(error.message || t('common.requestFail'));
} finally {
loading.value = false;
}
};
const viewBrokerConfig = async (broker: Broker) => {
selectedBroker.value = broker;
openDrawer.value = true;
loading.value = true;
try {
const res = await mqApi.kafkaTopicBrokerConfig.request({
id: props.kafkaId,
brokerId: broker.id,
});
if (res && res[broker.id].Configs) {
res[broker.id].Configs.sort((a: any, b: any) => (a['Key'] > b['Key'] ? 1 : -1));
state.brokerConfigs = res && res[broker.id].Configs;
} else {
state.brokerConfigs = [];
}
} catch (error: any) {
ElMessage.error(error.message || t('common.requestFail'));
} finally {
loading.value = false;
}
};
</script>
<style lang="scss" scoped>
.toolbar {
display: flex;
justify-content: space-between;
align-items: center;
}
</style>

View File

@@ -0,0 +1,197 @@
<template>
<div class="kafka-produce-message h-full card !p-1">
<el-form ref="produceFormRef" :model="form" label-width="auto" size="small">
<el-row :gutter="10">
<el-col :span="8">
<el-form-item :label="$t('mq.kafka.selectTopic')" required>
<template #label>
<el-space>
<span>{{ $t('mq.kafka.selectTopic') }}</span>
<el-button icon="refresh" link />
</el-space>
</template>
<el-select v-model="form.topic" filterable :placeholder="$t('mq.kafka.selectTopicPlaceholder')">
<el-option v-for="topic in topics" :key="topic" :label="topic" :value="topic" />
</el-select>
</el-form-item>
</el-col>
<el-col :span="8">
<el-form-item :label="$t('mq.kafka.messageKey')">
<el-input v-model="form.key" :placeholder="$t('mq.kafka.messageKeyPlaceholder')" />
</el-form-item>
</el-col>
<el-col :span="6">
<el-form-item :label="$t('mq.kafka.partition')">
<el-tooltip :content="$t('mq.kafka.partitionPlaceholder')">
<el-input-number v-model="form.partition" :min="0" :max="100" />
</el-tooltip>
</el-form-item>
</el-col>
</el-row>
<el-form-item :label="$t('mq.kafka.messageBody')" required>
<monaco-editor v-model="form.value" language="json" height="200px" :can-change-mode="true" />
</el-form-item>
<el-form-item :label="$t('mq.kafka.messageHeaders')">
<div class="w-full">
<el-button @click="addHeader" type="primary" size="small" icon="plus">
{{ $t('mq.kafka.addHeader') }}
</el-button>
<div class="mt-2" v-if="form.headers && form.headers.length > 0">
<div v-for="(header, index) in form.headers" :key="index" class="flex items-center mb-2">
<el-input v-model="header.key" :placeholder="$t('mq.kafka.headerKey')" size="small" class="w-60 mr-2" />
<el-input v-model="header.value" :placeholder="$t('mq.kafka.headerValue')" size="small" class="w-80 mr-2" />
<el-button @click="removeHeader(index)" type="danger" size="small" icon="delete" />
</div>
</div>
</div>
</el-form-item>
<el-row :gutter="10">
<el-col :span="6">
<el-form-item :label="$t('mq.kafka.sendTimes')">
<el-input-number v-model="form.times" :min="1" :max="100" />
</el-form-item>
</el-col>
<el-col :span="6">
<el-form-item :label="$t('mq.kafka.compression')">
<el-select v-model="form.compression" :placeholder="$t('mq.kafka.compressionPlaceholder')">
<el-option label="none" value="" />
<el-option label="gzip" value="gzip" />
<el-option label="lz4" value="lz4" />
<el-option label="zstd" value="zstd" />
<el-option label="snappy" value="snappy" />
</el-select>
</el-form-item>
</el-col>
</el-row>
<el-form-item>
<el-button @click="resetForm" icon="refresh">{{ $t('common.reset') }}</el-button>
<el-button @click="sendMessage" type="primary" icon="upload" :loading="sending">
{{ $t('mq.kafka.sendMessage') }}
</el-button>
</el-form-item>
</el-form>
</div>
</template>
<script lang="ts" setup>
import { ref, reactive, toRefs, onMounted, defineAsyncComponent, watch } from 'vue';
import { mqApi } from '../../api';
import { ElMessage } from 'element-plus';
import { useI18n } from 'vue-i18n';
import { Rules } from '@/common/rule';
import { useI18nOperateSuccessMsg, useI18nSaveSuccessMsg } from '@/hooks/useI18n';
const MonacoEditor = defineAsyncComponent(() => import('@/components/monaco/MonacoEditor.vue'));
interface Header {
key: string;
value: string;
}
const { t } = useI18n();
const props = defineProps({
kafkaId: {
type: Number,
required: true,
},
defaultTopic: {
type: String,
default: '',
},
topics: {
type: Array as () => string[],
default: () => [],
},
});
const produceFormRef = ref();
const sending = ref(false);
const state = reactive({
form: {
topic: '',
key: '',
value: '',
partition: 0,
headers: [] as Header[],
times: 1,
compression: '',
},
});
const { form } = toRefs(state);
onMounted(() => {
if (props.defaultTopic) {
state.form.topic = props.defaultTopic;
}
});
watch(
() => props.defaultTopic,
(newTopic) => {
state.form.topic = newTopic || '';
}
);
const addHeader = () => {
if (!state.form.headers) {
state.form.headers = [];
}
state.form.headers.push({ key: '', value: '' });
};
const removeHeader = (index: number) => {
state.form.headers.splice(index, 1);
};
const resetForm = () => {
state.form = {
topic: props.defaultTopic || '',
key: '',
value: '',
partition: -1,
headers: [] as Header[],
times: 1,
compression: '',
};
};
const sendMessage = async () => {
if (!produceFormRef.value) return;
await produceFormRef.value.validate();
sending.value = true;
try {
const param = {
id: props.kafkaId,
topic: state.form.topic,
key: state.form.key,
value: state.form.value,
partition: state.form.partition,
headers: state.form.headers.filter((h: any) => h.key || h.value),
times: state.form.times,
compression: state.form.compression,
};
await mqApi.kafkaTopicProduce.request(param);
useI18nOperateSuccessMsg();
} catch (error: any) {
ElMessage.error(error.message || t('common.requestFail'));
} finally {
sending.value = false;
}
};
</script>
<style lang="scss" scoped>
.kafka-produce-message {
overflow: auto;
}
</style>

View File

@@ -0,0 +1,555 @@
<template>
<div class="kafka-topic-manage h-full card !p-1">
<div class="toolbar flex items-center justify-between mb-2">
<div class="flex items-center">
<el-input v-model="searchTopic" :placeholder="$t('mq.kafka.searchTopic')" clearable size="small" class="w-60" @clear="loadTopics" />
<el-button @click="loadTopics" icon="refresh" :loading="loading" size="small" plain class="ml-2">
{{ $t('common.refresh') }}
</el-button>
<el-button @click="showCreateTopicDialog" type="primary" size="small" icon="plus" v-auth="'kafka:topic:create'">
{{ $t('mq.kafka.createTopic') }}
</el-button>
<el-button @click="openViewPartitions" type="primary" size="small" icon="data-line">
{{ $t('mq.kafka.partitions') }}
</el-button>
</div>
<span class="text-sm text-gray-500">{{ `count: ${filteredTopics.length}` }}</span>
</div>
<el-table :data="filteredTopics" stripe style="width: 100%" v-loading="loading" @row-contextmenu="handleRowContextmenu">
<el-table-column prop="name" :label="$t('mq.kafka.topicName')" min-width="200">
<template #default="{ row }">
<el-link type="primary" @click="viewTopicConfig(row)">{{ row.name }}</el-link>
</template>
</el-table-column>
<el-table-column prop="partitionCount" :label="$t('mq.kafka.partitions')" min-width="100">
<template #default="{ row }">
<el-link type="primary" @click="viewPartitions(row)">{{ row.partitionCount }}</el-link>
</template>
</el-table-column>
<el-table-column prop="replicationFactor" :label="$t('mq.kafka.replicationFactor')" min-width="120" />
<el-table-column prop="status" :label="$t('mq.kafka.topicStatus')" min-width="100">
<template #default="{ row }">
<el-tag :type="row.status === 'HEALTHY' ? 'success' : 'danger'" size="small">{{ row.status }}</el-tag>
</template>
</el-table-column>
<el-table-column prop="isInternal" :label="$t('mq.kafka.topicIsInternal')" min-width="100">
<template #default="{ row }">
<el-tag :type="row.isInternal ? 'success' : 'danger'" size="small">{{ row.isInternal ? 'Y' : 'N' }}</el-tag>
</template>
</el-table-column>
<el-table-column :label="$t('common.operation')" width="130" fixed="right">
<template #default="{ row }">
<el-dropdown trigger="click" @command="handleTopicCommand($event, row)">
<el-button size="small" icon="more">
{{ $t('common.operation') }}
</el-button>
<template #dropdown>
<el-dropdown-menu>
<el-dropdown-item command="produce" icon="upload" v-auth="'kafka:topic:produce'">
{{ $t('mq.kafka.produceMessage') }}
</el-dropdown-item>
<el-dropdown-item command="consume" icon="download" v-auth="'kafka:topic:consume'">
{{ $t('mq.kafka.consumeMessage') }}
</el-dropdown-item>
<el-dropdown-item command="partitions" icon="data-line">
{{ $t('mq.kafka.viewPartitions') }}
</el-dropdown-item>
<el-dropdown-item command="config" icon="setting">
{{ $t('mq.kafka.viewConfig') }}
</el-dropdown-item>
<el-dropdown-item command="delete" icon="delete" v-auth="'kafka:topic:delete'" divided>
{{ $t('common.delete') }}
</el-dropdown-item>
</el-dropdown-menu>
</template>
</el-dropdown>
</template>
</el-table-column>
</el-table>
<contextmenu :dropdown="contextmenu.dropdown" :items="contextmenu.items" ref="contextmenuRef" />
<!-- 创建 Topic 对话框 -->
<el-dialog :title="$t('mq.kafka.createTopic')" v-model="createTopicDialog.visible" width="600px" :close-on-click-modal="false">
<el-form ref="createTopicFormRef" :model="createTopicDialog.form" :rules="createTopicFormRules" label-width="auto">
<el-form-item :label="$t('mq.kafka.topicName')" prop="topic">
<el-input v-model="createTopicDialog.form.topic" :placeholder="$t('mq.kafka.topicNamePlaceholder')" />
</el-form-item>
<el-form-item :label="$t('mq.kafka.partitions')" prop="numPartitions">
<el-input-number v-model="createTopicDialog.form.numPartitions" :min="1" :max="100" />
</el-form-item>
<el-form-item :label="$t('mq.kafka.replicationFactor')" prop="replicationFactor">
<el-input-number v-model="createTopicDialog.form.replicationFactor" :min="1" :max="10" />
</el-form-item>
</el-form>
<template #footer>
<el-button @click="createTopicDialog.visible = false">{{ $t('common.cancel') }}</el-button>
<el-button type="primary" @click="confirmCreateTopic" :loading="createTopicDialog.loading">
{{ $t('common.confirm') }}
</el-button>
</template>
</el-dialog>
<!-- 创建 partitions 对话框 -->
<el-dialog :title="$t('mq.kafka.createPartitions')" v-model="createPartitionsDialog.visible" width="600px" :close-on-click-modal="false">
<el-form ref="createPartitionsFormRef" :model="createPartitionsDialog.form" :rules="createPartitionsFormRules" label-width="auto">
<el-form-item :label="$t('mq.kafka.partitions')" prop="numPartitions">
<el-input-number v-model="createPartitionsDialog.form.numPartitions" :min="1" :max="100" />
</el-form-item>
</el-form>
<template #footer>
<el-button @click="createPartitionsDialog.visible = false">{{ $t('common.cancel') }}</el-button>
<el-button type="primary" @click="confirmCreatePartitions" :loading="createPartitionsDialog.loading">
{{ $t('common.confirm') }}
</el-button>
</template>
</el-dialog>
<!-- Topic 配置对话框 -->
<el-drawer
v-model="topicConfigDialog.visible"
:before-close="cancelViewTopicConfig"
:destroy-on-close="true"
:close-on-click-modal="false"
size="80%"
:title="`${$t('mq.kafka.topicConfig')} [${topicConfigDialog.topic}] `"
>
<div class="toolbar">
<div class="">
<el-input v-model="searchTopicConfig" :placeholder="$t('mq.kafka.configName')" clearable size="small" class="mb-2" />
</div>
<span class="text-sm text-gray-500">{{ `count: ${filteredTopicConfigs.length}` }}</span>
</div>
<el-table :data="filteredTopicConfigs" stripe style="width: 100%" v-loading="loading">
<el-table-column type="index" label="#" width="50" />
<el-table-column prop="Key" :label="$t('mq.kafka.configName')" min-width="200" />
<el-table-column prop="Value" :label="$t('mq.kafka.configValue')" min-width="300" />
<el-table-column prop="Source" :label="$t('mq.kafka.configSource')" min-width="150" />
<el-table-column prop="Sensitive" :label="$t('mq.kafka.configSensitive')" min-width="150" />
</el-table>
</el-drawer>
<!-- Topic 分区信息 -->
<el-drawer
v-model="topicPartitionsDialog.visible"
:before-close="cancelViewTopicPartitions"
:destroy-on-close="true"
:close-on-click-modal="false"
size="80%"
:title="`${$t('mq.kafka.topicPartitions')} [${topicPartitionsDialog.topic}] `"
>
<div class="toolbar flex items-center justify-between mb-2">
<div class="flex items-center">
<el-select
v-model="topicPartitionsDialog.topic"
:placeholder="$t('mq.kafka.selectTopicPlaceholder')"
clearable
size="small"
filterable
style="width: 150px"
@change="loadTopicPartitions"
>
<el-option v-for="topic in topics" :key="topic.name" :label="topic.name" :value="topic.name" @select="viewPartitions(topic)" />
</el-select>
<el-button @click="showCreatePartitionsDialog" class="ml-3" type="primary" size="small" icon="plus" v-auth="'kafka:topic:create'">
{{ $t('mq.kafka.createPartitions') }}
</el-button>
<el-select
v-model="topicPartitionsDialog.group"
:placeholder="$t('mq.kafka.selectGroupPlaceholder')"
clearable
size="small"
filterable
class="ml-3"
style="width: 150px"
@change="loadOffsets"
>
<el-option v-for="g in groups" :key="g.Group" :label="g.Group" :value="g.Group" />
</el-select>
<el-button @click="loadGroups" icon="refresh" :loading="groupLoading" size="small" plain class="ml-3">
{{ $t('common.refresh') }}group
</el-button>
<el-button @click="loadOffsets" icon="refresh" :loading="groupLoading" size="small" plain class="ml-3">
{{ $t('mq.kafka.loadOffsets') }}
</el-button>
</div>
<span class="text-sm text-gray-500">{{ `count: ${topicPartitionsDialog.topicPartitions.length}` }}</span>
</div>
<el-table :data="topicPartitionsDialog.topicPartitions" style="width: 100%" v-loading="loading">
<el-table-column type="index" label="ID" width="50" :index="topicPartitionsIndexMethod" />
<el-table-column prop="leader" label="Leader" min-width="150" />
<el-table-column prop="Health" label="Health" min-width="150">
<template #default="{ row }">
<el-tag :type="row.err == 0 ? 'success' : 'danger'" size="small">{{ row.err == 0 ? 'HEALTHY' : `Error: ${row.err}` }}</el-tag>
</template>
</el-table-column>
<el-table-column prop="LeaderEpoch" label="LeaderEpoch" min-width="150" />
<el-table-column prop="OfflineReplicas" label="OfflineReplicas" min-width="150" />
<el-table-column prop="replicas" label="replicas" min-width="150">
<template #default="{ row }">
<el-tag v-for="(replica, index) in row.replicas" :key="index">
{{ replica }}
</el-tag>
</template>
</el-table-column>
<el-table-column prop="isr" label="isr" min-width="150">
<template #default="{ row }">
<el-tag v-for="(isr, index) in row.isr" :key="index">
{{ isr }}
</el-tag>
</template>
</el-table-column>
</el-table>
</el-drawer>
</div>
</template>
<script lang="ts" setup>
import { ref, reactive, toRefs, computed, nextTick } from 'vue';
import { mqApi } from '../../api';
import { ElMessage } from 'element-plus';
import { useI18n } from 'vue-i18n';
import { Contextmenu, ContextmenuItem } from '@/components/contextmenu';
import { Rules } from '@/common/rule';
import { useI18nDeleteConfirm, useI18nSaveSuccessMsg } from '@/hooks/useI18n';
import { ConsumerGroup } from '@/views/ops/mq/kafka/component/ConsumerGroup.vue';
interface Partitions {
LeaderEpoch: number;
OfflineReplicas: null;
err: string;
isr: number[];
leader: number;
partition: number;
replicas: number[];
}
interface Topic {
name: string;
partitionCount: number;
replicationFactor: number;
status: string;
partitions: Partitions[];
}
interface TopicConfig {
Key: string;
Value: string;
ReadOnly: boolean;
Default: boolean;
Source: number;
Sensitive: boolean;
}
const { t } = useI18n();
const props = defineProps({
kafkaId: {
type: Number,
required: true,
},
topics: {
type: Array as () => Topic[],
default: () => [],
},
groups: {
type: Array as () => ConsumerGroup[],
default: () => [],
},
loading: {
type: Boolean,
default: false,
},
});
const emits = defineEmits(['produce', 'consume', 'refresh']);
const searchTopic = ref('');
const searchTopicConfig = ref('');
const groupLoading = ref(false);
const state = reactive({
createTopicDialog: {
visible: false,
loading: false,
form: {
topic: '',
numPartitions: 1,
replicationFactor: 1,
},
},
createPartitionsDialog: {
visible: false,
loading: false,
form: {
topic: '',
numPartitions: 1,
},
},
topicConfigDialog: {
visible: false,
topic: '',
topicConfigs: [] as TopicConfig[],
},
topicPartitionsDialog: {
visible: false,
topic: '',
group: '',
topicPartitions: [] as Partitions[],
},
contextmenu: {
dropdown: {
x: 0,
y: 0,
},
items: [
new ContextmenuItem('produce', 'kafka.produceMessage')
.withIcon('upload')
.withPermission('kafka:topic:produce')
.withOnClick((data: any) => handleProduceMessage(data)),
new ContextmenuItem('consume', 'kafka.consumeMessage')
.withIcon('download')
.withPermission('kafka:topic:consume')
.withOnClick((data: any) => handleConsumeMessage(data)),
new ContextmenuItem('partitions', 'kafka.viewPartitions').withIcon('data-line').withOnClick((data: any) => viewPartitions(data)),
new ContextmenuItem('config', 'kafka.viewConfig').withIcon('setting').withOnClick((data: any) => viewTopicConfig(data)),
new ContextmenuItem('delete', 'common.delete')
.withIcon('delete')
.withPermission('kafka:topic:delete')
.withOnClick((data: any) => handleDeleteTopic(data)),
] as ContextmenuItem[],
},
});
const { createTopicDialog, createPartitionsDialog, topicConfigDialog, topicPartitionsDialog, contextmenu } = toRefs(state);
const contextmenuRef = ref();
const topicPartitionsIndexMethod = (index: number) => {
return index;
};
// 使用 computed 包装 props保持模板中的响应性
const topics = computed(() => props.topics);
const groups = computed(() => props.groups);
const loading = computed(() => props.loading);
const filteredTopicConfigs = computed(() => {
if (!searchTopicConfig.value) {
return state.topicConfigDialog.topicConfigs;
}
return state.topicConfigDialog.topicConfigs.filter((config: TopicConfig) => config.Key.toLowerCase().includes(searchTopicConfig.value.toLowerCase()));
});
const createTopicFormRef = ref();
const createPartitionsFormRef = ref();
const createTopicFormRules = {
name: [Rules.requiredInput('kafka.topicName')],
partitions: [Rules.requiredInput('kafka.partitions')],
replicationFactor: [Rules.requiredInput('kafka.replicationFactor')],
};
const createPartitionsFormRules = {
partitions: [Rules.requiredInput('kafka.partitions')],
};
const filteredTopics = computed(() => {
if (!searchTopic.value) {
return props.topics;
}
return props.topics.filter((topic: Topic) => topic.name.toLowerCase().includes(searchTopic.value.toLowerCase()));
});
const loadTopics = () => {
emits('refresh');
};
const loadGroups = () => {
emits('refresh');
};
const loadTopicPartitions = (topicName: string) => {
// 根据选中的 topic 名称查找对应的 topic 数据并更新分区信息
const selectedTopic = props.topics.find((t: Topic) => t.name === topicName);
if (selectedTopic) {
state.topicPartitionsDialog.topicPartitions = selectedTopic.partitions;
}
};
const loadOffsets = async () => {
console.log(state.topicPartitionsDialog);
};
const showCreateTopicDialog = () => {
state.createTopicDialog.form = {
topic: '',
numPartitions: 1,
replicationFactor: 1,
};
state.createTopicDialog.visible = true;
};
const showCreatePartitionsDialog = () => {
if (!state.topicPartitionsDialog.topic) {
ElMessage.warning(t('mq.kafka.selectTopicWarning'));
return;
}
state.createPartitionsDialog.form = {
topic: state.topicPartitionsDialog.topic,
numPartitions: 1,
};
state.createPartitionsDialog.visible = true;
};
const confirmCreateTopic = async () => {
if (!createTopicFormRef.value) return;
await createTopicFormRef.value.validate();
state.createTopicDialog.loading = true;
try {
await mqApi.kafkaTopicCreate.request({
id: props.kafkaId,
...state.createTopicDialog.form,
});
useI18nSaveSuccessMsg();
state.createTopicDialog.visible = false;
emits('refresh');
} catch (error: any) {
ElMessage.error(error.message || t('common.requestFail'));
} finally {
state.createTopicDialog.loading = false;
}
};
const confirmCreatePartitions = async () => {
if (!createPartitionsFormRef.value) return;
await createPartitionsFormRef.value.validate();
state.createPartitionsDialog.loading = true;
try {
await mqApi.kafkaTopicCreatePartitions.request({
id: props.kafkaId,
...state.createPartitionsDialog.form,
});
useI18nSaveSuccessMsg();
state.createPartitionsDialog.visible = false;
emits('refresh');
await nextTick(() => {
setTimeout(() => {
loadTopicPartitions(state.createPartitionsDialog.form.topic);
}, 200);
});
} catch (error: any) {
ElMessage.error(error.message || t('common.requestFail'));
} finally {
state.createPartitionsDialog.loading = false;
}
};
const viewTopicConfig = async (topic: Topic) => {
try {
const res = await mqApi.kafkaTopicInfo.request({
id: props.kafkaId,
topic: topic.name,
});
state.topicConfigDialog.topic = topic.name;
if (res && res[0].Configs) {
res[0].Configs.sort((a: any, b: any) => (a['Key'] > b['Key'] ? 1 : -1));
state.topicConfigDialog.topicConfigs = res && res[0].Configs;
} else {
state.topicConfigDialog.topicConfigs = [];
}
state.topicConfigDialog.visible = true;
} catch (error: any) {
ElMessage.error(error.message || t('common.requestFail'));
} finally {
}
};
const cancelViewTopicConfig = () => {
state.topicConfigDialog.visible = false;
searchTopicConfig.value = '';
state.topicConfigDialog.topicConfigs = [];
state.topicConfigDialog.topic = '';
};
const cancelViewTopicPartitions = () => {
state.topicPartitionsDialog.visible = false;
state.topicPartitionsDialog.topicPartitions = [];
state.topicPartitionsDialog.topic = '';
};
const handleDeleteTopic = async (topic: Topic) => {
await useI18nDeleteConfirm(`Topic: ${topic.name}`);
try {
await mqApi.kafkaTopicDelete.request({
id: props.kafkaId,
topic: topic.name,
});
useI18nSaveSuccessMsg();
emits('refresh');
} catch (error: any) {
ElMessage.error(error.message || t('common.requestFail'));
}
};
const viewPartitions = (topic: Topic) => {
state.topicPartitionsDialog.visible = true;
state.topicPartitionsDialog.topicPartitions = topic.partitions;
state.topicPartitionsDialog.topic = topic.name;
openViewPartitions();
};
const openViewPartitions = () => {
state.topicPartitionsDialog.visible = true;
};
const handleProduceMessage = (topic: Topic) => {
emits('produce', topic.name);
};
const handleConsumeMessage = (topic: Topic) => {
emits('consume', topic.name);
};
const handleTopicCommand = (command: string, topic: Topic) => {
switch (command) {
case 'produce':
handleProduceMessage(topic);
break;
case 'consume':
handleConsumeMessage(topic);
break;
case 'partitions':
viewPartitions(topic);
break;
case 'config':
viewTopicConfig(topic);
break;
case 'delete':
handleDeleteTopic(topic);
break;
}
};
const handleRowContextmenu = (row: any, column: any, event: any) => {
event.preventDefault();
event.stopPropagation();
const { clientX, clientY } = event;
state.contextmenu.dropdown.x = clientX;
state.contextmenu.dropdown.y = clientY;
contextmenuRef.value.openContextmenu(row);
};
</script>
<style lang="scss" scoped>
.kafka-topic-manage {
.toolbar {
display: flex;
justify-content: space-between;
align-items: center;
}
}
</style>

View File

@@ -0,0 +1,140 @@
<template>
<div class="kafka-op h-full">
<el-tabs v-model="activeTab" class="h-full" @tab-click="handleTabClick">
<el-tab-pane :label="$t('mq.kafka.nodeManage')" name="node">
<node-manage v-show="activeTab === 'node'" :kafka-id="kafkaId" />
</el-tab-pane>
<el-tab-pane :label="$t('mq.kafka.topicManage')" name="topic">
<topic-manage
v-show="activeTab === 'topic'"
:kafka-id="kafkaId"
:topics="topics"
:groups="groups"
:loading="loading"
@produce="handleProduceMessage"
@consume="handleConsumeMessage"
@refresh="loadData"
/>
</el-tab-pane>
<el-tab-pane :label="$t('mq.kafka.produceMessage')" name="produce">
<produce-message v-show="activeTab === 'produce'" :kafka-id="kafkaId" :default-topic="selectedTopic" :topics="topicNames" />
</el-tab-pane>
<el-tab-pane :label="$t('mq.kafka.consumeMessage')" name="consume">
<consume-message v-show="activeTab === 'consume'" :kafka-id="kafkaId" :default-topic="selectedTopic" :topics="topicNames" :groups="groups" />
</el-tab-pane>
<el-tab-pane :label="$t('mq.kafka.consumerGroup')" name="group">
<consumer-group v-show="activeTab === 'group'" :kafka-id="kafkaId" :groups="groups" :loading="loading" @refresh="loadData" />
</el-tab-pane>
</el-tabs>
</div>
</template>
<script lang="ts" setup>
import { ref, defineAsyncComponent, onMounted, getCurrentInstance, computed } from 'vue';
import { KafkaOpComp } from '@/views/ops/mq/kafka/resource';
import { mqApi } from '../../api';
import { ElMessage } from 'element-plus';
import { useI18n } from 'vue-i18n';
const NodeManage = defineAsyncComponent(() => import('../component/NodeManage.vue'));
const TopicManage = defineAsyncComponent(() => import('../component/TopicManage.vue'));
const ProduceMessage = defineAsyncComponent(() => import('../component/ProduceMessage.vue'));
const ConsumeMessage = defineAsyncComponent(() => import('../component/ConsumeMessage.vue'));
const ConsumerGroup = defineAsyncComponent(() => import('../component/ConsumerGroup.vue'));
const { t } = useI18n();
interface Topic {
name: string;
partitionCount: number;
replicationFactor: number;
status: string;
isInternal: boolean;
partitions: Partitions[];
}
interface Partitions {}
const activeTab = ref('node');
const kafkaId = ref<number>(0);
const selectedTopic = ref<string>('');
const loading = ref(false);
const topics = ref<any[]>([]);
const groups = ref<any[]>([]);
// 计算属性:提取 topic 名称列表
const topicNames = computed(() => topics.value.map((item: any) => item.name));
const emits = defineEmits(['init']);
const initKafka = (params: any) => {
kafkaId.value = params.id;
selectedTopic.value = '';
loadData();
};
const loadData = async () => {
if (!kafkaId.value) return;
loading.value = true;
try {
const [topicsRes, groupsRes] = await Promise.all([
mqApi.kafkaTopicList.request({ id: kafkaId.value }),
mqApi.kafkaGetGroups.request({ id: kafkaId.value }),
]);
// 转换 topics 数据格式
topics.value = (topicsRes || []).map(
(topic: any) =>
({
name: topic.topic,
partitionCount: topic.partition_count || 0,
replicationFactor: topic.replication_factor || 0,
partitions: topic.partitions || [],
isInternal: topic.IsInternal,
status: topic.Err === '' ? 'HEALTHY' : `ERROR${topic.Err}`,
}) as Topic
);
groups.value = groupsRes || [];
} catch (error: any) {
ElMessage.error(error.message || t('common.requestFail'));
} finally {
loading.value = false;
}
};
const handleTabClick = (tab: any) => {
// 切换 tab 时清空选中的 topic
if (tab.props.name !== 'produce' && tab.props.name !== 'consume') {
selectedTopic.value = '';
}
};
const handleProduceMessage = (topic: string) => {
selectedTopic.value = topic;
activeTab.value = 'produce';
};
const handleConsumeMessage = (topic: string) => {
selectedTopic.value = topic;
activeTab.value = 'consume';
};
onMounted(() => {
emits('init', { name: KafkaOpComp.name, ref: getCurrentInstance()?.exposed });
});
defineExpose({
initKafka,
});
</script>
<style lang="scss" scoped>
.kafka-op {
:deep(.el-tabs) {
height: 100%;
.el-tabs__content {
height: calc(100% - 55px);
overflow: auto;
}
}
}
</style>

View File

@@ -0,0 +1,32 @@
<template>
<BaseTreeNode v-bind="$attrs">
<template #prefix="{ data }">
<el-popover :show-after="500" placement="right-start" :title="$t('common.detail')" trigger="hover" :width="250">
<template #reference>
<SvgIcon :name="ResourceTypeEnum.MqKafka.extra.icon" :color="ResourceTypeEnum.MqKafka.extra.iconColor" :size="13" />
</template>
<template #default>
<el-descriptions :column="1" size="small">
<el-descriptions-item :label="$t('common.name')">
{{ data.params.name }}
</el-descriptions-item>
<el-descriptions-item label="hosts">
{{ data.params.hosts }}
</el-descriptions-item>
<el-descriptions-item :label="$t('common.remark')" label-align="right">
{{ data.params.remark }}
</el-descriptions-item>
</el-descriptions>
</template>
</el-popover>
</template>
</BaseTreeNode>
</template>
<script lang="ts" setup>
import { ResourceTypeEnum } from '@/common/commonEnum';
import BaseTreeNode from '@/views/ops/resource/BaseTreeNode.vue';
</script>
<style lang="scss"></style>

View File

@@ -0,0 +1,54 @@
import { defineAsyncComponent } from 'vue';
import { NodeType, TagTreeNode, ResourceComponentConfig, ResourceConfig } from '../../../component/tag';
import { ResourceTypeEnum, TagResourceTypeEnum } from '@/common/commonEnum';
import { sleep } from '@/common/utils/loading';
import { mqApi } from '@/views/ops/mq/api';
export const KafkaIcon = {
name: ResourceTypeEnum.MqKafka.extra.icon,
color: ResourceTypeEnum.MqKafka.extra.iconColor,
};
const KafkaList = defineAsyncComponent(() => import('../KafkaList.vue'));
const KafkaOp = defineAsyncComponent(() => import('./KafkaOp.vue'));
const NodeKafka = defineAsyncComponent(() => import('./NodeKafka.vue'));
export const KafkaOpComp: ResourceComponentConfig = {
name: 'tag.mq.kafkaOp',
component: KafkaOp,
icon: KafkaIcon,
};
const NodeTypeKafka = new NodeType(TagResourceTypeEnum.MqKafka.value).withNodeClickFunc(async (node: TagTreeNode) => {
(await node.ctx?.addResourceComponent(KafkaOpComp)).initKafka(node.params);
});
// tagpath 节点类型
const NodeTypeKafkaTag = new NodeType(TagTreeNode.TagPath).withLoadNodesFunc(async (parentNode: TagTreeNode) => {
const tagPath = parentNode.params.tagPath;
const res = await mqApi.kafkaList.request({ tagPath });
if (!res.total) {
return [];
}
const kafkaInfos = res.list;
await sleep(100);
return kafkaInfos.map((x: any) => {
return TagTreeNode.new(parentNode, `${x.code}`, x.name, NodeTypeKafka).withIsLeaf(true).withParams(x).withNodeComponent(NodeKafka);
});
});
export default {
order: 6,
resourceType: TagResourceTypeEnum.MqKafka.value,
rootNodeType: NodeTypeKafkaTag,
manager: {
componentConf: {
component: KafkaList,
icon: KafkaIcon,
name: 'kafka',
},
countKey: 'kafka',
permCode: 'mq:kafka:base',
},
} as ResourceConfig;

View File

@@ -23,8 +23,6 @@ export const RedisOpComp: ResourceComponentConfig = {
// tagpath 节点类型
const NodeTypeRedisTag = new NodeType(TagTreeNode.TagPath).withLoadNodesFunc(async (parentNode: TagTreeNode) => {
parentNode.ctx?.addResourceComponent(RedisOpComp);
const res = await redisApi.redisList.request({ tagPath: parentNode.params.tagPath });
if (!res.total) {
return [];

View File

@@ -371,6 +371,7 @@ const onResizeOpPanel = () => {
const loadTags = async () => {
const tags = await tagApi.getTagTrees.request({
type: getResourceTypes().join(','),
flatten: '1',
});
const tagNodes = [];
for (let tag of tags) {
@@ -381,7 +382,7 @@ const loadTags = async () => {
};
const processTagNode = (tag: any): TagTreeNode => {
const tagNode = new TagTreeNode(tag.codePath, tag.name, tag.type);
const tagNode = new TagTreeNode(tag.codePath, tag.namePath, tag.type);
if (!tag.children || !Array.isArray(tag.children) || tag.children.length == 0) {
return tagNode;

View File

@@ -21,10 +21,12 @@ require (
github.com/golang-jwt/jwt/v5 v5.3.1
github.com/google/uuid v1.6.0
github.com/gorilla/websocket v1.5.3
github.com/klauspost/compress v1.18.4
github.com/lionsoul2014/ip2region/binding/golang v0.0.0-20250930013652-2d71241a3bb9
github.com/microsoft/go-mssqldb v1.9.6
github.com/mojocn/base64Captcha v1.3.8 //
github.com/opencontainers/image-spec v1.1.1
github.com/pierrec/lz4/v4 v4.1.25
github.com/pkg/sftp v1.13.10
github.com/pquerna/otp v1.5.0
github.com/redis/go-redis/v9 v9.18.0
@@ -33,6 +35,8 @@ require (
github.com/spf13/cast v1.10.0
github.com/stretchr/testify v1.11.1
github.com/tidwall/gjson v1.18.0
github.com/twmb/franz-go v1.20.7
github.com/twmb/franz-go/pkg/kadm v1.17.2
go.mongodb.org/mongo-driver/v2 v2.5.0 // mongo
golang.org/x/crypto v0.48.0
golang.org/x/oauth2 v0.35.0
@@ -84,7 +88,6 @@ require (
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
github.com/kr/fs v0.1.0 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
@@ -113,6 +116,7 @@ require (
github.com/tidwall/pretty v1.2.1 // indirect
github.com/tjfoc/gmsm v1.4.1 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.12.0 // indirect
github.com/ugorji/go/codec v1.3.1 // indirect
github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect

View File

@@ -26,20 +26,31 @@ type Meta struct {
func (dm *Meta) GetSqlDb(ctx context.Context, d *dbi.DbInfo) (*sql.DB, error) {
driverName := "dm"
db := d.Database
schema := ""
dbParam := "?escapeProcess=true"
if db != "" {
// dm database可以使用db/schema表示方便连接指定schema, 若不存在schema则使用默认schema
ss := strings.Split(db, "/")
if len(ss) > 1 {
dbParam = fmt.Sprintf("%s&schema=\"%s\"", dbParam, ss[len(ss)-1])
schema = ss[1]
}
}
if d.Params != "" {
dbParam += "&" + d.Params
}
dsn := fmt.Sprintf("dm://%s:%s@%s:%d%s&appName=mayfly-go", d.Username, url.PathEscape(d.Password), d.Host, d.Port, dbParam)
conn, err := sql.Open(driverName, dsn)
if err != nil {
return nil, err
}
if schema != "" {
_, err := conn.Exec(fmt.Sprintf("SET SCHEMA %s", schema))
if err != nil {
return nil, err
}
}
dsn := fmt.Sprintf("dm://%s:%s@%s:%d%s", d.Username, url.PathEscape(d.Password), d.Host, d.Port, dbParam)
return sql.Open(driverName, dsn)
return conn, nil
}
func (dm *Meta) GetDialect(conn *dbi.DbConn) dbi.Dialect {
@@ -52,7 +63,7 @@ func (dm *Meta) GetMetadata(conn *dbi.DbConn) dbi.Metadata {
}
}
func (sm *Meta) GetDbDataTypes() []*dbi.DbDataType {
func (dm *Meta) GetDbDataTypes() []*dbi.DbDataType {
return collx.AsArray[*dbi.DbDataType](CHAR, VARCHAR, TEXT, LONG, LONGVARCHAR, IMAGE, LONGVARBINARY, CLOB,
BLOB,
NUMERIC, DECIMAL, NUMBER, INTEGER, INT, BIGINT, TINYINT, BYTE, SMALLINT, BIT, DOUBLE, FLOAT,
@@ -63,6 +74,6 @@ func (sm *Meta) GetDbDataTypes() []*dbi.DbDataType {
)
}
func (mm *Meta) GetCommonTypeConverter() dbi.CommonTypeConverter {
func (dm *Meta) GetCommonTypeConverter() dbi.CommonTypeConverter {
return &commonTypeConverter{}
}

View File

@@ -0,0 +1,5 @@
package init
import (
_ "mayfly-go/internal/mq/kafka/init"
)

View File

@@ -0,0 +1,7 @@
package api
import "mayfly-go/pkg/ioc"
func InitIoc() {
ioc.Register(new(Kafka))
}

View File

@@ -0,0 +1,18 @@
package form
import "mayfly-go/pkg/model"
type Kafka struct {
model.IdModel
Code string `json:"code"`
Name string `json:"name" binding:"required"`
Hosts string `json:"hosts" binding:"required"` // Kafka 连接地址,格式: host1:port1,host2:port2 或单个 broker
Username *string `json:"username"`
Password *string `json:"password"`
SaslMechanism *string `json:"saslMechanism"`
SshTunnelMachineId int `json:"sshTunnelMachineId"`
TagCodePaths []string `json:"tagCodePaths" binding:"required"`
}

View File

@@ -0,0 +1,268 @@
package api
import (
"mayfly-go/internal/mq/kafka/api/form"
"mayfly-go/internal/mq/kafka/api/vo"
"mayfly-go/internal/mq/kafka/application"
"mayfly-go/internal/mq/kafka/domain/entity"
"mayfly-go/internal/mq/kafka/imsg"
"mayfly-go/internal/mq/kafka/mgm"
"mayfly-go/internal/pkg/consts"
tagapp "mayfly-go/internal/tag/application"
tagentity "mayfly-go/internal/tag/domain/entity"
"mayfly-go/pkg/biz"
"mayfly-go/pkg/model"
"mayfly-go/pkg/req"
"mayfly-go/pkg/utils/collx"
"strings"
"github.com/spf13/cast"
)
type Kafka struct {
kafkaApp application.Kafka `inject:"T"`
tagTreeApp tagapp.TagTree `inject:"T"`
}
func (k *Kafka) ReqConfs() *req.Confs {
createTopicPerm := req.NewPermission("kafka:topic:create")
deleteTopicPerm := req.NewPermission("kafka:topic:delete")
deleteGroupPerm := req.NewPermission("kafka:group:delete")
reqs := [...]*req.Conf{
// 获取所有kafka列表
req.NewGet("", k.Kafkas),
req.NewPost("/test-conn", k.TestConn),
req.NewPost("", k.Save).Log(req.NewLogSaveI(imsg.LogKafkaSave)),
req.NewDelete(":id", k.DeleteById).Log(req.NewLogSaveI(imsg.LogKafkaDelete)),
req.NewGet(":id/getTopics", k.GetTopics),
//CreateTopics 创建主题
req.NewPost(":id/createTopic", k.CreateTopic).RequiredPermission(createTopicPerm).Log(req.NewLogSaveI(imsg.LogKafkaCreateTopic)),
req.NewDelete(":id/:topic/deleteTopic", k.DeleteTopic).RequiredPermission(deleteTopicPerm).Log(req.NewLogSaveI(imsg.LogKafkaDeleteTopic)),
req.NewGet(":id/:topic/getTopicConfig", k.GetTopicConfig),
// CreatePartitions 添加分区
req.NewPost(":id/createPartitions", k.CreatePartitions),
req.NewPost(":id/:topic/produce", k.Produce),
req.NewPost(":id/:topic/consume", k.Consume),
// 获取集群信息
req.NewGet(":id/getBrokers", k.GetBrokers),
// GetBrokerConfig 获取Broker配置
req.NewGet(":id/getBrokerConfig/:brokerId", k.GetBrokerConfig),
// GetGroups 获取消费组信息
req.NewGet(":id/getGroups", k.GetGroups),
req.NewGet(":id/getGroupMembers/:group", k.GetGroupMembers),
req.NewDelete(":id/deleteGroup/:group", k.DeleteGroup).RequiredPermission(deleteGroupPerm).Log(req.NewLogSaveI(imsg.LogKafkaDeleteGroup)),
}
return req.NewConfs("mq/kafka", reqs[:]...)
}
func (k *Kafka) Kafkas(rc *req.Ctx) {
queryCond := req.BindQuery[entity.KafkaQuery](rc)
// 不存在可访问标签id即没有可操作数据
tags := k.tagTreeApp.GetAccountTags(rc.GetLoginAccount().Id, &tagentity.TagTreeQuery{
TypePaths: collx.AsArray(tagentity.NewTypePaths(tagentity.TagTypeMqKafka)),
CodePathLikes: []string{queryCond.TagPath},
})
if len(tags) == 0 {
rc.ResData = model.NewEmptyPageResult[any]()
return
}
queryCond.Codes = tags.GetCodes()
res, err := k.kafkaApp.GetPageList(queryCond)
biz.ErrIsNil(err)
resVo := model.PageResultConv[*entity.Kafka, *vo.Kafka](res)
kafkavos := resVo.List
// 填充标签信息
k.tagTreeApp.FillTagInfo(tagentity.TagType(consts.ResourceTypeMqKafka), collx.ArrayMap(kafkavos, func(mvo *vo.Kafka) tagentity.ITagResource {
return mvo
})...)
rc.ResData = resVo
}
func (k *Kafka) TestConn(rc *req.Ctx) {
_, kafka := req.BindJsonAndCopyTo[form.Kafka, entity.Kafka](rc)
biz.ErrIsNilAppendErr(k.kafkaApp.TestConn(kafka), "connection error: %s")
}
func (k *Kafka) Save(rc *req.Ctx) {
f, kafka := req.BindJsonAndCopyTo[form.Kafka, entity.Kafka](rc)
// 密码脱敏记录日志
f.Password = func(str *string) *string {
str1 := "***"
return &str1
}(f.Password)
rc.ReqParam = f
biz.ErrIsNil(k.kafkaApp.SaveKafka(rc.MetaCtx, kafka, f.TagCodePaths...))
}
func (k *Kafka) DeleteById(rc *req.Ctx) {
idsStr := rc.PathParam("id")
rc.ReqParam = idsStr
ids := strings.Split(idsStr, ",")
for _, v := range ids {
k.kafkaApp.Delete(rc.MetaCtx, cast.ToUint64(v))
}
}
func (k *Kafka) GetTopics(rc *req.Ctx) {
id := k.GetKafkaId(rc)
conn, err := k.kafkaApp.GetKafkaConn(rc, id)
if err != nil {
rc.Error = err
return
}
rc.ResData, rc.Error = conn.GetTopicDetails()
}
func (k *Kafka) CreateTopic(rc *req.Ctx) {
id := k.GetKafkaId(rc)
param := req.BindJson[mgm.CreateTopicParam](rc)
conn, err := k.kafkaApp.GetKafkaConn(rc, id)
if err != nil {
rc.Error = err
return
}
rc.Error = conn.CreateTopic(param)
}
func (k *Kafka) DeleteTopic(rc *req.Ctx) {
id := k.GetKafkaId(rc)
topic := rc.PathParam("topic")
conn, err := k.kafkaApp.GetKafkaConn(rc, id)
if err != nil {
rc.Error = err
return
}
rc.Error = conn.DeleteTopic(topic)
}
func (k *Kafka) GetTopicConfig(rc *req.Ctx) {
id := k.GetKafkaId(rc)
topic := rc.PathParam("topic")
conn, err := k.kafkaApp.GetKafkaConn(rc, id)
if err != nil {
rc.Error = err
return
}
rc.ResData, rc.Error = conn.GetTopicConfig(topic)
}
func (k *Kafka) CreatePartitions(rc *req.Ctx) {
id := k.GetKafkaId(rc)
param := req.BindJson[mgm.CreatePartitionsParam](rc)
conn, err := k.kafkaApp.GetKafkaConn(rc, id)
if err != nil {
rc.Error = err
return
}
rc.Error = conn.CreatePartitions(param)
}
func (k *Kafka) Produce(rc *req.Ctx) {
id := k.GetKafkaId(rc)
topic := rc.PathParam("topic")
param := req.BindJson[mgm.ProduceMessageParam](rc)
param.Topic = topic
conn, err := k.kafkaApp.GetKafkaConn(rc, id)
if err != nil {
rc.Error = err
return
}
rc.Error = conn.ProduceMessage(rc.MetaCtx, param)
}
func (k *Kafka) Consume(rc *req.Ctx) {
id := k.GetKafkaId(rc)
topic := rc.PathParam("topic")
conn, err := k.kafkaApp.GetKafkaConn(rc, id)
if err != nil {
rc.Error = err
return
}
param := req.BindJson[mgm.ConsumeMessageParam](rc)
param.Topic = topic
rc.ResData, rc.Error = conn.ConsumeMessage(rc.MetaCtx, param)
}
func (k *Kafka) GetBrokers(rc *req.Ctx) {
id := k.GetKafkaId(rc)
conn, err := k.kafkaApp.GetKafkaConn(rc, id)
if err != nil {
rc.Error = err
return
}
rc.ResData = conn.GetBrokers()
}
func (k *Kafka) GetBrokerConfig(rc *req.Ctx) {
id := k.GetKafkaId(rc)
brokerId := rc.PathParamInt("brokerId")
conn, err := k.kafkaApp.GetKafkaConn(rc, id)
if err != nil {
rc.Error = err
return
}
rc.ResData, rc.Error = conn.GetBrokerConfig(brokerId)
}
func (k *Kafka) GetGroups(rc *req.Ctx) {
id := k.GetKafkaId(rc)
conn, err := k.kafkaApp.GetKafkaConn(rc, id)
if err != nil {
rc.Error = err
return
}
rc.ResData, rc.Error = conn.GetConsumerGroups()
}
func (k *Kafka) GetGroupMembers(rc *req.Ctx) {
id := k.GetKafkaId(rc)
group := rc.PathParam("group")
conn, err := k.kafkaApp.GetKafkaConn(rc, id)
if err != nil {
rc.Error = err
return
}
rc.ResData, rc.Error = conn.GetGroupMembers(group)
}
func (k *Kafka) DeleteGroup(rc *req.Ctx) {
id := k.GetKafkaId(rc)
group := rc.PathParam("group")
conn, err := k.kafkaApp.GetKafkaConn(rc, id)
if err != nil {
rc.Error = err
return
}
rc.Error = conn.DeleteGroup(group)
}
// 获取请求路径上的kafka id
func (k *Kafka) GetKafkaId(rc *req.Ctx) uint64 {
dbId := rc.PathParamInt("id")
biz.IsTrue(dbId > 0, "kafkaId error")
return uint64(dbId)
}

View File

@@ -0,0 +1,24 @@
package vo
import (
tagentity "mayfly-go/internal/tag/domain/entity"
"mayfly-go/pkg/model"
)
type Kafka struct {
model.Model
tagentity.ResourceTags
Code string `json:"code"`
Name string `json:"name"`
Hosts string `json:"hosts"`
Username string `json:"username"`
Password string `json:"password"`
SshTunnelMachineId int `json:"sshTunnelMachineId"` // ssh隧道机器id
SaslMechanism string `json:"saslMechanism"` // sasl机制
}
func (m *Kafka) GetCode() string {
return m.Code
}

View File

@@ -0,0 +1,9 @@
package application
import (
"mayfly-go/pkg/ioc"
)
func InitIoc() {
ioc.Register(new(kafkaAppImpl))
}

View File

@@ -0,0 +1,141 @@
package application
import (
"context"
"mayfly-go/internal/mq/kafka/domain/entity"
"mayfly-go/internal/mq/kafka/domain/repository"
"mayfly-go/internal/mq/kafka/imsg"
"mayfly-go/internal/mq/kafka/mgm"
tagapp "mayfly-go/internal/tag/application"
tagdto "mayfly-go/internal/tag/application/dto"
tagentity "mayfly-go/internal/tag/domain/entity"
"mayfly-go/pkg/base"
"mayfly-go/pkg/errorx"
"mayfly-go/pkg/model"
"mayfly-go/pkg/utils/stringx"
)
type Kafka interface {
base.App[*entity.Kafka]
// 分页获取机器脚本信息列表
GetPageList(condition *entity.KafkaQuery, orderBy ...string) (*model.PageResult[*entity.Kafka], error)
TestConn(entity *entity.Kafka) error
SaveKafka(ctx context.Context, entity *entity.Kafka, tagCodePaths ...string) error
// 删除数据库信息
Delete(ctx context.Context, id uint64) error
// 获取Kafka连接实例
// - id Kafka id
GetKafkaConn(ctx context.Context, id uint64) (*mgm.KafkaConn, error)
}
type kafkaAppImpl struct {
base.AppImpl[*entity.Kafka, repository.Kafka]
tagTreeApp tagapp.TagTree `inject:"T"`
}
var _ Kafka = (*kafkaAppImpl)(nil)
// 分页获取数据库信息列表
func (d *kafkaAppImpl) GetPageList(condition *entity.KafkaQuery, orderBy ...string) (*model.PageResult[*entity.Kafka], error) {
return d.GetRepo().GetList(condition, orderBy...)
}
func (d *kafkaAppImpl) Delete(ctx context.Context, id uint64) error {
kafkaEntity, err := d.GetById(id)
if err != nil {
return errorx.NewBiz("kafka not found")
}
mgm.CloseConn(id)
return d.Tx(ctx,
func(ctx context.Context) error {
return d.DeleteById(ctx, id)
},
func(ctx context.Context) error {
return d.tagTreeApp.SaveResourceTag(ctx, &tagdto.SaveResourceTag{ResourceTag: &tagdto.ResourceTag{
Type: tagentity.TagTypeMqKafka,
Code: kafkaEntity.Code,
}})
})
}
func (d *kafkaAppImpl) TestConn(me *entity.Kafka) error {
conn, err := me.ToKafkaInfo().Conn()
if err != nil {
return err
}
conn.Close()
return nil
}
func (d *kafkaAppImpl) SaveKafka(ctx context.Context, m *entity.Kafka, tagCodePaths ...string) error {
oldKafka := &entity.Kafka{Hosts: m.Hosts, SshTunnelMachineId: m.SshTunnelMachineId}
err := d.GetByCond(oldKafka)
if m.Id == 0 {
if err == nil {
return errorx.NewBizI(ctx, imsg.ErrKafkaInfoExist)
}
// 生成随机编号
m.Code = stringx.Rand(10)
return d.Tx(ctx, func(ctx context.Context) error {
return d.Insert(ctx, m)
}, func(ctx context.Context) error {
return d.tagTreeApp.SaveResourceTag(ctx, &tagdto.SaveResourceTag{
ResourceTag: &tagdto.ResourceTag{
Type: tagentity.TagTypeMqKafka,
Code: m.Code,
Name: m.Name,
},
ParentTagCodePaths: tagCodePaths,
})
})
}
// 如果存在该库,则校验修改的库是否为该库
if err == nil && oldKafka.Id != m.Id {
return errorx.NewBizI(ctx, imsg.ErrKafkaInfoExist)
}
// 如果调整了ssh等会查不到旧数据故需要根据id获取旧信息将code赋值给标签进行关联
if oldKafka.Code == "" {
oldKafka, _ = d.GetById(m.Id)
}
// 先关闭连接
mgm.CloseConn(m.Id)
m.Code = ""
return d.Tx(ctx, func(ctx context.Context) error {
return d.UpdateById(ctx, m)
}, func(ctx context.Context) error {
if oldKafka.Name != m.Name {
if err := d.tagTreeApp.UpdateTagName(ctx, tagentity.TagTypeMqKafka, oldKafka.Code, m.Name); err != nil {
return err
}
}
return d.tagTreeApp.SaveResourceTag(ctx, &tagdto.SaveResourceTag{
ResourceTag: &tagdto.ResourceTag{
Type: tagentity.TagTypeMqKafka,
Code: oldKafka.Code,
},
ParentTagCodePaths: tagCodePaths,
})
})
}
func (d *kafkaAppImpl) GetKafkaConn(ctx context.Context, id uint64) (*mgm.KafkaConn, error) {
return mgm.GetKafkaConn(ctx, id, func() (*mgm.KafkaInfo, error) {
me, err := d.GetById(id)
if err != nil {
return nil, errorx.NewBiz("kafka not found")
}
return me.ToKafkaInfo(), nil
})
}

View File

@@ -0,0 +1,26 @@
package entity
import (
"mayfly-go/internal/mq/kafka/mgm"
"mayfly-go/pkg/model"
"mayfly-go/pkg/utils/structx"
)
type Kafka struct {
model.Model
Code string `json:"code" gorm:"size:32;comment:code"`
Name string `json:"name" gorm:"not null;size:50;comment:名称"`
Hosts string `json:"hosts" gorm:"not null;size:500;comment:Kafka 连接地址,格式: host1:port1,host2:port2 或单个 broker"`
Username *string `json:"username" gorm:"size:100;comment:用户名"`
Password *string `json:"password" gorm:"size:100;comment:密码"`
SshTunnelMachineId int `json:"sshTunnelMachineId" gorm:"comment:ssh隧道的机器id"`
SaslMechanism *string `json:"saslMechanism" gorm:"comment:sasl机制"`
}
// 转换为kafkaInfo进行连接
func (k *Kafka) ToKafkaInfo() *mgm.KafkaInfo {
mongoInfo := new(mgm.KafkaInfo)
_ = structx.Copy(mongoInfo, k)
return mongoInfo
}

View File

@@ -0,0 +1,17 @@
package entity
import "mayfly-go/pkg/model"
type KafkaQuery struct {
model.Model
model.PageParam
Code string `json:"code" form:"code"`
Name string
Keyword string `json:"keyword" form:"keyword"`
Uri string
SshTunnelMachineId uint64 // ssh隧道机器id
TagPath string `json:"tagPath" form:"tagPath"`
Codes []string
}

View File

@@ -0,0 +1,14 @@
package repository
import (
"mayfly-go/internal/mq/kafka/domain/entity"
"mayfly-go/pkg/base"
"mayfly-go/pkg/model"
)
type Kafka interface {
base.Repo[*entity.Kafka]
// 分页获取列表
GetList(condition *entity.KafkaQuery, orderBy ...string) (*model.PageResult[*entity.Kafka], error)
}

View File

@@ -0,0 +1,14 @@
package imsg
import "mayfly-go/pkg/i18n"
var En = map[i18n.MsgId]string{
LogKafkaSave: "Kafka - Save",
LogKafkaDelete: "Kafka-Delete",
LogKafkaRunCmd: "Kafka - Run Cmd",
LogUpdateDocs: "Kafka - Update Documents",
LogDelDocs: "Kafka - Delete Documents",
LogInsertDocs: "Kafka - Insert Documents",
ErrKafkaInfoExist: "that information already exists",
}

View File

@@ -0,0 +1,25 @@
package imsg
import (
"mayfly-go/internal/pkg/consts"
"mayfly-go/pkg/i18n"
)
func init() {
i18n.AppendLangMsg(i18n.Zh_CN, Zh_CN)
i18n.AppendLangMsg(i18n.En, En)
}
const (
LogKafkaSave = iota + consts.ImsgNumMqKafka
LogKafkaDelete
LogKafkaRunCmd
LogUpdateDocs
LogDelDocs
LogInsertDocs
LogKafkaCreateTopic
LogKafkaDeleteTopic
LogKafkaDeleteGroup
ErrKafkaInfoExist
)

View File

@@ -0,0 +1,14 @@
package imsg
import "mayfly-go/pkg/i18n"
var Zh_CN = map[i18n.MsgId]string{
LogKafkaSave: "Kafka-保存",
LogKafkaDelete: "Kafka-删除",
LogKafkaRunCmd: "Kafka-执行命令",
LogUpdateDocs: "Kafka-更新文档",
LogDelDocs: "Kafka-删除文档",
LogInsertDocs: "Kafka-插入文档",
ErrKafkaInfoExist: "该信息已存在",
}

View File

@@ -0,0 +1,31 @@
package persistence
import (
"mayfly-go/internal/mq/kafka/domain/entity"
"mayfly-go/internal/mq/kafka/domain/repository"
"mayfly-go/pkg/base"
"mayfly-go/pkg/model"
)
type kafkaRepoImpl struct {
base.RepoImpl[*entity.Kafka]
}
func newKafkaRepo() repository.Kafka {
return &kafkaRepoImpl{}
}
// 分页获取数据库信息列表
func (d *kafkaRepoImpl) GetList(condition *entity.KafkaQuery, orderBy ...string) (*model.PageResult[*entity.Kafka], error) {
qd := model.NewCond().
Like("name", condition.Name).
Eq("code", condition.Code).
In("code", condition.Codes)
keyword := condition.Keyword
if keyword != "" {
keyword = "%" + keyword + "%"
qd.And("name like ? or code like ? or hosts like ?", keyword, keyword, keyword)
}
return d.PageByCond(qd, condition.PageParam)
}

View File

@@ -0,0 +1,9 @@
package persistence
import (
"mayfly-go/pkg/ioc"
)
func InitIoc() {
ioc.Register(newKafkaRepo())
}

View File

@@ -0,0 +1,16 @@
package init
import (
"mayfly-go/internal/mq/kafka/api"
"mayfly-go/internal/mq/kafka/application"
"mayfly-go/internal/mq/kafka/infra/persistence"
"mayfly-go/pkg/starter"
)
func init() {
starter.AddInitIocFunc(func() {
persistence.InitIoc()
application.InitIoc()
api.InitIoc()
})
}

View File

@@ -0,0 +1,478 @@
package mgm
import (
"cmp"
"context"
"encoding/base64"
"errors"
"fmt"
"mayfly-go/pkg/errorx"
"mayfly-go/pkg/logx"
"sort"
"strconv"
"strings"
"time"
"github.com/google/uuid"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
)
type KafkaConn struct {
Id string
Info *KafkaInfo
Configs []kgo.Opt
Client *kgo.Client
Ac *kadm.Client
}
/******************* pool.Conn impl *******************/
func (kc *KafkaConn) Close() error {
if kc.Client != nil {
kc.Client.Close()
kc.Client = nil
}
if kc.Ac != nil {
kc.Ac.Close()
kc.Ac = nil
}
return nil
}
func (kc *KafkaConn) Ping() error {
if kc == nil {
return errorx.NewBiz("kafka connection is nil")
}
if kc.Client == nil {
return errorx.NewBiz("kafka client is nil")
}
if kc.Ac == nil {
return errorx.NewBiz("kafka admin client is nil")
}
brokers, err := kc.Ac.ListBrokers(context.Background())
if err != nil {
return err
}
if len(brokers) == 0 {
return errorx.NewBiz("no available brokers")
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := kc.Client.Ping(ctx); err != nil {
return err
}
return nil
}
func (kc *KafkaConn) GetTopics() ([]string, error) {
if kc.Ac == nil {
return nil, errorx.NewBiz("kafka admin client is closed")
}
list, err := kc.Ac.ListTopics(context.Background())
if err != nil {
return nil, err
}
topics := make([]string, 0, len(list))
for topic := range list {
topics = append(topics, topic)
}
return topics, nil
}
// GetTopicConfig 获取 Topic 配置信息
func (kc *KafkaConn) GetTopicConfig(topic string) (*kadm.ResourceConfigs, error) {
configs, err := kc.Ac.DescribeTopicConfigs(context.Background(), topic)
if err != nil {
return nil, err
}
return &configs, nil
}
// GetTopicDetails 获取 Topic 详细信息
func (kc *KafkaConn) GetTopicDetails() ([]any, error) {
topics, err := kc.Ac.ListTopics(context.Background())
if err != nil {
return nil, err
}
return buildTopicsResp(topics), nil
}
func buildTopicsResp(topics kadm.TopicDetails) []any {
// FIX: 对 map 进行排序以保证输出顺序稳定
topicNames := make([]string, 0, len(topics))
for name := range topics {
topicNames = append(topicNames, name)
}
sort.Strings(topicNames)
result := make([]any, 0, len(topicNames))
for _, topicName := range topicNames {
topicDetail := topics[topicName]
partitionErrs := ""
var partitions []any
for _, partition := range topicDetail.Partitions {
errMsg := ""
if partition.Err != nil {
errMsg = partition.Err.Error()
partitionErrs += fmt.Sprintf("partition %d: %s\n", partition.Partition, errMsg)
}
partitions = append(partitions, map[string]any{
"partition": partition.Partition,
"leader": partition.Leader,
"replicas": partition.Replicas,
"isr": partition.ISR,
"err": errMsg,
"LeaderEpoch": partition.LeaderEpoch,
"OfflineReplicas": partition.OfflineReplicas,
})
}
if topicDetail.Err != nil {
partitionErrs = topicDetail.Err.Error() + "\n" + partitionErrs
}
replicationFactor := 0
if len(topicDetail.Partitions) > 0 {
replicationFactor = len(topicDetail.Partitions[0].Replicas)
}
result = append(result, map[string]any{
"ID": topicDetail.ID,
"topic": topicName,
"partition_count": len(topicDetail.Partitions),
"replication_factor": replicationFactor,
"IsInternal": topicDetail.IsInternal,
"Err": partitionErrs,
"partitions": partitions,
})
}
return result
}
// GetConsumerGroups 获取消费者组列表
func (kc *KafkaConn) GetConsumerGroups() ([]kadm.ListedGroup, error) {
groups, err := kc.Ac.ListGroups(context.Background())
if err != nil {
return nil, err
}
sortedGroups := groups.Sorted()
return sortedGroups, nil
}
// CreateTopic 创建 Topic
func (kc *KafkaConn) CreateTopic(p *CreateTopicParam) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
resp, err := kc.Ac.CreateTopics(ctx, p.NumPartitions, p.ReplicationFactor, p.ConfigEntries, p.TopicName)
if err != nil {
return err
}
err = resp.Error()
if err != nil {
return err
}
return nil
}
// DeleteTopic 删除 Topic
func (kc *KafkaConn) DeleteTopic(topic string) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
resp, err := kc.Ac.DeleteTopics(ctx, topic)
if err != nil {
return err
}
if resp.Error() != nil {
return resp.Error()
}
return nil
}
// ConsumeMessage 消费 Topic 消息
func (kc *KafkaConn) ConsumeMessage(ctx context.Context, param *ConsumeMessageParam) ([]*ConsumeMessageResult, error) {
param.Number = cmp.Or(param.Number, 10)
param.PullTimeout = cmp.Or(param.PullTimeout, 10)
param.Group = cmp.Or(param.Group, "__mayfly-server__"+uuid.New().String())
st := time.Now()
// 构建消费配置
consumeOpts := []kgo.Opt{
kgo.ConsumeTopics(param.Topic),
kgo.DisableAutoCommit(),
}
// 配置隔离级别
if strings.ToLower(param.IsolationLevel) == "read_committed" {
consumeOpts = append(consumeOpts, kgo.FetchIsolationLevel(kgo.ReadCommitted()))
} else {
consumeOpts = append(consumeOpts, kgo.FetchIsolationLevel(kgo.ReadUncommitted()))
}
if param.StartTime != "" {
parse, err := time.Parse(time.DateTime, param.StartTime)
if err != nil {
return nil, err
}
consumeOpts = append(consumeOpts, kgo.ConsumeResetOffset(kgo.NewOffset().AfterMilli(parse.UnixMilli())))
} else if param.Earliest {
consumeOpts = append(consumeOpts, kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()))
} else {
consumeOpts = append(consumeOpts, kgo.ConsumeResetOffset(kgo.NewOffset().AtEnd()))
}
// 获取或创建带缓存的消费者组客户端
cl, err := GetOrCreateConsumerGroupClient(ctx, kc.Info.Id, param.Group, kc.Info, consumeOpts)
if err != nil {
return nil, err
}
// 开始 poll msg
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(param.PullTimeout)*time.Second)
defer cancel()
fetches := cl.PollRecords(ctx, param.Number)
if fetches.IsClientClosed() {
return nil, errorx.NewBiz("Client Closed, Please Retry")
}
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
if len(fetches.Records()) == 0 {
return nil, errorx.NewBiz("Consume Timeout, Maybe No Message")
}
}
if errs := fetches.Errors(); len(errs) > 0 {
return nil, errorx.NewBiz(fmt.Sprint(errs))
}
logx.Infof("poll 完成... %v", len(fetches.Records()))
res := make([]*ConsumeMessageResult, 0)
for i, v := range fetches.Records() {
if v == nil {
continue
}
var data []byte
var err error
switch param.Decompression {
case "gzip":
data, err = GzipDecompress(v.Value)
case "lz4":
data, err = Lz4Decompress(v.Value)
case "zstd":
data, err = ZstdDecompress(v.Value)
case "snappy":
data, err = SnappyDecompress(v.Value)
default:
data = v.Value
}
if err != nil {
return nil, errorx.NewBiz(fmt.Sprintf("failed to decompress data: %s", err.Error()))
}
// 根据 decode 参数进行解码
var decodedData []byte
switch strings.ToLower(param.Decode) {
case "base64":
decodedData, err = base64.StdEncoding.DecodeString(string(data))
if err != nil {
return nil, errorx.NewBiz(fmt.Sprintf("Failed to decode base64 data: %s", err.Error()))
}
default:
decodedData = data
}
res = append(res, &ConsumeMessageResult{
Id: i,
Offset: v.Offset,
Partition: v.Partition,
Key: string(v.Key),
Value: string(decodedData),
Timestamp: v.Timestamp.Format(time.DateTime),
Topic: v.Topic,
Headers: getHeadersString(v.Headers),
LeaderEpoch: v.LeaderEpoch,
ProducerEpoch: v.ProducerEpoch,
ProducerID: v.ProducerID,
})
}
logx.Infof("耗时:%.4f秒 , topic: %s, group: %s, num: %v", time.Since(st).Seconds(), param.Topic, param.Group, param.Number)
if param.Group != "" && param.CommitOffset {
logx.Infof("开始提交 offset...")
if err := cl.CommitUncommittedOffsets(context.Background()); err != nil {
return nil, err
}
logx.Infof("提交 offset 完成...")
}
return res, nil
}
func getHeadersString(headers []kgo.RecordHeader) map[string]string {
headersMap := make(map[string]string)
if len(headers) == 0 {
return headersMap
}
for _, h := range headers {
headersMap[h.Key] = string(h.Value)
}
return headersMap
}
// ProduceMessage 生产消息到 Topic
func (kc *KafkaConn) ProduceMessage(ctx context.Context, param *ProduceMessageParam) error {
logx.Infof("开始生产消息...")
st := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
headers2 := make([]kgo.RecordHeader, len(param.Headers))
for i := 0; i < len(param.Headers); i++ {
headers2[i] = kgo.RecordHeader{
Key: param.Headers[i]["key"],
Value: []byte(param.Headers[i]["value"]),
}
}
var data []byte
var err error
switch param.Compression {
case "gzip":
data, err = Gzip([]byte(param.Value))
case "lz4":
data, err = Lz4([]byte(param.Value))
case "zstd":
data, err = Zstd([]byte(param.Value))
case "snappy":
data, err = Snappy([]byte(param.Value))
default:
data = []byte(param.Value)
}
if err != nil {
return errorx.NewBiz("Failed to compress data: " + err.Error())
}
var records []*kgo.Record
for i := 0; i < param.Times; i++ {
records = append(records, &kgo.Record{
Topic: param.Topic,
Value: data,
Key: []byte(param.Key),
Headers: headers2,
Partition: param.Partition,
})
}
res := kc.Client.ProduceSync(ctx, records...)
if err := res.FirstErr(); err != nil {
return errorx.NewBiz("Produce Error" + err.Error())
}
logx.Infof("耗时:%.4f秒 topic: %s, key:%s, num:%v", time.Since(st).Seconds(), param.Topic, param.Key, param.Times)
return nil
}
func (kc *KafkaConn) CreatePartitions(p *CreatePartitionsParam) error {
res, err := kc.Ac.CreatePartitions(context.Background(), p.NumPartitions, p.TopicName)
if err != nil {
return err
}
err = res.Error()
if err != nil {
return errorx.NewBiz("CreatePartitions Error" + err.Error())
}
return nil
}
func (kc *KafkaConn) GetBrokers() []BrokerInfo {
brokers, err := kc.Ac.ListBrokers(context.Background())
if err != nil {
return nil
}
var bs []BrokerInfo
for _, b := range brokers {
bs = append(bs, BrokerInfo{
Id: b.NodeID,
Addr: b.Host + ":" + strconv.Itoa(int(b.Port)),
Rack: b.Rack,
})
}
return bs
}
func (kc *KafkaConn) GetBrokerConfig(id int) (*kadm.ResourceConfigs, error) {
configs, err := kc.Ac.DescribeBrokerConfigs(context.Background(), int32(id))
if err != nil {
return nil, err
}
return &configs, nil
}
func (kc *KafkaConn) DeleteGroup(group string) error {
resp, err := kc.Ac.DeleteGroup(context.Background(), group)
if err != nil {
return errorx.NewBiz("DeleteGroup Error" + err.Error())
}
if resp.Err != nil {
return errorx.NewBiz("DeleteGroup Error" + resp.Err.Error())
}
return err
}
func (kc *KafkaConn) GetGroupMembers(group string) (any, any) {
ctx := context.Background()
resp, err := kc.Ac.DescribeGroups(ctx, group)
if err != nil {
return nil, errorx.NewBiz("DescribeGroups Error" + err.Error())
}
err = resp.Error()
if err != nil {
return nil, errorx.NewBiz("DescribeGroups Error" + err.Error())
}
sortedGroups := resp.Sorted()
membersLst := make([]any, 0)
for _, describedGroup := range sortedGroups {
if describedGroup.Err != nil {
return nil, errorx.NewBiz(fmt.Sprintf("Error describing group %s: %v", describedGroup.Group, describedGroup.Err))
}
for _, member := range describedGroup.Members {
subscribedTPs := make(map[string][]int32)
if consumerMetadata, ok := member.Assigned.AsConsumer(); ok {
tps := consumerMetadata.Topics
for _, tp := range tps {
subscribedTPs[tp.Topic] = tp.Partitions
}
}
membersLst = append(membersLst, map[string]any{
"MemberID": member.MemberID,
"InstanceID": member.InstanceID,
"ClientID": member.ClientID,
"ClientHost": member.ClientHost,
"TPs": subscribedTPs, // TPs:map[topicName:[0]]]]
})
}
return membersLst, nil
}
return membersLst, nil
}

View File

@@ -0,0 +1,110 @@
package mgm
import (
"context"
"fmt"
"mayfly-go/pkg/logx"
"mayfly-go/pkg/pool"
"sync"
"time"
"github.com/twmb/franz-go/pkg/kgo"
)
var (
poolGroup = pool.NewPoolGroup[*KafkaConn]()
// consumerGroup 客户端缓存key: kafkaId:group, value: *kgo.Client
consumerGroupClients = make(map[string]*kgo.Client)
cgClientMu sync.RWMutex
cgClientCleanupDelay = 5 * time.Minute // 空闲 5 分钟后清除
)
// 从缓存中获取 kafka 连接信息,若缓存中不存在则会使用回调函数获取 kafkaInfo 进行连接并缓存
func GetKafkaConn(ctx context.Context, kafkaId uint64, getKafkaInfo func() (*KafkaInfo, error)) (*KafkaConn, error) {
cachePool, err := poolGroup.GetCachePool(getConnId(kafkaId), func() (*KafkaConn, error) {
// 若缓存中不存在,则从回调函数中获取 KafkaInfo
mi, err := getKafkaInfo()
if err != nil {
return nil, err
}
// 连接 kafka
return mi.Conn()
})
if err != nil {
return nil, err
}
// 从连接池中获取一个可用的连接
return cachePool.Get(ctx)
}
// 获取或创建带 group 的消费者客户端(带缓存)
func GetOrCreateConsumerGroupClient(ctx context.Context, kafkaId uint64, group string, info *KafkaInfo, consumeOpts []kgo.Opt) (*kgo.Client, error) {
cacheKey := getConsumerGroupCacheKey(kafkaId, group)
cgClientMu.RLock()
if cl, exists := consumerGroupClients[cacheKey]; exists {
cgClientMu.RUnlock()
return cl, nil
}
cgClientMu.RUnlock()
cgClientMu.Lock()
defer cgClientMu.Unlock()
// double check
if cl, exists := consumerGroupClients[cacheKey]; exists {
return cl, nil
}
// 创建新客户端
// 构建基础配置
opts := info.BuildBaseOpts()
opts = append(opts, kgo.ConsumerGroup(group))
opts = append(opts, consumeOpts...)
cl, err := kgo.NewClient(opts...)
if err != nil {
return nil, err
}
// 缓存客户端
consumerGroupClients[cacheKey] = cl
// 设置定时清理
go scheduleConsumerGroupCleanup(cacheKey, cl)
logx.Debugf("创建消费者组客户端:%s", cacheKey)
return cl, nil
}
// 安排消费者组客户端清理
func scheduleConsumerGroupCleanup(cacheKey string, cl *kgo.Client) {
time.Sleep(cgClientCleanupDelay)
cgClientMu.Lock()
defer cgClientMu.Unlock()
// 检查是否仍为同一个客户端
if cachedCl, exists := consumerGroupClients[cacheKey]; exists && cachedCl == cl {
cl.Close()
delete(consumerGroupClients, cacheKey)
logx.Debugf("清理空闲消费者组客户端:%s", cacheKey)
}
}
// 生成消费者组缓存 key
func getConsumerGroupCacheKey(kafkaId uint64, group string) string {
return fmt.Sprintf("%s:group:%s", getConnId(kafkaId), group)
}
// 关闭连接,并移除缓存连接
func CloseConn(kafkaId uint64) {
err := poolGroup.Close(getConnId(kafkaId))
if err != nil {
logx.Errorf("关闭kafka连接失败%v", err)
return
}
}

View File

@@ -0,0 +1,212 @@
package mgm
import (
"archive/tar"
"bytes"
"compress/gzip"
"fmt"
"io"
"os"
"path/filepath"
"github.com/klauspost/compress/snappy"
"github.com/klauspost/compress/zstd"
"github.com/pierrec/lz4/v4"
)
// Tar 函数:创建 tar 归档
func Tar(sourceDir string, destFile string) error {
// 创建目标文件
file, err := os.Create(destFile)
if err != nil {
return fmt.Errorf("创建目标 tar 文件失败: %w", err)
}
defer file.Close()
tarWriter := tar.NewWriter(file)
defer tarWriter.Close()
// 遍历源目录,将文件添加到 tar 归档
err = filepath.Walk(sourceDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
header, err := tar.FileInfoHeader(info, "") // 基于文件信息创建 tar header
if err != nil {
return fmt.Errorf("tar创建 tar header 失败: %w", err)
}
// tar 归档中,文件路径需要相对于归档根目录,这里使用相对路径
header.Name, err = filepath.Rel(sourceDir, path)
if err != nil {
return fmt.Errorf("tar获取相对路径失败: %w", err)
}
if header.Name == "." { // 根目录相对路径是 ".", 需要修正为空字符串
header.Name = ""
}
if err := tarWriter.WriteHeader(header); err != nil { // 写入 header
return fmt.Errorf("写入 tar header 失败: %w", err)
}
if info.IsDir() { // 如果是目录header 已经写入,无需写入内容
return nil
}
// 打开文件并写入文件内容 添加缓冲写入以提高性能
srcFile, err := os.Open(path)
if err != nil {
return fmt.Errorf("tar打开源文件失败: %w", err)
}
defer srcFile.Close()
buf := make([]byte, 32*1024) // 32KB buffer
_, err = io.CopyBuffer(tarWriter, srcFile, buf)
if err != nil {
return fmt.Errorf("tar复制文件内容到 tar 失败: %w", err)
}
return nil
})
if err != nil {
return fmt.Errorf("tar遍历源目录失败: %w", err)
}
return nil
}
// TarDecompress 函数:解压 tar 归档到指定目录
func TarDecompress(tarFile string, destDir string) error {
file, err := os.Open(tarFile) // 打开 tar 文件
if err != nil {
return fmt.Errorf("打开 tar 文件失败: %w", err)
}
defer file.Close()
tarReader := tar.NewReader(file)
for {
header, err := tarReader.Next() // 读取下一个 header
if err == io.EOF { // 文件结束
break
}
if err != nil {
return fmt.Errorf("读取 tar header 失败: %w", err)
}
targetPath := filepath.Join(destDir, header.Name) // 构建目标路径
if header.Typeflag == tar.TypeDir { // 如果是目录,创建目录
if err := os.MkdirAll(targetPath, os.FileMode(header.Mode)); err != nil {
return fmt.Errorf("tarDecompress: 创建目录失败: %w", err)
}
continue
}
// 创建文件
outFile, err := os.Create(targetPath)
if err != nil {
return fmt.Errorf("tarDecompress: 创建目标文件失败: %w", err)
}
// 复制文件内容
if _, err := io.Copy(outFile, tarReader); err != nil {
_ = outFile.Close()
return fmt.Errorf("tarDecompress: 复制文件内容失败: %w", err)
}
_ = outFile.Close()
}
return nil
}
// Gzip 函数:压缩数据
func Gzip(data []byte) ([]byte, error) {
var buf bytes.Buffer
gzipWriter := gzip.NewWriter(&buf) // 创建 gzip writer写入到 buffer
_, err := gzipWriter.Write(data) // 将原始数据写入 gzip writer 进行压缩
if err != nil {
return nil, fmt.Errorf("gzip write error: %w", err)
}
err = gzipWriter.Close() // **重要**: 关闭 writer完成 gzip 流
if err != nil {
return nil, fmt.Errorf("gzip close error: %w", err)
}
return buf.Bytes(), nil // 返回 buffer 中的压缩数据
}
// GzipDecompress 函数:解压缩数据
func GzipDecompress(compressedData []byte) ([]byte, error) {
buf := bytes.NewReader(compressedData) // 从压缩数据创建 reader
gzipReader, err := gzip.NewReader(buf) // 创建 gzip reader从 buffer 读取压缩数据
if err != nil {
return nil, fmt.Errorf("gzipDecompress: gzip reader creation error: %w", err)
}
defer gzipReader.Close() // 确保 reader 在使用后关闭
decompressedData, err := io.ReadAll(gzipReader) // 读取所有解压缩后的数据
if err != nil {
return nil, fmt.Errorf("gzipDecompress: gzip read error: %w", err)
}
return decompressedData, nil // 返回解压缩后的数据
}
// Zstd 函数:使用 zstd 压缩数据
func Zstd(data []byte) ([]byte, error) {
var buf bytes.Buffer
zstdWriter, err := zstd.NewWriter(&buf) // 创建 zstd writer写入到 buffer
if err != nil {
return nil, fmt.Errorf("zstd writer 创建失败: %w", err)
}
defer zstdWriter.Close() // 确保 writer 关闭
_, err = zstdWriter.Write(data) // 将原始数据写入 zstd writer 进行压缩
if err != nil {
return nil, fmt.Errorf("zstd 写入错误: %w", err)
}
err = zstdWriter.Close() // 完成 zstd 流
if err != nil {
return nil, fmt.Errorf("zstd 关闭错误: %w", err)
}
return buf.Bytes(), nil // 返回 buffer 中的压缩数据
}
// ZstdDecompress 函数:使用 zstd 解压缩数据
func ZstdDecompress(compressedData []byte) ([]byte, error) {
buf := bytes.NewReader(compressedData) // 从压缩数据创建 reader
zstdReader, err := zstd.NewReader(buf) // 创建 zstd reader从 buffer 读取压缩数据
if err != nil {
return nil, fmt.Errorf("zstd reader 创建失败: %w", err)
}
defer zstdReader.Close() // 确保 reader 关闭
decompressedData, err := io.ReadAll(zstdReader) // 读取所有解压缩后的数据
if err != nil {
return nil, fmt.Errorf("zstd 读取错误: %w", err)
}
return decompressedData, nil // 返回解压缩后的数据
}
// Lz4 函数:使用 lz4 压缩数据
func Lz4(data []byte) ([]byte, error) {
var buf bytes.Buffer
lz4Writer := lz4.NewWriter(&buf) // 创建 lz4 writer写入到 buffer
defer lz4Writer.Close() // 确保 writer 关闭
_, err := lz4Writer.Write(data) // 将原始数据写入 lz4 writer 进行压缩
if err != nil {
return nil, fmt.Errorf("lz4 写入错误: %w", err)
}
err = lz4Writer.Close() // 完成 lz4 流
if err != nil {
return nil, fmt.Errorf("lz4 关闭错误: %w", err)
}
return buf.Bytes(), nil // 返回 buffer 中的压缩数据
}
// Lz4Decompress 函数:使用 lz4 解压缩数据
func Lz4Decompress(compressedData []byte) ([]byte, error) {
buf := bytes.NewReader(compressedData) // 从压缩数据创建 reader
lz4Reader := lz4.NewReader(buf) // 创建 lz4 reader从 buffer 读取压缩数据
decompressedData, err := io.ReadAll(lz4Reader) // 读取所有解压缩后的数据
if err != nil {
return nil, fmt.Errorf("lz4 读取错误: %w", err)
}
return decompressedData, nil // 返回解压缩后的数据
}
// Snappy 函数:使用 snappy 压缩数据
func Snappy(data []byte) ([]byte, error) {
compressedData := snappy.Encode(nil, data) // 使用 snappy.Encode 直接压缩,无需 Writer
return compressedData, nil // snappy.Encode 返回压缩后的 []byte以及 error (但目前 snappy.Encode 不直接返回 error)
}
// SnappyDecompress 函数:使用 snappy 解压缩数据
func SnappyDecompress(compressedData []byte) ([]byte, error) {
decompressedData, err := snappy.Decode(nil, compressedData) // 使用 snappy.Decode 解压缩,无需 Reader
if err != nil {
return nil, fmt.Errorf("snappy 解压缩失败: %w", err)
}
return decompressedData, nil
}

View File

@@ -0,0 +1,221 @@
package mgm
import (
"context"
"fmt"
machineapp "mayfly-go/internal/machine/application"
"mayfly-go/pkg/logx"
"mayfly-go/pkg/utils/netx"
"net"
"strings"
"time"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/sasl/plain"
"github.com/twmb/franz-go/pkg/sasl/scram"
)
type KafkaInfo struct {
Id uint64 `json:"id"`
Name string `json:"name"`
// Kafka 连接地址,格式: host1:port1,host2:port2 或单个 broker
Hosts string `json:"-"`
Username string `json:"-"`
Password string `json:"-"`
SshTunnelMachineId int `json:"-"` // ssh隧道机器id
// SASL 配置
SaslEnabled bool `json:"-"`
SaslMechanism string `json:"-"` // PLAIN, SCRAM-SHA-256, SCRAM-SHA-512
}
func (mi *KafkaInfo) Conn() (*KafkaConn, error) {
opts := mi.BuildBaseOpts()
// 创建客户端
client, err := kgo.NewClient(opts...)
if err != nil {
return nil, fmt.Errorf("failed to create kafka client: %w", err)
}
kac := kadm.NewClient(client)
// 测试连接 - 获取 broker 列表来验证连接
brokers, err := kac.ListBrokers(context.Background())
if err != nil {
return nil, fmt.Errorf("failed to list brokers: %w", err)
}
if len(brokers) == 0 {
client.Close()
return nil, fmt.Errorf("no available brokers")
}
logx.Infof("连接 kafka: %s", mi.Hosts)
return &KafkaConn{
Id: getConnId(mi.Id),
Info: mi,
Client: client,
Configs: opts,
Ac: kac,
}, nil
}
func (mi *KafkaInfo) BuildBaseOpts() []kgo.Opt {
// 构建基础配置(复用 KafkaInfo.Conn 的逻辑)
opts := []kgo.Opt{
kgo.SeedBrokers(strings.Split(mi.Hosts, ",")...),
kgo.DialTimeout(8 * time.Second),
kgo.RecordPartitioner(kgo.ManualPartitioner()),
}
// 配置认证
if mi.Username != "" {
scramAuth := &scram.Auth{
User: mi.Username,
Pass: mi.Password,
}
plainAuth := &plain.Auth{
User: mi.Username,
Pass: mi.Password,
}
if mi.SaslMechanism != "" {
switch strings.ToUpper(mi.SaslMechanism) {
case "SCRAM-SHA-256":
opts = append(opts, kgo.SASL(scramAuth.AsSha256Mechanism()))
case "SCRAM-SHA-512":
opts = append(opts, kgo.SASL(scramAuth.AsSha512Mechanism()))
default:
opts = append(opts, kgo.SASL(plainAuth.AsMechanism()))
}
} else {
opts = append(opts, kgo.SASL(plainAuth.AsMechanism()))
}
}
// SSH 隧道
if mi.SshTunnelMachineId > 0 {
stm, err := machineapp.GetMachineApp().GetSshTunnelMachine(context.Background(), mi.SshTunnelMachineId)
if err != nil {
logx.Errorf("获取 ssh隧道失败%v", err)
} else {
dialFn := func(ctx context.Context, network, address string) (net.Conn, error) {
sshConn, err := stm.GetDialConn(network, address)
if err != nil {
return nil, err
}
return &netx.WrapSshConn{Conn: sshConn}, nil
}
opts = append(opts, kgo.Dialer(dialFn))
}
}
return opts
}
type KafkaSshProxyDialer struct {
machineId int
}
func (sd *KafkaSshProxyDialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
stm, err := machineapp.GetMachineApp().GetSshTunnelMachine(ctx, sd.machineId)
if err != nil {
return nil, err
}
if sshConn, err := stm.GetDialConn(network, address); err == nil {
// 将ssh conn包装否则内部设置超时会报错,ssh conn不支持设置超时会返回错误: ssh: tcpChan: deadline not supported
return &netx.WrapSshConn{Conn: sshConn}, nil
} else {
return nil, err
}
}
// Dial 实现 sarama.Dialer 接口
func (sd *KafkaSshProxyDialer) Dial(network, address string) (net.Conn, error) {
return sd.DialContext(context.Background(), network, address)
}
// 生成kafka连接id
func getConnId(id uint64) string {
if id == 0 {
return ""
}
return fmt.Sprintf("kafka:%d", id)
}
// CreateTopicParam 创建Topic参数
type CreateTopicParam struct {
TopicName string `json:"topic" binding:"required" ` // Topic 名称
NumPartitions int32 `json:"numPartitions" binding:"required" ` // 分区数
ReplicationFactor int16 `json:"replicationFactor" binding:"required" ` // 副本数
ConfigEntries map[string]*string `json:"configEntries"` // 配置项
}
// CreatePartitionsParam 修改分区参数
type CreatePartitionsParam struct {
TopicName string `json:"topic"` // Topic 名称
NumPartitions int `json:"numPartitions"` // 分区数
}
// ConsumeMessageParam 消费消息参数
type ConsumeMessageParam struct {
Topic string `json:"topic"`
Group string `json:"group"` // 消费组 ID为空则不使用消费组 // Topic 名称
Partition int32 `json:"partition" default:"-1"` // 分区号,-1 表示所有分区
Number int `json:"number" default:"10"` // 消费消息数量
PullTimeout int `json:"pullTimeout" default:"10"` // 拉取超时时间(秒)
Decompression string `json:"decompression"` // 解压方式gzip, lz4, zstd, snappy
Decode string `json:"decode"` // 解码方式Base64
Earliest bool `json:"earliest"` // 是否从最早开始消费false 则从最新
StartTime string `json:"startTime"` // 消费起始时间戳毫秒0 表示不使用,优先级高于 Earliest
CommitOffset bool `json:"commitOffset"` // 是否提交消费位点
IsolationLevel string `json:"isolationLevel"` // 读取消息的隔离级别,默认为 read_committed
}
// ConsumeMessageResult 消费的消息结果
type ConsumeMessageResult struct {
Id int `json:"id"`
Offset int64 `json:"offset"`
Partition int32 `json:"partition"`
Key string `json:"key"`
Value string `json:"value"`
Timestamp string `json:"timestamp"`
Topic string `json:"topic"`
Headers map[string]string `json:"headers"`
LeaderEpoch int32 `json:"leaderEpoch"`
ProducerEpoch int16 `json:"producerEpoch"`
ProducerID int64 `json:"producerID"`
}
// ProduceMessageParam 生产消息参数
type ProduceMessageParam struct {
Topic string `json:"topic"` // Topic 名称
Key string `json:"key"` // 消息 Key可选
Partition int32 `json:"partition"` // 指定分区号(可选,-1 表示自动选择)
Value string `json:"value"` // 消息内容
Headers []map[string]string `json:"headers"` // 消息 Headers可选
Times int `json:"times"` // 发送次数
Compression string `json:"compression"` // 压缩方式gzip, lz4, zstd, snappy
}
type RecordHeader struct {
Key string
Value string
}
// ProduceMessageResult 生产消息结果
type ProduceMessageResult struct {
Partition int32 `json:"partition"`
Offset int64 `json:"offset"`
}
type BrokerInfo struct {
Id int32 `json:"id"`
Addr string `json:"addr"`
Rack *string `json:"rac"`
}

View File

@@ -10,6 +10,7 @@ const (
ResourceTypeAuthCert int8 = 5
ResourceTypeEsInstance int8 = 6
ResourceTypeContainer int8 = 7
ResourceTypeMqKafka int8 = 8
// imsg起始编号
ImsgNumSys = 10000
@@ -23,4 +24,5 @@ const (
ImsgNumMsg = 90000
ImsgNumEs = 100000
ImsgNumDocker = 110000
ImsgNumMqKafka = 120000
)

View File

@@ -1,6 +1,7 @@
package api
import (
"cmp"
"fmt"
"mayfly-go/internal/tag/api/form"
"mayfly-go/internal/tag/api/vo"
@@ -45,6 +46,9 @@ func (t *TagTree) ReqConfs() *req.Confs {
func (p *TagTree) GetTagTree(rc *req.Ctx) {
tagTypesStr := rc.Query("type")
flatten := rc.Query("flatten") // 是否展开平铺树
flatten = cmp.Or(flatten, "0")
var typePaths []entity.TypePath
if tagTypesStr != "" {
typePaths = collx.ArrayMap(strings.Split(tagTypesStr, ","), func(val string) entity.TypePath {
@@ -63,7 +67,15 @@ func (p *TagTree) GetTagTree(rc *req.Ctx) {
for _, tag := range allTags {
tagTrees = append(tagTrees, tag)
}
rc.ResData = tagTrees.ToTrees(0)
var tree []*vo.TagTreeItem
if flatten != "0" {
tree = tagTrees.ToFlattenTrees(0)
} else {
tree = tagTrees.ToTrees(0)
}
rc.ResData = tree
}
// complteTags 补全标签信息,使其能构造为树结构
@@ -163,22 +175,34 @@ func (p *TagTree) CountTagResource(rc *req.Ctx) {
CodePathLikes: collx.AsArray(tagPath),
}).GetCodePaths()...)
redisCodes := p.tagTreeApp.GetAccountTags(accountId, &entity.TagTreeQuery{
Types: collx.AsArray(entity.TagTypeRedis),
CodePathLikes: collx.AsArray(tagPath),
}).GetCodes()
mongoCodes := p.tagTreeApp.GetAccountTags(accountId, &entity.TagTreeQuery{
Types: collx.AsArray(entity.TagTypeMongo),
CodePathLikes: collx.AsArray(tagPath),
}).GetCodes()
containerCodes := p.tagTreeApp.GetAccountTags(accountId, &entity.TagTreeQuery{
Types: collx.AsArray(entity.TagTypeContainer),
CodePathLikes: collx.AsArray(tagPath),
}).GetCodes()
kafkaCodes := p.tagTreeApp.GetAccountTags(accountId, &entity.TagTreeQuery{
Types: collx.AsArray(entity.TagTypeMqKafka),
CodePathLikes: collx.AsArray(tagPath),
}).GetCodes()
rc.ResData = collx.M{
"machine": len(machineCodes),
"db": len(dbCodes),
"es": len(esCodes),
"redis": len(p.tagTreeApp.GetAccountTags(accountId, &entity.TagTreeQuery{
Types: collx.AsArray(entity.TagTypeRedis),
CodePathLikes: collx.AsArray(tagPath),
}).GetCodes()),
"mongo": len(p.tagTreeApp.GetAccountTags(accountId, &entity.TagTreeQuery{
Types: collx.AsArray(entity.TagTypeMongo),
CodePathLikes: collx.AsArray(tagPath),
}).GetCodes()),
"container": len(p.tagTreeApp.GetAccountTags(accountId, &entity.TagTreeQuery{
Types: collx.AsArray(entity.TagTypeContainer),
CodePathLikes: collx.AsArray(tagPath),
}).GetCodes()),
"machine": len(machineCodes),
"db": len(dbCodes),
"es": len(esCodes),
"redis": len(redisCodes),
"mongo": len(mongoCodes),
"container": len(containerCodes),
"kafka": len(kafkaCodes),
}
}

View File

@@ -2,6 +2,7 @@ package vo
import (
"mayfly-go/internal/tag/application/dto"
"strings"
)
type TagTreeVOS []*dto.SimpleTagTree
@@ -9,6 +10,7 @@ type TagTreeVOS []*dto.SimpleTagTree
type TagTreeItem struct {
*dto.SimpleTagTree
Children []*TagTreeItem `json:"children"`
NamePath string `json:"namePath"`
}
func (m *TagTreeVOS) ToTrees(pid uint64) []*TagTreeItem {
@@ -30,7 +32,6 @@ func (m *TagTreeVOS) ToTrees(pid uint64) []*TagTreeItem {
}
for _, node := range ttis {
// 根节点
if node.Root {
continue
}
@@ -43,3 +44,64 @@ func (m *TagTreeVOS) ToTrees(pid uint64) []*TagTreeItem {
return roots
}
func (m *TagTreeVOS) ToFlattenTrees(pid uint64) []*TagTreeItem {
trees := m.ToTrees(pid)
result := make([]*TagTreeItem, 0)
var flatten func(node *TagTreeItem, namePath []string)
flatten = func(node *TagTreeItem, namePath []string) {
currentNamePath := append(namePath, node.Name)
if node.Type != -1 {
return
}
hasNonMinus1Child := false
for _, child := range node.Children {
if child.Type != -1 {
hasNonMinus1Child = true
break
}
}
if hasNonMinus1Child {
newNode := &TagTreeItem{
SimpleTagTree: node.SimpleTagTree,
Children: make([]*TagTreeItem, 0),
NamePath: strings.Join(currentNamePath, "/"),
}
for _, child := range node.Children {
if child.Type != -1 {
childCopy := &TagTreeItem{
SimpleTagTree: child.SimpleTagTree,
Children: make([]*TagTreeItem, 0),
NamePath: strings.Join(append(currentNamePath, child.Name), "/"),
}
for _, grandchild := range child.Children {
grandchildCopy := &TagTreeItem{
SimpleTagTree: grandchild.SimpleTagTree,
NamePath: strings.Join(append(currentNamePath, child.Name, grandchild.Name), "/"),
}
childCopy.Children = append(childCopy.Children, grandchildCopy)
}
newNode.Children = append(newNode.Children, childCopy)
} else {
flatten(child, currentNamePath)
}
}
result = append(result, newNode)
return
}
for _, child := range node.Children {
flatten(child, currentNamePath)
}
}
for _, tree := range trees {
flatten(tree, []string{})
}
return result
}

View File

@@ -37,6 +37,7 @@ const (
TagTypeMongo TagType = TagType(consts.ResourceTypeMongo)
TagTypeAuthCert TagType = TagType(consts.ResourceTypeAuthCert) // 授权凭证类型
TagTypeContainer TagType = TagType(consts.ResourceTypeContainer)
TagTypeMqKafka TagType = TagType(consts.ResourceTypeMqKafka)
TagTypeDb TagType = 22 // 数据库名
)

View File

@@ -12,6 +12,7 @@ import (
_ "mayfly-go/internal/flow/init"
_ "mayfly-go/internal/machine/init"
_ "mayfly-go/internal/mongo/init"
_ "mayfly-go/internal/mq/init"
_ "mayfly-go/internal/msg/init"
"mayfly-go/internal/pkg/config"
_ "mayfly-go/internal/redis/init"

View File

@@ -7,6 +7,7 @@ import (
esentity "mayfly-go/internal/es/domain/entity"
flowentity "mayfly-go/internal/flow/domain/entity"
machineentity "mayfly-go/internal/machine/domain/entity"
mqentity "mayfly-go/internal/mq/kafka/domain/entity"
msgentity "mayfly-go/internal/msg/domain/entity"
sysentity "mayfly-go/internal/sys/domain/entity"
"mayfly-go/pkg/model"
@@ -25,6 +26,7 @@ func V1_10() []*gormigrate.Migration {
migrations = append(migrations, V1_10_4()...)
migrations = append(migrations, V1_10_5()...)
migrations = append(migrations, V1_10_6()...)
migrations = append(migrations, V1_10_7()...)
return migrations
}
@@ -493,3 +495,44 @@ func V1_10_6() []*gormigrate.Migration {
},
}
}
func V1_10_7() []*gormigrate.Migration {
return []*gormigrate.Migration{
{
ID: "20260313-v1.10.7",
Migrate: func(tx *gorm.DB) error {
tx.AutoMigrate(&mqentity.Kafka{})
tx.Exec("INSERT INTO `t_sys_resource` (`id`, `pid`, `type`, `status`, `name`, `code`, `weight`, `meta`, `creator_id`, `creator`, `modifier_id`, `modifier`, `create_time`, `update_time`, `ui_path`, `is_deleted`, `delete_time`) VALUES (1773796612, 1773796555, 2, 1, 'kafka', 'res:mq:kafka', 1773796612, 'null', 12, 'admin', 12, 'admin', '2026-03-18 09:16:53', '2026-03-18 09:16:53', 'UX7Sa6oG/cC4TRerV/', 0, NULL)")
tx.Exec("INSERT INTO `t_sys_resource` (`id`, `pid`, `type`, `status`, `name`, `code`, `weight`, `meta`, `creator_id`, `creator`, `modifier_id`, `modifier`, `create_time`, `update_time`, `ui_path`, `is_deleted`, `delete_time`) VALUES (1773796555, 94, 2, 1, 'MQ', 'res:mq', 1773796555, 'null', 12, 'admin', 12, 'admin', '2026-03-18 09:15:55', '2026-03-18 09:16:34', 'UX7Sa6oG/', 0, NULL)")
tx.Exec("INSERT INTO `t_sys_resource` (`id`, `pid`, `type`, `status`, `name`, `code`, `weight`, `meta`, `creator_id`, `creator`, `modifier_id`, `modifier`, `create_time`, `update_time`, `ui_path`, `is_deleted`, `delete_time`) VALUES (1773714634, 1773365877, 2, 1, 'group delete', 'kafka:group:delete', 1773714634, 'null', 12, 'admin', 12, 'admin', '2026-03-17 10:30:35', '2026-03-17 10:30:35', 'ocdrUNaa/6f6tY0WZ/nDZWgbDn/QvLeQGrM/', 0, NULL)")
tx.Exec("INSERT INTO `t_sys_resource` (`id`, `pid`, `type`, `status`, `name`, `code`, `weight`, `meta`, `creator_id`, `creator`, `modifier_id`, `modifier`, `create_time`, `update_time`, `ui_path`, `is_deleted`, `delete_time`) VALUES (1773647239, 1773365877, 2, 1, 'topic consume', 'kafka:topic:consume', 1773647239, 'null', 12, 'admin', 12, 'admin', '2026-03-16 15:47:20', '2026-03-16 15:47:20', 'ocdrUNaa/6f6tY0WZ/nDZWgbDn/xgP900Sy/', 0, NULL)")
tx.Exec("INSERT INTO `t_sys_resource` (`id`, `pid`, `type`, `status`, `name`, `code`, `weight`, `meta`, `creator_id`, `creator`, `modifier_id`, `modifier`, `create_time`, `update_time`, `ui_path`, `is_deleted`, `delete_time`) VALUES (1773647223, 1773365877, 2, 1, 'topic produce', 'kafka:topic:produce', 1773647223, 'null', 12, 'admin', 12, 'admin', '2026-03-16 15:47:04', '2026-03-16 15:47:04', 'ocdrUNaa/6f6tY0WZ/nDZWgbDn/dPHcNo2n/', 0, NULL)")
tx.Exec("INSERT INTO `t_sys_resource` (`id`, `pid`, `type`, `status`, `name`, `code`, `weight`, `meta`, `creator_id`, `creator`, `modifier_id`, `modifier`, `create_time`, `update_time`, `ui_path`, `is_deleted`, `delete_time`) VALUES (1773647053, 1773365877, 2, 1, 'topic delete', 'kafka:topic:delete', 1773647053, 'null', 12, 'admin', 12, 'admin', '2026-03-16 15:44:13', '2026-03-16 15:44:13', 'ocdrUNaa/6f6tY0WZ/nDZWgbDn/nzD93hsk/', 0, NULL)")
tx.Exec("INSERT INTO `t_sys_resource` (`id`, `pid`, `type`, `status`, `name`, `code`, `weight`, `meta`, `creator_id`, `creator`, `modifier_id`, `modifier`, `create_time`, `update_time`, `ui_path`, `is_deleted`, `delete_time`) VALUES (1773647032, 1773365877, 2, 1, 'topic create', 'kafka:topic:create', 1773647032, 'null', 12, 'admin', 12, 'admin', '2026-03-16 15:43:53', '2026-03-16 15:43:53', 'ocdrUNaa/6f6tY0WZ/nDZWgbDn/7ZtYUWEn/', 0, NULL)")
tx.Exec("INSERT INTO `t_sys_resource` (`id`, `pid`, `type`, `status`, `name`, `code`, `weight`, `meta`, `creator_id`, `creator`, `modifier_id`, `modifier`, `create_time`, `update_time`, `ui_path`, `is_deleted`, `delete_time`) VALUES (1773365930, 1773796612, 2, 1, 'kafka-删除', 'mq:kafka:del', 0, 'null', 12, 'admin', 12, 'admin', '2026-03-13 09:38:50', '2026-03-18 09:17:39', 'UX7Sa6oG/cC4TRerV/qdaX1R6b/', 0, NULL)")
tx.Exec("INSERT INTO `t_sys_resource` (`id`, `pid`, `type`, `status`, `name`, `code`, `weight`, `meta`, `creator_id`, `creator`, `modifier_id`, `modifier`, `create_time`, `update_time`, `ui_path`, `is_deleted`, `delete_time`) VALUES (1773365913, 1773796612, 2, 1, 'kafka-保存', 'mq:kafka:save', 1, 'null', 12, 'admin', 12, 'admin', '2026-03-13 09:38:33', '2026-03-18 09:17:34', 'UX7Sa6oG/cC4TRerV/xaL2Zi9m/', 0, NULL)")
tx.Exec("INSERT INTO `t_sys_resource` (`id`, `pid`, `type`, `status`, `name`, `code`, `weight`, `meta`, `creator_id`, `creator`, `modifier_id`, `modifier`, `create_time`, `update_time`, `ui_path`, `is_deleted`, `delete_time`) VALUES (1773365877, 1773365853, 2, 1, 'mq:kafka', 'mq:kafka:base', 1773365877, 'null', 12, 'admin', 12, 'admin', '2026-03-13 09:37:57', '2026-03-13 09:37:57', 'ocdrUNaa/6f6tY0WZ/nDZWgbDn/', 0, NULL)")
tx.Exec("INSERT INTO `t_sys_resource` (`id`, `pid`, `type`, `status`, `name`, `code`, `weight`, `meta`, `creator_id`, `creator`, `modifier_id`, `modifier`, `create_time`, `update_time`, `ui_path`, `is_deleted`, `delete_time`) VALUES (1773365853, 1756122788, 2, 1, 'MQ', 'mq:base', 1773365853, 'null', 12, 'admin', 12, 'admin', '2026-03-13 09:37:33', '2026-03-13 09:37:42', 'ocdrUNaa/6f6tY0WZ/', 0, NULL)")
tx.Exec("INSERT INTO `t_sys_role_resource` (`role_id`, `resource_id`, `creator_id`, `creator`, `create_time`, `is_deleted`, `delete_time`) VALUES (1, 1773365853, 12, 'admin', '2026-03-13 09:45:24', 0, NULL)")
tx.Exec("INSERT INTO `t_sys_role_resource` (`role_id`, `resource_id`, `creator_id`, `creator`, `create_time`, `is_deleted`, `delete_time`) VALUES (1, 1773365877, 12, 'admin', '2026-03-13 09:45:24', 0, NULL)")
tx.Exec("INSERT INTO `t_sys_role_resource` (`role_id`, `resource_id`, `creator_id`, `creator`, `create_time`, `is_deleted`, `delete_time`) VALUES (1, 1773365913, 12, 'admin', '2026-03-13 09:45:24', 0, NULL)")
tx.Exec("INSERT INTO `t_sys_role_resource` (`role_id`, `resource_id`, `creator_id`, `creator`, `create_time`, `is_deleted`, `delete_time`) VALUES (1, 1773365930, 12, 'admin', '2026-03-13 09:45:24', 0, NULL)")
tx.Exec("INSERT INTO `t_sys_role_resource` (`role_id`, `resource_id`, `creator_id`, `creator`, `create_time`, `is_deleted`, `delete_time`) VALUES (1, 1773647032, 12, 'admin', '2026-03-16 15:47:27', 0, NULL)")
tx.Exec("INSERT INTO `t_sys_role_resource` (`role_id`, `resource_id`, `creator_id`, `creator`, `create_time`, `is_deleted`, `delete_time`) VALUES (1, 1773647053, 12, 'admin', '2026-03-16 15:47:27', 0, NULL)")
tx.Exec("INSERT INTO `t_sys_role_resource` (`role_id`, `resource_id`, `creator_id`, `creator`, `create_time`, `is_deleted`, `delete_time`) VALUES (1, 1773647239, 12, 'admin', '2026-03-16 15:47:27', 0, NULL)")
tx.Exec("INSERT INTO `t_sys_role_resource` (`role_id`, `resource_id`, `creator_id`, `creator`, `create_time`, `is_deleted`, `delete_time`) VALUES (1, 1773647223, 12, 'admin', '2026-03-16 15:47:27', 0, NULL)")
tx.Exec("INSERT INTO `t_sys_role_resource` (`role_id`, `resource_id`, `creator_id`, `creator`, `create_time`, `is_deleted`, `delete_time`) VALUES (1, 1773714634, 12, 'admin', '2026-03-17 10:31:00', 0, NULL)")
tx.Exec("INSERT INTO `t_sys_role_resource` (`role_id`, `resource_id`, `creator_id`, `creator`, `create_time`, `is_deleted`, `delete_time`) VALUES (1, 1773796555, 12, 'admin', '2026-03-18 09:18:09', 0, NULL)")
tx.Exec("INSERT INTO `t_sys_role_resource` (`role_id`, `resource_id`, `creator_id`, `creator`, `create_time`, `is_deleted`, `delete_time`) VALUES (1, 1773796612, 12, 'admin', '2026-03-18 09:18:09', 0, NULL)")
return nil
},
Rollback: func(tx *gorm.DB) error {
return nil
},
},
}
}