rocketmq,收益管理,资源管理模块代码

This commit is contained in:
gaoyutao
2025-08-21 09:28:46 +08:00
parent 499e93b68a
commit 80dbba70fe
110 changed files with 9430 additions and 68 deletions

View File

@@ -0,0 +1,25 @@
package com.ruoyi.rocketmq;
import com.ruoyi.common.security.annotation.EnableCustomConfig;
import com.ruoyi.common.security.annotation.EnableRyFeignClients;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
/**
* 平台管理模块
*
* @author ruoyi
*/
@EnableCustomConfig
@EnableRyFeignClients
@SpringBootApplication
@EnableAsync
public class RocketMQApplication
{
public static void main(String[] args)
{
SpringApplication.run(RocketMQApplication.class, args);
System.out.println("(♥◠‿◠)ノ゙ RocketMQ模块启动成功 ლ(´ڡ`ლ)゙");
}
}

View File

@@ -0,0 +1,75 @@
package com.ruoyi.rocketmq.config;
import com.ruoyi.rocketmq.enums.MessageCodeEnum;
import com.ruoyi.rocketmq.enums.MessageTopic;
import com.ruoyi.rocketmq.model.ConsumerMode;
import com.ruoyi.rocketmq.consumer.RocketMsgListener;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.List;
/**
* 消费者配置
*/
@RefreshScope
@Configuration
@Slf4j
public class ConsumerConfig {
@Autowired
private ConsumerMode consumerMode;
@Bean
public DefaultMQPushConsumer getRocketMQConsumer() {
//构建客户端连接
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerMode.getGroupName());
//
consumer.setNamesrvAddr(consumerMode.getNamesrvAddr());
consumer.setConsumeThreadMin(consumerMode.getConsumeThreadMin());
consumer.setConsumeThreadMax(consumerMode.getConsumeThreadMax());
consumer.registerMessageListener(new RocketMsgListener());
/**
* 1. CONSUME_FROM_LAST_OFFSET第一次启动从队列最后位置消费后续再启动接着上次消费的进度开始消费
* 2. CONSUME_FROM_FIRST_OFFSET第一次启动从队列初始位置消费后续再启动接着上次消费的进度开始消费
* 3. CONSUME_FROM_TIMESTAMP第一次启动从指定时间点位置消费后续再启动接着上次消费的进度开始消费
* 以上所说的第一次启动是指从来没有消费过的消费者如果该消费者消费过那么会在broker端记录该消费者的消费位置如果该消费者挂了再启动那么自动从上次消费的进度开始
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
/**
* CLUSTERING (集群模式) 默认模式同一个ConsumerGroup(groupName相同)每个consumer只消费所订阅消息的一部分内容同一个ConsumerGroup里所有的Consumer消息加起来才是所
* 订阅topic整体从而达到负载均衡的目的
* BROADCASTING (广播模式) 同一个ConsumerGroup每个consumer都消费到所订阅topic所有消息也就是一个消费会被多次分发被多个consumer消费。
* 需要注意的是在广播模式下每个Consumer都会独立地处理相同的消息副本。这可能会导致一些潜在的问题例如消息重复处理或者资源浪费。因此在使用广播模式时请确保消息的处理逻辑是幂等的并仔细考虑系统资源的消耗。
*/
// consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.setVipChannelEnabled(false);
consumer.setConsumeMessageBatchMaxSize(consumerMode.getConsumeMessageBatchMaxSize());
try {
/**
* 订阅topic可以对指定消息进行过滤例如"TopicTest","tagl||tag2||tag3",*或null表示topic所有消息
* 由于官方并没有给直接订阅全部消息示例 所以使用list列表循环订阅所有topic
*/
// 获取所有topic列表
MessageTopic messageTopic = new MessageTopic();
List<String> allTopics = messageTopic.RocketMQTopicList();
//订阅所有topic
for (String topic : allTopics) {
consumer.subscribe(topic,"*");
}
consumer.start();
log.info("消费者初始化成功:{}", consumer);
} catch (MQClientException e) {
e.printStackTrace();
log.error("消费者初始化失败:{}",e.getMessage());
}
return consumer;
}
}

View File

