初始流量数据库读取mq入库

This commit is contained in:
gaoyutao
2025-08-21 18:29:22 +08:00
parent 80dbba70fe
commit 34049d1c78
30 changed files with 912 additions and 446 deletions

View File

@@ -78,5 +78,12 @@ public interface RemoteUserService
*/
@PostMapping("/batch")
public R<Boolean> batchInitialTraffic(@RequestBody EpsInitialTrafficDataRemote queryParam, @RequestHeader(SecurityConstants.FROM_SOURCE) String source);
/**
* 保存流量数据
* @param queryParam 流量数据列表
* @return 操作结果
*/
@PostMapping("/revenueConfig/autoSaveServiceTrafficData")
public R<Boolean> autoSaveServiceTrafficData(@RequestBody EpsInitialTrafficDataRemote queryParam, @RequestHeader(SecurityConstants.FROM_SOURCE) String source);
}

View File

@@ -1,10 +1,10 @@
package com.ruoyi.system.api.domain;
import com.ruoyi.common.core.annotation.Excel;
import com.ruoyi.common.core.web.domain.BaseEntity;
import lombok.Data;
import java.time.LocalDateTime;
import java.util.LinkedHashSet;
import java.util.Set;
import java.math.BigDecimal;
/**
* EPS初始流量数据实体类
@@ -12,98 +12,47 @@ import java.util.Set;
* 支持按时间自动分表存储每月分成3个表
*/
@Data
public class EpsInitialTrafficDataRemote {
/** 主键ID */
public class EpsInitialTrafficDataRemote extends BaseEntity {
private static final long serialVersionUID = 1L;
/** 唯一标识ID */
private Long id;
/**
* 动态表名
* 格式eps_traffic_[年]_[月]_[日期范围]
* 示例eps_traffic_2023_08_1_10
*/
private String tableName;
/** 节点名称 */
@Excel(name = "节点名称")
private String nodeName;
/** 流量统计开始时间 */
private LocalDateTime startTime;
/** 收益方式(1.流量,2包端) */
@Excel(name = "收益方式(1.流量,2包端)")
private String revenueMethod;
/** 流量统计结束时间 */
private LocalDateTime endTime;
/** 硬件SN */
@Excel(name = "硬件SN")
private String hardwareSn;
/** 接收流量单位MB */
private Double receiveTraffic;
/** 流量网口 */
@Excel(name = "流量网口")
private String trafficPort;
/** 发送流量单位MB */
private Double sendTraffic;
/** 95带宽值(Mbps) */
@Excel(name = "95带宽值(Mbps)")
private BigDecimal bandwidth95;
/** 设备序列号 */
private String deviceSn;
/** 包端带宽值 */
@Excel(name = "包端带宽值")
private BigDecimal packageBandwidth;
/** 数据创建时间 */
private LocalDateTime createTime;
/** 业务名称 */
@Excel(name = "业务名称")
private String businessName;
/** 备用字段1 */
private String remark1;
/** 业务代码(12位) */
@Excel(name = "业务代码(12位)")
private String businessCode;
/** 备用字段2 */
private String remark2;
/** 注册状态 */
@Excel(name = "注册状态")
private String registrationStatus;
/** 备用字段3 */
private String remark3;
/** 备用字段4 */
private String remark4;
/**
* 根据createTime自动计算表名
* 分表规则每月分成3个表1-10日、11-20日、21-31日
*/
public void calculateTableName() {
if (this.createTime != null) {
int year = createTime.getYear();
int month = createTime.getMonthValue();
int day = createTime.getDayOfMonth();
String range;
if (day <= 10) {
range = "1_10";
} else if (day <= 20) {
range = "11_20";
} else {
range = "21_31";
}
this.tableName = String.format("eps_traffic_%d_%02d_%s", year, month, range);
}
}
/**
* 获取时间范围内涉及的所有表名
* @param start 开始时间
* @param end 结束时间
* @return 涉及的动态表名集合
*/
public static Set<String> getAffectedTables(LocalDateTime start, LocalDateTime end) {
Set<String> tables = new LinkedHashSet<>();
LocalDateTime current = start;
while (!current.isAfter(end)) {
int year = current.getYear();
int month = current.getMonthValue();
int day = current.getDayOfMonth();
String range;
if (day <= 10) {
range = "1_10";
} else if (day <= 20) {
range = "11_20";
} else {
range = "21_31";
}
tables.add(String.format("eps_traffic_%d_%02d_%s", year, month, range));
current = current.plusDays(1);
}
return tables;
}
}

View File

@@ -63,6 +63,10 @@ public class RemoteUserFallbackFactory implements FallbackFactory<RemoteUserServ
public R<Boolean> batchInitialTraffic(EpsInitialTrafficDataRemote queryParam, String source) {
return R.fail("新增初始流量数据失败:" + throwable.getMessage());
}
@Override
public R<Boolean> autoSaveServiceTrafficData(EpsInitialTrafficDataRemote queryParam, String source) {
return R.fail("保存流量数据失败:" + throwable.getMessage());
}
};
}
}

View File

@@ -1,15 +1,14 @@
package com.ruoyi.common.core.utils;
import org.apache.commons.lang3.time.DateFormatUtils;
import java.lang.management.ManagementFactory;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.*;
import java.util.Calendar;
import java.util.Date;
import org.apache.commons.lang3.time.DateFormatUtils;
import java.util.TimeZone;
/**
* 时间工具类
@@ -40,7 +39,9 @@ public class DateUtils extends org.apache.commons.lang3.time.DateUtils
*/
public static Date getNowDate()
{
return new Date();
Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("Asia/Shanghai"));
Date now = calendar.getTime();
return now;
}
/**

View File

@@ -22,4 +22,5 @@ public class TableScheduleConfig {
public void createNextMonthTables() {
epsInitialTrafficDataService.createNextMonthTables();
}
}

View File

@@ -7,6 +7,7 @@ import com.ruoyi.common.core.web.page.PageDomain;
import com.ruoyi.common.core.web.page.TableDataInfo;
import com.ruoyi.common.log.annotation.Log;
import com.ruoyi.common.log.enums.BusinessType;
import com.ruoyi.common.security.annotation.InnerAuth;
import com.ruoyi.common.security.annotation.RequiresPermissions;
import com.ruoyi.system.domain.EpsServerRevenueConfig;
import com.ruoyi.system.service.IEpsServerRevenueConfigService;
@@ -70,4 +71,15 @@ public class EpsServerRevenueConfigController extends BaseController
return toAjax(epsServerRevenueConfigService.updateEpsServerRevenueConfig(epsServerRevenueConfig));
}
/**
* 流量相关数据入库
*/
@Log(title = "流量相关数据入库", businessType = BusinessType.EXPORT)
@InnerAuth
@PostMapping("/autoSaveServiceTrafficData")
public void autoSaveServiceTrafficData(@RequestBody EpsServerRevenueConfig epsServerRevenueConfig)
{
epsServerRevenueConfigService.autoSaveServiceTrafficData(epsServerRevenueConfig);
}
}

View File

