完善初始流量数据库读取mq入库

This commit is contained in:
gaoyutao
2025-08-22 18:13:10 +08:00
parent 34049d1c78
commit 259398bfcc
19 changed files with 334 additions and 264 deletions

View File

@@ -8,7 +8,7 @@ import com.ruoyi.rocketmq.domain.InitialBandwidthTraffic;
import com.ruoyi.rocketmq.enums.MessageCodeEnum;
import com.ruoyi.rocketmq.producer.ConsumeException;
import com.ruoyi.rocketmq.service.IInitialBandwidthTrafficService;
import com.ruoyi.system.api.RemoteUserService;
import com.ruoyi.system.api.RemoteRevenueConfigService;
import com.ruoyi.system.api.domain.EpsInitialTrafficDataRemote;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
@@ -31,13 +31,13 @@ public class RocketMsgListener implements MessageListenerConcurrently {
private final IInitialBandwidthTrafficService initialBandwidthTrafficService;
private final RemoteUserService remoteUserService;
private final RemoteRevenueConfigService remoteRevenueConfigService;
@Autowired
public RocketMsgListener(IInitialBandwidthTrafficService initialBandwidthTrafficService,
RemoteUserService remoteUserService) {
RemoteRevenueConfigService remoteRevenueConfigService) {
this.initialBandwidthTrafficService = initialBandwidthTrafficService;
this.remoteUserService = remoteUserService;
this.remoteRevenueConfigService = remoteRevenueConfigService;
}
/**
@@ -77,6 +77,16 @@ public class RocketMsgListener implements MessageListenerConcurrently {
case "NET":
handleNetMessage(message);
break;
case "CPU":
break;
case "SYSTEM":
break;
case "DISK":
break;
case "POINT":
break;
case "MEMORY":
break;
default:
log.warn("未知数据类型:{}",message.getDataType());
@@ -111,6 +121,10 @@ public class RocketMsgListener implements MessageListenerConcurrently {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
/**
* 网络流量数据入库
* @param message
*/
private void handleNetMessage(DeviceMessage message) {
List<InitialBandwidthTraffic> interfaces = JSON.parseArray(message.getData(), InitialBandwidthTraffic.class);
InitialBandwidthTraffic data = new InitialBandwidthTraffic();
@@ -119,12 +133,17 @@ public class RocketMsgListener implements MessageListenerConcurrently {
});
// 批量入库集合
data.setList(interfaces);
// 开始时间
String startTime = DateUtils.getTime();
// 初始流量数据入库
initialBandwidthTrafficService.batchInsert(data);
// 结束时间
String endTime = DateUtils.getTime();
EpsInitialTrafficDataRemote epsInitialTrafficDataRemote = new EpsInitialTrafficDataRemote();
epsInitialTrafficDataRemote.setCreateTime(DateUtils.getNowDate());
epsInitialTrafficDataRemote.setStartTime(startTime);
epsInitialTrafficDataRemote.setEndTime(endTime);
// 复制到业务初始库
remoteUserService.autoSaveServiceTrafficData(epsInitialTrafficDataRemote, SecurityConstants.FROM_SOURCE);
remoteRevenueConfigService.autoSaveServiceTrafficData(epsInitialTrafficDataRemote, SecurityConstants.INNER);
}
}

View File

@@ -1,26 +1,19 @@
package com.ruoyi.rocketmq.controller;
import java.util.List;
import java.io.IOException;
import javax.servlet.http.HttpServletResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.ruoyi.common.core.utils.poi.ExcelUtil;
import com.ruoyi.common.core.web.controller.BaseController;
import com.ruoyi.common.core.web.domain.AjaxResult;
import com.ruoyi.common.core.web.page.TableDataInfo;
import com.ruoyi.common.log.annotation.Log;
import com.ruoyi.common.log.enums.BusinessType;
import com.ruoyi.common.security.annotation.RequiresPermissions;
import com.ruoyi.rocketmq.domain.InitialBandwidthTraffic;
import com.ruoyi.rocketmq.service.IInitialBandwidthTrafficService;
import com.ruoyi.common.core.web.controller.BaseController;
import com.ruoyi.common.core.web.domain.AjaxResult;
import com.ruoyi.common.core.utils.poi.ExcelUtil;
import com.ruoyi.common.core.web.page.TableDataInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletResponse;
import java.util.List;
/**
* 初始带宽流量Controller
@@ -34,7 +27,6 @@ public class InitialBandwidthTrafficController extends BaseController
{
@Autowired
private IInitialBandwidthTrafficService initialBandwidthTrafficService;
/**
* 查询初始带宽流量列表
*/