@@ -0,0 +1,56 @@
package com.ruoyi.rocketmq.config;
import com.ruoyi.rocketmq.model.ProducerMode;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* mq搭建地址连接
* 生产者初者连接信息 具体看nacos配置
*/
@Configuration
@Slf4j
public class ProducerConfig {
/**
* 远程调用连接信息
*/
public static DefaultMQProducer producer;
/**
* 连接客户端信息配置 具体看nacos配置
*/
@Autowired
private ProducerMode producerMode;
@Bean
public DefaultMQProducer getRocketMQProducer() {
producer = new DefaultMQProducer(producerMode.getGroupName());
producer.setNamesrvAddr(producerMode.getNamesrvAddr());
//如果需要同一个jvm中不同的producer往不同的mq集群发送消息需要设置不同的instanceName
if(producerMode.getMaxMessageSize()!=null){
producer.setMaxMessageSize(producerMode.getMaxMessageSize());
}
if(producerMode.getSendMsgTimeout()!=null){
producer.setSendMsgTimeout(producerMode.getSendMsgTimeout());
}
//如果发送消息失败设置重试次数默认为2次
if(producerMode.getRetryTimesWhenSendFailed()!=null){
producer.setRetryTimesWhenSendFailed(producerMode.getRetryTimesWhenSendFailed());
}
producer.setVipChannelEnabled(false);
try {
producer.start();
log.info("生产者初始化成功:{}",producer.toString());
} catch (MQClientException e) {
log.error("生产者初始化失败:{}",e.getMessage());
}
return producer;
}
}

View File

@@ -0,0 +1,87 @@
package com.ruoyi.rocketmq.consumer;
import com.ruoyi.rocketmq.enums.MessageCodeEnum;
import com.ruoyi.rocketmq.producer.ConsumeException;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.io.UnsupportedEncodingException;
import java.util.List;
/**
* 消息监听
*/
@Slf4j
@Component
public class RocketMsgListener implements MessageListenerConcurrently {
/**
* 消费消息
* @param list msgs.size() >= 1
* DefaultMQPushConsumer.consumeMessageBatchMaxSize=1you can modify here
* 这里只设置为1当设置为多个时list中只要有一条消息消费失败就会整体重试
* @param consumeConcurrentlyContext 上下文信息
* @return 消费状态 成功CONSUME_SUCCESS或者 重试 (RECONSUME_LATER)
*/
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
try{
//消息不等于空情况
if (!CollectionUtils.isEmpty(list)) {
//获取topic
for (MessageExt messageExt : list) {
// 解析消息内容
String body = new String(messageExt.getBody());
log.info("接受到的消息为:{}", body);
String tags = messageExt.getTags();
String topic = messageExt.getTopic();
String msgId = messageExt.getMsgId();
String keys = messageExt.getKeys();
int reConsume = messageExt.getReconsumeTimes();
// 消息已经重试了3次如果不需要再次消费则返回成功
if (reConsume == 3) {
// TODO 补偿信息
log.error("消息消费三次失败,消息内容:{}", body);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//根据业务返回是否正常
}
// 根据不同的topic处理不同的业务 这里以订单消息为例子
if (MessageCodeEnum.ORDER_MESSAGE_TOPIC.getCode().equals(topic)) {
if (MessageCodeEnum.ORDER_TIMEOUT_TAG.getCode().equals(tags)) {
//处理你的业务
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//业务处理成功
} else {
log.info("未匹配到Tag【{}】" + tags);
}
}
}
}
// 消息消费失败
//broker会根据设置的messageDelayLevel发起重试默认16次
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
} catch (Exception e) {
// 调用 handleException 方法处理异常并返回处理结果
return handleException(e);
}
}
/**
* 异常处理
*
* @param e 捕获的异常
* @return 消息消费结果
*/
private static ConsumeConcurrentlyStatus handleException(final Exception e) {
Class exceptionClass = e.getClass();
if (exceptionClass.equals(UnsupportedEncodingException.class)) {
log.error(e.getMessage());
} else if (exceptionClass.equals(ConsumeException.class)) {
log.error(e.getMessage());
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}

View File

@@ -0,0 +1,30 @@
package com.ruoyi.rocketmq.consumer;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
/**
* 事物消息监听
*/
public class RocketMsgTransactionListenerImpl implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 在这里执行本地事务,比如数据库操作等
// 如果本地事务执行成功,返回 COMMIT_MESSAGE
// 如果本地事务执行失败,返回 ROLLBACK_MESSAGE
// 如果本地事务执行中,可以返回 UNKNOWRocketMQ 将会检查事务状态,并根据状态处理消息
return LocalTransactionState.COMMIT_MESSAGE; // 根据实际情况返回对应的状态
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 检查本地事务状态,如果本地事务执行成功,返回 COMMIT_MESSAGE
// 如果本地事务执行失败,返回 ROLLBACK_MESSAGE
// 如果本地事务仍在执行中,返回 UNKNOWRocketMQ 将会继续检查事务状态
return LocalTransactionState.COMMIT_MESSAGE; // 根据实际情况返回对应的状态
}
}

View File