@@ -1,8 +1,10 @@
package com.ruoyi.system.domain;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.ruoyi.common.core.annotation.Excel;
import lombok.Data;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.List;
@@ -13,15 +15,82 @@ import java.util.List;
*/
@Data
public class EpsInitialTrafficData {
/** 主键ID */
private static final long serialVersionUID = 1L;
/** 唯一标识ID */
private Long id;
/** 接口名称 */
@Excel(name = "接口名称")
private String interfaceName;
/** MAC地址 */
@Excel(name = "MAC地址")
private String macAddress;
/** 运行状态 */
@Excel(name = "运行状态")
private String operationStatus;
/** 接口类型 */
@Excel(name = "接口类型")
private String interfaceType;
/** IPv4地址 */
@Excel(name = "IPv4地址")
private String ipv4Address;
/** 入站丢包率(%) */
@Excel(name = "入站丢包率(%)")
private BigDecimal inboundPacketLoss;
/** 出站丢包率(%) */
@Excel(name = "出站丢包率(%)")
private BigDecimal outboundPacketLoss;
/** 接收带宽(Mbps) */
@Excel(name = "接收带宽(Mbps)")
private BigDecimal receiveBandwidth;
/** 发送带宽(Mbps) */
@Excel(name = "发送带宽(Mbps)")
private BigDecimal sendBandwidth;
/** 业务代码 */
@Excel(name = "业务代码")
private String businessId;
/** 业务名称 */
@Excel(name = "业务名称")
private String businessName;
/** 服务器SN */
@Excel(name = "服务器SN")
private String serviceSn;
/** 服务器名称 */
@Excel(name = "服务器名称")
private String nodeName;
/** 收益方式(1.流量,2包端) */
@Excel(name = "收益方式(1.流量,2包端)")
private String revenueMethod;
/** 包端带宽值 */
@Excel(name = "包端带宽值")
private BigDecimal packageBandwidth;
/** 批量插入集合 **/
private List<EpsInitialTrafficData> dataList;
/**
* 动态表名
* 格式eps_traffic_[年]_[月]_[日期范围]
* 示例eps_traffic_2023_08_1_10
*/
private String tableName;
/** 设备唯一标识 */
private String clientId;
/** 流量统计开始时间 */
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@@ -31,36 +100,11 @@ public class EpsInitialTrafficData {
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private LocalDateTime endTime;
/** 接收流量单位MB */
private Double receiveTraffic;
/** 发送流量单位MB */
private Double sendTraffic;
/** 设备序列号 */
private String deviceSn;
/** 流量端口 */
private String trafficPort;
/** 业务代码 */
private String businesId;
/** 数据创建时间 */
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private LocalDateTime createTime;
/** 备用字段1 */
private String remark1;
/** 备用字段2 */
private String remark2;
/** 备用字段3 */
private String remark3;
/** 备用字段4 */
private String remark4;
/** 批量插入集合 **/
private List<EpsInitialTrafficData> dataList;
}

View File

@@ -34,4 +34,11 @@ public interface EpsInitialTrafficDataMapper {
* @return 流量数据列表
*/
List<EpsInitialTrafficData> selectByCondition(EpsInitialTrafficData condition);
/**
* 查询初始流量信息
* @param condition 查询条件实体
* @return 初始流量数据列表
*/
List<EpsInitialTrafficData> getAllTraficMsg(EpsInitialTrafficData condition);
}

View File

@@ -4,6 +4,7 @@ import com.ruoyi.system.domain.EpsServerRevenueConfig;
import org.springframework.data.repository.query.Param;
import java.util.List;
import java.util.Map;
/**
* 服务器收益方式配置Mapper接口
@@ -68,4 +69,12 @@ public interface EpsServerRevenueConfigMapper
* @return 数据条数
*/
public int countBySn(@Param("hardwareSn") String hardwareSn);
/**
* 查询服务器信息
*
* @param ipAddress ipv4地址
* @return 服务器收益方式配置
*/
public Map getNodeMsgByIp(@Param("ipAddress") String ipAddress);
}

View File

@@ -33,5 +33,11 @@ public interface EpsInitialTrafficDataService {
* @return 流量数据列表
*/
List<EpsInitialTrafficData> query(EpsInitialTrafficData queryParam);
/**
* 查询初始流量信息
* @param queryParam 查询参数实体
* @return 初始流量数据列表
*/
List<EpsInitialTrafficData> getAllTraficMsg(EpsInitialTrafficData queryParam);
}

View File

@@ -1,8 +1,9 @@
package com.ruoyi.system.service;
import java.util.List;
import com.ruoyi.system.domain.EpsServerRevenueConfig;
import java.util.List;
/**
* 服务器收益方式配置Service接口
*
@@ -58,4 +59,11 @@ public interface IEpsServerRevenueConfigService
* @return 结果
*/
public int deleteEpsServerRevenueConfigById(Long id);
/**
* 保存业务相关流量表
* 每5分钟自动执行
* @param epsServerRevenueConfig
*/
void autoSaveServiceTrafficData(EpsServerRevenueConfig epsServerRevenueConfig);
}

View File

@@ -151,14 +151,35 @@ public class EpsInitialTrafficDataServiceImpl implements EpsInitialTrafficDataSe
.flatMap(tableName -> {
EpsInitialTrafficData condition = new EpsInitialTrafficData();
condition.setTableName(tableName);
condition.setDeviceSn(queryParam.getDeviceSn());
condition.setServiceSn(queryParam.getServiceSn());
condition.setStartTime(queryParam.getStartTime());
condition.setEndTime(queryParam.getEndTime());
condition.setReceiveTraffic(queryParam.getReceiveTraffic());
condition.setSendTraffic(queryParam.getSendTraffic());
condition.setBusinessId(queryParam.getBusinessId());
condition.setBusinessName(queryParam.getBusinessName());
return epsInitialTrafficDataMapper.selectByCondition(condition).stream();
})
.collect(Collectors.toList());
}
/**
* 查询初始流量信息
*/
@Override
public List<EpsInitialTrafficData> getAllTraficMsg(EpsInitialTrafficData queryParam) {
// 获取涉及的表名
Set<String> tableNames = TableRouterUtil.getTableNamesBetweenInitial(queryParam.getStartTime(), queryParam.getEndTime());
// 并行查询各表
return tableNames.parallelStream()
.flatMap(tableName -> {
EpsInitialTrafficData condition = new EpsInitialTrafficData();
condition.setTableName(tableName);
condition.setStartTime(queryParam.getStartTime());
condition.setEndTime(queryParam.getEndTime());
condition.setInterfaceName(queryParam.getInterfaceName());
return epsInitialTrafficDataMapper.getAllTraficMsg(condition).stream();
})
.collect(Collectors.toList());
}
}

View File

@@ -6,6 +6,7 @@ import com.ruoyi.system.domain.EpsNodeBandwidth;
import com.ruoyi.system.mapper.EpsNodeBandwidthMapper;
import com.ruoyi.system.service.EpsInitialTrafficDataService;
import com.ruoyi.system.service.IEpsNodeBandwidthService;
import com.ruoyi.system.util.DateUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@@ -13,8 +14,10 @@ import org.webjars.NotFoundException;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.time.*;
import java.util.Date;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.YearMonth;
import java.util.List;
/**
@@ -60,10 +63,10 @@ public class EpsNodeBandwidthServiceImpl implements IEpsNodeBandwidthService
// 2. 设置查询条件
EpsInitialTrafficData queryParams = new EpsInitialTrafficData();
queryParams.setDeviceSn(epsNodeBandwidth.getHardwareSn());
queryParams.setServiceSn(epsNodeBandwidth.getHardwareSn());
// 3. 根据带宽类型设置时间范围
LocalDateTime createTime = dateToLocalDateTime(epsNodeBandwidth.getCreateTime());
LocalDateTime createTime = DateUtil.dateToLocalDateTime(epsNodeBandwidth.getCreateTime());
setTimeRangeByBandwidthType(queryParams, epsNodeBandwidth.getBandwidthType(), createTime);
// 4. 查询并返回数据
@@ -176,16 +179,6 @@ public class EpsNodeBandwidthServiceImpl implements IEpsNodeBandwidthService
return epsNodeBandwidthMapper.getAvgDetailMsg(epsNodeBandwidth);
}
/**
* Date 类型 转为 LocalDateTime 类型
* @param date
* @return
*/
public static LocalDateTime dateToLocalDateTime(Date date) {
return date.toInstant()
.atZone(ZoneId.systemDefault())
.toLocalDateTime();
}
/**
* 根据带宽类型设置时间范围
*/

View File

