diff --git a/frontend/src/assets/icon/icon.js b/frontend/src/assets/icon/icon.js
index d77737fe..48df420a 100644
--- a/frontend/src/assets/icon/icon.js
+++ b/frontend/src/assets/icon/icon.js
@@ -47,6 +47,7 @@ function convertSvgToSymbol(svgString, symbolId) {
iconNames.push(`icon ${name}`);
svgsymbols += convertSvgToSymbol(allSvgIcons[path].default, name);
}
+
svgsymbols += '';
var t = (t = document.getElementsByTagName('script'))[t.length - 1],
diff --git a/frontend/src/assets/icon/mq/kafka.svg b/frontend/src/assets/icon/mq/kafka.svg
new file mode 100644
index 00000000..f821c109
--- /dev/null
+++ b/frontend/src/assets/icon/mq/kafka.svg
@@ -0,0 +1 @@
+
\ No newline at end of file
diff --git a/frontend/src/assets/icon/mq/mq.svg b/frontend/src/assets/icon/mq/mq.svg
new file mode 100644
index 00000000..80d9f6d3
--- /dev/null
+++ b/frontend/src/assets/icon/mq/mq.svg
@@ -0,0 +1 @@
+
\ No newline at end of file
diff --git a/frontend/src/common/commonEnum.ts b/frontend/src/common/commonEnum.ts
index 8b34b6a5..56d8a95d 100644
--- a/frontend/src/common/commonEnum.ts
+++ b/frontend/src/common/commonEnum.ts
@@ -23,6 +23,7 @@ export const ResourceTypeEnum = {
AuthCert: EnumValue.of(5, 'ac.ac').setExtra({ icon: 'Ticket', iconColor: 'var(--el-color-success)' }),
Es: EnumValue.of(6, 'tag.es').setExtra({ icon: 'icon es/es-color', iconColor: 'var(--el-color-warning)' }).tagTypeWarning(),
Container: EnumValue.of(7, 'tag.container').setExtra({ icon: 'icon docker/docker', iconColor: 'var(--el-color-primary)' }),
+ MqKafka: EnumValue.of(8, 'tag.mq.kafka').setExtra({ icon: 'icon mq/kafka', iconColor: 'var(--el-color-primary)' }),
};
// 标签关联的资源类型
@@ -38,6 +39,8 @@ export const TagResourceTypeEnum = {
AuthCert: ResourceTypeEnum.AuthCert,
Container: ResourceTypeEnum.Container,
+ MqKafka: ResourceTypeEnum.MqKafka,
+
Db: EnumValue.of(22, '数据库').setExtra({ icon: 'icon db/db' }),
};
diff --git a/frontend/src/i18n/en/common.ts b/frontend/src/i18n/en/common.ts
index 76fb6acb..cb327ea3 100644
--- a/frontend/src/i18n/en/common.ts
+++ b/frontend/src/i18n/en/common.ts
@@ -49,6 +49,7 @@ export default {
reset: 'Reset',
success: 'Success',
fail: 'Fail',
+ requestFail: 'Request Fail',
previousStep: 'Previous Step',
nextStep: 'Next Step',
copy: 'Copy',
diff --git a/frontend/src/i18n/en/mq.ts b/frontend/src/i18n/en/mq.ts
new file mode 100644
index 00000000..69ecd675
--- /dev/null
+++ b/frontend/src/i18n/en/mq.ts
@@ -0,0 +1,89 @@
+export default {
+ mq: {
+ kafka: {
+ hosts: 'Hosts',
+ username: 'Username',
+ sasl_mechanism: 'SASL Mechanism',
+ sasl_mechanism_placeholder: 'please select SASL Mechanism',
+ key: 'Key',
+ keywordPlaceholder: 'ip/name',
+ nodeManage: 'Node Management',
+ topicManage: 'Topic Management',
+ produceMessage: 'Produce Message',
+ consumeMessage: 'Consume Message',
+ nodes: 'Nodes',
+ config: 'Config',
+ nodeId: 'Node ID',
+ addr: 'Addr',
+ rack: 'Rack',
+ viewConfig: 'View Config',
+ brokerConfig: 'Broker Config',
+ topicConfig: 'topic Config',
+ selectTopic: 'select topic',
+ selectTopicPlaceholder: 'select topic',
+ selectTopicWarning: 'Please select topic',
+ selectBrokerViewConfig: 'Please select a broker to view config',
+ configName: 'Config Name',
+ configValue: 'Config Value',
+ configSource: 'Config Source',
+ configSensitive: 'Sensitive',
+ searchTopic: 'Enter topic name',
+ createTopic: 'Create Topic',
+ createPartitions: 'Create Partitions',
+ topicName: 'Topic Name',
+ topicNamePlaceholder: 'Please enter topic name',
+ partitions: 'Partitions',
+ topicPartitions: 'Topic Partitions',
+ replicationFactor: 'Replication Factor',
+ topicStatus: 'Status',
+ topicIsInternal: 'IsInternal',
+ viewPartitions: 'View Partitions',
+ delete: 'Delete',
+ messageKey: 'Message Key',
+ messageKeyPlaceholder: 'Optional: Enter message key',
+ partition: 'Partition',
+ partitionPlaceholder: 'Optional: Specify partition number, -1 for auto',
+ messageBody: 'Message Body',
+ messageHeaders: 'Message Headers',
+ addHeader: 'Add Header',
+ headerKey: 'Header Key',
+ headerValue: 'Header Value',
+ sendTimes: 'Send Times',
+ compression: 'Compression',
+ compressionPlaceholder: 'Please select compression',
+ sendMessage: 'Send Message',
+ messageNumber: 'Number',
+ consumerGroup: 'Consumer Group',
+ consumerGroupPlaceholder: 'Enter consumer group, empty for auto generate',
+ consumerOnlyTip: 'Note: Only effective when Group is first consumed, subsequent changes are invalid',
+ pullTimeout: 'Pull Timeout (s)',
+ decompression: 'Decompression',
+ decompressionPlaceholder: 'Please select decompression',
+ decode: 'Decode',
+ decodePlaceholder: 'Please select decode',
+ isolationLevel: 'Isolation Level',
+ isolationLevelPlaceholder: 'Please select isolation level',
+ readUncommitted: 'Read Uncommitted',
+ readCommitted: 'Read Committed',
+ commitOffset: 'Commit Offset',
+ loadOffsets: 'Load Offsets',
+ defaultConsumePosition: 'Default Consume Position',
+ earliest: 'Earliest',
+ latest: 'Latest',
+ defaultConsumeStartTime: 'Default Consume Start Time',
+ selectDateTime: 'Select date time',
+ offset: 'Offset',
+ timestamp: 'Timestamp',
+ headers: 'Headers',
+ messageDetail: 'Message Detail',
+ groupId: 'Group ID',
+ coordinator: 'Coordinator',
+ state: 'State',
+ protocolType: 'Protocol Type',
+ searchGroup: 'Enter group name',
+ selectGroupPlaceholder: 'select group',
+ Members: 'Members',
+ partitionsFeatureComingSoon: 'Partitions feature coming soon',
+ },
+ },
+};
diff --git a/frontend/src/i18n/en/tag.ts b/frontend/src/i18n/en/tag.ts
index 749f0936..4c23d032 100644
--- a/frontend/src/i18n/en/tag.ts
+++ b/frontend/src/i18n/en/tag.ts
@@ -21,6 +21,10 @@ export default {
esDataOp: 'Es Operation',
mongoDataOp: 'Mongo Operation',
allResource: 'All Resource',
+ mq: {
+ kafka: 'Kafka',
+ kafkaOp: 'Kafka Operation',
+ },
},
team: {
team: 'Team',
diff --git a/frontend/src/i18n/zh-cn/common.ts b/frontend/src/i18n/zh-cn/common.ts
index 13d2b58a..ad45270f 100644
--- a/frontend/src/i18n/zh-cn/common.ts
+++ b/frontend/src/i18n/zh-cn/common.ts
@@ -49,6 +49,7 @@ export default {
reset: '重置',
success: '成功',
fail: '失败',
+ requestFail: '请求失败',
previousStep: '上一步',
nextStep: '下一步',
copy: '复制',
diff --git a/frontend/src/i18n/zh-cn/mq.ts b/frontend/src/i18n/zh-cn/mq.ts
new file mode 100644
index 00000000..17363854
--- /dev/null
+++ b/frontend/src/i18n/zh-cn/mq.ts
@@ -0,0 +1,89 @@
+export default {
+ mq: {
+ kafka: {
+ hosts: 'Hosts',
+ username: '用户名',
+ sasl_mechanism: 'SASL机制',
+ sasl_mechanism_placeholder: '请选择SASL机制',
+ key: 'Key',
+ keywordPlaceholder: 'ip/名称',
+ nodeManage: '节点管理',
+ topicManage: '主题管理',
+ produceMessage: '生产消息',
+ consumeMessage: '消费消息',
+ nodes: '节点',
+ config: '配置',
+ nodeId: '节点 ID',
+ addr: 'Addr',
+ rack: '机架',
+ viewConfig: '查看配置',
+ brokerConfig: 'Broker 配置',
+ topicConfig: 'Topic 配置',
+ selectTopic: '选择 topic',
+ selectTopicPlaceholder: '选择 topic',
+ selectTopicWarning: '请选择 topic',
+ selectBrokerViewConfig: '请选择 Broker 查看配置',
+ configName: '配置项',
+ configValue: '配置值',
+ configSource: '配置来源',
+ configSensitive: '敏感',
+ searchTopic: '输入主题名称',
+ createTopic: '创建主题',
+ createPartitions: '添加分区',
+ topicName: '主题名称',
+ topicNamePlaceholder: '请输入主题名称',
+ partitions: '分区',
+ topicPartitions: 'Topic 分区信息',
+ replicationFactor: '副本数',
+ topicStatus: '状态',
+ topicIsInternal: '是否内建',
+ viewPartitions: '查看分区',
+ delete: '删除',
+ messageKey: '消息 Key',
+ messageKeyPlaceholder: '可选:输入消息 Key',
+ partition: '分区号',
+ partitionPlaceholder: '可选:指定分区号,-1 表示自动分配',
+ messageBody: '消息内容',
+ messageHeaders: '消息 Headers',
+ addHeader: '添加 Header',
+ headerKey: 'Header Key',
+ headerValue: 'Header Value',
+ sendTimes: '发送次数',
+ compression: '压缩',
+ compressionPlaceholder: '请选择压缩方式',
+ sendMessage: '发送消息',
+ messageNumber: '消息数量',
+ consumerGroup: '消费者组',
+ consumerGroupPlaceholder: '请输入消费者组,为空则自动生成',
+ consumerOnlyTip: '注意:仅在Group首次消费时生效,后续改动无效',
+ pullTimeout: '拉取超时 (秒)',
+ decompression: '解压',
+ decompressionPlaceholder: '请选择解压方式',
+ decode: '解码',
+ decodePlaceholder: '请选择解码方式',
+ isolationLevel: '隔离级别',
+ isolationLevelPlaceholder: '请选择隔离级别',
+ readUncommitted: '读未提交',
+ readCommitted: '读已提交',
+ commitOffset: '提交 Offset',
+ loadOffsets: '读取 Offsets',
+ defaultConsumePosition: '默认消费位置',
+ earliest: '最早',
+ latest: '最新',
+ defaultConsumeStartTime: '默认消费起始时间',
+ selectDateTime: '选择日期时间',
+ offset: '偏移量',
+ timestamp: '时间戳',
+ headers: 'Headers',
+ messageDetail: '消息详情',
+ groupId: '组 ID',
+ coordinator: '协调器',
+ state: '状态',
+ protocolType: '协议类型',
+ searchGroup: '输入组名称',
+ selectGroupPlaceholder: '选择分组',
+ Members: '成员',
+ partitionsFeatureComingSoon: '分区详情功能即将上线',
+ },
+ },
+};
diff --git a/frontend/src/i18n/zh-cn/tag.ts b/frontend/src/i18n/zh-cn/tag.ts
index 27c47491..e198ee61 100644
--- a/frontend/src/i18n/zh-cn/tag.ts
+++ b/frontend/src/i18n/zh-cn/tag.ts
@@ -24,6 +24,10 @@ export default {
mongoDataOp: 'Mongo操作',
containerOp: '容器操作',
allResource: '所有资源',
+ mq: {
+ kafka: 'Kafka',
+ kafkaOp: 'Kafka操作',
+ },
},
team: {
team: '团队',
diff --git a/frontend/src/views/ops/db/component/table/DbTableData.vue b/frontend/src/views/ops/db/component/table/DbTableData.vue
index c741f69d..f5ae843d 100644
--- a/frontend/src/views/ops/db/component/table/DbTableData.vue
+++ b/frontend/src/views/ops/db/component/table/DbTableData.vue
@@ -125,7 +125,7 @@
- {{ rowIndex + 1 }}
+ {{ (pageNum - 1) * pageSize + rowIndex + 1 }}
@@ -259,6 +259,14 @@ const props = defineProps({
type: String,
default: '600px',
},
+ pageSize: {
+ type: Number,
+ default: 25,
+ },
+ pageNum: {
+ type: Number,
+ default: 1,
+ },
});
const contextmenuRef = ref();
diff --git a/frontend/src/views/ops/db/component/table/DbTableDataOp.vue b/frontend/src/views/ops/db/component/table/DbTableDataOp.vue
index 46fbeae9..980d938e 100644
--- a/frontend/src/views/ops/db/component/table/DbTableDataOp.vue
+++ b/frontend/src/views/ops/db/component/table/DbTableDataOp.vue
@@ -140,6 +140,8 @@
:columns="columns"
:loading="loading"
:height="tableHeight"
+ :page-size="pageSize"
+ :page-num="pageNum"
:show-column-tip="true"
@sort-change="(sort: any) => onTableSortChange(sort)"
@selection-change="onDataSelectionChange"
diff --git a/frontend/src/views/ops/docker/container/ContainerCreate.vue b/frontend/src/views/ops/docker/container/ContainerCreate.vue
index 37a66290..76b13ed5 100644
--- a/frontend/src/views/ops/docker/container/ContainerCreate.vue
+++ b/frontend/src/views/ops/docker/container/ContainerCreate.vue
@@ -232,7 +232,7 @@
-
+
{{ $t('common.delete') }}
diff --git a/frontend/src/views/ops/docker/container/ContainerList.vue b/frontend/src/views/ops/docker/container/ContainerList.vue
index 72f695f3..41bb57f6 100644
--- a/frontend/src/views/ops/docker/container/ContainerList.vue
+++ b/frontend/src/views/ops/docker/container/ContainerList.vue
@@ -102,7 +102,7 @@
-
+
{{ network || '-' }}
diff --git a/frontend/src/views/ops/mq/api.ts b/frontend/src/views/ops/mq/api.ts
new file mode 100644
index 00000000..5d143967
--- /dev/null
+++ b/frontend/src/views/ops/mq/api.ts
@@ -0,0 +1,21 @@
+import Api from '@/common/Api';
+
+export const mqApi = {
+ kafkaList: Api.newGet('/mq/kafka'),
+ KafkaTestConn: Api.newPost('/mq/kafka/test-conn'),
+ kafkaSave: Api.newPost('/mq/kafka'),
+ kafkaDel: Api.newDelete('/mq/kafka/{id}'),
+ kafkaGetPwd: Api.newGet('/mq/kafka/{id}/pwd'),
+ kafkaTopicList: Api.newGet('/mq/kafka/{id}/getTopics'),
+ kafkaTopicCreate: Api.newPost('/mq/kafka/{id}/createTopic'),
+ kafkaTopicInfo: Api.newGet('/mq/kafka/{id}/{topic}/getTopicConfig'),
+ kafkaTopicDelete: Api.newDelete('/mq/kafka/{id}/{topic}/deleteTopic'),
+ kafkaTopicCreatePartitions: Api.newPost('/mq/kafka/{id}/createPartitions'),
+ kafkaTopicProduce: Api.newPost('/mq/kafka/{id}/{topic}/produce'),
+ kafkaTopicConsume: Api.newPost('/mq/kafka/{id}/{topic}/consume'),
+ kafkaTopicBrokers: Api.newGet('/mq/kafka/{id}/getBrokers'),
+ kafkaTopicBrokerConfig: Api.newGet('/mq/kafka/{id}/getBrokerConfig/{brokerId}'),
+ kafkaGetGroups: Api.newGet('/mq/kafka/{id}/getGroups'),
+ kafkaDeleteGroup: Api.newDelete('/mq/kafka/{id}/deleteGroup/{group}'),
+ kafkaGetGroupMembers: Api.newGet('/mq/kafka/{id}/getGroupMembers/{group}'),
+};
diff --git a/frontend/src/views/ops/mq/kafka/KafkaEdit.vue b/frontend/src/views/ops/mq/kafka/KafkaEdit.vue
new file mode 100644
index 00000000..a2a4c0ab
--- /dev/null
+++ b/frontend/src/views/ops/mq/kafka/KafkaEdit.vue
@@ -0,0 +1,165 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/frontend/src/views/ops/mq/kafka/KafkaList.vue b/frontend/src/views/ops/mq/kafka/KafkaList.vue
new file mode 100644
index 00000000..ff0710d4
--- /dev/null
+++ b/frontend/src/views/ops/mq/kafka/KafkaList.vue
@@ -0,0 +1,140 @@
+
+
+
+
+ {{ $t('common.create') }}
+
+ {{ $t('common.delete') }}
+
+
+
+
+
+
+
+
+ {{ $t('common.edit') }}
+
+
+
+
+
+
+
+
+
+
diff --git a/frontend/src/views/ops/mq/kafka/component/ConsumeMessage.vue b/frontend/src/views/ops/mq/kafka/component/ConsumeMessage.vue
new file mode 100644
index 00000000..cb06f703
--- /dev/null
+++ b/frontend/src/views/ops/mq/kafka/component/ConsumeMessage.vue
@@ -0,0 +1,278 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ {{ $t('common.reset') }}
+
+ {{ $t('mq.kafka.consumeMessage') }}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ {{ JSON.stringify(row.headers) }}
+
+
+
+
+
+
+
+
+
diff --git a/frontend/src/views/ops/mq/kafka/component/ConsumerGroup.vue b/frontend/src/views/ops/mq/kafka/component/ConsumerGroup.vue
new file mode 100644
index 00000000..dfbfe4b0
--- /dev/null
+++ b/frontend/src/views/ops/mq/kafka/component/ConsumerGroup.vue
@@ -0,0 +1,127 @@
+
+
+
+
+
+
+
+
+
+ {{ row.State }}
+
+
+
+
+
+
+ {{ $t('mq.kafka.Members') }}
+
+
+ {{ $t('common.delete') }}
+
+
+
+
+
+
+
+
+
+
diff --git a/frontend/src/views/ops/mq/kafka/component/NodeManage.vue b/frontend/src/views/ops/mq/kafka/component/NodeManage.vue
new file mode 100644
index 00000000..14b6fb49
--- /dev/null
+++ b/frontend/src/views/ops/mq/kafka/component/NodeManage.vue
@@ -0,0 +1,147 @@
+
+
+
+
+ {{ $t('common.refresh') }}
+
+
+
+
+
+
+
+
+
+
+ {{ $t('mq.kafka.viewConfig') }}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/frontend/src/views/ops/mq/kafka/component/ProduceMessage.vue b/frontend/src/views/ops/mq/kafka/component/ProduceMessage.vue
new file mode 100644
index 00000000..7454853c
--- /dev/null
+++ b/frontend/src/views/ops/mq/kafka/component/ProduceMessage.vue
@@ -0,0 +1,197 @@
+
+
+
+
+
+
+
+
+ {{ $t('mq.kafka.selectTopic') }}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ {{ $t('mq.kafka.addHeader') }}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ {{ $t('common.reset') }}
+
+ {{ $t('mq.kafka.sendMessage') }}
+
+
+
+
+
+
+
+
+
diff --git a/frontend/src/views/ops/mq/kafka/component/TopicManage.vue b/frontend/src/views/ops/mq/kafka/component/TopicManage.vue
new file mode 100644
index 00000000..61b0934d
--- /dev/null
+++ b/frontend/src/views/ops/mq/kafka/component/TopicManage.vue
@@ -0,0 +1,555 @@
+
+
+
+
+
+
+
+ {{ row.name }}
+
+
+
+
+ {{ row.partitionCount }}
+
+
+
+
+
+ {{ row.status }}
+
+
+
+
+ {{ row.isInternal ? 'Y' : 'N' }}
+
+
+
+
+
+
+ {{ $t('common.operation') }}
+
+
+
+
+ {{ $t('mq.kafka.produceMessage') }}
+
+
+ {{ $t('mq.kafka.consumeMessage') }}
+
+
+ {{ $t('mq.kafka.viewPartitions') }}
+
+
+ {{ $t('mq.kafka.viewConfig') }}
+
+
+ {{ $t('common.delete') }}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ {{ $t('common.cancel') }}
+
+ {{ $t('common.confirm') }}
+
+
+
+
+
+
+
+
+
+
+
+
+ {{ $t('common.cancel') }}
+
+ {{ $t('common.confirm') }}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ {{ row.err == 0 ? 'HEALTHY' : `Error: ${row.err}` }}
+
+
+
+
+
+
+
+ {{ replica }}
+
+
+
+
+
+
+ {{ isr }}
+
+
+
+
+
+
+
+
+
+
+
diff --git a/frontend/src/views/ops/mq/kafka/resource/KafkaOp.vue b/frontend/src/views/ops/mq/kafka/resource/KafkaOp.vue
new file mode 100644
index 00000000..9399316e
--- /dev/null
+++ b/frontend/src/views/ops/mq/kafka/resource/KafkaOp.vue
@@ -0,0 +1,140 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/frontend/src/views/ops/mq/kafka/resource/NodeKafka.vue b/frontend/src/views/ops/mq/kafka/resource/NodeKafka.vue
new file mode 100644
index 00000000..3c290f7f
--- /dev/null
+++ b/frontend/src/views/ops/mq/kafka/resource/NodeKafka.vue
@@ -0,0 +1,32 @@
+
+
+
+
+
+
+
+
+
+
+
+ {{ data.params.name }}
+
+
+ {{ data.params.hosts }}
+
+
+ {{ data.params.remark }}
+
+
+
+
+
+
+
+
+
+
+
diff --git a/frontend/src/views/ops/mq/kafka/resource/index.ts b/frontend/src/views/ops/mq/kafka/resource/index.ts
new file mode 100644
index 00000000..b0d006db
--- /dev/null
+++ b/frontend/src/views/ops/mq/kafka/resource/index.ts
@@ -0,0 +1,54 @@
+import { defineAsyncComponent } from 'vue';
+import { NodeType, TagTreeNode, ResourceComponentConfig, ResourceConfig } from '../../../component/tag';
+import { ResourceTypeEnum, TagResourceTypeEnum } from '@/common/commonEnum';
+import { sleep } from '@/common/utils/loading';
+import { mqApi } from '@/views/ops/mq/api';
+
+export const KafkaIcon = {
+ name: ResourceTypeEnum.MqKafka.extra.icon,
+ color: ResourceTypeEnum.MqKafka.extra.iconColor,
+};
+
+const KafkaList = defineAsyncComponent(() => import('../KafkaList.vue'));
+const KafkaOp = defineAsyncComponent(() => import('./KafkaOp.vue'));
+
+const NodeKafka = defineAsyncComponent(() => import('./NodeKafka.vue'));
+
+export const KafkaOpComp: ResourceComponentConfig = {
+ name: 'tag.mq.kafkaOp',
+ component: KafkaOp,
+ icon: KafkaIcon,
+};
+
+const NodeTypeKafka = new NodeType(TagResourceTypeEnum.MqKafka.value).withNodeClickFunc(async (node: TagTreeNode) => {
+ (await node.ctx?.addResourceComponent(KafkaOpComp)).initKafka(node.params);
+});
+
+// tagpath 节点类型
+const NodeTypeKafkaTag = new NodeType(TagTreeNode.TagPath).withLoadNodesFunc(async (parentNode: TagTreeNode) => {
+ const tagPath = parentNode.params.tagPath;
+ const res = await mqApi.kafkaList.request({ tagPath });
+ if (!res.total) {
+ return [];
+ }
+ const kafkaInfos = res.list;
+ await sleep(100);
+ return kafkaInfos.map((x: any) => {
+ return TagTreeNode.new(parentNode, `${x.code}`, x.name, NodeTypeKafka).withIsLeaf(true).withParams(x).withNodeComponent(NodeKafka);
+ });
+});
+
+export default {
+ order: 6,
+ resourceType: TagResourceTypeEnum.MqKafka.value,
+ rootNodeType: NodeTypeKafkaTag,
+ manager: {
+ componentConf: {
+ component: KafkaList,
+ icon: KafkaIcon,
+ name: 'kafka',
+ },
+ countKey: 'kafka',
+ permCode: 'mq:kafka:base',
+ },
+} as ResourceConfig;
diff --git a/frontend/src/views/ops/redis/resource/index.ts b/frontend/src/views/ops/redis/resource/index.ts
index 92725114..ee55062c 100644
--- a/frontend/src/views/ops/redis/resource/index.ts
+++ b/frontend/src/views/ops/redis/resource/index.ts
@@ -23,8 +23,6 @@ export const RedisOpComp: ResourceComponentConfig = {
// tagpath 节点类型
const NodeTypeRedisTag = new NodeType(TagTreeNode.TagPath).withLoadNodesFunc(async (parentNode: TagTreeNode) => {
- parentNode.ctx?.addResourceComponent(RedisOpComp);
-
const res = await redisApi.redisList.request({ tagPath: parentNode.params.tagPath });
if (!res.total) {
return [];
diff --git a/frontend/src/views/ops/resource/ResourceOp.vue b/frontend/src/views/ops/resource/ResourceOp.vue
index 43ac7369..1ee92bcd 100644
--- a/frontend/src/views/ops/resource/ResourceOp.vue
+++ b/frontend/src/views/ops/resource/ResourceOp.vue
@@ -371,6 +371,7 @@ const onResizeOpPanel = () => {
const loadTags = async () => {
const tags = await tagApi.getTagTrees.request({
type: getResourceTypes().join(','),
+ flatten: '1',
});
const tagNodes = [];
for (let tag of tags) {
@@ -381,7 +382,7 @@ const loadTags = async () => {
};
const processTagNode = (tag: any): TagTreeNode => {
- const tagNode = new TagTreeNode(tag.codePath, tag.name, tag.type);
+ const tagNode = new TagTreeNode(tag.codePath, tag.namePath, tag.type);
if (!tag.children || !Array.isArray(tag.children) || tag.children.length == 0) {
return tagNode;
diff --git a/server/go.mod b/server/go.mod
index 909dc592..652ab414 100644
--- a/server/go.mod
+++ b/server/go.mod
@@ -21,10 +21,12 @@ require (
github.com/golang-jwt/jwt/v5 v5.3.1
github.com/google/uuid v1.6.0
github.com/gorilla/websocket v1.5.3
+ github.com/klauspost/compress v1.18.4
github.com/lionsoul2014/ip2region/binding/golang v0.0.0-20250930013652-2d71241a3bb9
github.com/microsoft/go-mssqldb v1.9.6
github.com/mojocn/base64Captcha v1.3.8 // 验证码
github.com/opencontainers/image-spec v1.1.1
+ github.com/pierrec/lz4/v4 v4.1.25
github.com/pkg/sftp v1.13.10
github.com/pquerna/otp v1.5.0
github.com/redis/go-redis/v9 v9.18.0
@@ -33,6 +35,8 @@ require (
github.com/spf13/cast v1.10.0
github.com/stretchr/testify v1.11.1
github.com/tidwall/gjson v1.18.0
+ github.com/twmb/franz-go v1.20.7
+ github.com/twmb/franz-go/pkg/kadm v1.17.2
go.mongodb.org/mongo-driver/v2 v2.5.0 // mongo
golang.org/x/crypto v0.48.0
golang.org/x/oauth2 v0.35.0
@@ -84,7 +88,6 @@ require (
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/json-iterator/go v1.1.12 // indirect
- github.com/klauspost/compress v1.18.0 // indirect
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
github.com/kr/fs v0.1.0 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
@@ -113,6 +116,7 @@ require (
github.com/tidwall/pretty v1.2.1 // indirect
github.com/tjfoc/gmsm v1.4.1 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
+ github.com/twmb/franz-go/pkg/kmsg v1.12.0 // indirect
github.com/ugorji/go/codec v1.3.1 // indirect
github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
diff --git a/server/internal/db/dbm/dm/meta.go b/server/internal/db/dbm/dm/meta.go
index ee3973d1..5aad02d2 100644
--- a/server/internal/db/dbm/dm/meta.go
+++ b/server/internal/db/dbm/dm/meta.go
@@ -26,20 +26,31 @@ type Meta struct {
func (dm *Meta) GetSqlDb(ctx context.Context, d *dbi.DbInfo) (*sql.DB, error) {
driverName := "dm"
db := d.Database
+ schema := ""
dbParam := "?escapeProcess=true"
if db != "" {
// dm database可以使用db/schema表示,方便连接指定schema, 若不存在schema则使用默认schema
ss := strings.Split(db, "/")
if len(ss) > 1 {
- dbParam = fmt.Sprintf("%s&schema=\"%s\"", dbParam, ss[len(ss)-1])
+ schema = ss[1]
}
}
if d.Params != "" {
dbParam += "&" + d.Params
}
+ dsn := fmt.Sprintf("dm://%s:%s@%s:%d%s&appName=mayfly-go", d.Username, url.PathEscape(d.Password), d.Host, d.Port, dbParam)
+ conn, err := sql.Open(driverName, dsn)
+ if err != nil {
+ return nil, err
+ }
+ if schema != "" {
+ _, err := conn.Exec(fmt.Sprintf("SET SCHEMA %s", schema))
+ if err != nil {
+ return nil, err
+ }
+ }
- dsn := fmt.Sprintf("dm://%s:%s@%s:%d%s", d.Username, url.PathEscape(d.Password), d.Host, d.Port, dbParam)
- return sql.Open(driverName, dsn)
+ return conn, nil
}
func (dm *Meta) GetDialect(conn *dbi.DbConn) dbi.Dialect {
@@ -52,7 +63,7 @@ func (dm *Meta) GetMetadata(conn *dbi.DbConn) dbi.Metadata {
}
}
-func (sm *Meta) GetDbDataTypes() []*dbi.DbDataType {
+func (dm *Meta) GetDbDataTypes() []*dbi.DbDataType {
return collx.AsArray[*dbi.DbDataType](CHAR, VARCHAR, TEXT, LONG, LONGVARCHAR, IMAGE, LONGVARBINARY, CLOB,
BLOB,
NUMERIC, DECIMAL, NUMBER, INTEGER, INT, BIGINT, TINYINT, BYTE, SMALLINT, BIT, DOUBLE, FLOAT,
@@ -63,6 +74,6 @@ func (sm *Meta) GetDbDataTypes() []*dbi.DbDataType {
)
}
-func (mm *Meta) GetCommonTypeConverter() dbi.CommonTypeConverter {
+func (dm *Meta) GetCommonTypeConverter() dbi.CommonTypeConverter {
return &commonTypeConverter{}
}
diff --git a/server/internal/mq/init/init.go b/server/internal/mq/init/init.go
new file mode 100644
index 00000000..533bece4
--- /dev/null
+++ b/server/internal/mq/init/init.go
@@ -0,0 +1,5 @@
+package init
+
+import (
+ _ "mayfly-go/internal/mq/kafka/init"
+)
diff --git a/server/internal/mq/kafka/api/api.go b/server/internal/mq/kafka/api/api.go
new file mode 100644
index 00000000..96fead08
--- /dev/null
+++ b/server/internal/mq/kafka/api/api.go
@@ -0,0 +1,7 @@
+package api
+
+import "mayfly-go/pkg/ioc"
+
+func InitIoc() {
+ ioc.Register(new(Kafka))
+}
diff --git a/server/internal/mq/kafka/api/form/kafka.go b/server/internal/mq/kafka/api/form/kafka.go
new file mode 100644
index 00000000..d4b76bdf
--- /dev/null
+++ b/server/internal/mq/kafka/api/form/kafka.go
@@ -0,0 +1,18 @@
+package form
+
+import "mayfly-go/pkg/model"
+
+type Kafka struct {
+ model.IdModel
+
+ Code string `json:"code"`
+ Name string `json:"name" binding:"required"`
+ Hosts string `json:"hosts" binding:"required"` // Kafka 连接地址,格式: host1:port1,host2:port2 或单个 broker
+ Username *string `json:"username"`
+ Password *string `json:"password"`
+ SaslMechanism *string `json:"saslMechanism"`
+
+ SshTunnelMachineId int `json:"sshTunnelMachineId"`
+
+ TagCodePaths []string `json:"tagCodePaths" binding:"required"`
+}
diff --git a/server/internal/mq/kafka/api/kafka.go b/server/internal/mq/kafka/api/kafka.go
new file mode 100644
index 00000000..49603ac2
--- /dev/null
+++ b/server/internal/mq/kafka/api/kafka.go
@@ -0,0 +1,268 @@
+package api
+
+import (
+ "mayfly-go/internal/mq/kafka/api/form"
+ "mayfly-go/internal/mq/kafka/api/vo"
+ "mayfly-go/internal/mq/kafka/application"
+ "mayfly-go/internal/mq/kafka/domain/entity"
+ "mayfly-go/internal/mq/kafka/imsg"
+ "mayfly-go/internal/mq/kafka/mgm"
+ "mayfly-go/internal/pkg/consts"
+ tagapp "mayfly-go/internal/tag/application"
+ tagentity "mayfly-go/internal/tag/domain/entity"
+ "mayfly-go/pkg/biz"
+ "mayfly-go/pkg/model"
+ "mayfly-go/pkg/req"
+ "mayfly-go/pkg/utils/collx"
+ "strings"
+
+ "github.com/spf13/cast"
+)
+
+type Kafka struct {
+ kafkaApp application.Kafka `inject:"T"`
+ tagTreeApp tagapp.TagTree `inject:"T"`
+}
+
+func (k *Kafka) ReqConfs() *req.Confs {
+ createTopicPerm := req.NewPermission("kafka:topic:create")
+ deleteTopicPerm := req.NewPermission("kafka:topic:delete")
+ deleteGroupPerm := req.NewPermission("kafka:group:delete")
+
+ reqs := [...]*req.Conf{
+ // 获取所有kafka列表
+ req.NewGet("", k.Kafkas),
+
+ req.NewPost("/test-conn", k.TestConn),
+
+ req.NewPost("", k.Save).Log(req.NewLogSaveI(imsg.LogKafkaSave)),
+
+ req.NewDelete(":id", k.DeleteById).Log(req.NewLogSaveI(imsg.LogKafkaDelete)),
+
+ req.NewGet(":id/getTopics", k.GetTopics),
+ //CreateTopics 创建主题
+ req.NewPost(":id/createTopic", k.CreateTopic).RequiredPermission(createTopicPerm).Log(req.NewLogSaveI(imsg.LogKafkaCreateTopic)),
+
+ req.NewDelete(":id/:topic/deleteTopic", k.DeleteTopic).RequiredPermission(deleteTopicPerm).Log(req.NewLogSaveI(imsg.LogKafkaDeleteTopic)),
+
+ req.NewGet(":id/:topic/getTopicConfig", k.GetTopicConfig),
+ // CreatePartitions 添加分区
+ req.NewPost(":id/createPartitions", k.CreatePartitions),
+ req.NewPost(":id/:topic/produce", k.Produce),
+ req.NewPost(":id/:topic/consume", k.Consume),
+
+ // 获取集群信息
+ req.NewGet(":id/getBrokers", k.GetBrokers),
+
+ // GetBrokerConfig 获取Broker配置
+ req.NewGet(":id/getBrokerConfig/:brokerId", k.GetBrokerConfig),
+
+ // GetGroups 获取消费组信息
+ req.NewGet(":id/getGroups", k.GetGroups),
+ req.NewGet(":id/getGroupMembers/:group", k.GetGroupMembers),
+ req.NewDelete(":id/deleteGroup/:group", k.DeleteGroup).RequiredPermission(deleteGroupPerm).Log(req.NewLogSaveI(imsg.LogKafkaDeleteGroup)),
+ }
+
+ return req.NewConfs("mq/kafka", reqs[:]...)
+}
+
+func (k *Kafka) Kafkas(rc *req.Ctx) {
+ queryCond := req.BindQuery[entity.KafkaQuery](rc)
+
+ // 不存在可访问标签id,即没有可操作数据
+ tags := k.tagTreeApp.GetAccountTags(rc.GetLoginAccount().Id, &tagentity.TagTreeQuery{
+ TypePaths: collx.AsArray(tagentity.NewTypePaths(tagentity.TagTypeMqKafka)),
+ CodePathLikes: []string{queryCond.TagPath},
+ })
+ if len(tags) == 0 {
+ rc.ResData = model.NewEmptyPageResult[any]()
+ return
+ }
+ queryCond.Codes = tags.GetCodes()
+
+ res, err := k.kafkaApp.GetPageList(queryCond)
+ biz.ErrIsNil(err)
+ resVo := model.PageResultConv[*entity.Kafka, *vo.Kafka](res)
+ kafkavos := resVo.List
+
+ // 填充标签信息
+ k.tagTreeApp.FillTagInfo(tagentity.TagType(consts.ResourceTypeMqKafka), collx.ArrayMap(kafkavos, func(mvo *vo.Kafka) tagentity.ITagResource {
+ return mvo
+ })...)
+
+ rc.ResData = resVo
+}
+
+func (k *Kafka) TestConn(rc *req.Ctx) {
+ _, kafka := req.BindJsonAndCopyTo[form.Kafka, entity.Kafka](rc)
+ biz.ErrIsNilAppendErr(k.kafkaApp.TestConn(kafka), "connection error: %s")
+}
+
+func (k *Kafka) Save(rc *req.Ctx) {
+ f, kafka := req.BindJsonAndCopyTo[form.Kafka, entity.Kafka](rc)
+
+ // 密码脱敏记录日志
+ f.Password = func(str *string) *string {
+ str1 := "***"
+ return &str1
+ }(f.Password)
+
+ rc.ReqParam = f
+
+ biz.ErrIsNil(k.kafkaApp.SaveKafka(rc.MetaCtx, kafka, f.TagCodePaths...))
+}
+
+func (k *Kafka) DeleteById(rc *req.Ctx) {
+ idsStr := rc.PathParam("id")
+ rc.ReqParam = idsStr
+ ids := strings.Split(idsStr, ",")
+
+ for _, v := range ids {
+ k.kafkaApp.Delete(rc.MetaCtx, cast.ToUint64(v))
+ }
+}
+
+func (k *Kafka) GetTopics(rc *req.Ctx) {
+ id := k.GetKafkaId(rc)
+
+ conn, err := k.kafkaApp.GetKafkaConn(rc, id)
+ if err != nil {
+ rc.Error = err
+ return
+ }
+
+ rc.ResData, rc.Error = conn.GetTopicDetails()
+
+}
+func (k *Kafka) CreateTopic(rc *req.Ctx) {
+ id := k.GetKafkaId(rc)
+
+ param := req.BindJson[mgm.CreateTopicParam](rc)
+ conn, err := k.kafkaApp.GetKafkaConn(rc, id)
+ if err != nil {
+ rc.Error = err
+ return
+ }
+ rc.Error = conn.CreateTopic(param)
+
+}
+func (k *Kafka) DeleteTopic(rc *req.Ctx) {
+ id := k.GetKafkaId(rc)
+ topic := rc.PathParam("topic")
+ conn, err := k.kafkaApp.GetKafkaConn(rc, id)
+ if err != nil {
+ rc.Error = err
+ return
+ }
+ rc.Error = conn.DeleteTopic(topic)
+}
+
+func (k *Kafka) GetTopicConfig(rc *req.Ctx) {
+ id := k.GetKafkaId(rc)
+ topic := rc.PathParam("topic")
+ conn, err := k.kafkaApp.GetKafkaConn(rc, id)
+ if err != nil {
+ rc.Error = err
+ return
+ }
+ rc.ResData, rc.Error = conn.GetTopicConfig(topic)
+}
+func (k *Kafka) CreatePartitions(rc *req.Ctx) {
+ id := k.GetKafkaId(rc)
+ param := req.BindJson[mgm.CreatePartitionsParam](rc)
+
+ conn, err := k.kafkaApp.GetKafkaConn(rc, id)
+ if err != nil {
+ rc.Error = err
+ return
+ }
+ rc.Error = conn.CreatePartitions(param)
+
+}
+func (k *Kafka) Produce(rc *req.Ctx) {
+ id := k.GetKafkaId(rc)
+ topic := rc.PathParam("topic")
+
+ param := req.BindJson[mgm.ProduceMessageParam](rc)
+ param.Topic = topic
+
+ conn, err := k.kafkaApp.GetKafkaConn(rc, id)
+ if err != nil {
+ rc.Error = err
+ return
+ }
+
+ rc.Error = conn.ProduceMessage(rc.MetaCtx, param)
+
+}
+func (k *Kafka) Consume(rc *req.Ctx) {
+ id := k.GetKafkaId(rc)
+ topic := rc.PathParam("topic")
+
+ conn, err := k.kafkaApp.GetKafkaConn(rc, id)
+ if err != nil {
+ rc.Error = err
+ return
+ }
+
+ param := req.BindJson[mgm.ConsumeMessageParam](rc)
+ param.Topic = topic
+
+ rc.ResData, rc.Error = conn.ConsumeMessage(rc.MetaCtx, param)
+}
+func (k *Kafka) GetBrokers(rc *req.Ctx) {
+ id := k.GetKafkaId(rc)
+ conn, err := k.kafkaApp.GetKafkaConn(rc, id)
+ if err != nil {
+ rc.Error = err
+ return
+ }
+ rc.ResData = conn.GetBrokers()
+}
+func (k *Kafka) GetBrokerConfig(rc *req.Ctx) {
+ id := k.GetKafkaId(rc)
+ brokerId := rc.PathParamInt("brokerId")
+ conn, err := k.kafkaApp.GetKafkaConn(rc, id)
+ if err != nil {
+ rc.Error = err
+ return
+ }
+ rc.ResData, rc.Error = conn.GetBrokerConfig(brokerId)
+}
+
+func (k *Kafka) GetGroups(rc *req.Ctx) {
+ id := k.GetKafkaId(rc)
+ conn, err := k.kafkaApp.GetKafkaConn(rc, id)
+ if err != nil {
+ rc.Error = err
+ return
+ }
+ rc.ResData, rc.Error = conn.GetConsumerGroups()
+}
+func (k *Kafka) GetGroupMembers(rc *req.Ctx) {
+ id := k.GetKafkaId(rc)
+ group := rc.PathParam("group")
+ conn, err := k.kafkaApp.GetKafkaConn(rc, id)
+ if err != nil {
+ rc.Error = err
+ return
+ }
+ rc.ResData, rc.Error = conn.GetGroupMembers(group)
+}
+func (k *Kafka) DeleteGroup(rc *req.Ctx) {
+ id := k.GetKafkaId(rc)
+ group := rc.PathParam("group")
+ conn, err := k.kafkaApp.GetKafkaConn(rc, id)
+ if err != nil {
+ rc.Error = err
+ return
+ }
+ rc.Error = conn.DeleteGroup(group)
+}
+
+// 获取请求路径上的kafka id
+func (k *Kafka) GetKafkaId(rc *req.Ctx) uint64 {
+ dbId := rc.PathParamInt("id")
+ biz.IsTrue(dbId > 0, "kafkaId error")
+ return uint64(dbId)
+}
diff --git a/server/internal/mq/kafka/api/vo/kafka.go b/server/internal/mq/kafka/api/vo/kafka.go
new file mode 100644
index 00000000..7ba0d68c
--- /dev/null
+++ b/server/internal/mq/kafka/api/vo/kafka.go
@@ -0,0 +1,24 @@
+package vo
+
+import (
+ tagentity "mayfly-go/internal/tag/domain/entity"
+ "mayfly-go/pkg/model"
+)
+
+type Kafka struct {
+ model.Model
+ tagentity.ResourceTags
+
+ Code string `json:"code"`
+ Name string `json:"name"`
+ Hosts string `json:"hosts"`
+ Username string `json:"username"`
+ Password string `json:"password"`
+
+ SshTunnelMachineId int `json:"sshTunnelMachineId"` // ssh隧道机器id
+ SaslMechanism string `json:"saslMechanism"` // sasl机制
+}
+
+func (m *Kafka) GetCode() string {
+ return m.Code
+}
diff --git a/server/internal/mq/kafka/application/application.go b/server/internal/mq/kafka/application/application.go
new file mode 100644
index 00000000..6786bc4f
--- /dev/null
+++ b/server/internal/mq/kafka/application/application.go
@@ -0,0 +1,9 @@
+package application
+
+import (
+ "mayfly-go/pkg/ioc"
+)
+
+func InitIoc() {
+ ioc.Register(new(kafkaAppImpl))
+}
diff --git a/server/internal/mq/kafka/application/kafka.go b/server/internal/mq/kafka/application/kafka.go
new file mode 100644
index 00000000..f20fa39e
--- /dev/null
+++ b/server/internal/mq/kafka/application/kafka.go
@@ -0,0 +1,141 @@
+package application
+
+import (
+ "context"
+ "mayfly-go/internal/mq/kafka/domain/entity"
+ "mayfly-go/internal/mq/kafka/domain/repository"
+ "mayfly-go/internal/mq/kafka/imsg"
+ "mayfly-go/internal/mq/kafka/mgm"
+ tagapp "mayfly-go/internal/tag/application"
+ tagdto "mayfly-go/internal/tag/application/dto"
+ tagentity "mayfly-go/internal/tag/domain/entity"
+ "mayfly-go/pkg/base"
+ "mayfly-go/pkg/errorx"
+ "mayfly-go/pkg/model"
+ "mayfly-go/pkg/utils/stringx"
+)
+
+type Kafka interface {
+ base.App[*entity.Kafka]
+
+ // 分页获取机器脚本信息列表
+ GetPageList(condition *entity.KafkaQuery, orderBy ...string) (*model.PageResult[*entity.Kafka], error)
+
+ TestConn(entity *entity.Kafka) error
+
+ SaveKafka(ctx context.Context, entity *entity.Kafka, tagCodePaths ...string) error
+
+ // 删除数据库信息
+ Delete(ctx context.Context, id uint64) error
+
+ // 获取Kafka连接实例
+ // - id Kafka id
+ GetKafkaConn(ctx context.Context, id uint64) (*mgm.KafkaConn, error)
+}
+
+type kafkaAppImpl struct {
+ base.AppImpl[*entity.Kafka, repository.Kafka]
+
+ tagTreeApp tagapp.TagTree `inject:"T"`
+}
+
+var _ Kafka = (*kafkaAppImpl)(nil)
+
+// 分页获取数据库信息列表
+func (d *kafkaAppImpl) GetPageList(condition *entity.KafkaQuery, orderBy ...string) (*model.PageResult[*entity.Kafka], error) {
+ return d.GetRepo().GetList(condition, orderBy...)
+}
+
+func (d *kafkaAppImpl) Delete(ctx context.Context, id uint64) error {
+ kafkaEntity, err := d.GetById(id)
+ if err != nil {
+ return errorx.NewBiz("kafka not found")
+ }
+
+ mgm.CloseConn(id)
+ return d.Tx(ctx,
+ func(ctx context.Context) error {
+ return d.DeleteById(ctx, id)
+ },
+ func(ctx context.Context) error {
+ return d.tagTreeApp.SaveResourceTag(ctx, &tagdto.SaveResourceTag{ResourceTag: &tagdto.ResourceTag{
+ Type: tagentity.TagTypeMqKafka,
+ Code: kafkaEntity.Code,
+ }})
+ })
+}
+
+func (d *kafkaAppImpl) TestConn(me *entity.Kafka) error {
+ conn, err := me.ToKafkaInfo().Conn()
+ if err != nil {
+ return err
+ }
+ conn.Close()
+ return nil
+}
+
+func (d *kafkaAppImpl) SaveKafka(ctx context.Context, m *entity.Kafka, tagCodePaths ...string) error {
+ oldKafka := &entity.Kafka{Hosts: m.Hosts, SshTunnelMachineId: m.SshTunnelMachineId}
+ err := d.GetByCond(oldKafka)
+
+ if m.Id == 0 {
+ if err == nil {
+ return errorx.NewBizI(ctx, imsg.ErrKafkaInfoExist)
+ }
+ // 生成随机编号
+ m.Code = stringx.Rand(10)
+
+ return d.Tx(ctx, func(ctx context.Context) error {
+ return d.Insert(ctx, m)
+ }, func(ctx context.Context) error {
+ return d.tagTreeApp.SaveResourceTag(ctx, &tagdto.SaveResourceTag{
+ ResourceTag: &tagdto.ResourceTag{
+ Type: tagentity.TagTypeMqKafka,
+ Code: m.Code,
+ Name: m.Name,
+ },
+ ParentTagCodePaths: tagCodePaths,
+ })
+ })
+ }
+
+ // 如果存在该库,则校验修改的库是否为该库
+ if err == nil && oldKafka.Id != m.Id {
+ return errorx.NewBizI(ctx, imsg.ErrKafkaInfoExist)
+ }
+ // 如果调整了ssh等会查不到旧数据,故需要根据id获取旧信息将code赋值给标签进行关联
+ if oldKafka.Code == "" {
+ oldKafka, _ = d.GetById(m.Id)
+ }
+
+ // 先关闭连接
+ mgm.CloseConn(m.Id)
+ m.Code = ""
+ return d.Tx(ctx, func(ctx context.Context) error {
+ return d.UpdateById(ctx, m)
+ }, func(ctx context.Context) error {
+ if oldKafka.Name != m.Name {
+ if err := d.tagTreeApp.UpdateTagName(ctx, tagentity.TagTypeMqKafka, oldKafka.Code, m.Name); err != nil {
+ return err
+ }
+ }
+
+ return d.tagTreeApp.SaveResourceTag(ctx, &tagdto.SaveResourceTag{
+ ResourceTag: &tagdto.ResourceTag{
+ Type: tagentity.TagTypeMqKafka,
+ Code: oldKafka.Code,
+ },
+ ParentTagCodePaths: tagCodePaths,
+ })
+ })
+}
+
+func (d *kafkaAppImpl) GetKafkaConn(ctx context.Context, id uint64) (*mgm.KafkaConn, error) {
+ return mgm.GetKafkaConn(ctx, id, func() (*mgm.KafkaInfo, error) {
+ me, err := d.GetById(id)
+ if err != nil {
+ return nil, errorx.NewBiz("kafka not found")
+ }
+ return me.ToKafkaInfo(), nil
+ })
+}
diff --git a/server/internal/mq/kafka/domain/entity/kafka.go b/server/internal/mq/kafka/domain/entity/kafka.go
new file mode 100644
index 00000000..d5baa70b
--- /dev/null
+++ b/server/internal/mq/kafka/domain/entity/kafka.go
@@ -0,0 +1,26 @@
+package entity
+
+import (
+ "mayfly-go/internal/mq/kafka/mgm"
+ "mayfly-go/pkg/model"
+ "mayfly-go/pkg/utils/structx"
+)
+
+type Kafka struct {
+ model.Model
+
+ Code string `json:"code" gorm:"size:32;comment:code"`
+ Name string `json:"name" gorm:"not null;size:50;comment:名称"`
+ Hosts string `json:"hosts" gorm:"not null;size:500;comment:Kafka 连接地址,格式: host1:port1,host2:port2 或单个 broker"`
+ Username *string `json:"username" gorm:"size:100;comment:用户名"`
+ Password *string `json:"password" gorm:"size:100;comment:密码"`
+ SshTunnelMachineId int `json:"sshTunnelMachineId" gorm:"comment:ssh隧道的机器id"`
+ SaslMechanism *string `json:"saslMechanism" gorm:"comment:sasl机制"`
+}
+
+// 转换为kafkaInfo进行连接
+func (k *Kafka) ToKafkaInfo() *mgm.KafkaInfo {
+ mongoInfo := new(mgm.KafkaInfo)
+ _ = structx.Copy(mongoInfo, k)
+ return mongoInfo
+}
diff --git a/server/internal/mq/kafka/domain/entity/query.go b/server/internal/mq/kafka/domain/entity/query.go
new file mode 100644
index 00000000..9b1993ed
--- /dev/null
+++ b/server/internal/mq/kafka/domain/entity/query.go
@@ -0,0 +1,17 @@
+package entity
+
+import "mayfly-go/pkg/model"
+
+type KafkaQuery struct {
+ model.Model
+ model.PageParam
+
+ Code string `json:"code" form:"code"`
+ Name string
+ Keyword string `json:"keyword" form:"keyword"`
+ Uri string
+ SshTunnelMachineId uint64 // ssh隧道机器id
+ TagPath string `json:"tagPath" form:"tagPath"`
+
+ Codes []string
+}
diff --git a/server/internal/mq/kafka/domain/repository/kafka.go b/server/internal/mq/kafka/domain/repository/kafka.go
new file mode 100644
index 00000000..8c3a2bc6
--- /dev/null
+++ b/server/internal/mq/kafka/domain/repository/kafka.go
@@ -0,0 +1,14 @@
+package repository
+
+import (
+ "mayfly-go/internal/mq/kafka/domain/entity"
+ "mayfly-go/pkg/base"
+ "mayfly-go/pkg/model"
+)
+
+type Kafka interface {
+ base.Repo[*entity.Kafka]
+
+ // 分页获取列表
+ GetList(condition *entity.KafkaQuery, orderBy ...string) (*model.PageResult[*entity.Kafka], error)
+}
diff --git a/server/internal/mq/kafka/imsg/en.go b/server/internal/mq/kafka/imsg/en.go
new file mode 100644
index 00000000..cbc8c805
--- /dev/null
+++ b/server/internal/mq/kafka/imsg/en.go
@@ -0,0 +1,14 @@
+package imsg
+
+import "mayfly-go/pkg/i18n"
+
+var En = map[i18n.MsgId]string{
+ LogKafkaSave: "Kafka - Save",
+ LogKafkaDelete: "Kafka-Delete",
+ LogKafkaRunCmd: "Kafka - Run Cmd",
+ LogUpdateDocs: "Kafka - Update Documents",
+ LogDelDocs: "Kafka - Delete Documents",
+ LogInsertDocs: "Kafka - Insert Documents",
+
+ ErrKafkaInfoExist: "that information already exists",
+}
diff --git a/server/internal/mq/kafka/imsg/imsg.go b/server/internal/mq/kafka/imsg/imsg.go
new file mode 100644
index 00000000..8c53c9ec
--- /dev/null
+++ b/server/internal/mq/kafka/imsg/imsg.go
@@ -0,0 +1,25 @@
+package imsg
+
+import (
+ "mayfly-go/internal/pkg/consts"
+ "mayfly-go/pkg/i18n"
+)
+
+func init() {
+ i18n.AppendLangMsg(i18n.Zh_CN, Zh_CN)
+ i18n.AppendLangMsg(i18n.En, En)
+}
+
+const (
+ LogKafkaSave = iota + consts.ImsgNumMqKafka
+ LogKafkaDelete
+ LogKafkaRunCmd
+ LogUpdateDocs
+ LogDelDocs
+ LogInsertDocs
+ LogKafkaCreateTopic
+ LogKafkaDeleteTopic
+ LogKafkaDeleteGroup
+
+ ErrKafkaInfoExist
+)
diff --git a/server/internal/mq/kafka/imsg/zh_cn.go b/server/internal/mq/kafka/imsg/zh_cn.go
new file mode 100644
index 00000000..e70f07de
--- /dev/null
+++ b/server/internal/mq/kafka/imsg/zh_cn.go
@@ -0,0 +1,14 @@
+package imsg
+
+import "mayfly-go/pkg/i18n"
+
+var Zh_CN = map[i18n.MsgId]string{
+ LogKafkaSave: "Kafka-保存",
+ LogKafkaDelete: "Kafka-删除",
+ LogKafkaRunCmd: "Kafka-执行命令",
+ LogUpdateDocs: "Kafka-更新文档",
+ LogDelDocs: "Kafka-删除文档",
+ LogInsertDocs: "Kafka-插入文档",
+
+ ErrKafkaInfoExist: "该信息已存在",
+}
diff --git a/server/internal/mq/kafka/infra/persistence/kafka.go b/server/internal/mq/kafka/infra/persistence/kafka.go
new file mode 100644
index 00000000..b45d1015
--- /dev/null
+++ b/server/internal/mq/kafka/infra/persistence/kafka.go
@@ -0,0 +1,31 @@
+package persistence
+
+import (
+ "mayfly-go/internal/mq/kafka/domain/entity"
+ "mayfly-go/internal/mq/kafka/domain/repository"
+ "mayfly-go/pkg/base"
+ "mayfly-go/pkg/model"
+)
+
+type kafkaRepoImpl struct {
+ base.RepoImpl[*entity.Kafka]
+}
+
+func newKafkaRepo() repository.Kafka {
+ return &kafkaRepoImpl{}
+}
+
+// 分页获取数据库信息列表
+func (d *kafkaRepoImpl) GetList(condition *entity.KafkaQuery, orderBy ...string) (*model.PageResult[*entity.Kafka], error) {
+ qd := model.NewCond().
+ Like("name", condition.Name).
+ Eq("code", condition.Code).
+ In("code", condition.Codes)
+
+ keyword := condition.Keyword
+ if keyword != "" {
+ keyword = "%" + keyword + "%"
+ qd.And("name like ? or code like ? or hosts like ?", keyword, keyword, keyword)
+ }
+ return d.PageByCond(qd, condition.PageParam)
+}
diff --git a/server/internal/mq/kafka/infra/persistence/persistence.go b/server/internal/mq/kafka/infra/persistence/persistence.go
new file mode 100644
index 00000000..b0f16ab6
--- /dev/null
+++ b/server/internal/mq/kafka/infra/persistence/persistence.go
@@ -0,0 +1,9 @@
+package persistence
+
+import (
+ "mayfly-go/pkg/ioc"
+)
+
+func InitIoc() {
+ ioc.Register(newKafkaRepo())
+}
diff --git a/server/internal/mq/kafka/init/init.go b/server/internal/mq/kafka/init/init.go
new file mode 100644
index 00000000..baba1f59
--- /dev/null
+++ b/server/internal/mq/kafka/init/init.go
@@ -0,0 +1,16 @@
+package init
+
+import (
+ "mayfly-go/internal/mq/kafka/api"
+ "mayfly-go/internal/mq/kafka/application"
+ "mayfly-go/internal/mq/kafka/infra/persistence"
+ "mayfly-go/pkg/starter"
+)
+
+func init() {
+ starter.AddInitIocFunc(func() {
+ persistence.InitIoc()
+ application.InitIoc()
+ api.InitIoc()
+ })
+}
diff --git a/server/internal/mq/kafka/mgm/conn.go b/server/internal/mq/kafka/mgm/conn.go
new file mode 100644
index 00000000..2cb8d55e
--- /dev/null
+++ b/server/internal/mq/kafka/mgm/conn.go
@@ -0,0 +1,478 @@
+package mgm
+
+import (
+ "cmp"
+ "context"
+ "encoding/base64"
+ "errors"
+ "fmt"
+ "mayfly-go/pkg/errorx"
+ "mayfly-go/pkg/logx"
+ "sort"
+ "strconv"
+ "strings"
+ "time"
+
+ "github.com/google/uuid"
+ "github.com/twmb/franz-go/pkg/kadm"
+ "github.com/twmb/franz-go/pkg/kgo"
+)
+
+type KafkaConn struct {
+ Id string
+ Info *KafkaInfo
+
+ Configs []kgo.Opt
+ Client *kgo.Client
+ Ac *kadm.Client
+}
+
+/******************* pool.Conn impl *******************/
+
+func (kc *KafkaConn) Close() error {
+ if kc.Client != nil {
+ kc.Client.Close()
+ kc.Client = nil
+ }
+
+ if kc.Ac != nil {
+ kc.Ac.Close()
+ kc.Ac = nil
+ }
+ return nil
+}
+
+func (kc *KafkaConn) Ping() error {
+ if kc == nil {
+ return errorx.NewBiz("kafka connection is nil")
+ }
+
+ if kc.Client == nil {
+ return errorx.NewBiz("kafka client is nil")
+ }
+ if kc.Ac == nil {
+ return errorx.NewBiz("kafka admin client is nil")
+ }
+
+ brokers, err := kc.Ac.ListBrokers(context.Background())
+ if err != nil {
+ return err
+ }
+ if len(brokers) == 0 {
+ return errorx.NewBiz("no available brokers")
+ }
+
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+
+ if err := kc.Client.Ping(ctx); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (kc *KafkaConn) GetTopics() ([]string, error) {
+ if kc.Ac == nil {
+ return nil, errorx.NewBiz("kafka admin client is closed")
+ }
+
+ list, err := kc.Ac.ListTopics(context.Background())
+ if err != nil {
+ return nil, err
+ }
+
+ topics := make([]string, 0, len(list))
+ for topic := range list {
+ topics = append(topics, topic)
+ }
+ return topics, nil
+}
+
+// GetTopicConfig 获取 Topic 配置信息
+func (kc *KafkaConn) GetTopicConfig(topic string) (*kadm.ResourceConfigs, error) {
+ configs, err := kc.Ac.DescribeTopicConfigs(context.Background(), topic)
+ if err != nil {
+ return nil, err
+ }
+
+ return &configs, nil
+}
+
+// GetTopicDetails 获取 Topic 详细信息
+func (kc *KafkaConn) GetTopicDetails() ([]any, error) {
+ topics, err := kc.Ac.ListTopics(context.Background())
+ if err != nil {
+ return nil, err
+ }
+ return buildTopicsResp(topics), nil
+}
+
+func buildTopicsResp(topics kadm.TopicDetails) []any {
+ // FIX: 对 map 进行排序以保证输出顺序稳定
+ topicNames := make([]string, 0, len(topics))
+ for name := range topics {
+ topicNames = append(topicNames, name)
+ }
+ sort.Strings(topicNames)
+
+ result := make([]any, 0, len(topicNames))
+ for _, topicName := range topicNames {
+ topicDetail := topics[topicName]
+ partitionErrs := ""
+ var partitions []any
+ for _, partition := range topicDetail.Partitions {
+ errMsg := ""
+ if partition.Err != nil {
+ errMsg = partition.Err.Error()
+ partitionErrs += fmt.Sprintf("partition %d: %s\n", partition.Partition, errMsg)
+ }
+ partitions = append(partitions, map[string]any{
+ "partition": partition.Partition,
+ "leader": partition.Leader,
+ "replicas": partition.Replicas,
+ "isr": partition.ISR,
+ "err": errMsg,
+ "LeaderEpoch": partition.LeaderEpoch,
+ "OfflineReplicas": partition.OfflineReplicas,
+ })
+ }
+ if topicDetail.Err != nil {
+ partitionErrs = topicDetail.Err.Error() + "\n" + partitionErrs
+ }
+ replicationFactor := 0
+ if len(topicDetail.Partitions) > 0 {
+ replicationFactor = len(topicDetail.Partitions[0].Replicas)
+ }
+ result = append(result, map[string]any{
+ "ID": topicDetail.ID,
+ "topic": topicName,
+ "partition_count": len(topicDetail.Partitions),
+ "replication_factor": replicationFactor,
+ "IsInternal": topicDetail.IsInternal,
+ "Err": partitionErrs,
+ "partitions": partitions,
+ })
+ }
+ return result
+}
+
+// GetConsumerGroups 获取消费者组列表
+func (kc *KafkaConn) GetConsumerGroups() ([]kadm.ListedGroup, error) {
+ groups, err := kc.Ac.ListGroups(context.Background())
+ if err != nil {
+ return nil, err
+ }
+
+ sortedGroups := groups.Sorted()
+ return sortedGroups, nil
+}
+
+// CreateTopic 创建 Topic
+func (kc *KafkaConn) CreateTopic(p *CreateTopicParam) error {
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+
+ resp, err := kc.Ac.CreateTopics(ctx, p.NumPartitions, p.ReplicationFactor, p.ConfigEntries, p.TopicName)
+ if err != nil {
+ return err
+ }
+ err = resp.Error()
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// DeleteTopic 删除 Topic
+func (kc *KafkaConn) DeleteTopic(topic string) error {
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+
+ resp, err := kc.Ac.DeleteTopics(ctx, topic)
+ if err != nil {
+ return err
+ }
+
+ if resp.Error() != nil {
+ return resp.Error()
+ }
+
+ return nil
+}
+
+// ConsumeMessage 消费 Topic 消息
+func (kc *KafkaConn) ConsumeMessage(ctx context.Context, param *ConsumeMessageParam) ([]*ConsumeMessageResult, error) {
+ param.Number = cmp.Or(param.Number, 10)
+ param.PullTimeout = cmp.Or(param.PullTimeout, 10)
+ param.Group = cmp.Or(param.Group, "__mayfly-server__"+uuid.New().String())
+ st := time.Now()
+
+ // 构建消费配置
+ consumeOpts := []kgo.Opt{
+ kgo.ConsumeTopics(param.Topic),
+ kgo.DisableAutoCommit(),
+ }
+
+ // 配置隔离级别
+ if strings.ToLower(param.IsolationLevel) == "read_committed" {
+ consumeOpts = append(consumeOpts, kgo.FetchIsolationLevel(kgo.ReadCommitted()))
+ } else {
+ consumeOpts = append(consumeOpts, kgo.FetchIsolationLevel(kgo.ReadUncommitted()))
+ }
+
+ if param.StartTime != "" {
+ parse, err := time.Parse(time.DateTime, param.StartTime)
+ if err != nil {
+ return nil, err
+ }
+ consumeOpts = append(consumeOpts, kgo.ConsumeResetOffset(kgo.NewOffset().AfterMilli(parse.UnixMilli())))
+ } else if param.Earliest {
+ consumeOpts = append(consumeOpts, kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()))
+ } else {
+ consumeOpts = append(consumeOpts, kgo.ConsumeResetOffset(kgo.NewOffset().AtEnd()))
+ }
+
+ // 获取或创建带缓存的消费者组客户端
+ cl, err := GetOrCreateConsumerGroupClient(ctx, kc.Info.Id, param.Group, kc.Info, consumeOpts)
+ if err != nil {
+ return nil, err
+ }
+
+ // 开始 poll msg
+ ctx, cancel := context.WithTimeout(context.Background(), time.Duration(param.PullTimeout)*time.Second)
+ defer cancel()
+ fetches := cl.PollRecords(ctx, param.Number)
+ if fetches.IsClientClosed() {
+ return nil, errorx.NewBiz("Client Closed, Please Retry")
+ }
+ if errors.Is(ctx.Err(), context.DeadlineExceeded) {
+ if len(fetches.Records()) == 0 {
+ return nil, errorx.NewBiz("Consume Timeout, Maybe No Message")
+ }
+ }
+ if errs := fetches.Errors(); len(errs) > 0 {
+ return nil, errorx.NewBiz(fmt.Sprint(errs))
+ }
+
+ logx.Infof("poll 完成... %v", len(fetches.Records()))
+
+ res := make([]*ConsumeMessageResult, 0)
+ for i, v := range fetches.Records() {
+ if v == nil {
+ continue
+ }
+ var data []byte
+ var err error
+ switch param.Decompression {
+ case "gzip":
+ data, err = GzipDecompress(v.Value)
+ case "lz4":
+ data, err = Lz4Decompress(v.Value)
+ case "zstd":
+ data, err = ZstdDecompress(v.Value)
+ case "snappy":
+ data, err = SnappyDecompress(v.Value)
+ default:
+ data = v.Value
+ }
+ if err != nil {
+ return nil, errorx.NewBiz(fmt.Sprintf("failed to decompress data: %s", err.Error()))
+ }
+
+ // 根据 decode 参数进行解码
+ var decodedData []byte
+ switch strings.ToLower(param.Decode) {
+ case "base64":
+ decodedData, err = base64.StdEncoding.DecodeString(string(data))
+ if err != nil {
+ return nil, errorx.NewBiz(fmt.Sprintf("Failed to decode base64 data: %s", err.Error()))
+ }
+ default:
+ decodedData = data
+ }
+
+ res = append(res, &ConsumeMessageResult{
+ Id: i,
+ Offset: v.Offset,
+ Partition: v.Partition,
+ Key: string(v.Key),
+ Value: string(decodedData),
+ Timestamp: v.Timestamp.Format(time.DateTime),
+ Topic: v.Topic,
+ Headers: getHeadersString(v.Headers),
+ LeaderEpoch: v.LeaderEpoch,
+ ProducerEpoch: v.ProducerEpoch,
+ ProducerID: v.ProducerID,
+ })
+ }
+
+ logx.Infof("耗时:%.4f秒 , topic: %s, group: %s, num: %v", time.Since(st).Seconds(), param.Topic, param.Group, param.Number)
+ if param.Group != "" && param.CommitOffset {
+ logx.Infof("开始提交 offset...")
+ if err := cl.CommitUncommittedOffsets(context.Background()); err != nil {
+ return nil, err
+ }
+ logx.Infof("提交 offset 完成...")
+ }
+
+ return res, nil
+
+}
+
+func getHeadersString(headers []kgo.RecordHeader) map[string]string {
+ headersMap := make(map[string]string)
+
+ if len(headers) == 0 {
+ return headersMap
+ }
+
+ for _, h := range headers {
+ headersMap[h.Key] = string(h.Value)
+ }
+
+ return headersMap
+}
+
+// ProduceMessage 生产消息到 Topic
+func (kc *KafkaConn) ProduceMessage(ctx context.Context, param *ProduceMessageParam) error {
+ logx.Infof("开始生产消息...")
+ st := time.Now()
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+
+ headers2 := make([]kgo.RecordHeader, len(param.Headers))
+ for i := 0; i < len(param.Headers); i++ {
+ headers2[i] = kgo.RecordHeader{
+ Key: param.Headers[i]["key"],
+ Value: []byte(param.Headers[i]["value"]),
+ }
+ }
+ var data []byte
+ var err error
+ switch param.Compression {
+ case "gzip":
+ data, err = Gzip([]byte(param.Value))
+ case "lz4":
+ data, err = Lz4([]byte(param.Value))
+ case "zstd":
+ data, err = Zstd([]byte(param.Value))
+ case "snappy":
+ data, err = Snappy([]byte(param.Value))
+ default:
+ data = []byte(param.Value)
+ }
+ if err != nil {
+ return errorx.NewBiz("Failed to compress data: " + err.Error())
+ }
+ var records []*kgo.Record
+ for i := 0; i < param.Times; i++ {
+ records = append(records, &kgo.Record{
+ Topic: param.Topic,
+ Value: data,
+ Key: []byte(param.Key),
+ Headers: headers2,
+ Partition: param.Partition,
+ })
+ }
+ res := kc.Client.ProduceSync(ctx, records...)
+ if err := res.FirstErr(); err != nil {
+ return errorx.NewBiz("Produce Error:" + err.Error())
+ }
+ logx.Infof("耗时:%.4f秒 topic: %s, key:%s, num:%v", time.Since(st).Seconds(), param.Topic, param.Key, param.Times)
+ return nil
+}
+
+func (kc *KafkaConn) CreatePartitions(p *CreatePartitionsParam) error {
+ res, err := kc.Ac.CreatePartitions(context.Background(), p.NumPartitions, p.TopicName)
+ if err != nil {
+ return err
+ }
+ err = res.Error()
+ if err != nil {
+ return errorx.NewBiz("CreatePartitions Error:" + err.Error())
+ }
+ return nil
+}
+
+func (kc *KafkaConn) GetBrokers() []BrokerInfo {
+
+ brokers, err := kc.Ac.ListBrokers(context.Background())
+ if err != nil {
+ return nil
+ }
+
+ var bs []BrokerInfo
+ for _, b := range brokers {
+ bs = append(bs, BrokerInfo{
+ Id: b.NodeID,
+ Addr: b.Host + ":" + strconv.Itoa(int(b.Port)),
+ Rack: b.Rack,
+ })
+ }
+ return bs
+}
+
+func (kc *KafkaConn) GetBrokerConfig(id int) (*kadm.ResourceConfigs, error) {
+
+ configs, err := kc.Ac.DescribeBrokerConfigs(context.Background(), int32(id))
+ if err != nil {
+ return nil, err
+ }
+
+ return &configs, nil
+}
+
+func (kc *KafkaConn) DeleteGroup(group string) error {
+
+ resp, err := kc.Ac.DeleteGroup(context.Background(), group)
+ if err != nil {
+ return errorx.NewBiz("DeleteGroup Error:" + err.Error())
+ }
+ if resp.Err != nil {
+ return errorx.NewBiz("DeleteGroup Error:" + resp.Err.Error())
+ }
+ return err
+}
+
+func (kc *KafkaConn) GetGroupMembers(group string) (any, any) {
+
+ ctx := context.Background()
+ resp, err := kc.Ac.DescribeGroups(ctx, group)
+ if err != nil {
+ return nil, errorx.NewBiz("DescribeGroups Error:" + err.Error())
+ }
+ err = resp.Error()
+ if err != nil {
+ return nil, errorx.NewBiz("DescribeGroups Error:" + err.Error())
+ }
+ sortedGroups := resp.Sorted()
+
+ membersLst := make([]any, 0)
+ for _, describedGroup := range sortedGroups {
+ if describedGroup.Err != nil {
+ return nil, errorx.NewBiz(fmt.Sprintf("Error describing group %s: %v", describedGroup.Group, describedGroup.Err))
+ }
+ for _, member := range describedGroup.Members {
+ subscribedTPs := make(map[string][]int32)
+ if consumerMetadata, ok := member.Assigned.AsConsumer(); ok {
+ tps := consumerMetadata.Topics
+ for _, tp := range tps {
+ subscribedTPs[tp.Topic] = tp.Partitions
+ }
+ }
+ membersLst = append(membersLst, map[string]any{
+ "MemberID": member.MemberID,
+ "InstanceID": member.InstanceID,
+ "ClientID": member.ClientID,
+ "ClientHost": member.ClientHost,
+ "TPs": subscribedTPs, // TPs:map[topicName:[0]]]]
+ })
+ }
+
+ return membersLst, nil
+ }
+
+ return membersLst, nil
+}
diff --git a/server/internal/mq/kafka/mgm/conn_cache.go b/server/internal/mq/kafka/mgm/conn_cache.go
new file mode 100644
index 00000000..dd820bd6
--- /dev/null
+++ b/server/internal/mq/kafka/mgm/conn_cache.go
@@ -0,0 +1,110 @@
+package mgm
+
+import (
+ "context"
+ "fmt"
+ "mayfly-go/pkg/logx"
+ "mayfly-go/pkg/pool"
+ "sync"
+ "time"
+
+ "github.com/twmb/franz-go/pkg/kgo"
+)
+
+var (
+ poolGroup = pool.NewPoolGroup[*KafkaConn]()
+
+ // consumerGroup 客户端缓存,key: kafkaId:group, value: *kgo.Client
+ consumerGroupClients = make(map[string]*kgo.Client)
+ cgClientMu sync.RWMutex
+ cgClientCleanupDelay = 5 * time.Minute // 空闲 5 分钟后清除
+)
+
+// 从缓存中获取 kafka 连接信息,若缓存中不存在则会使用回调函数获取 kafkaInfo 进行连接并缓存
+func GetKafkaConn(ctx context.Context, kafkaId uint64, getKafkaInfo func() (*KafkaInfo, error)) (*KafkaConn, error) {
+ cachePool, err := poolGroup.GetCachePool(getConnId(kafkaId), func() (*KafkaConn, error) {
+ // 若缓存中不存在,则从回调函数中获取 KafkaInfo
+ mi, err := getKafkaInfo()
+ if err != nil {
+ return nil, err
+ }
+
+ // 连接 kafka
+ return mi.Conn()
+ })
+
+ if err != nil {
+ return nil, err
+ }
+ // 从连接池中获取一个可用的连接
+ return cachePool.Get(ctx)
+}
+
+// 获取或创建带 group 的消费者客户端(带缓存)
+func GetOrCreateConsumerGroupClient(ctx context.Context, kafkaId uint64, group string, info *KafkaInfo, consumeOpts []kgo.Opt) (*kgo.Client, error) {
+ cacheKey := getConsumerGroupCacheKey(kafkaId, group)
+
+ cgClientMu.RLock()
+ if cl, exists := consumerGroupClients[cacheKey]; exists {
+ cgClientMu.RUnlock()
+ return cl, nil
+ }
+ cgClientMu.RUnlock()
+
+ cgClientMu.Lock()
+ defer cgClientMu.Unlock()
+
+ // double check
+ if cl, exists := consumerGroupClients[cacheKey]; exists {
+ return cl, nil
+ }
+
+ // 创建新客户端
+ // 构建基础配置
+ opts := info.BuildBaseOpts()
+ opts = append(opts, kgo.ConsumerGroup(group))
+ opts = append(opts, consumeOpts...)
+
+ cl, err := kgo.NewClient(opts...)
+ if err != nil {
+ return nil, err
+ }
+
+ // 缓存客户端
+ consumerGroupClients[cacheKey] = cl
+
+ // 设置定时清理
+ go scheduleConsumerGroupCleanup(cacheKey, cl)
+
+ logx.Debugf("创建消费者组客户端:%s", cacheKey)
+ return cl, nil
+}
+
+// 安排消费者组客户端清理
+func scheduleConsumerGroupCleanup(cacheKey string, cl *kgo.Client) {
+ time.Sleep(cgClientCleanupDelay)
+
+ cgClientMu.Lock()
+ defer cgClientMu.Unlock()
+
+ // 检查是否仍为同一个客户端
+ if cachedCl, exists := consumerGroupClients[cacheKey]; exists && cachedCl == cl {
+ cl.Close()
+ delete(consumerGroupClients, cacheKey)
+ logx.Debugf("清理空闲消费者组客户端:%s", cacheKey)
+ }
+}
+
+// 生成消费者组缓存 key
+func getConsumerGroupCacheKey(kafkaId uint64, group string) string {
+ return fmt.Sprintf("%s:group:%s", getConnId(kafkaId), group)
+}
+
+// 关闭连接,并移除缓存连接
+func CloseConn(kafkaId uint64) {
+ err := poolGroup.Close(getConnId(kafkaId))
+ if err != nil {
+ logx.Errorf("关闭kafka连接失败:%v", err)
+ return
+ }
+}
diff --git a/server/internal/mq/kafka/mgm/conpress.go b/server/internal/mq/kafka/mgm/conpress.go
new file mode 100644
index 00000000..46dfafd7
--- /dev/null
+++ b/server/internal/mq/kafka/mgm/conpress.go
@@ -0,0 +1,212 @@
+package mgm
+
+import (
+ "archive/tar"
+ "bytes"
+ "compress/gzip"
+ "fmt"
+ "io"
+ "os"
+ "path/filepath"
+
+ "github.com/klauspost/compress/snappy"
+ "github.com/klauspost/compress/zstd"
+ "github.com/pierrec/lz4/v4"
+)
+
+// Tar 函数:创建 tar 归档
+func Tar(sourceDir string, destFile string) error {
+ // 创建目标文件
+ file, err := os.Create(destFile)
+ if err != nil {
+ return fmt.Errorf("创建目标 tar 文件失败: %w", err)
+ }
+ defer file.Close()
+ tarWriter := tar.NewWriter(file)
+ defer tarWriter.Close()
+ // 遍历源目录,将文件添加到 tar 归档
+ err = filepath.Walk(sourceDir, func(path string, info os.FileInfo, err error) error {
+ if err != nil {
+ return err
+ }
+ header, err := tar.FileInfoHeader(info, "") // 基于文件信息创建 tar header
+ if err != nil {
+ return fmt.Errorf("tar:创建 tar header 失败: %w", err)
+ }
+ // tar 归档中,文件路径需要相对于归档根目录,这里使用相对路径
+ header.Name, err = filepath.Rel(sourceDir, path)
+ if err != nil {
+ return fmt.Errorf("tar:获取相对路径失败: %w", err)
+ }
+ if header.Name == "." { // 根目录相对路径是 ".", 需要修正为空字符串
+ header.Name = ""
+ }
+ if err := tarWriter.WriteHeader(header); err != nil { // 写入 header
+ return fmt.Errorf("写入 tar header 失败: %w", err)
+ }
+ if info.IsDir() { // 如果是目录,header 已经写入,无需写入内容
+ return nil
+ }
+ // 打开文件并写入文件内容 添加缓冲写入以提高性能
+ srcFile, err := os.Open(path)
+ if err != nil {
+ return fmt.Errorf("tar:打开源文件失败: %w", err)
+ }
+ defer srcFile.Close()
+ buf := make([]byte, 32*1024) // 32KB buffer
+ _, err = io.CopyBuffer(tarWriter, srcFile, buf)
+ if err != nil {
+ return fmt.Errorf("tar:复制文件内容到 tar 失败: %w", err)
+ }
+ return nil
+ })
+ if err != nil {
+ return fmt.Errorf("tar:遍历源目录失败: %w", err)
+ }
+ return nil
+}
+
+// TarDecompress 函数:解压 tar 归档到指定目录
+func TarDecompress(tarFile string, destDir string) error {
+ file, err := os.Open(tarFile) // 打开 tar 文件
+ if err != nil {
+ return fmt.Errorf("打开 tar 文件失败: %w", err)
+ }
+ defer file.Close()
+ tarReader := tar.NewReader(file)
+ for {
+ header, err := tarReader.Next() // 读取下一个 header
+ if err == io.EOF { // 文件结束
+ break
+ }
+ if err != nil {
+ return fmt.Errorf("读取 tar header 失败: %w", err)
+ }
+ targetPath := filepath.Join(destDir, header.Name) // 构建目标路径
+ if header.Typeflag == tar.TypeDir { // 如果是目录,创建目录
+ if err := os.MkdirAll(targetPath, os.FileMode(header.Mode)); err != nil {
+ return fmt.Errorf("tarDecompress: 创建目录失败: %w", err)
+ }
+ continue
+ }
+ // 创建文件
+ outFile, err := os.Create(targetPath)
+ if err != nil {
+ return fmt.Errorf("tarDecompress: 创建目标文件失败: %w", err)
+ }
+
+ // 复制文件内容
+ if _, err := io.Copy(outFile, tarReader); err != nil {
+ _ = outFile.Close()
+ return fmt.Errorf("tarDecompress: 复制文件内容失败: %w", err)
+ }
+ _ = outFile.Close()
+ }
+ return nil
+}
+
+// Gzip 函数:压缩数据
+func Gzip(data []byte) ([]byte, error) {
+ var buf bytes.Buffer
+ gzipWriter := gzip.NewWriter(&buf) // 创建 gzip writer,写入到 buffer
+ _, err := gzipWriter.Write(data) // 将原始数据写入 gzip writer 进行压缩
+ if err != nil {
+ return nil, fmt.Errorf("gzip write error: %w", err)
+ }
+ err = gzipWriter.Close() // **重要**: 关闭 writer,完成 gzip 流
+ if err != nil {
+ return nil, fmt.Errorf("gzip close error: %w", err)
+ }
+ return buf.Bytes(), nil // 返回 buffer 中的压缩数据
+}
+
+// GzipDecompress 函数:解压缩数据
+func GzipDecompress(compressedData []byte) ([]byte, error) {
+ buf := bytes.NewReader(compressedData) // 从压缩数据创建 reader
+ gzipReader, err := gzip.NewReader(buf) // 创建 gzip reader,从 buffer 读取压缩数据
+ if err != nil {
+ return nil, fmt.Errorf("gzipDecompress: gzip reader creation error: %w", err)
+ }
+ defer gzipReader.Close() // 确保 reader 在使用后关闭
+ decompressedData, err := io.ReadAll(gzipReader) // 读取所有解压缩后的数据
+ if err != nil {
+ return nil, fmt.Errorf("gzipDecompress: gzip read error: %w", err)
+ }
+ return decompressedData, nil // 返回解压缩后的数据
+}
+
+// Zstd 函数:使用 zstd 压缩数据
+func Zstd(data []byte) ([]byte, error) {
+ var buf bytes.Buffer
+ zstdWriter, err := zstd.NewWriter(&buf) // 创建 zstd writer,写入到 buffer
+ if err != nil {
+ return nil, fmt.Errorf("zstd writer 创建失败: %w", err)
+ }
+ defer zstdWriter.Close() // 确保 writer 关闭
+ _, err = zstdWriter.Write(data) // 将原始数据写入 zstd writer 进行压缩
+ if err != nil {
+ return nil, fmt.Errorf("zstd 写入错误: %w", err)
+ }
+ err = zstdWriter.Close() // 完成 zstd 流
+ if err != nil {
+ return nil, fmt.Errorf("zstd 关闭错误: %w", err)
+ }
+ return buf.Bytes(), nil // 返回 buffer 中的压缩数据
+}
+
+// ZstdDecompress 函数:使用 zstd 解压缩数据
+func ZstdDecompress(compressedData []byte) ([]byte, error) {
+ buf := bytes.NewReader(compressedData) // 从压缩数据创建 reader
+ zstdReader, err := zstd.NewReader(buf) // 创建 zstd reader,从 buffer 读取压缩数据
+ if err != nil {
+ return nil, fmt.Errorf("zstd reader 创建失败: %w", err)
+ }
+ defer zstdReader.Close() // 确保 reader 关闭
+ decompressedData, err := io.ReadAll(zstdReader) // 读取所有解压缩后的数据
+ if err != nil {
+ return nil, fmt.Errorf("zstd 读取错误: %w", err)
+ }
+ return decompressedData, nil // 返回解压缩后的数据
+}
+
+// Lz4 函数:使用 lz4 压缩数据
+func Lz4(data []byte) ([]byte, error) {
+ var buf bytes.Buffer
+ lz4Writer := lz4.NewWriter(&buf) // 创建 lz4 writer,写入到 buffer
+ defer lz4Writer.Close() // 确保 writer 关闭
+ _, err := lz4Writer.Write(data) // 将原始数据写入 lz4 writer 进行压缩
+ if err != nil {
+ return nil, fmt.Errorf("lz4 写入错误: %w", err)
+ }
+ err = lz4Writer.Close() // 完成 lz4 流
+ if err != nil {
+ return nil, fmt.Errorf("lz4 关闭错误: %w", err)
+ }
+ return buf.Bytes(), nil // 返回 buffer 中的压缩数据
+}
+
+// Lz4Decompress 函数:使用 lz4 解压缩数据
+func Lz4Decompress(compressedData []byte) ([]byte, error) {
+ buf := bytes.NewReader(compressedData) // 从压缩数据创建 reader
+ lz4Reader := lz4.NewReader(buf) // 创建 lz4 reader,从 buffer 读取压缩数据
+ decompressedData, err := io.ReadAll(lz4Reader) // 读取所有解压缩后的数据
+ if err != nil {
+ return nil, fmt.Errorf("lz4 读取错误: %w", err)
+ }
+ return decompressedData, nil // 返回解压缩后的数据
+}
+
+// Snappy 函数:使用 snappy 压缩数据
+func Snappy(data []byte) ([]byte, error) {
+ compressedData := snappy.Encode(nil, data) // 使用 snappy.Encode 直接压缩,无需 Writer
+ return compressedData, nil // snappy.Encode 返回压缩后的 []byte,以及 error (但目前 snappy.Encode 不直接返回 error)
+}
+
+// SnappyDecompress 函数:使用 snappy 解压缩数据
+func SnappyDecompress(compressedData []byte) ([]byte, error) {
+ decompressedData, err := snappy.Decode(nil, compressedData) // 使用 snappy.Decode 解压缩,无需 Reader
+ if err != nil {
+ return nil, fmt.Errorf("snappy 解压缩失败: %w", err)
+ }
+ return decompressedData, nil
+}
diff --git a/server/internal/mq/kafka/mgm/info.go b/server/internal/mq/kafka/mgm/info.go
new file mode 100644
index 00000000..a83c6d05
--- /dev/null
+++ b/server/internal/mq/kafka/mgm/info.go
@@ -0,0 +1,221 @@
+package mgm
+
+import (
+ "context"
+ "fmt"
+ machineapp "mayfly-go/internal/machine/application"
+ "mayfly-go/pkg/logx"
+ "mayfly-go/pkg/utils/netx"
+ "net"
+ "strings"
+ "time"
+
+ "github.com/twmb/franz-go/pkg/kadm"
+ "github.com/twmb/franz-go/pkg/kgo"
+ "github.com/twmb/franz-go/pkg/sasl/plain"
+ "github.com/twmb/franz-go/pkg/sasl/scram"
+)
+
+type KafkaInfo struct {
+ Id uint64 `json:"id"`
+ Name string `json:"name"`
+
+ // Kafka 连接地址,格式: host1:port1,host2:port2 或单个 broker
+ Hosts string `json:"-"`
+
+ Username string `json:"-"`
+ Password string `json:"-"`
+
+ SshTunnelMachineId int `json:"-"` // ssh隧道机器id
+
+ // SASL 配置
+ SaslEnabled bool `json:"-"`
+ SaslMechanism string `json:"-"` // PLAIN, SCRAM-SHA-256, SCRAM-SHA-512
+}
+
+func (mi *KafkaInfo) Conn() (*KafkaConn, error) {
+ opts := mi.BuildBaseOpts()
+
+ // 创建客户端
+ client, err := kgo.NewClient(opts...)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create kafka client: %w", err)
+ }
+
+ kac := kadm.NewClient(client)
+
+ // 测试连接 - 获取 broker 列表来验证连接
+ brokers, err := kac.ListBrokers(context.Background())
+ if err != nil {
+ return nil, fmt.Errorf("failed to list brokers: %w", err)
+ }
+ if len(brokers) == 0 {
+ client.Close()
+ return nil, fmt.Errorf("no available brokers")
+ }
+
+ logx.Infof("连接 kafka: %s", mi.Hosts)
+
+ return &KafkaConn{
+ Id: getConnId(mi.Id),
+ Info: mi,
+ Client: client,
+ Configs: opts,
+ Ac: kac,
+ }, nil
+}
+
+func (mi *KafkaInfo) BuildBaseOpts() []kgo.Opt {
+
+ // 构建基础配置(复用 KafkaInfo.Conn 的逻辑)
+ opts := []kgo.Opt{
+ kgo.SeedBrokers(strings.Split(mi.Hosts, ",")...),
+ kgo.DialTimeout(8 * time.Second),
+ kgo.RecordPartitioner(kgo.ManualPartitioner()),
+ }
+
+ // 配置认证
+ if mi.Username != "" {
+ scramAuth := &scram.Auth{
+ User: mi.Username,
+ Pass: mi.Password,
+ }
+ plainAuth := &plain.Auth{
+ User: mi.Username,
+ Pass: mi.Password,
+ }
+ if mi.SaslMechanism != "" {
+ switch strings.ToUpper(mi.SaslMechanism) {
+ case "SCRAM-SHA-256":
+ opts = append(opts, kgo.SASL(scramAuth.AsSha256Mechanism()))
+ case "SCRAM-SHA-512":
+ opts = append(opts, kgo.SASL(scramAuth.AsSha512Mechanism()))
+ default:
+ opts = append(opts, kgo.SASL(plainAuth.AsMechanism()))
+ }
+ } else {
+ opts = append(opts, kgo.SASL(plainAuth.AsMechanism()))
+ }
+ }
+
+ // SSH 隧道
+ if mi.SshTunnelMachineId > 0 {
+ stm, err := machineapp.GetMachineApp().GetSshTunnelMachine(context.Background(), mi.SshTunnelMachineId)
+ if err != nil {
+ logx.Errorf("获取 ssh隧道失败:%v", err)
+ } else {
+ dialFn := func(ctx context.Context, network, address string) (net.Conn, error) {
+ sshConn, err := stm.GetDialConn(network, address)
+ if err != nil {
+ return nil, err
+ }
+ return &netx.WrapSshConn{Conn: sshConn}, nil
+ }
+ opts = append(opts, kgo.Dialer(dialFn))
+ }
+ }
+
+ return opts
+}
+
+type KafkaSshProxyDialer struct {
+ machineId int
+}
+
+func (sd *KafkaSshProxyDialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
+ stm, err := machineapp.GetMachineApp().GetSshTunnelMachine(ctx, sd.machineId)
+ if err != nil {
+ return nil, err
+ }
+ if sshConn, err := stm.GetDialConn(network, address); err == nil {
+ // 将ssh conn包装,否则内部设置超时会报错,ssh conn不支持设置超时会返回错误: ssh: tcpChan: deadline not supported
+ return &netx.WrapSshConn{Conn: sshConn}, nil
+ } else {
+ return nil, err
+ }
+}
+
+// Dial 实现 sarama.Dialer 接口
+func (sd *KafkaSshProxyDialer) Dial(network, address string) (net.Conn, error) {
+ return sd.DialContext(context.Background(), network, address)
+}
+
+// 生成kafka连接id
+func getConnId(id uint64) string {
+ if id == 0 {
+ return ""
+ }
+ return fmt.Sprintf("kafka:%d", id)
+}
+
+// CreateTopicParam 创建Topic参数
+type CreateTopicParam struct {
+ TopicName string `json:"topic" binding:"required" ` // Topic 名称
+ NumPartitions int32 `json:"numPartitions" binding:"required" ` // 分区数
+ ReplicationFactor int16 `json:"replicationFactor" binding:"required" ` // 副本数
+ ConfigEntries map[string]*string `json:"configEntries"` // 配置项
+}
+
+// CreatePartitionsParam 修改分区参数
+type CreatePartitionsParam struct {
+ TopicName string `json:"topic"` // Topic 名称
+ NumPartitions int `json:"numPartitions"` // 分区数
+}
+
+// ConsumeMessageParam 消费消息参数
+type ConsumeMessageParam struct {
+ Topic string `json:"topic"`
+ Group string `json:"group"` // 消费组 ID,为空则不使用消费组 // Topic 名称
+ Partition int32 `json:"partition" default:"-1"` // 分区号,-1 表示所有分区
+ Number int `json:"number" default:"10"` // 消费消息数量
+ PullTimeout int `json:"pullTimeout" default:"10"` // 拉取超时时间(秒)
+ Decompression string `json:"decompression"` // 解压方式:gzip, lz4, zstd, snappy
+ Decode string `json:"decode"` // 解码方式:Base64
+ Earliest bool `json:"earliest"` // 是否从最早开始消费,false 则从最新
+ StartTime string `json:"startTime"` // 消费起始时间戳(毫秒),0 表示不使用,优先级高于 Earliest
+ CommitOffset bool `json:"commitOffset"` // 是否提交消费位点
+ IsolationLevel string `json:"isolationLevel"` // 读取消息的隔离级别,默认为 read_committed
+}
+
+// ConsumeMessageResult 消费的消息结果
+type ConsumeMessageResult struct {
+ Id int `json:"id"`
+ Offset int64 `json:"offset"`
+ Partition int32 `json:"partition"`
+ Key string `json:"key"`
+ Value string `json:"value"`
+ Timestamp string `json:"timestamp"`
+ Topic string `json:"topic"`
+ Headers map[string]string `json:"headers"`
+ LeaderEpoch int32 `json:"leaderEpoch"`
+ ProducerEpoch int16 `json:"producerEpoch"`
+ ProducerID int64 `json:"producerID"`
+}
+
+// ProduceMessageParam 生产消息参数
+type ProduceMessageParam struct {
+ Topic string `json:"topic"` // Topic 名称
+ Key string `json:"key"` // 消息 Key(可选)
+ Partition int32 `json:"partition"` // 指定分区号(可选,-1 表示自动选择)
+ Value string `json:"value"` // 消息内容
+ Headers []map[string]string `json:"headers"` // 消息 Headers(可选)
+ Times int `json:"times"` // 发送次数
+ Compression string `json:"compression"` // 压缩方式:gzip, lz4, zstd, snappy
+}
+
+type RecordHeader struct {
+ Key string
+ Value string
+}
+
+// ProduceMessageResult 生产消息结果
+type ProduceMessageResult struct {
+ Partition int32 `json:"partition"`
+ Offset int64 `json:"offset"`
+}
+
+type BrokerInfo struct {
+ Id int32 `json:"id"`
+ Addr string `json:"addr"`
+ Rack *string `json:"rac"`
+}
diff --git a/server/internal/pkg/consts/consts.go b/server/internal/pkg/consts/consts.go
index e93ab543..5f1b5995 100644
--- a/server/internal/pkg/consts/consts.go
+++ b/server/internal/pkg/consts/consts.go
@@ -10,6 +10,7 @@ const (
ResourceTypeAuthCert int8 = 5
ResourceTypeEsInstance int8 = 6
ResourceTypeContainer int8 = 7
+ ResourceTypeMqKafka int8 = 8
// imsg起始编号
ImsgNumSys = 10000
@@ -23,4 +24,5 @@ const (
ImsgNumMsg = 90000
ImsgNumEs = 100000
ImsgNumDocker = 110000
+ ImsgNumMqKafka = 120000
)
diff --git a/server/internal/tag/api/tag_tree.go b/server/internal/tag/api/tag_tree.go
index f3fe5811..07a79396 100644
--- a/server/internal/tag/api/tag_tree.go
+++ b/server/internal/tag/api/tag_tree.go
@@ -1,6 +1,7 @@
package api
import (
+ "cmp"
"fmt"
"mayfly-go/internal/tag/api/form"
"mayfly-go/internal/tag/api/vo"
@@ -45,6 +46,9 @@ func (t *TagTree) ReqConfs() *req.Confs {
func (p *TagTree) GetTagTree(rc *req.Ctx) {
tagTypesStr := rc.Query("type")
+ flatten := rc.Query("flatten") // 是否展开平铺树
+ flatten = cmp.Or(flatten, "0")
+
var typePaths []entity.TypePath
if tagTypesStr != "" {
typePaths = collx.ArrayMap(strings.Split(tagTypesStr, ","), func(val string) entity.TypePath {
@@ -63,7 +67,15 @@ func (p *TagTree) GetTagTree(rc *req.Ctx) {
for _, tag := range allTags {
tagTrees = append(tagTrees, tag)
}
- rc.ResData = tagTrees.ToTrees(0)
+
+ var tree []*vo.TagTreeItem
+ if flatten != "0" {
+ tree = tagTrees.ToFlattenTrees(0)
+ } else {
+ tree = tagTrees.ToTrees(0)
+ }
+
+ rc.ResData = tree
}
// complteTags 补全标签信息,使其能构造为树结构
@@ -163,22 +175,34 @@ func (p *TagTree) CountTagResource(rc *req.Ctx) {
CodePathLikes: collx.AsArray(tagPath),
}).GetCodePaths()...)
+ redisCodes := p.tagTreeApp.GetAccountTags(accountId, &entity.TagTreeQuery{
+ Types: collx.AsArray(entity.TagTypeRedis),
+ CodePathLikes: collx.AsArray(tagPath),
+ }).GetCodes()
+
+ mongoCodes := p.tagTreeApp.GetAccountTags(accountId, &entity.TagTreeQuery{
+ Types: collx.AsArray(entity.TagTypeMongo),
+ CodePathLikes: collx.AsArray(tagPath),
+ }).GetCodes()
+
+ containerCodes := p.tagTreeApp.GetAccountTags(accountId, &entity.TagTreeQuery{
+ Types: collx.AsArray(entity.TagTypeContainer),
+ CodePathLikes: collx.AsArray(tagPath),
+ }).GetCodes()
+
+ kafkaCodes := p.tagTreeApp.GetAccountTags(accountId, &entity.TagTreeQuery{
+ Types: collx.AsArray(entity.TagTypeMqKafka),
+ CodePathLikes: collx.AsArray(tagPath),
+ }).GetCodes()
+
rc.ResData = collx.M{
- "machine": len(machineCodes),
- "db": len(dbCodes),
- "es": len(esCodes),
- "redis": len(p.tagTreeApp.GetAccountTags(accountId, &entity.TagTreeQuery{
- Types: collx.AsArray(entity.TagTypeRedis),
- CodePathLikes: collx.AsArray(tagPath),
- }).GetCodes()),
- "mongo": len(p.tagTreeApp.GetAccountTags(accountId, &entity.TagTreeQuery{
- Types: collx.AsArray(entity.TagTypeMongo),
- CodePathLikes: collx.AsArray(tagPath),
- }).GetCodes()),
- "container": len(p.tagTreeApp.GetAccountTags(accountId, &entity.TagTreeQuery{
- Types: collx.AsArray(entity.TagTypeContainer),
- CodePathLikes: collx.AsArray(tagPath),
- }).GetCodes()),
+ "machine": len(machineCodes),
+ "db": len(dbCodes),
+ "es": len(esCodes),
+ "redis": len(redisCodes),
+ "mongo": len(mongoCodes),
+ "container": len(containerCodes),
+ "kafka": len(kafkaCodes),
}
}
diff --git a/server/internal/tag/api/vo/tag_tree.go b/server/internal/tag/api/vo/tag_tree.go
index 93714c11..f047a4cc 100644
--- a/server/internal/tag/api/vo/tag_tree.go
+++ b/server/internal/tag/api/vo/tag_tree.go
@@ -2,6 +2,7 @@ package vo
import (
"mayfly-go/internal/tag/application/dto"
+ "strings"
)
type TagTreeVOS []*dto.SimpleTagTree
@@ -9,6 +10,7 @@ type TagTreeVOS []*dto.SimpleTagTree
type TagTreeItem struct {
*dto.SimpleTagTree
Children []*TagTreeItem `json:"children"`
+ NamePath string `json:"namePath"`
}
func (m *TagTreeVOS) ToTrees(pid uint64) []*TagTreeItem {
@@ -30,7 +32,6 @@ func (m *TagTreeVOS) ToTrees(pid uint64) []*TagTreeItem {
}
for _, node := range ttis {
- // 根节点
if node.Root {
continue
}
@@ -43,3 +44,64 @@ func (m *TagTreeVOS) ToTrees(pid uint64) []*TagTreeItem {
return roots
}
+
+func (m *TagTreeVOS) ToFlattenTrees(pid uint64) []*TagTreeItem {
+ trees := m.ToTrees(pid)
+ result := make([]*TagTreeItem, 0)
+
+ var flatten func(node *TagTreeItem, namePath []string)
+ flatten = func(node *TagTreeItem, namePath []string) {
+ currentNamePath := append(namePath, node.Name)
+
+ if node.Type != -1 {
+ return
+ }
+
+ hasNonMinus1Child := false
+ for _, child := range node.Children {
+ if child.Type != -1 {
+ hasNonMinus1Child = true
+ break
+ }
+ }
+
+ if hasNonMinus1Child {
+ newNode := &TagTreeItem{
+ SimpleTagTree: node.SimpleTagTree,
+ Children: make([]*TagTreeItem, 0),
+ NamePath: strings.Join(currentNamePath, "/"),
+ }
+ for _, child := range node.Children {
+ if child.Type != -1 {
+ childCopy := &TagTreeItem{
+ SimpleTagTree: child.SimpleTagTree,
+ Children: make([]*TagTreeItem, 0),
+ NamePath: strings.Join(append(currentNamePath, child.Name), "/"),
+ }
+ for _, grandchild := range child.Children {
+ grandchildCopy := &TagTreeItem{
+ SimpleTagTree: grandchild.SimpleTagTree,
+ NamePath: strings.Join(append(currentNamePath, child.Name, grandchild.Name), "/"),
+ }
+ childCopy.Children = append(childCopy.Children, grandchildCopy)
+ }
+ newNode.Children = append(newNode.Children, childCopy)
+ } else {
+ flatten(child, currentNamePath)
+ }
+ }
+ result = append(result, newNode)
+ return
+ }
+
+ for _, child := range node.Children {
+ flatten(child, currentNamePath)
+ }
+ }
+
+ for _, tree := range trees {
+ flatten(tree, []string{})
+ }
+
+ return result
+}
diff --git a/server/internal/tag/domain/entity/tag_tree.go b/server/internal/tag/domain/entity/tag_tree.go
index 4077e254..fd782889 100644
--- a/server/internal/tag/domain/entity/tag_tree.go
+++ b/server/internal/tag/domain/entity/tag_tree.go
@@ -37,6 +37,7 @@ const (
TagTypeMongo TagType = TagType(consts.ResourceTypeMongo)
TagTypeAuthCert TagType = TagType(consts.ResourceTypeAuthCert) // 授权凭证类型
TagTypeContainer TagType = TagType(consts.ResourceTypeContainer)
+ TagTypeMqKafka TagType = TagType(consts.ResourceTypeMqKafka)
TagTypeDb TagType = 22 // 数据库名
)
diff --git a/server/main.go b/server/main.go
index 284415ac..fc12b7b1 100644
--- a/server/main.go
+++ b/server/main.go
@@ -12,6 +12,7 @@ import (
_ "mayfly-go/internal/flow/init"
_ "mayfly-go/internal/machine/init"
_ "mayfly-go/internal/mongo/init"
+ _ "mayfly-go/internal/mq/init"
_ "mayfly-go/internal/msg/init"
"mayfly-go/internal/pkg/config"
_ "mayfly-go/internal/redis/init"
diff --git a/server/migration/migrations/v1_10.go b/server/migration/migrations/v1_10.go
index 093bf83f..2b516841 100644
--- a/server/migration/migrations/v1_10.go
+++ b/server/migration/migrations/v1_10.go
@@ -7,6 +7,7 @@ import (
esentity "mayfly-go/internal/es/domain/entity"
flowentity "mayfly-go/internal/flow/domain/entity"
machineentity "mayfly-go/internal/machine/domain/entity"
+ mqentity "mayfly-go/internal/mq/kafka/domain/entity"
msgentity "mayfly-go/internal/msg/domain/entity"
sysentity "mayfly-go/internal/sys/domain/entity"
"mayfly-go/pkg/model"
@@ -25,6 +26,7 @@ func V1_10() []*gormigrate.Migration {
migrations = append(migrations, V1_10_4()...)
migrations = append(migrations, V1_10_5()...)
migrations = append(migrations, V1_10_6()...)
+ migrations = append(migrations, V1_10_7()...)
return migrations
}
@@ -493,3 +495,44 @@ func V1_10_6() []*gormigrate.Migration {
},
}
}
+
+func V1_10_7() []*gormigrate.Migration {
+ return []*gormigrate.Migration{
+ {
+ ID: "20260313-v1.10.7",
+
+ Migrate: func(tx *gorm.DB) error {
+ tx.AutoMigrate(&mqentity.Kafka{})
+
+ tx.Exec("INSERT INTO `t_sys_resource` (`id`, `pid`, `type`, `status`, `name`, `code`, `weight`, `meta`, `creator_id`, `creator`, `modifier_id`, `modifier`, `create_time`, `update_time`, `ui_path`, `is_deleted`, `delete_time`) VALUES (1773796612, 1773796555, 2, 1, 'kafka', 'res:mq:kafka', 1773796612, 'null', 12, 'admin', 12, 'admin', '2026-03-18 09:16:53', '2026-03-18 09:16:53', 'UX7Sa6oG/cC4TRerV/', 0, NULL)")
+ tx.Exec("INSERT INTO `t_sys_resource` (`id`, `pid`, `type`, `status`, `name`, `code`, `weight`, `meta`, `creator_id`, `creator`, `modifier_id`, `modifier`, `create_time`, `update_time`, `ui_path`, `is_deleted`, `delete_time`) VALUES (1773796555, 94, 2, 1, 'MQ', 'res:mq', 1773796555, 'null', 12, 'admin', 12, 'admin', '2026-03-18 09:15:55', '2026-03-18 09:16:34', 'UX7Sa6oG/', 0, NULL)")
+ tx.Exec("INSERT INTO `t_sys_resource` (`id`, `pid`, `type`, `status`, `name`, `code`, `weight`, `meta`, `creator_id`, `creator`, `modifier_id`, `modifier`, `create_time`, `update_time`, `ui_path`, `is_deleted`, `delete_time`) VALUES (1773714634, 1773365877, 2, 1, 'group delete', 'kafka:group:delete', 1773714634, 'null', 12, 'admin', 12, 'admin', '2026-03-17 10:30:35', '2026-03-17 10:30:35', 'ocdrUNaa/6f6tY0WZ/nDZWgbDn/QvLeQGrM/', 0, NULL)")
+ tx.Exec("INSERT INTO `t_sys_resource` (`id`, `pid`, `type`, `status`, `name`, `code`, `weight`, `meta`, `creator_id`, `creator`, `modifier_id`, `modifier`, `create_time`, `update_time`, `ui_path`, `is_deleted`, `delete_time`) VALUES (1773647239, 1773365877, 2, 1, 'topic consume', 'kafka:topic:consume', 1773647239, 'null', 12, 'admin', 12, 'admin', '2026-03-16 15:47:20', '2026-03-16 15:47:20', 'ocdrUNaa/6f6tY0WZ/nDZWgbDn/xgP900Sy/', 0, NULL)")
+ tx.Exec("INSERT INTO `t_sys_resource` (`id`, `pid`, `type`, `status`, `name`, `code`, `weight`, `meta`, `creator_id`, `creator`, `modifier_id`, `modifier`, `create_time`, `update_time`, `ui_path`, `is_deleted`, `delete_time`) VALUES (1773647223, 1773365877, 2, 1, 'topic produce', 'kafka:topic:produce', 1773647223, 'null', 12, 'admin', 12, 'admin', '2026-03-16 15:47:04', '2026-03-16 15:47:04', 'ocdrUNaa/6f6tY0WZ/nDZWgbDn/dPHcNo2n/', 0, NULL)")
+ tx.Exec("INSERT INTO `t_sys_resource` (`id`, `pid`, `type`, `status`, `name`, `code`, `weight`, `meta`, `creator_id`, `creator`, `modifier_id`, `modifier`, `create_time`, `update_time`, `ui_path`, `is_deleted`, `delete_time`) VALUES (1773647053, 1773365877, 2, 1, 'topic delete', 'kafka:topic:delete', 1773647053, 'null', 12, 'admin', 12, 'admin', '2026-03-16 15:44:13', '2026-03-16 15:44:13', 'ocdrUNaa/6f6tY0WZ/nDZWgbDn/nzD93hsk/', 0, NULL)")
+ tx.Exec("INSERT INTO `t_sys_resource` (`id`, `pid`, `type`, `status`, `name`, `code`, `weight`, `meta`, `creator_id`, `creator`, `modifier_id`, `modifier`, `create_time`, `update_time`, `ui_path`, `is_deleted`, `delete_time`) VALUES (1773647032, 1773365877, 2, 1, 'topic create', 'kafka:topic:create', 1773647032, 'null', 12, 'admin', 12, 'admin', '2026-03-16 15:43:53', '2026-03-16 15:43:53', 'ocdrUNaa/6f6tY0WZ/nDZWgbDn/7ZtYUWEn/', 0, NULL)")
+ tx.Exec("INSERT INTO `t_sys_resource` (`id`, `pid`, `type`, `status`, `name`, `code`, `weight`, `meta`, `creator_id`, `creator`, `modifier_id`, `modifier`, `create_time`, `update_time`, `ui_path`, `is_deleted`, `delete_time`) VALUES (1773365930, 1773796612, 2, 1, 'kafka-删除', 'mq:kafka:del', 0, 'null', 12, 'admin', 12, 'admin', '2026-03-13 09:38:50', '2026-03-18 09:17:39', 'UX7Sa6oG/cC4TRerV/qdaX1R6b/', 0, NULL)")
+ tx.Exec("INSERT INTO `t_sys_resource` (`id`, `pid`, `type`, `status`, `name`, `code`, `weight`, `meta`, `creator_id`, `creator`, `modifier_id`, `modifier`, `create_time`, `update_time`, `ui_path`, `is_deleted`, `delete_time`) VALUES (1773365913, 1773796612, 2, 1, 'kafka-保存', 'mq:kafka:save', 1, 'null', 12, 'admin', 12, 'admin', '2026-03-13 09:38:33', '2026-03-18 09:17:34', 'UX7Sa6oG/cC4TRerV/xaL2Zi9m/', 0, NULL)")
+ tx.Exec("INSERT INTO `t_sys_resource` (`id`, `pid`, `type`, `status`, `name`, `code`, `weight`, `meta`, `creator_id`, `creator`, `modifier_id`, `modifier`, `create_time`, `update_time`, `ui_path`, `is_deleted`, `delete_time`) VALUES (1773365877, 1773365853, 2, 1, 'mq:kafka', 'mq:kafka:base', 1773365877, 'null', 12, 'admin', 12, 'admin', '2026-03-13 09:37:57', '2026-03-13 09:37:57', 'ocdrUNaa/6f6tY0WZ/nDZWgbDn/', 0, NULL)")
+ tx.Exec("INSERT INTO `t_sys_resource` (`id`, `pid`, `type`, `status`, `name`, `code`, `weight`, `meta`, `creator_id`, `creator`, `modifier_id`, `modifier`, `create_time`, `update_time`, `ui_path`, `is_deleted`, `delete_time`) VALUES (1773365853, 1756122788, 2, 1, 'MQ', 'mq:base', 1773365853, 'null', 12, 'admin', 12, 'admin', '2026-03-13 09:37:33', '2026-03-13 09:37:42', 'ocdrUNaa/6f6tY0WZ/', 0, NULL)")
+
+ tx.Exec("INSERT INTO `t_sys_role_resource` (`role_id`, `resource_id`, `creator_id`, `creator`, `create_time`, `is_deleted`, `delete_time`) VALUES (1, 1773365853, 12, 'admin', '2026-03-13 09:45:24', 0, NULL)")
+ tx.Exec("INSERT INTO `t_sys_role_resource` (`role_id`, `resource_id`, `creator_id`, `creator`, `create_time`, `is_deleted`, `delete_time`) VALUES (1, 1773365877, 12, 'admin', '2026-03-13 09:45:24', 0, NULL)")
+ tx.Exec("INSERT INTO `t_sys_role_resource` (`role_id`, `resource_id`, `creator_id`, `creator`, `create_time`, `is_deleted`, `delete_time`) VALUES (1, 1773365913, 12, 'admin', '2026-03-13 09:45:24', 0, NULL)")
+ tx.Exec("INSERT INTO `t_sys_role_resource` (`role_id`, `resource_id`, `creator_id`, `creator`, `create_time`, `is_deleted`, `delete_time`) VALUES (1, 1773365930, 12, 'admin', '2026-03-13 09:45:24', 0, NULL)")
+ tx.Exec("INSERT INTO `t_sys_role_resource` (`role_id`, `resource_id`, `creator_id`, `creator`, `create_time`, `is_deleted`, `delete_time`) VALUES (1, 1773647032, 12, 'admin', '2026-03-16 15:47:27', 0, NULL)")
+ tx.Exec("INSERT INTO `t_sys_role_resource` (`role_id`, `resource_id`, `creator_id`, `creator`, `create_time`, `is_deleted`, `delete_time`) VALUES (1, 1773647053, 12, 'admin', '2026-03-16 15:47:27', 0, NULL)")
+ tx.Exec("INSERT INTO `t_sys_role_resource` (`role_id`, `resource_id`, `creator_id`, `creator`, `create_time`, `is_deleted`, `delete_time`) VALUES (1, 1773647239, 12, 'admin', '2026-03-16 15:47:27', 0, NULL)")
+ tx.Exec("INSERT INTO `t_sys_role_resource` (`role_id`, `resource_id`, `creator_id`, `creator`, `create_time`, `is_deleted`, `delete_time`) VALUES (1, 1773647223, 12, 'admin', '2026-03-16 15:47:27', 0, NULL)")
+ tx.Exec("INSERT INTO `t_sys_role_resource` (`role_id`, `resource_id`, `creator_id`, `creator`, `create_time`, `is_deleted`, `delete_time`) VALUES (1, 1773714634, 12, 'admin', '2026-03-17 10:31:00', 0, NULL)")
+ tx.Exec("INSERT INTO `t_sys_role_resource` (`role_id`, `resource_id`, `creator_id`, `creator`, `create_time`, `is_deleted`, `delete_time`) VALUES (1, 1773796555, 12, 'admin', '2026-03-18 09:18:09', 0, NULL)")
+ tx.Exec("INSERT INTO `t_sys_role_resource` (`role_id`, `resource_id`, `creator_id`, `creator`, `create_time`, `is_deleted`, `delete_time`) VALUES (1, 1773796612, 12, 'admin', '2026-03-18 09:18:09', 0, NULL)")
+
+ return nil
+ },
+ Rollback: func(tx *gorm.DB) error {
+ return nil
+ },
+ },
+ }
+}