@@ -0,0 +1,105 @@
package com.ruoyi.rocketmq.controller;
import java.util.List;
import java.io.IOException;
import javax.servlet.http.HttpServletResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.ruoyi.common.log.annotation.Log;
import com.ruoyi.common.log.enums.BusinessType;
import com.ruoyi.common.security.annotation.RequiresPermissions;
import com.ruoyi.rocketmq.domain.InitialBandwidthTraffic;
import com.ruoyi.rocketmq.service.IInitialBandwidthTrafficService;
import com.ruoyi.common.core.web.controller.BaseController;
import com.ruoyi.common.core.web.domain.AjaxResult;
import com.ruoyi.common.core.utils.poi.ExcelUtil;
import com.ruoyi.common.core.web.page.TableDataInfo;
/**
* 初始带宽流量Controller
*
* @author gyt
* @date 2025-08-20
*/
@RestController
@RequestMapping("/traffic")
public class InitialBandwidthTrafficController extends BaseController
{
@Autowired
private IInitialBandwidthTrafficService initialBandwidthTrafficService;
/**
* 查询初始带宽流量列表
*/
@RequiresPermissions("rocketmq:traffic:list")
@GetMapping("/list")
public TableDataInfo list(InitialBandwidthTraffic initialBandwidthTraffic)
{
startPage();
List<InitialBandwidthTraffic> list = initialBandwidthTrafficService.selectInitialBandwidthTrafficList(initialBandwidthTraffic);
return getDataTable(list);
}
/**
* 导出初始带宽流量列表
*/
@RequiresPermissions("rocketmq:traffic:export")
@Log(title = "初始带宽流量", businessType = BusinessType.EXPORT)
@PostMapping("/export")
public void export(HttpServletResponse response, InitialBandwidthTraffic initialBandwidthTraffic)
{
List<InitialBandwidthTraffic> list = initialBandwidthTrafficService.selectInitialBandwidthTrafficList(initialBandwidthTraffic);
ExcelUtil<InitialBandwidthTraffic> util = new ExcelUtil<InitialBandwidthTraffic>(InitialBandwidthTraffic.class);
util.exportExcel(response, list, "初始带宽流量数据");
}
/**
* 获取初始带宽流量详细信息
*/
@RequiresPermissions("rocketmq:traffic:query")
@GetMapping(value = "/{id}")
public AjaxResult getInfo(@PathVariable("id") Long id)
{
return success(initialBandwidthTrafficService.selectInitialBandwidthTrafficById(id));
}
/**
* 新增初始带宽流量
*/
@RequiresPermissions("rocketmq:traffic:add")
@Log(title = "初始带宽流量", businessType = BusinessType.INSERT)
@PostMapping
public AjaxResult add(@RequestBody InitialBandwidthTraffic initialBandwidthTraffic)
{
return toAjax(initialBandwidthTrafficService.insertInitialBandwidthTraffic(initialBandwidthTraffic));
}
/**
* 修改初始带宽流量
*/
@RequiresPermissions("rocketmq:traffic:edit")
@Log(title = "初始带宽流量", businessType = BusinessType.UPDATE)
@PutMapping
public AjaxResult edit(@RequestBody InitialBandwidthTraffic initialBandwidthTraffic)
{
return toAjax(initialBandwidthTrafficService.updateInitialBandwidthTraffic(initialBandwidthTraffic));
}
/**
* 删除初始带宽流量
*/
@RequiresPermissions("rocketmq:traffic:remove")
@Log(title = "初始带宽流量", businessType = BusinessType.DELETE)
@DeleteMapping("/{ids}")
public AjaxResult remove(@PathVariable Long[] ids)
{
return toAjax(initialBandwidthTrafficService.deleteInitialBandwidthTrafficByIds(ids));
}
}

View File