@@ -2,14 +2,18 @@ package com.ruoyi.system.service.impl;
import com.ruoyi.common.core.utils.DateUtils;
import com.ruoyi.common.security.utils.SecurityUtils;
import com.ruoyi.system.domain.EpsInitialTrafficData;
import com.ruoyi.system.domain.EpsMethodChangeRecord;
import com.ruoyi.system.domain.EpsServerRevenueConfig;
import com.ruoyi.system.mapper.EpsServerRevenueConfigMapper;
import com.ruoyi.system.service.EpsInitialTrafficDataService;
import com.ruoyi.system.service.IEpsServerRevenueConfigService;
import com.ruoyi.system.util.DateUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
/**
* 服务器收益方式配置Service业务层处理
@@ -22,6 +26,8 @@ public class EpsServerRevenueConfigServiceImpl implements IEpsServerRevenueConfi
{
@Autowired
private EpsServerRevenueConfigMapper epsServerRevenueConfigMapper;
@Autowired
private EpsInitialTrafficDataService epsInitialTrafficDataService;
/**
* 查询服务器收益方式配置
@@ -78,34 +84,88 @@ public class EpsServerRevenueConfigServiceImpl implements IEpsServerRevenueConfi
// 修改条数
int rows = epsServerRevenueConfigMapper.updateEpsServerRevenueConfig(epsServerRevenueConfig);
if(rows>=0){
EpsMethodChangeRecord epsMethodChangeRecord = new EpsMethodChangeRecord();
// 节点名称
epsMethodChangeRecord.setNodeName(epsServerRevenueConfig.getNodeName());
// 硬件sn
epsMethodChangeRecord.setHardwareSn(epsServerRevenueConfig.getHardwareSn());
// 修改时间
epsMethodChangeRecord.setCreateTime(DateUtils.getNowDate());
// 修改人
epsMethodChangeRecord.setCreatBy(SecurityUtils.getUsername());
// 修改内容
String content = "";
if("流量".equals(nowRevenueMethod)){
// 修改内容
content = "收益方式变为【"+nowRevenueMethod+"】,流量网口设置为"+epsServerRevenueConfig.getTrafficPort()+
",业务为"+epsServerRevenueConfig.getBusinessName();
}
if("包端".equals(nowRevenueMethod)){
// 修改内容
content = "收益方式变为【"+nowRevenueMethod+"】,流量网口设置为"+epsServerRevenueConfig.getTrafficPort()+
",带宽值设置为"+epsServerRevenueConfig.getPackageBandwidth()+"Mbps,"+"业务为"+epsServerRevenueConfig.getBusinessName();
}
epsMethodChangeRecord.setChangeContent(content);
// 添加操作记录
epsServerRevenueConfigMapper.insertEpsServerRevenueConfig(epsServerRevenueConfig);
addRecord(epsServerRevenueConfig);
// 添加包端数据到相关数据表
saveServiceTrafficData(epsServerRevenueConfig);
}
return rows;
}
/**
* 添加服务器配置操作记录
*/
private void addRecord(EpsServerRevenueConfig epsServerRevenueConfig) {
String nowRevenueMethod = getMethodName(epsServerRevenueConfig.getRevenueMethod());
EpsMethodChangeRecord changeRecord = new EpsMethodChangeRecord();
// 设置基本信息
changeRecord.setNodeName(epsServerRevenueConfig.getNodeName());
changeRecord.setHardwareSn(epsServerRevenueConfig.getHardwareSn());
changeRecord.setCreateTime(DateUtils.getNowDate());
changeRecord.setCreatBy(SecurityUtils.getUsername());
// 修改内容
String content = "";
if("流量".equals(nowRevenueMethod)){
// 修改内容
content = "收益方式变为【"+nowRevenueMethod+"】,流量网口设置为"+epsServerRevenueConfig.getTrafficPort()+
",业务为"+epsServerRevenueConfig.getBusinessName();
}
if("包端".equals(nowRevenueMethod)){
// 修改内容
content = "收益方式变为【"+nowRevenueMethod+"】,流量网口设置为"+epsServerRevenueConfig.getTrafficPort()+
",带宽值设置为"+epsServerRevenueConfig.getPackageBandwidth()+"Mbps,"+"业务为"+epsServerRevenueConfig.getBusinessName();
}
changeRecord.setChangeContent(content);
// 添加操作记录
epsServerRevenueConfigMapper.insertEpsServerRevenueConfig(epsServerRevenueConfig);
}
/**
* 保存流量信息
* @param epsServerRevenueConfig
*/
private void saveServiceTrafficData(EpsServerRevenueConfig epsServerRevenueConfig) {
// 查询初始流量数据
EpsInitialTrafficData epsInitialTrafficData = new EpsInitialTrafficData();
epsInitialTrafficData.setServiceSn(epsServerRevenueConfig.getHardwareSn());
epsInitialTrafficData.setBusinessId(epsServerRevenueConfig.getBusinessCode());
epsInitialTrafficData.setBusinessName(epsServerRevenueConfig.getBusinessName());
epsInitialTrafficData.setInterfaceName(epsServerRevenueConfig.getTrafficPort());
epsInitialTrafficData.setPackageBandwidth(epsServerRevenueConfig.getPackageBandwidth());
epsInitialTrafficData.setRevenueMethod(epsServerRevenueConfig.getRevenueMethod());
epsInitialTrafficData.setNodeName(epsServerRevenueConfig.getNodeName());
epsInitialTrafficData.setStartTime(DateUtil.dateToLocalDateTime(epsServerRevenueConfig.getCreateTime()));
epsInitialTrafficData.setEndTime(DateUtil.dateToLocalDateTime(epsServerRevenueConfig.getCreateTime()));
epsInitialTrafficDataService.save(epsInitialTrafficData);
}
/**
* 保存流量信息
* @param epsServerRevenueConfig
*/
@Override
public void autoSaveServiceTrafficData(EpsServerRevenueConfig epsServerRevenueConfig) {
// 查询初始流量数据
EpsInitialTrafficData epsInitialTrafficData = new EpsInitialTrafficData();
epsInitialTrafficData.setStartTime(DateUtil.dateToLocalDateTime(epsServerRevenueConfig.getCreateTime()));
epsInitialTrafficData.setEndTime(DateUtil.dateToLocalDateTime(epsServerRevenueConfig.getCreateTime()));
List<EpsInitialTrafficData> dataList = epsInitialTrafficDataService.getAllTraficMsg(epsInitialTrafficData);
for (EpsInitialTrafficData initialTrafficData : dataList) {
// 根据ip查询节点名称
String ip = initialTrafficData.getIpv4Address();
Map nodeMsg = epsServerRevenueConfigMapper.getNodeMsgByIp(ip);
// 赋值
initialTrafficData.setServiceSn(nodeMsg.get("hardwareSn") + "");
initialTrafficData.setNodeName(nodeMsg.get("resourceName") + "");
initialTrafficData.setRevenueMethod(nodeMsg.get("revenueMethod") + "");
initialTrafficData.setBusinessId(nodeMsg.get("businessCode") + "");
initialTrafficData.setBusinessName(nodeMsg.get("businessName") + "");
initialTrafficData.setStartTime(initialTrafficData.getCreateTime());
initialTrafficData.setEndTime(initialTrafficData.getCreateTime());
// 流量相关数据入库
epsInitialTrafficDataService.save(initialTrafficData);
}
}
/**
* 批量删除服务器收益方式配置
*

View File

@@ -0,0 +1,19 @@
package com.ruoyi.system.util;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
public class DateUtil {
/**
* Date 类型 转为 LocalDateTime 类型
* @param date
* @return
*/
public static LocalDateTime dateToLocalDateTime(Date date) {
return date.toInstant()
.atZone(ZoneId.systemDefault())
.toLocalDateTime();
}
}

View File

@@ -12,7 +12,9 @@ import java.util.Set;
public class TableRouterUtil {
// 表名前缀
private static final String TABLE_PREFIX = "eps_initial_traffic";
private static final String TABLE_PREFIX = "eps_traffic_details";
private static final String TABLE_PREFIX_INITIAL = "initial_bandwidth_traffic";
// 日期格式
private static final DateTimeFormatter YEAR_MONTH_FORMAT =
@@ -42,6 +44,19 @@ public class TableRouterUtil {
yearMonth,
getDayRange(day));
}
public static String getInitialTableName(LocalDateTime createTime) {
if (createTime == null) {
throw new IllegalArgumentException("创建时间不能为null");
}
String yearMonth = createTime.format(YEAR_MONTH_FORMAT);
int day = createTime.getDayOfMonth();
return String.format("%s_%s_%s",
TABLE_PREFIX_INITIAL,
yearMonth,
getDayRange(day));
}
/**
* 获取时间范围内涉及的所有表名
@@ -62,6 +77,25 @@ public class TableRouterUtil {
return tableNames;
}
/**
* 获取时间范围内涉及的所有表名
* @param startTime 开始时间(包含)
* @param endTime 结束时间(包含)
* @return 按时间顺序排列的表名集合
*/
public static Set<String> getTableNamesBetweenInitial(LocalDateTime startTime, LocalDateTime endTime) {
validateTimeRange(startTime, endTime);
Set<String> tableNames = new LinkedHashSet<>();
LocalDateTime current = startTime.withHour(0).withMinute(0).withSecond(0);
while (!current.isAfter(endTime)) {
tableNames.add(getInitialTableName(current));
current = current.plusDays(1);
}
return tableNames;
}
// 获取日期区间