@@ -0,0 +1,137 @@
package com.ruoyi.rocketmq.controller;
import com.ruoyi.rocketmq.producer.MessageProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 消息测试类Controller
*/
@RestController
@RequestMapping("/api/rocketMessage")
public class RocketMqController {
/**
* 发送同步消息
*/
@PostMapping("/sendSynchronizeMessage")
private Map sendSynchronizeMessage(){
MessageProducer messageProducer = new MessageProducer();
//调用MessageProducer配置好的消息方法
SendResult sendResult = messageProducer.sendSynchronizeMessage("order-message","order_message_tag","title","content");
Map<String,Object> result = new HashMap<>();
result.put("data",sendResult);
return result;
}
/**
* 发送单向消息
*/
@PostMapping("/sendOnewayMessage")
private Map sendOnewayMessage(@RequestParam("topic") String topic,@RequestParam("tag") String tag,@RequestParam("key") String key,@RequestParam("value") String value){
MessageProducer messageProducer = new MessageProducer();
//调用MessageProducer配置好的消息方法 topic需要你根据你们业务定制相应的
messageProducer.sendOnewayMessage("order-message","order_timeout_tag","title","content");
Map<String,Object> result = new HashMap<>();
result.put("msg","发送成功");
result.put("code",200);
return result;
}
/**
* 批量发送消息
*/
@PostMapping("/sendBatchMessage")
private Map sendBatchMessage(){
// 根据实际需求创建消息列表并返回
List<Message> messages = new ArrayList<>();
// 添加消息到列表
messages.add(new Message("order-message", "order_timeout_tag", "Message 1".getBytes()));
messages.add(new Message("order-message", "order_timeout_tag", "Message 2".getBytes()));
messages.add(new Message("order-message", "order_timeout_tag", "Message 3".getBytes()));
MessageProducer messageProducer = new MessageProducer();
//调用MessageProducer配置好的消息方法 topic需要你根据你们业务定制相应的
SendResult sendResult = messageProducer.sendBatchMessage(messages);
Map<String,Object> result = new HashMap<>();
result.put("data",sendResult);
return result;
}
/**
* 发送事物消息
*/
@PostMapping("/sendThingMessage")
private Map sendThingMessage(@RequestParam("topic") String topic,@RequestParam("tag") String tag,@RequestParam("key") String key,@RequestParam("value") String value){
MessageProducer messageProducer = new MessageProducer();
//调用MessageProducer配置好的消息方法 topic需要你根据你们业务定制相应的
SendResult sendResult = messageProducer.sendThingMessage("order-message","order_timeout_tag","title","content");
Map<String,Object> result = new HashMap<>();
result.put("data",sendResult);
return result;
}
/**
* 发送有序的消息
*/
@PostMapping("/sendOrderlyMessage")
private Map sendOrderlyMessage(){
// 根据实际需求创建消息列表并返回
List<Message> messages = new ArrayList<>();
// 添加消息到列表
messages.add(new Message("order-message", "order_timeout_tag", "Message 1".getBytes()));
messages.add(new Message("order-message", "order_timeout_tag", "Message 2".getBytes()));
messages.add(new Message("order-message", "order_timeout_tag", "Message 3".getBytes()));
Integer messageQueueNumber = 3;
MessageProducer messageProducer = new MessageProducer();
//调用MessageProducer配置好的消息方法 topic需要你根据你们业务定制相应的
SendResult sendResult = messageProducer.sendOrderlyMessage(messages,messageQueueNumber);
Map<String,Object> result = new HashMap<>();
result.put("data",sendResult);
return result;
}
/**
* 发送延迟消息
*/
@PostMapping("/sendDelayMessage")
private Map sendDelayMessage(@RequestParam("topic") String topic,@RequestParam("tag") String tag,@RequestParam("key") String key,@RequestParam("value") String value){
MessageProducer messageProducer = new MessageProducer();
//调用MessageProducer配置好的消息方法 topic需要你根据你们业务定制相应的
SendResult sendResult = messageProducer.sendDelayMessage("order-message","order_timeout_tag","title","content");
Map<String,Object> result = new HashMap<>();
result.put("data",sendResult);
return result;
}
/**
* 发送异步的消息
*/
@PostMapping("/sendAsyncProducerMessage")
private Map sendAsyncProducerMessage(@RequestParam("topic") String topic,@RequestParam("tag") String tag,@RequestParam("key") String key,@RequestParam("value") String value){
MessageProducer messageProducer = new MessageProducer();
//调用MessageProducer配置好的消息方法 topic需要你根据你们业务定制相应的
SendResult sendResult = messageProducer.sendAsyncProducerMessage("order-message","order_timeout_tag","title","content");
Map<String,Object> result = new HashMap<>();
result.put("data",sendResult);
return result;
}
}

View File

@@ -0,0 +1,177 @@
package com.ruoyi.rocketmq.domain;
import java.math.BigDecimal;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import com.ruoyi.common.core.annotation.Excel;
import com.ruoyi.common.core.web.domain.BaseEntity;
/**
* 初始带宽流量对象 initial_bandwidth_traffic
*
* @author gyt
* @date 2025-08-20
*/
public class InitialBandwidthTraffic extends BaseEntity
{
private static final long serialVersionUID = 1L;
/** 唯一标识ID */
private Long id;
/** 接口名称 */
@Excel(name = "接口名称")
private String interfaceName;
/** MAC地址 */
@Excel(name = "MAC地址")
private String macAddress;
/** 运行状态 */
@Excel(name = "运行状态")
private String operationStatus;
/** 接口类型 */
@Excel(name = "接口类型")
private String interfaceType;
/** IPv4地址 */
@Excel(name = "IPv4地址")
private String ipv4Address;
/** 入站丢包率(%) */
@Excel(name = "入站丢包率(%)")
private BigDecimal inboundPacketLoss;
/** 出站丢包率(%) */
@Excel(name = "出站丢包率(%)")
private BigDecimal outboundPacketLoss;
/** 接收带宽(Mbps) */
@Excel(name = "接收带宽(Mbps)")
private BigDecimal receiveBandwidth;
/** 发送带宽(Mbps) */
@Excel(name = "发送带宽(Mbps)")
private BigDecimal sendBandwidth;
public void setId(Long id)
{
this.id = id;
}
public Long getId()
{
return id;
}
public void setInterfaceName(String interfaceName)
{
this.interfaceName = interfaceName;
}
public String getInterfaceName()
{
return interfaceName;
}
public void setMacAddress(String macAddress)
{
this.macAddress = macAddress;
}
public String getMacAddress()
{
return macAddress;
}
public void setOperationStatus(String operationStatus)
{
this.operationStatus = operationStatus;
}
public String getOperationStatus()
{
return operationStatus;
}
public void setInterfaceType(String interfaceType)
{
this.interfaceType = interfaceType;
}
public String getInterfaceType()
{
return interfaceType;
}
public void setIpv4Address(String ipv4Address)
{
this.ipv4Address = ipv4Address;
}
public String getIpv4Address()
{
return ipv4Address;
}
public void setInboundPacketLoss(BigDecimal inboundPacketLoss)
{
this.inboundPacketLoss = inboundPacketLoss;
}
public BigDecimal getInboundPacketLoss()
{
return inboundPacketLoss;
}
public void setOutboundPacketLoss(BigDecimal outboundPacketLoss)
{
this.outboundPacketLoss = outboundPacketLoss;
}
public BigDecimal getOutboundPacketLoss()
{
return outboundPacketLoss;
}
public void setReceiveBandwidth(BigDecimal receiveBandwidth)
{
this.receiveBandwidth = receiveBandwidth;
}
public BigDecimal getReceiveBandwidth()
{
return receiveBandwidth;
}
public void setSendBandwidth(BigDecimal sendBandwidth)
{
this.sendBandwidth = sendBandwidth;
}
public BigDecimal getSendBandwidth()
{
return sendBandwidth;
}
@Override
public String toString() {
return new ToStringBuilder(this,ToStringStyle.MULTI_LINE_STYLE)
.append("id", getId())
.append("interfaceName", getInterfaceName())
.append("macAddress", getMacAddress())
.append("operationStatus", getOperationStatus())
.append("interfaceType", getInterfaceType())
.append("ipv4Address", getIpv4Address())
.append("inboundPacketLoss", getInboundPacketLoss())
.append("outboundPacketLoss", getOutboundPacketLoss())
.append("receiveBandwidth", getReceiveBandwidth())
.append("sendBandwidth", getSendBandwidth())
.append("createTime", getCreateTime())
.append("updateTime", getUpdateTime())
.append("createBy", getCreateBy())
.append("updateBy", getUpdateBy())
.toString();
}
}

View File

@@ -0,0 +1,55 @@
package com.ruoyi.rocketmq.enums;
import lombok.Getter;
/**
* 用于传递topic和 tag
* 也用于接收消息后判断不同的消息处理不同的业务
*/
@Getter
public enum MessageCodeEnum {
/**
* 系统消息
*/
NOTE_MESSAGE_TOPIC("system-message","系统消息服务模块topic名称"),
/**
* 用户消息
*/
USER_MESSAGE_TOPIC("user-message","用户消息服务模块topic名称"),
/**
* 订单消息
*/
ORDER_MESSAGE_TOPIC("order-message","订单消息服务模块topic名称"),
/**
* 用户消息tag
*/
USER_MESSAGE_TAG("user_message_tag","用户消息推送"),
/**
* 系统消息tag
*/
NOTE_MESSAGE_TAG("system_message_tag","系统消息推送"),
/**
* 订单消息
*/
ORDER_MESSAGE_TAG("order_message_tag","订单消息推送"),
/**
* 订单处理编号
*/
ORDER_TIMEOUT_TAG("order_timeout_tag","订单超时处理");
private final String code;
private final String msg;
MessageCodeEnum(String code, String msg){
this.code = code;
this.msg = msg;
}
}

View File

@@ -0,0 +1,24 @@
package com.ruoyi.rocketmq.enums;
import java.util.ArrayList;
import java.util.List;
/**
* 定义topic列表
*/
public class MessageTopic {
//在这里添加topic 用于批量订阅
public List<String> RocketMQTopicList(){
List<String> getTopicLists=new ArrayList<>();
//系统消息
getTopicLists.add("system-message");
//用户消息
getTopicLists.add("user-message");
//订单消息
getTopicLists.add("order-message");
return getTopicLists;
}
}

View File