View File

@@ -6,16 +6,16 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
<update id="createEpsTrafficTable">
CREATE TABLE IF NOT EXISTS ${tableName} (
id BIGINT(20) COMMENT '唯一标识ID',
interface_name VARCHAR(50) COMMENT '接口名称',
mac_address VARCHAR(17) COMMENT 'MAC地址',
operation_status VARCHAR(20) COMMENT '运行状态',
interface_type VARCHAR(30) COMMENT '接口类型',
ipv4_address VARCHAR(15) COMMENT 'IPv4地址',
inbound_packet_loss DECIMAL(5,2) COMMENT '入站丢包率(%)',
outbound_packet_loss DECIMAL(5,2) COMMENT '出站丢包率(%)',
receive_bandwidth DECIMAL(10,2) COMMENT '接收带宽(Mbps)',
send_bandwidth DECIMAL(10,2) COMMENT '发送带宽(Mbps)',
id BIGINT(20) AUTO_INCREMENT COMMENT '唯一标识ID',
`name` VARCHAR(100) COMMENT '接口名称',
`mac` VARCHAR(50) COMMENT 'MAC地址',
`status` VARCHAR(20) COMMENT '运行状态',
`type` VARCHAR(30) COMMENT '接口类型',
ipV4 VARCHAR(20) COMMENT 'IPv4地址',
`in_dropped` DECIMAL(5,2) COMMENT '入站丢包率(%)',
`out_dropped` DECIMAL(5,2) COMMENT '出站丢包率(%)',
`in_speed` varchar(20) COMMENT '接收带宽(Mbps)',
`out_speed` varchar(20) COMMENT '发送带宽(Mbps)',
business_id varchar(12) COMMENT '业务代码',
business_name varchar(50) COMMENT '业务名称',
service_sn varchar(50) COMMENT '服务器SN',
@@ -26,75 +26,125 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
create_by VARCHAR(50) COMMENT '创建人',
update_by VARCHAR(50) COMMENT '修改人',
client_id VARCHAR(64) COMMENT '设备唯一标识',
PRIMARY KEY (id),
INDEX idx_name_time (interface_name,create_time),
INDEX idx_name_time (`name`,create_time),
INDEX idx_revenue_method (revenue_method),
INDEX idx_service_sn (service_sn)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='EPS设备流量初始数据表';
</update>
<update id="createEpsInitialTrafficTable">
CREATE TABLE IF NOT EXISTS ${tableName} (
id BIGINT(20) COMMENT '唯一标识ID',
interface_name VARCHAR(50) COMMENT '接口名称',
mac_address VARCHAR(17) COMMENT 'MAC地址',
operation_status VARCHAR(20) COMMENT '运行状态',
interface_type VARCHAR(30) COMMENT '接口类型',
ipv4_address VARCHAR(15) COMMENT 'IPv4地址',
inbound_packet_loss DECIMAL(5,2) COMMENT '入站丢包率(%)',
outbound_packet_loss DECIMAL(5,2) COMMENT '出站丢包率(%)',
receive_bandwidth DECIMAL(10,2) COMMENT '接收带宽(Mbps)',
send_bandwidth DECIMAL(10,2) COMMENT '发送带宽(Mbps)',
id BIGINT(20) AUTO_INCREMENT COMMENT '唯一标识ID',
`name` VARCHAR(100) COMMENT '接口名称',
`mac` VARCHAR(20) COMMENT 'MAC地址',
`status` VARCHAR(20) COMMENT '运行状态',
`type` VARCHAR(30) COMMENT '接口类型',
ipV4 VARCHAR(20) COMMENT 'IPv4地址',
`in_dropped` DECIMAL(5,2) COMMENT '入站丢包率(%)',
`out_dropped` DECIMAL(5,2) COMMENT '出站丢包率(%)',
`in_speed` VARCHAR(20) COMMENT '接收带宽(Mbps)',
`out_speed` VARCHAR(20) COMMENT '发送带宽(Mbps)',
create_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
create_by VARCHAR(50) COMMENT '创建人',
update_by VARCHAR(50) COMMENT '修改人',
client_id VARCHAR(64) COMMENT '设备唯一标识',
PRIMARY KEY (id),
INDEX idx_name_time (interface_name,create_time)
INDEX idx_name_time (`name`,create_time)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='初始带宽流量表';
</update>
<!-- 单条插入 -->
<insert id="insert">
INSERT INTO ${tableName} (
receive_traffic,
send_traffic,
device_sn,
interface_name,
mac_address,
operation_status,
interface_type,
ipv4_address,
inbound_packet_loss,
outbound_packet_loss,
receive_bandwidth,
send_bandwidth,
business_id,
traffic_port,
create_time,
remark1, remark2, remark3, remark4
business_name,
service_sn,
node_name,
revenue_method,
package_bandwidth,
create_by,
update_by,
client_id
) VALUES (
#{receiveTraffic},
#{sendTraffic},
#{deviceSn},
#{interfaceName},
#{macAddress},
#{operationStatus},
#{interfaceType},
#{ipv4Address},
#{inboundPacketLoss},
#{outboundPacketLoss},
#{receiveBandwidth},
#{sendBandwidth},
#{businessId},
#{trafficPort},
#{createTime},
#{remark1}, #{remark2},
#{remark3}, #{remark4}
#{businessName},
#{serviceSn},
#{nodeName},
#{revenueMethod},
#{packageBandwidth},
#{createBy},
#{updateBy},
#{clientId}
)
</insert>
<!-- 批量插入 -->
<insert id="batchInsert">
INSERT INTO ${tableName} (
receive_traffic,
send_traffic,
device_sn,
id,
interface_name,
mac_address,
operation_status,
interface_type,
ipv4_address,
inbound_packet_loss,
outbound_packet_loss,
receive_bandwidth,
send_bandwidth,
business_id,
traffic_port,
business_name,
service_sn,
node_name,
revenue_method,
package_bandwidth,
create_time,
remark1, remark2, remark3, remark4
update_time,
create_by,
update_by,
client_id
) VALUES
<foreach collection="dataList" item="data" separator=",">
<foreach collection="list" item="data" separator=",">
(
#{data.receiveTraffic},
#{data.sendTraffic},
#{data.deviceSn},
#{data.businessId},
#{data.trafficPort},
#{data.createTime},
#{data.remark1}, #{data.remark2},
#{data.remark3}, #{data.remark4}
#{data.id,jdbcType=BIGINT},
#{data.interfaceName,jdbcType=VARCHAR},
#{data.macAddress,jdbcType=CHAR},
#{data.operationStatus,jdbcType=VARCHAR},
#{data.interfaceType,jdbcType=VARCHAR},
#{data.ipv4Address,jdbcType=VARCHAR},
#{data.inboundPacketLoss,jdbcType=DECIMAL},
#{data.outboundPacketLoss,jdbcType=DECIMAL},
#{data.receiveBandwidth,jdbcType=DECIMAL},
#{data.sendBandwidth,jdbcType=DECIMAL},
#{data.businessId,jdbcType=CHAR},
#{data.businessName,jdbcType=VARCHAR},
#{data.serviceSn,jdbcType=VARCHAR},
#{data.nodeName,jdbcType=VARCHAR},
#{data.revenueMethod,jdbcType=VARCHAR},
#{data.packageBandwidth,jdbcType=DECIMAL},
#{data.createTime,jdbcType=TIMESTAMP},
#{data.updateTime,jdbcType=TIMESTAMP},
#{data.createBy,jdbcType=VARCHAR},
#{data.updateBy,jdbcType=VARCHAR},
#{data.clientId,jdbcType=VARCHAR}
)
</foreach>
</insert>
@@ -102,18 +152,31 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
<!-- 条件查询 -->
<select id="selectByCondition" resultType="EpsInitialTrafficData">
SELECT
id,
receive_traffic as receiveTraffic,
send_traffic as sendTraffic,
device_sn as deviceSn,
business_id as businessId,
traffic_port as trafficPort,
create_time as createTime,
remark1, remark2, remark3, remark4
id,
`name` AS interfaceName,
`mac` AS macAddress,
`status` AS operationStatus,
`type` AS interfaceType,
`ipV4` AS ipv4Address,
`in_dropped` AS inboundPacketLoss,
`out_dropped` AS outboundPacketLoss,
`in_speed` AS receiveBandwidth,
`out_speed` AS sendBandwidth,
business_id AS businessId,
business_name AS businessName,
service_sn AS serviceSn,
node_name AS nodeName,
revenue_method AS revenueMethod,
package_bandwidth AS packageBandwidth,
create_time AS createTime,
update_time AS updateTime,
create_by AS createBy,
update_by AS updateBy,
client_id AS clientId
FROM ${tableName}
<where>
<if test="deviceSn != '' and deviceSn != null">
and device_sn = #{deviceSn}
<if test="serviceSn != '' and serviceSn != null">
and service_sn = #{serviceSn}
</if>
<if test="startTime != null">
and create_time &gt;= #{startTime}
@@ -125,4 +188,35 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
</where>
ORDER BY create_time desc
</select>
<!-- 条件查询 -->
<select id="getAllTraficMsg" resultType="EpsInitialTrafficData">
SELECT
id,
`name` AS interfaceName,
`mac` AS macAddress,
`status` AS operationStatus,
`type` AS interfaceType,
`ipV4` AS ipv4Address,
`in_dropped` AS inboundPacketLoss,
`out_dropped` AS outboundPacketLoss,
`in_speed` AS receiveBandwidth,
`out_speed` AS sendBandwidth,
create_time AS createTime,
update_time AS updateTime,
create_by AS createBy,
update_by AS updateBy,
client_id AS clientId
FROM ${tableName}
<where>
<if test="startTime != null">
and create_time &gt;= #{startTime}
</if>
<if test="endTime != null">
and create_time &lt;= #{endTime}
</if>
</where>
ORDER BY create_time desc
</select>
</mapper>

View File

@@ -116,4 +116,23 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
#{id}
</foreach>
</delete>
<select id="getNodeMsgByIp" parameterType="String" resultType="java.util.Map">
SELECT
rrr.hardware_sn AS hardwareSn,
rrr.resource_name AS resourceName,
esrc.revenue_method AS revenueMethod,
esrc.business_code AS businessCode,
esrc.business_name AS businessName
FROM
rm_resource_registration rrr
LEFT JOIN
eps_server_revenue_config esrc
ON
rrr.hardware_sn = esrc.hardware_sn
<where>
and rrr.registration_status = '1'
<if test="ipAddress != null and ipAddress != ''"> and rrr.ip_address = #{ipAddress}</if>
</where>
</select>
</mapper>

View File

@@ -1,9 +1,8 @@
package com.ruoyi.rocketmq.config;
import com.ruoyi.rocketmq.enums.MessageCodeEnum;
import com.ruoyi.rocketmq.consumer.RocketMsgListener;
import com.ruoyi.rocketmq.enums.MessageTopic;
import com.ruoyi.rocketmq.model.ConsumerMode;
import com.ruoyi.rocketmq.consumer.RocketMsgListener;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
@@ -26,6 +25,9 @@ public class ConsumerConfig {
@Autowired
private ConsumerMode consumerMode;
@Autowired
private RocketMsgListener rocketMsgListener;
@Bean
public DefaultMQPushConsumer getRocketMQConsumer() {
//构建客户端连接
@@ -34,7 +36,7 @@ public class ConsumerConfig {
consumer.setNamesrvAddr(consumerMode.getNamesrvAddr());
consumer.setConsumeThreadMin(consumerMode.getConsumeThreadMin());
consumer.setConsumeThreadMax(consumerMode.getConsumeThreadMax());
consumer.registerMessageListener(new RocketMsgListener());
consumer.registerMessageListener(rocketMsgListener);
/**
* 1. CONSUME_FROM_LAST_OFFSET第一次启动从队列最后位置消费后续再启动接着上次消费的进度开始消费
* 2. CONSUME_FROM_FIRST_OFFSET第一次启动从队列初始位置消费后续再启动接着上次消费的进度开始消费

View File

@@ -1,12 +1,21 @@
package com.ruoyi.rocketmq.consumer;
import com.alibaba.fastjson.JSON;
import com.ruoyi.common.core.constant.SecurityConstants;
import com.ruoyi.common.core.utils.DateUtils;
import com.ruoyi.rocketmq.domain.DeviceMessage;
import com.ruoyi.rocketmq.domain.InitialBandwidthTraffic;
import com.ruoyi.rocketmq.enums.MessageCodeEnum;
import com.ruoyi.rocketmq.producer.ConsumeException;
import com.ruoyi.rocketmq.service.IInitialBandwidthTrafficService;
import com.ruoyi.system.api.RemoteUserService;
import com.ruoyi.system.api.domain.EpsInitialTrafficDataRemote;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
@@ -20,6 +29,17 @@ import java.util.List;
@Component
public class RocketMsgListener implements MessageListenerConcurrently {
private final IInitialBandwidthTrafficService initialBandwidthTrafficService;
private final RemoteUserService remoteUserService;
@Autowired
public RocketMsgListener(IInitialBandwidthTrafficService initialBandwidthTrafficService,
RemoteUserService remoteUserService) {
this.initialBandwidthTrafficService = initialBandwidthTrafficService;
this.remoteUserService = remoteUserService;
}
/**
* 消费消息
* @param list msgs.size() >= 1
@@ -50,13 +70,19 @@ public class RocketMsgListener implements MessageListenerConcurrently {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//根据业务返回是否正常
}
// 根据不同的topic处理不同的业务 这里以订单消息为例子
if (MessageCodeEnum.ORDER_MESSAGE_TOPIC.getCode().equals(topic)) {
if (MessageCodeEnum.ORDER_TIMEOUT_TAG.getCode().equals(tags)) {
//处理你的业务
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//业务处理成功
} else {
log.info("未匹配到Tag【{}】" + tags);
if (MessageCodeEnum.AGENT_MESSAGE_TOPIC.getCode().equals(topic)) {
// 拿到信息
DeviceMessage message = JSON.parseObject(body, DeviceMessage.class);
switch (message.getDataType()){
case "NET":
handleNetMessage(message);
break;
default:
log.warn("未知数据类型:{}",message.getDataType());
}
//处理你的业务
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//业务处理成功
}
}
}
@@ -84,4 +110,21 @@ public class RocketMsgListener implements MessageListenerConcurrently {
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
private void handleNetMessage(DeviceMessage message) {
List<InitialBandwidthTraffic> interfaces = JSON.parseArray(message.getData(), InitialBandwidthTraffic.class);
InitialBandwidthTraffic data = new InitialBandwidthTraffic();
interfaces.forEach(iface -> {
iface.setClientId(message.getClientId());
});
// 批量入库集合
data.setList(interfaces);
// 初始流量数据入库
initialBandwidthTrafficService.batchInsert(data);
EpsInitialTrafficDataRemote epsInitialTrafficDataRemote = new EpsInitialTrafficDataRemote();
epsInitialTrafficDataRemote.setCreateTime(DateUtils.getNowDate());
// 复制到业务初始库
remoteUserService.autoSaveServiceTrafficData(epsInitialTrafficDataRemote, SecurityConstants.FROM_SOURCE);
}
}

View File

@@ -0,0 +1,13 @@
package com.ruoyi.rocketmq.domain;
import lombok.Data;
import java.time.LocalDateTime;
@Data
public class DeviceMessage {
private String clientId;
private String dataType;
private String data;
private LocalDateTime receiveTime = LocalDateTime.now();
}

View File

@@ -1,10 +1,11 @@
package com.ruoyi.rocketmq.domain;
import java.math.BigDecimal;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import com.ruoyi.common.core.annotation.Excel;
import com.ruoyi.common.core.web.domain.BaseEntity;
import lombok.Data;
import java.math.BigDecimal;
import java.util.List;
/**
* 初始带宽流量对象 initial_bandwidth_traffic
@@ -12,166 +13,56 @@ import com.ruoyi.common.core.web.domain.BaseEntity;
* @author gyt
* @date 2025-08-20
*/
@Data
public class InitialBandwidthTraffic extends BaseEntity
{
private static final long serialVersionUID = 1L;
/** 唯一标识ID */
private Long id;
/** 表名 */
private String tableName;
/** 接口名称 */
@Excel(name = "接口名称")
private String interfaceName;
private String name;
/** MAC地址 */
@Excel(name = "MAC地址")
private String macAddress;
private String mac;
/** 运行状态 */
@Excel(name = "运行状态")
private String operationStatus;
private String status;
/** 接口类型 */
@Excel(name = "接口类型")
private String interfaceType;
private String type;
/** IPv4地址 */
@Excel(name = "IPv4地址")
private String ipv4Address;
private String ipV4;
/** 入站丢包率(%) */
@Excel(name = "入站丢包率(%)")
private BigDecimal inboundPacketLoss;
private BigDecimal inDropped;
/** 出站丢包率(%) */
@Excel(name = "出站丢包率(%)")
private BigDecimal outboundPacketLoss;
private BigDecimal outDropped;
/** 接收带宽(Mbps) */
@Excel(name = "接收带宽(Mbps)")
private BigDecimal receiveBandwidth;
private String inSpeed;
/** 发送带宽(Mbps) */
@Excel(name = "发送带宽(Mbps)")
private BigDecimal sendBandwidth;
private String outSpeed;
public void setId(Long id)
{
this.id = id;
}
/** 设备唯一标识 */
@Excel(name = "设备唯一标识")
private String clientId;
/** 初始带宽流量集合 */
private List<InitialBandwidthTraffic> list;
public Long getId()
{
return id;
}
public void setInterfaceName(String interfaceName)
{
this.interfaceName = interfaceName;
}
public String getInterfaceName()
{
return interfaceName;
}
public void setMacAddress(String macAddress)
{
this.macAddress = macAddress;
}
public String getMacAddress()
{
return macAddress;
}
public void setOperationStatus(String operationStatus)
{
this.operationStatus = operationStatus;
}
public String getOperationStatus()
{
return operationStatus;
}
public void setInterfaceType(String interfaceType)
{
this.interfaceType = interfaceType;
}
public String getInterfaceType()
{
return interfaceType;
}
public void setIpv4Address(String ipv4Address)
{
this.ipv4Address = ipv4Address;
}
public String getIpv4Address()
{
return ipv4Address;
}
public void setInboundPacketLoss(BigDecimal inboundPacketLoss)
{
this.inboundPacketLoss = inboundPacketLoss;
}
public BigDecimal getInboundPacketLoss()
{
return inboundPacketLoss;
}
public void setOutboundPacketLoss(BigDecimal outboundPacketLoss)
{
this.outboundPacketLoss = outboundPacketLoss;
}
public BigDecimal getOutboundPacketLoss()
{
return outboundPacketLoss;
}
public void setReceiveBandwidth(BigDecimal receiveBandwidth)
{
this.receiveBandwidth = receiveBandwidth;
}
public BigDecimal getReceiveBandwidth()
{
return receiveBandwidth;
}
public void setSendBandwidth(BigDecimal sendBandwidth)
{
this.sendBandwidth = sendBandwidth;
}
public BigDecimal getSendBandwidth()
{
return sendBandwidth;
}
@Override
public String toString() {
return new ToStringBuilder(this,ToStringStyle.MULTI_LINE_STYLE)
.append("id", getId())
.append("interfaceName", getInterfaceName())
.append("macAddress", getMacAddress())
.append("operationStatus", getOperationStatus())
.append("interfaceType", getInterfaceType())
.append("ipv4Address", getIpv4Address())
.append("inboundPacketLoss", getInboundPacketLoss())
.append("outboundPacketLoss", getOutboundPacketLoss())
.append("receiveBandwidth", getReceiveBandwidth())
.append("sendBandwidth", getSendBandwidth())
.append("createTime", getCreateTime())
.append("updateTime", getUpdateTime())
.append("createBy", getCreateBy())
.append("updateBy", getUpdateBy())
.toString();
}
}

View File

@@ -0,0 +1,22 @@
package com.ruoyi.rocketmq.domain;
import com.alibaba.fastjson.JSON;
import lombok.Data;
import java.util.List;
@Data
public class NetInterfaceInfo {
private String name;
private String type;
private String status;
private String ipV4;
private String mac;
private String inSpeed;
private String outSpeed;
private int inDropped;
private int outDropped;
public static List<NetInterfaceInfo> parseNetData(String jsonData) {
return JSON.parseArray(jsonData, NetInterfaceInfo.class);
}
}

View File

@@ -10,6 +10,11 @@ import lombok.Getter;
@Getter
public enum MessageCodeEnum {
/**
* agent数据采集的信息
*/
AGENT_MESSAGE_TOPIC("agent_up","agent数据采集的信息topic"),
/**
* 系统消息
*/
@@ -44,6 +49,7 @@ public enum MessageCodeEnum {
*/
ORDER_TIMEOUT_TAG("order_timeout_tag","订单超时处理");
private final String code;
private final String msg;

View File

@@ -12,12 +12,8 @@ public class MessageTopic {
//在这里添加topic 用于批量订阅
public List<String> RocketMQTopicList(){
List<String> getTopicLists=new ArrayList<>();
//系统消息
getTopicLists.add("system-message");
//用户消息
getTopicLists.add("user-message");
//订单消息
getTopicLists.add("order-message");
// agent采集消息
getTopicLists.add("agent_up");
return getTopicLists;
}

View File

@@ -1,8 +1,9 @@
package com.ruoyi.rocketmq.mapper;
import java.util.List;
import com.ruoyi.rocketmq.domain.InitialBandwidthTraffic;
import java.util.List;
/**
* 初始带宽流量Mapper接口
*
@@ -58,4 +59,16 @@ public interface InitialBandwidthTrafficMapper
* @return 结果
*/
public int deleteInitialBandwidthTrafficByIds(Long[] ids);
/**
* 按时间分表插入流量数据
* @param data
* @return
*/
int insert(InitialBandwidthTraffic data);
/**
* 批量插入数据
* @param data 流量数据实体类
*/
int batchInsert(InitialBandwidthTraffic data);
}

View File

@@ -1,8 +1,9 @@
package com.ruoyi.rocketmq.service;
import java.util.List;
import com.ruoyi.rocketmq.domain.InitialBandwidthTraffic;
import java.util.List;
/**
* 初始带宽流量Service接口
*
@@ -58,4 +59,16 @@ public interface IInitialBandwidthTrafficService
* @return 结果
*/
public int deleteInitialBandwidthTrafficById(Long id);
/**
* 保存单条流量数据
* @param data 流量数据
*/
void save(InitialBandwidthTraffic data);
/**
* 保存单条流量数据
* @param data 流量数据
*/
void batchInsert(InitialBandwidthTraffic data);
}

View File

@@ -1,12 +1,23 @@
package com.ruoyi.rocketmq.service.impl;
import java.util.List;
import com.ruoyi.common.core.utils.DateUtils;
import com.ruoyi.rocketmq.domain.InitialBandwidthTraffic;
import com.ruoyi.rocketmq.mapper.InitialBandwidthTrafficMapper;
import com.ruoyi.rocketmq.service.IInitialBandwidthTrafficService;
import com.ruoyi.rocketmq.utils.TableRouterUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.ruoyi.rocketmq.mapper.InitialBandwidthTrafficMapper;
import com.ruoyi.rocketmq.domain.InitialBandwidthTraffic;
import com.ruoyi.rocketmq.service.IInitialBandwidthTrafficService;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* 初始带宽流量Service业务层处理
@@ -15,6 +26,7 @@ import com.ruoyi.rocketmq.service.IInitialBandwidthTrafficService;
* @date 2025-08-20
*/
@Service
@Slf4j
public class InitialBandwidthTrafficServiceImpl implements IInitialBandwidthTrafficService
{
@Autowired
@@ -51,6 +63,7 @@ public class InitialBandwidthTrafficServiceImpl implements IInitialBandwidthTraf
* @return 结果
*/
@Override
@Transactional(rollbackFor = Exception.class)
public int insertInitialBandwidthTraffic(InitialBandwidthTraffic initialBandwidthTraffic)
{
initialBandwidthTraffic.setCreateTime(DateUtils.getNowDate());
@@ -93,4 +106,73 @@ public class InitialBandwidthTrafficServiceImpl implements IInitialBandwidthTraf
{
return initialBandwidthTrafficMapper.deleteInitialBandwidthTrafficById(id);
}
/**
* 保存单条数据到对应分表
* @param data 流量数据
*/
@Override
@Transactional
public void save(InitialBandwidthTraffic data) {
if (data.getCreateTime() == null) {
data.setCreateTime(DateUtils.getNowDate());
}
Date date = data.getCreateTime();
LocalDateTime createTime = date.toInstant()
.atZone(ZoneId.systemDefault())
.toLocalDateTime();
data.setTableName(TableRouterUtil.getTableName(createTime));
initialBandwidthTrafficMapper.insert(data);
}
/**
* 保存多条数据到对应分表
* @param initialBandwidthTraffic 流量数据
*/
@Override
@Transactional
public void batchInsert(InitialBandwidthTraffic initialBandwidthTraffic) {
if (initialBandwidthTraffic == null) {
return;
}
List<InitialBandwidthTraffic> dataList = initialBandwidthTraffic.getList();
if (dataList.isEmpty()){
return;
}
// 按表名分组批量插入
Map<String, List<InitialBandwidthTraffic>> groupedData = dataList.stream()
.map(data -> {
try {
InitialBandwidthTraffic processed = new InitialBandwidthTraffic();
BeanUtils.copyProperties(data,processed);
if (data.getCreateTime() == null) {
data.setCreateTime(DateUtils.getNowDate());
}
LocalDateTime createTime = data.getCreateTime().toInstant()
.atZone(ZoneId.systemDefault())
.toLocalDateTime();
processed.setTableName(TableRouterUtil.getTableName(createTime));
return processed;
} catch (Exception e){
log.error("数据处理失败",e.getMessage());
return null;
}
}).collect(Collectors.groupingBy(
InitialBandwidthTraffic::getTableName,
LinkedHashMap::new, // 保持插入顺序
Collectors.toList()));
groupedData.forEach((tableName, list) -> {
try {
InitialBandwidthTraffic data = new InitialBandwidthTraffic();
BeanUtils.copyProperties(initialBandwidthTraffic,data);
data.setTableName(tableName);
data.setList(list);
initialBandwidthTrafficMapper.batchInsert(data);
} catch (Exception e) {
log.error("表{}插入失败", tableName, e);
throw new RuntimeException("批量插入失败", e);
}
});
}
}

View File

@@ -0,0 +1,85 @@
package com.ruoyi.rocketmq.utils;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.LinkedHashSet;
import java.util.Set;
public class TableRouterUtil {
// 表名前缀
private static final String TABLE_PREFIX = "eps_initial_traffic";
// 表名前缀
private static final String TABLE_PREFIX_INITIAL = "initial_bandwidth_traffic";
// 日期格式
private static final DateTimeFormatter YEAR_MONTH_FORMAT =
DateTimeFormatter.ofPattern("yyyy_MM");
/**
* 根据创建时间获取表名
* @param createTime 记录创建时间
* @return 对应的分表名称
* @throws IllegalArgumentException 如果createTime为null
*
* 示例:
* 2023-08-05 14:30:00 → eps_initial_traffic_2023_08_1_10
* 2023-08-15 09:15:00 → eps_initial_traffic_2023_08_11_20
* 2023-08-25 18:45:00 → eps_initial_traffic_2023_08_21_31
*/
public static String getTableName(LocalDateTime createTime) {
if (createTime == null) {
throw new IllegalArgumentException("创建时间不能为null");
}
String yearMonth = createTime.format(YEAR_MONTH_FORMAT);
int day = createTime.getDayOfMonth();
return String.format("%s_%s_%s",
TABLE_PREFIX_INITIAL,
yearMonth,
getDayRange(day));
}
/**
* 获取时间范围内涉及的所有表名
* @param startTime 开始时间(包含)
* @param endTime 结束时间(包含)
* @return 按时间顺序排列的表名集合
*/
public static Set<String> getTableNamesBetween(LocalDateTime startTime, LocalDateTime endTime) {
validateTimeRange(startTime, endTime);
Set<String> tableNames = new LinkedHashSet<>();
LocalDateTime current = startTime.withHour(0).withMinute(0).withSecond(0);
while (!current.isAfter(endTime)) {
tableNames.add(getTableName(current));
current = current.plusDays(1);
}
return tableNames;
}
// 获取日期区间
private static String getDayRange(int day) {
if (day < 1 || day > 31) {
throw new IllegalArgumentException("日期必须在1-31之间");
}
if (day <= 10) return "1_10";
if (day <= 20) return "11_20";
return "21_31";
}
// 验证时间范围
private static void validateTimeRange(LocalDateTime start, LocalDateTime end) {
if (start == null || end == null) {
throw new IllegalArgumentException("时间范围参数不能为null");
}
if (start.isAfter(end)) {
throw new IllegalArgumentException("开始时间不能晚于结束时间");
}
}
}

View File

@@ -5,111 +5,123 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
<mapper namespace="com.ruoyi.rocketmq.mapper.InitialBandwidthTrafficMapper">
<resultMap type="InitialBandwidthTraffic" id="InitialBandwidthTrafficResult">
<result property="id" column="id" />
<result property="interfaceName" column="interface_name" />
<result property="macAddress" column="mac_address" />
<result property="operationStatus" column="operation_status" />
<result property="interfaceType" column="interface_type" />
<result property="ipv4Address" column="ipv4_address" />
<result property="inboundPacketLoss" column="inbound_packet_loss" />
<result property="outboundPacketLoss" column="outbound_packet_loss" />
<result property="receiveBandwidth" column="receive_bandwidth" />
<result property="sendBandwidth" column="send_bandwidth" />
<result property="createTime" column="create_time" />
<result property="updateTime" column="update_time" />
<result property="createBy" column="create_by" />
<result property="updateBy" column="update_by" />
<result property="id" column="id"/>
<result property="name" column="name"/>
<result property="mac" column="mac"/>
<result property="status" column="status"/>
<result property="type" column="type"/>
<result property="ipV4" column="ipV4"/>
<result property="inDropped" column="in_dropped"/>
<result property="outDropped" column="out_dropped"/>
<result property="inSpeed" column="in_speed"/>
<result property="outSpeed" column="out_speed"/>
<result property="createTime" column="create_time"/>
<result property="updateTime" column="update_time"/>
<result property="createBy" column="create_by"/>
<result property="updateBy" column="update_by"/>
<result property="clientId" column="client_id"/>
</resultMap>
<sql id="selectInitialBandwidthTrafficVo">
select id, interface_name, mac_address, operation_status, interface_type, ipv4_address, inbound_packet_loss, outbound_packet_loss, receive_bandwidth, send_bandwidth, create_time, update_time, create_by, update_by from initial_bandwidth_traffic
</sql>
<sql id="selectInitialBandwidthTrafficVo"> select id, name, mac, status, type, ipV4, in_dropped, out_dropped, in_speed, out_speed, create_time, update_time, create_by, update_by, client_id from initial_bandwidth_traffic </sql>
<select id="selectInitialBandwidthTrafficList" parameterType="InitialBandwidthTraffic" resultMap="InitialBandwidthTrafficResult">
<include refid="selectInitialBandwidthTrafficVo"/>
<where>
<if test="interfaceName != null and interfaceName != ''"> and interface_name = #{interfaceName}</if>
<if test="macAddress != null and macAddress != ''"> and mac_address = #{macAddress}</if>
<if test="operationStatus != null and operationStatus != ''"> and operation_status = #{operationStatus}</if>
<if test="interfaceType != null and interfaceType != ''"> and interface_type = #{interfaceType}</if>
<if test="ipv4Address != null and ipv4Address != ''"> and ipv4_address = #{ipv4Address}</if>
<if test="inboundPacketLoss != null "> and inbound_packet_loss = #{inboundPacketLoss}</if>
<if test="outboundPacketLoss != null "> and outbound_packet_loss = #{outboundPacketLoss}</if>
<if test="receiveBandwidth != null "> and receive_bandwidth = #{receiveBandwidth}</if>
<if test="sendBandwidth != null "> and send_bandwidth = #{sendBandwidth}</if>
<if test="name != null and name != ''"> and name like concat('%', #{name}, '%')</if>
<if test="mac != null and mac != ''"> and mac = #{mac}</if>
<if test="status != null and status != ''"> and status = #{status}</if>
<if test="type != null and type != ''"> and type = #{type}</if>
<if test="ipV4 != null and ipV4 != ''"> and ipV4 = #{ipV4}</if>
<if test="inDropped != null "> and in_dropped = #{inDropped}</if>
<if test="outDropped != null "> and out_dropped = #{outDropped}</if>
<if test="inSpeed != null and inSpeed != ''"> and in_speed = #{inSpeed}</if>
<if test="outSpeed != null and outSpeed != ''"> and out_speed = #{outSpeed}</if>
<if test="clientId != null and clientId != ''"> and client_id = #{clientId}</if>
</where>
</select>
<select id="selectInitialBandwidthTrafficById" parameterType="Long" resultMap="InitialBandwidthTrafficResult">
<include refid="selectInitialBandwidthTrafficVo"/>
where id = #{id}
</select>
<insert id="insertInitialBandwidthTraffic" parameterType="InitialBandwidthTraffic">
<insert id="insertInitialBandwidthTraffic" parameterType="InitialBandwidthTraffic" useGeneratedKeys="true" keyProperty="id">
insert into initial_bandwidth_traffic
<trim prefix="(" suffix=")" suffixOverrides=",">
<if test="id != null">id,</if>
<if test="interfaceName != null">interface_name,</if>
<if test="macAddress != null">mac_address,</if>
<if test="operationStatus != null">operation_status,</if>
<if test="interfaceType != null">interface_type,</if>
<if test="ipv4Address != null">ipv4_address,</if>
<if test="inboundPacketLoss != null">inbound_packet_loss,</if>
<if test="outboundPacketLoss != null">outbound_packet_loss,</if>
<if test="receiveBandwidth != null">receive_bandwidth,</if>
<if test="sendBandwidth != null">send_bandwidth,</if>
<if test="name != null">name,</if>
<if test="mac != null">mac,</if>
<if test="status != null">status,</if>
<if test="type != null">type,</if>
<if test="ipV4 != null">ipV4,</if>
<if test="inDropped != null">in_dropped,</if>
<if test="outDropped != null">out_dropped,</if>
<if test="inSpeed != null">in_speed,</if>
<if test="outSpeed != null">out_speed,</if>
<if test="createTime != null">create_time,</if>
<if test="updateTime != null">update_time,</if>
<if test="createBy != null">create_by,</if>
<if test="updateBy != null">update_by,</if>
</trim>
<if test="clientId != null">client_id,</if>
</trim>
<trim prefix="values (" suffix=")" suffixOverrides=",">
<if test="id != null">#{id},</if>
<if test="interfaceName != null">#{interfaceName},</if>
<if test="macAddress != null">#{macAddress},</if>
<if test="operationStatus != null">#{operationStatus},</if>
<if test="interfaceType != null">#{interfaceType},</if>
<if test="ipv4Address != null">#{ipv4Address},</if>
<if test="inboundPacketLoss != null">#{inboundPacketLoss},</if>
<if test="outboundPacketLoss != null">#{outboundPacketLoss},</if>
<if test="receiveBandwidth != null">#{receiveBandwidth},</if>
<if test="sendBandwidth != null">#{sendBandwidth},</if>
<if test="name != null">#{name},</if>
<if test="mac != null">#{mac},</if>
<if test="status != null">#{status},</if>
<if test="type != null">#{type},</if>
<if test="ipV4 != null">#{ipV4},</if>
<if test="inDropped != null">#{inDropped},</if>
<if test="outDropped != null">#{outDropped},</if>
<if test="inSpeed != null">#{inSpeed},</if>
<if test="outSpeed != null">#{outSpeed},</if>
<if test="createTime != null">#{createTime},</if>
<if test="updateTime != null">#{updateTime},</if>
<if test="createBy != null">#{createBy},</if>
<if test="updateBy != null">#{updateBy},</if>
</trim>
<if test="clientId != null">#{clientId},</if>
</trim>
</insert>
<update id="updateInitialBandwidthTraffic" parameterType="InitialBandwidthTraffic">
update initial_bandwidth_traffic
<trim prefix="SET" suffixOverrides=",">
<if test="interfaceName != null">interface_name = #{interfaceName},</if>
<if test="macAddress != null">mac_address = #{macAddress},</if>
<if test="operationStatus != null">operation_status = #{operationStatus},</if>
<if test="interfaceType != null">interface_type = #{interfaceType},</if>
<if test="ipv4Address != null">ipv4_address = #{ipv4Address},</if>
<if test="inboundPacketLoss != null">inbound_packet_loss = #{inboundPacketLoss},</if>
<if test="outboundPacketLoss != null">outbound_packet_loss = #{outboundPacketLoss},</if>
<if test="receiveBandwidth != null">receive_bandwidth = #{receiveBandwidth},</if>
<if test="sendBandwidth != null">send_bandwidth = #{sendBandwidth},</if>
<if test="name != null">name = #{name},</if>
<if test="mac != null">mac = #{mac},</if>
<if test="status != null">status = #{status},</if>
<if test="type != null">type = #{type},</if>
<if test="ipV4 != null">ipV4 = #{ipV4},</if>
<if test="inDropped != null">in_dropped = #{inDropped},</if>
<if test="outDropped != null">out_dropped = #{outDropped},</if>
<if test="inSpeed != null">in_speed = #{inSpeed},</if>
<if test="outSpeed != null">out_speed = #{outSpeed},</if>
<if test="createTime != null">create_time = #{createTime},</if>
<if test="updateTime != null">update_time = #{updateTime},</if>
<if test="createBy != null">create_by = #{createBy},</if>
<if test="updateBy != null">update_by = #{updateBy},</if>
<if test="clientId != null">client_id = #{clientId},</if>
</trim>
where id = #{id}
</update>
<delete id="deleteInitialBandwidthTrafficById" parameterType="Long">
delete from initial_bandwidth_traffic where id = #{id}
</delete>
<delete id="deleteInitialBandwidthTrafficById" parameterType="Long"> delete from initial_bandwidth_traffic where id = #{id} </delete>
<delete id="deleteInitialBandwidthTrafficByIds" parameterType="String">
delete from initial_bandwidth_traffic where id in
<foreach item="id" collection="array" open="(" separator="," close=")">
#{id}
</foreach>
<foreach item="id" collection="array" open="(" separator="," close=")"> #{id} </foreach>
</delete>
<!-- 单条插入 -->
<insert id="insert" parameterType="InitialBandwidthTraffic">
INSERT INTO ${tableName} (
`name`, `mac`, `status`, `type`, ipV4, `in_dropped`, `out_dropped`,
`in_speed`, `out_speed`, create_by, update_by, client_id
) VALUES (
#{name}, #{mac}, #{status}, #{type}, #{ipV4}, #{inDropped}, #{outDropped},
#{inSpeed}, #{outSpeed}, #{createBy}, #{updateBy}, #{clientId}
)
</insert>
<insert id="batchInsert" parameterType="InitialBandwidthTraffic">
INSERT INTO ${tableName} (
`name`, `mac`, `status`, `type`, ipV4, `in_dropped`, `out_dropped`,
`in_speed`, `out_speed`,create_by, update_by, client_id
) VALUES
<foreach collection="list" item="item" separator=",">
(
#{item.name}, #{item.mac}, #{item.status}, #{item.type}, #{item.ipV4}, #{item.inDropped}, #{item.outDropped},
#{item.inSpeed}, #{item.outSpeed}, #{item.createBy}, #{item.updateBy}, #{item.clientId}
)
</foreach>
</insert>
</mapper>