@@ -0,0 +1,61 @@
package com.ruoyi.rocketmq.mapper;
import java.util.List;
import com.ruoyi.rocketmq.domain.InitialBandwidthTraffic;
/**
* 初始带宽流量Mapper接口
*
* @author gyt
* @date 2025-08-20
*/
public interface InitialBandwidthTrafficMapper
{
/**
* 查询初始带宽流量
*
* @param id 初始带宽流量主键
* @return 初始带宽流量
*/
public InitialBandwidthTraffic selectInitialBandwidthTrafficById(Long id);
/**
* 查询初始带宽流量列表
*
* @param initialBandwidthTraffic 初始带宽流量
* @return 初始带宽流量集合
*/
public List<InitialBandwidthTraffic> selectInitialBandwidthTrafficList(InitialBandwidthTraffic initialBandwidthTraffic);
/**
* 新增初始带宽流量
*
* @param initialBandwidthTraffic 初始带宽流量
* @return 结果
*/
public int insertInitialBandwidthTraffic(InitialBandwidthTraffic initialBandwidthTraffic);
/**
* 修改初始带宽流量
*
* @param initialBandwidthTraffic 初始带宽流量
* @return 结果
*/
public int updateInitialBandwidthTraffic(InitialBandwidthTraffic initialBandwidthTraffic);
/**
* 删除初始带宽流量
*
* @param id 初始带宽流量主键
* @return 结果
*/
public int deleteInitialBandwidthTrafficById(Long id);
/**
* 批量删除初始带宽流量
*
* @param ids 需要删除的数据主键集合
* @return 结果
*/
public int deleteInitialBandwidthTrafficByIds(Long[] ids);
}

View File

@@ -0,0 +1,26 @@
package com.ruoyi.rocketmq.model;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
/**
* 消费者初始化
* 消费者连接信息 具体看nacos配置
*/
@Data
@Configuration
@Component
public class ConsumerMode {
@Value("${suning.rocketmq.namesrvAddr}")
private String namesrvAddr;
@Value("${suning.rocketmq.conumer.groupName}")
private String groupName ;
@Value("${suning.rocketmq.conumer.consumeThreadMin}")
private int consumeThreadMin;
@Value("${suning.rocketmq.conumer.consumeThreadMax}")
private int consumeThreadMax;
@Value("${suning.rocketmq.conumer.consumeMessageBatchMaxSize}")
private int consumeMessageBatchMaxSize;
}

View File

@@ -0,0 +1,25 @@
package com.ruoyi.rocketmq.model;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Configuration;
/**
* 生产者初始化
*/
@RefreshScope
@Data
@Configuration
public class ProducerMode {
@Value("${suning.rocketmq.producer.groupName}")
private String groupName;
@Value("${suning.rocketmq.namesrvAddr}")
private String namesrvAddr;
@Value("${suning.rocketmq.producer.maxMessageSize}")
private Integer maxMessageSize;
@Value("${suning.rocketmq.producer.sendMsgTimeout}")
private Integer sendMsgTimeout;
@Value("${suning.rocketmq.producer.retryTimesWhenSendFailed}")
private Integer retryTimesWhenSendFailed;
}

View File

@@ -0,0 +1,23 @@
package com.ruoyi.rocketmq.producer;
/**
* @author 影子
* 用于捕捉异常非受检异常unchecked exception
* RuntimeException 和其子类的异常在编译时不需要进行强制性的异常处理,可以选择在运行时进行捕获和处理
* 可选择使用
*/
public class ConsumeException extends RuntimeException{
private static final long serialVersionUID = 4093867789628938836L;
public ConsumeException(String message) {
super(message);
}
public ConsumeException(Throwable cause) {
super(cause);
}
public ConsumeException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@@ -0,0 +1,224 @@
package com.ruoyi.rocketmq.producer;
import com.alibaba.fastjson.JSON;
import com.ruoyi.rocketmq.consumer.RocketMsgTransactionListenerImpl;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.io.UnsupportedEncodingException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import static com.ruoyi.rocketmq.config.ProducerConfig.producer;
/**
* 消息发送
*/
@Slf4j
public class MessageProducer {
/**
* 同步发送消息
* @param topic 主题
* @param tag 标签
* @param key 自定义的key根据业务来定
* @param value 消息的内容
* 通过调用 send() 方法发送消息,阻塞等待服务器响应。
*/
public SendResult sendSynchronizeMessage(String topic, String tag, String key, String value){
String body = "topic"+topic+"】, tag"+tag+"】, key"+key+"】, value"+value+"";
try {
Message msg = new Message(topic,tag,key, value.getBytes(RemotingHelper.DEFAULT_CHARSET));
System.out.println("生产者发送消息:"+ JSON.toJSONString(value));
SendResult result = producer.send(msg);
return result;
} catch (UnsupportedEncodingException e) {
log.error("消息初始化失败body{}",body);
} catch (MQClientException | InterruptedException | RemotingException | MQBrokerException e) {
log.error("消息发送失败! body{}",body);
}
return null;
}
/**
* 单向发送消息
* @param topic 主题
* @param tag 标签
* @param key 自定义的key根据业务来定
* @param value 消息的内容
* 单向发送:通过调用 sendOneway() 方法发送消息,不关心发送结果,适用于对可靠性要求不高的场景。
*/
public void sendOnewayMessage(String topic, String tag, String key, String value){
String body = "topic"+topic+"】, tag"+tag+"】, key"+key+"】, value"+value+"";
try {
Message msg = new Message(topic,tag,key, value.getBytes(RemotingHelper.DEFAULT_CHARSET));
System.out.println("生产者发送消息:"+ JSON.toJSONString(value));
producer.sendOneway(msg);
} catch (UnsupportedEncodingException e) {
log.error("消息初始化失败body{}",body);
} catch (MQClientException | InterruptedException | RemotingException e) {
log.error("消息发送失败! body{}",body);
}
}
/**
* 批量发送消息
* @param messages 消息列表
* 批量发送:通过调用 send() 方法并传入多条消息,实现批量发送消息。
*/
public SendResult sendBatchMessage(List<Message> messages){
String body = messages.toString();
try {
System.out.println("生产者发送消息:"+ messages);
// 发送批量消息
SendResult sendResult = producer.send(messages);
return sendResult;
} catch (MQClientException | InterruptedException | RemotingException e) {
log.error("消息发送失败! body{}",body);
} catch (MQBrokerException e) {
throw new RuntimeException(e);
}
return null;
}
/**
* 事务消息发送
* @param topic 主题
* @param tag 标签
* @param key 自定义的key根据业务来定
* @param value 消息的内容
* 事务消息发送:通过使用事务监听器实现本地事务执行和消息发送的一致性。
*/
public SendResult sendThingMessage(String topic, String tag, String key, String value){
String body = "topic"+topic+"】, tag"+tag+"】, key"+key+"】, value"+value+"";
try {
// 实例化事务生产者
TransactionMQProducer transactionMQProducer = new TransactionMQProducer(producer.getProducerGroup());
transactionMQProducer.setNamesrvAddr(producer.getNamesrvAddr());
// 设置事务监听器
transactionMQProducer.setTransactionListener(new RocketMsgTransactionListenerImpl());
Message msg = new Message(topic,tag,key, value.getBytes(RemotingHelper.DEFAULT_CHARSET));
System.out.println("生产者发送消息:"+ JSON.toJSONString(value));
// 发送事务消息
TransactionSendResult sendResult = transactionMQProducer.sendMessageInTransaction(msg, null);
return sendResult;
} catch (UnsupportedEncodingException e) {
log.error("消息初始化失败body{}",body);
} catch (MQClientException e) {
log.error("消息发送失败! body{}",body);
}
return null;
}
/**
* 发送有序的消息
* @param messagesList Message集合
* @param messageQueueNumber 消息队列数量,根据实际情况设定
* 顺序发送: messageQueueNumber 表示消息的业务标识,可以根据具体需求进行设置来保证消息按顺序发送。
*/
public SendResult sendOrderlyMessage(List<Message> messagesList, Integer messageQueueNumber) {
SendResult result = null;
for (Message message : messagesList) {
try {
result = producer.send(message, (list, msg, arg) -> {
Integer queueNumber = (Integer) arg;
//int queueIndex = queueNumber % list.size();
return list.get(queueNumber);
}, messageQueueNumber);//根据编号取模,选择消息队列
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
log.error("发送有序消息失败");
return result;
}
}
return result;
}
/**
* 发送延迟消息
* @param topic 主题
* @param tag 标签
* @param key 自定义的key根据业务来定
* @param value 消息的内容
* 延迟发送:通过设置延迟级别来实现延迟发送消息。
*/
public SendResult sendDelayMessage(String topic, String tag, String key, String value)
{
SendResult result = null;
try
{
Message msg = new Message(topic,tag,key, value.getBytes(RemotingHelper.DEFAULT_CHARSET));
//设置消息延迟级别我这里设置5对应就是延时一分钟
// "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
msg.setDelayTimeLevel(4);
// 发送消息到一个Broker
result = producer.send(msg);
// 通过sendResult返回消息是否成功送达
log.info("发送延迟消息结果:======sendResult{}", result);
DateFormat format =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
log.info("发送时间:{}", format.format(new Date()));
return result;
}
catch (Exception e)
{
e.printStackTrace();
log.error("延迟消息队列推送消息异常:{},推送内容:{}", e.getMessage(), result);
}
return result;
}
/**
* 发送异步的消息
* @param topic 主题
* @param tag 标签
* @param key 自定义的key根据业务来定
* @param value 消息的内容
* 通过调用 send() 方法,并传入一个 SendCallback 对象,在发送消息的同时可以继续处理其他逻辑,消息发送结果通过回调函数通知。
*/
public SendResult sendAsyncProducerMessage(String topic, String tag, String key, String value){
try {
//创建一个消息实例,指定主题、标签和消息体。
Message msg = new Message(topic,tag,key, value.getBytes(RemotingHelper.DEFAULT_CHARSET));
System.out.println("生产者发送消息:"+ JSON.toJSONString(value));
producer.send(msg,new SendCallback() {
// 异步回调的处理
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d 异步发送消息成功 %s %n", msg, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d 异步发送消息失败 %s %n", msg, e);
e.printStackTrace();
}
});
} catch (MQClientException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
return null;
}
}

View File

@@ -0,0 +1,61 @@
package com.ruoyi.rocketmq.service;
import java.util.List;
import com.ruoyi.rocketmq.domain.InitialBandwidthTraffic;
/**
* 初始带宽流量Service接口
*
* @author gyt
* @date 2025-08-20
*/
public interface IInitialBandwidthTrafficService
{
/**
* 查询初始带宽流量
*
* @param id 初始带宽流量主键
* @return 初始带宽流量
*/
public InitialBandwidthTraffic selectInitialBandwidthTrafficById(Long id);
/**
* 查询初始带宽流量列表
*
* @param initialBandwidthTraffic 初始带宽流量
* @return 初始带宽流量集合
*/
public List<InitialBandwidthTraffic> selectInitialBandwidthTrafficList(InitialBandwidthTraffic initialBandwidthTraffic);
/**
* 新增初始带宽流量
*
* @param initialBandwidthTraffic 初始带宽流量
* @return 结果
*/
public int insertInitialBandwidthTraffic(InitialBandwidthTraffic initialBandwidthTraffic);
/**
* 修改初始带宽流量
*
* @param initialBandwidthTraffic 初始带宽流量
* @return 结果
*/
public int updateInitialBandwidthTraffic(InitialBandwidthTraffic initialBandwidthTraffic);
/**
* 批量删除初始带宽流量
*
* @param ids 需要删除的初始带宽流量主键集合
* @return 结果
*/
public int deleteInitialBandwidthTrafficByIds(Long[] ids);
/**
* 删除初始带宽流量信息
*
* @param id 初始带宽流量主键
* @return 结果
*/
public int deleteInitialBandwidthTrafficById(Long id);
}

View File

@@ -0,0 +1,96 @@
package com.ruoyi.rocketmq.service.impl;
import java.util.List;
import com.ruoyi.common.core.utils.DateUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.ruoyi.rocketmq.mapper.InitialBandwidthTrafficMapper;
import com.ruoyi.rocketmq.domain.InitialBandwidthTraffic;
import com.ruoyi.rocketmq.service.IInitialBandwidthTrafficService;
/**
* 初始带宽流量Service业务层处理
*
* @author gyt
* @date 2025-08-20
*/
@Service
public class InitialBandwidthTrafficServiceImpl implements IInitialBandwidthTrafficService
{
@Autowired
private InitialBandwidthTrafficMapper initialBandwidthTrafficMapper;
/**
* 查询初始带宽流量
*
* @param id 初始带宽流量主键
* @return 初始带宽流量
*/
@Override
public InitialBandwidthTraffic selectInitialBandwidthTrafficById(Long id)
{
return initialBandwidthTrafficMapper.selectInitialBandwidthTrafficById(id);
}
/**
* 查询初始带宽流量列表
*
* @param initialBandwidthTraffic 初始带宽流量
* @return 初始带宽流量
*/
@Override
public List<InitialBandwidthTraffic> selectInitialBandwidthTrafficList(InitialBandwidthTraffic initialBandwidthTraffic)
{
return initialBandwidthTrafficMapper.selectInitialBandwidthTrafficList(initialBandwidthTraffic);
}
/**
* 新增初始带宽流量
*
* @param initialBandwidthTraffic 初始带宽流量
* @return 结果
*/
@Override
public int insertInitialBandwidthTraffic(InitialBandwidthTraffic initialBandwidthTraffic)
{
initialBandwidthTraffic.setCreateTime(DateUtils.getNowDate());
return initialBandwidthTrafficMapper.insertInitialBandwidthTraffic(initialBandwidthTraffic);
}
/**
* 修改初始带宽流量
*
* @param initialBandwidthTraffic 初始带宽流量
* @return 结果
*/
@Override
public int updateInitialBandwidthTraffic(InitialBandwidthTraffic initialBandwidthTraffic)
{
initialBandwidthTraffic.setUpdateTime(DateUtils.getNowDate());
return initialBandwidthTrafficMapper.updateInitialBandwidthTraffic(initialBandwidthTraffic);
}
/**
* 批量删除初始带宽流量
*
* @param ids 需要删除的初始带宽流量主键
* @return 结果
*/
@Override
public int deleteInitialBandwidthTrafficByIds(Long[] ids)
{
return initialBandwidthTrafficMapper.deleteInitialBandwidthTrafficByIds(ids);
}
/**
* 删除初始带宽流量信息
*
* @param id 初始带宽流量主键
* @return 结果
*/
@Override
public int deleteInitialBandwidthTrafficById(Long id)
{
return initialBandwidthTrafficMapper.deleteInitialBandwidthTrafficById(id);
}
}