解决多线程导致某些数据表数据重复问题。
This commit is contained in:
@@ -11,15 +11,14 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Isolation;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.math.RoundingMode;
|
||||
import java.time.LocalDate;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.time.ZoneId;
|
||||
import java.util.*;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@@ -102,45 +101,65 @@ public class EpsInitialTrafficDataServiceImpl implements EpsInitialTrafficDataSe
|
||||
* @param epsInitialTrafficData 流量数据表
|
||||
*/
|
||||
@Override
|
||||
@Transactional
|
||||
@Transactional(rollbackFor = Exception.class, isolation = Isolation.READ_COMMITTED)
|
||||
public void saveBatch(EpsInitialTrafficData epsInitialTrafficData) {
|
||||
if (epsInitialTrafficData == null) {
|
||||
if (epsInitialTrafficData == null || epsInitialTrafficData.getDataList().isEmpty()) {
|
||||
return;
|
||||
}
|
||||
List<EpsInitialTrafficData> dataList = epsInitialTrafficData.getDataList();
|
||||
if (dataList.isEmpty()){
|
||||
return;
|
||||
}
|
||||
// 按表名分组批量插入
|
||||
Map<String, List<EpsInitialTrafficData>> groupedData = dataList.stream()
|
||||
.map(data -> {
|
||||
try {
|
||||
EpsInitialTrafficData processed = new EpsInitialTrafficData();
|
||||
BeanUtils.copyProperties(data,processed);
|
||||
if (data.getCreateTime() == null) {
|
||||
data.setCreateTime(DateUtils.getNowDate());
|
||||
}
|
||||
processed.setTableName(TableRouterUtil.getTableName(DateUtil.dateToLocalDateTime(data.getCreateTime())));
|
||||
return processed;
|
||||
} catch (Exception e){
|
||||
log.error("数据处理失败",e.getMessage());
|
||||
return null;
|
||||
}
|
||||
}).collect(Collectors.groupingBy(
|
||||
EpsInitialTrafficData::getTableName,
|
||||
LinkedHashMap::new, // 保持插入顺序
|
||||
Collectors.toList()));
|
||||
|
||||
// 内存去重(基于唯一键)
|
||||
List<EpsInitialTrafficData> distinctList = epsInitialTrafficData.getDataList().stream()
|
||||
.filter(Objects::nonNull)
|
||||
.map(data -> {
|
||||
EpsInitialTrafficData processed = new EpsInitialTrafficData();
|
||||
BeanUtils.copyProperties(data, processed);
|
||||
if (data.getCreateTime() == null) {
|
||||
processed.setCreateTime(DateUtils.getNowDate());
|
||||
}
|
||||
return processed;
|
||||
})
|
||||
.collect(Collectors.collectingAndThen(
|
||||
// 使用TreeSet按唯一键去重
|
||||
Collectors.toCollection(() -> new TreeSet<>(Comparator.comparing(
|
||||
d -> String.join("|",
|
||||
d.getClientId(),
|
||||
d.getMac(),
|
||||
d.getName(),
|
||||
d.getCreateTime().toString()
|
||||
)
|
||||
))),
|
||||
ArrayList::new
|
||||
));
|
||||
|
||||
// 按表名分组
|
||||
Map<String, List<EpsInitialTrafficData>> groupedData = distinctList.stream()
|
||||
.map(data -> {
|
||||
data.setTableName(TableRouterUtil.getTableName(
|
||||
data.getCreateTime().toInstant()
|
||||
.atZone(ZoneId.systemDefault())
|
||||
.toLocalDateTime()
|
||||
));
|
||||
return data;
|
||||
})
|
||||
.collect(Collectors.groupingBy(
|
||||
EpsInitialTrafficData::getTableName,
|
||||
LinkedHashMap::new,
|
||||
Collectors.toList()
|
||||
));
|
||||
|
||||
// 分表插入(带冲突降级)
|
||||
groupedData.forEach((tableName, list) -> {
|
||||
try {
|
||||
EpsInitialTrafficData data = new EpsInitialTrafficData();
|
||||
BeanUtils.copyProperties(epsInitialTrafficData,data);
|
||||
data.setTableName(tableName);
|
||||
data.setDataList(list);
|
||||
epsInitialTrafficDataMapper.batchInsert(data);
|
||||
EpsInitialTrafficData batchData = new EpsInitialTrafficData();
|
||||
BeanUtils.copyProperties(epsInitialTrafficData, batchData);
|
||||
batchData.setTableName(tableName);
|
||||
batchData.setDataList(list);
|
||||
|
||||
// 优先尝试批量插入
|
||||
epsInitialTrafficDataMapper.batchInsert(batchData);
|
||||
} catch (Exception e) {
|
||||
log.error("表{}插入失败", tableName, e);
|
||||
throw new RuntimeException("批量插入失败", e);
|
||||
log.error("表 {} 插入失败", tableName, e);
|
||||
throw new RuntimeException("数据入库失败", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -11,6 +11,8 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.dao.DuplicateKeyException;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Isolation;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.math.RoundingMode;
|
||||
@@ -119,12 +121,15 @@ public class InitialSwitchInfoDetailsServiceImpl implements IInitialSwitchInfoDe
|
||||
* 保存交换机流量信息
|
||||
* @param initialSwitchInfoDetails
|
||||
*/
|
||||
@Transactional(rollbackFor = Exception.class, isolation = Isolation.READ_COMMITTED)
|
||||
public R<String> autoSaveSwitchTraffic(InitialSwitchInfoDetails initialSwitchInfoDetails) {
|
||||
// 查询初始流量数据
|
||||
List<InitialSwitchInfoDetails> dataList = initialSwitchInfoDetailsMapper.getAllSwitchInfoMsg(initialSwitchInfoDetails);
|
||||
List<InitialSwitchInfoDetails> batchList = new ArrayList<>();
|
||||
if(!dataList.isEmpty()){
|
||||
for (InitialSwitchInfoDetails details : dataList) {
|
||||
// id自增
|
||||
details.setId(null);
|
||||
// 根据接口名称查询交换机信息
|
||||
String interfaceName = details.getName();
|
||||
String switchIp = details.getSwitchIp();
|
||||
@@ -160,10 +165,10 @@ public class InitialSwitchInfoDetailsServiceImpl implements IInitialSwitchInfoDe
|
||||
details.setBusinessCode(null);
|
||||
details.setBusinessName(null);
|
||||
}
|
||||
// id自增
|
||||
details.setId(null);
|
||||
batchList.add(details);
|
||||
}
|
||||
}else{
|
||||
batchList.add(details);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -200,12 +205,6 @@ public class InitialSwitchInfoDetailsServiceImpl implements IInitialSwitchInfoDe
|
||||
if (StringUtils.isBlank(name) || StringUtils.isBlank(ip) || StringUtils.isBlank(clientId)) {
|
||||
return;
|
||||
}
|
||||
AllInterfaceName allInterfaceName = new AllInterfaceName();
|
||||
allInterfaceName.setClientId(clientId);
|
||||
allInterfaceName.setSwitchIp(ip);
|
||||
allInterfaceName.setInterfaceName(name);
|
||||
allInterfaceName.setResourceType("2");
|
||||
List<AllInterfaceName> existingNames = allInterfaceNameMapper.selectByNames(allInterfaceName);
|
||||
AllInterfaceName record = new AllInterfaceName();
|
||||
record.setInterfaceName(name);
|
||||
record.setClientId(data.getClientId());
|
||||
@@ -217,6 +216,7 @@ public class InitialSwitchInfoDetailsServiceImpl implements IInitialSwitchInfoDe
|
||||
record.setSwitchName(data.getSwitchName());
|
||||
record.setSwitchSn(data.getSwitchSn());
|
||||
record.setSwitchIp(data.getSwitchIp());
|
||||
List<AllInterfaceName> existingNames = allInterfaceNameMapper.selectByNames(record);
|
||||
// 根据服务器sn查询业务
|
||||
if(data.getServerSn() != null){
|
||||
EpsServerRevenueConfig epsServerRevenueConfig = new EpsServerRevenueConfig();
|
||||
|
||||
@@ -6,55 +6,57 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
|
||||
|
||||
<update id="createEpsTrafficTable">
|
||||
CREATE TABLE IF NOT EXISTS ${tableName} (
|
||||
id BIGINT(20) AUTO_INCREMENT COMMENT '唯一标识ID',
|
||||
`name` VARCHAR(100) COMMENT '接口名称',
|
||||
`mac` VARCHAR(50) COMMENT 'MAC地址',
|
||||
`status` VARCHAR(20) COMMENT '运行状态',
|
||||
`type` VARCHAR(30) COMMENT '接口类型',
|
||||
ipV4 VARCHAR(20) COMMENT 'IPv4地址',
|
||||
`in_dropped` DECIMAL(20,2) COMMENT '入站丢包率(%)',
|
||||
`out_dropped` DECIMAL(20,2) COMMENT '出站丢包率(%)',
|
||||
`in_speed` varchar(50) COMMENT '接收带宽(Mbps)',
|
||||
`out_speed` varchar(50) COMMENT '发送带宽(Mbps)',
|
||||
`speed` varchar(100) COMMENT '协商速度',
|
||||
`duplex` varchar(100) COMMENT '工作模式',
|
||||
business_id varchar(50) COMMENT '业务代码',
|
||||
business_name varchar(200) COMMENT '业务名称',
|
||||
service_sn varchar(64) COMMENT '服务器SN',
|
||||
node_name varchar(200) COMMENT '服务器名称',
|
||||
revenue_method varchar(10) COMMENT '收益方式(1.流量,2包端)',
|
||||
package_bandwidth DECIMAL(10,2) COMMENT '包端带宽值',
|
||||
create_time DATETIME COMMENT '创建时间',
|
||||
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
|
||||
create_by VARCHAR(50) COMMENT '创建人',
|
||||
update_by VARCHAR(50) COMMENT '修改人',
|
||||
client_id VARCHAR(64) COMMENT '设备唯一标识',
|
||||
PRIMARY KEY (id),
|
||||
INDEX idx_name_time (`name`,create_time),
|
||||
INDEX idx_revenue_method (revenue_method),
|
||||
INDEX idx_service_sn (service_sn)
|
||||
id BIGINT(20) AUTO_INCREMENT COMMENT '唯一标识ID',
|
||||
`name` VARCHAR(100) COMMENT '接口名称',
|
||||
`mac` VARCHAR(50) COMMENT 'MAC地址',
|
||||
`status` VARCHAR(20) COMMENT '运行状态',
|
||||
`type` VARCHAR(30) COMMENT '接口类型',
|
||||
ipV4 VARCHAR(20) COMMENT 'IPv4地址',
|
||||
`in_dropped` DECIMAL(20,2) COMMENT '入站丢包率(%)',
|
||||
`out_dropped` DECIMAL(20,2) COMMENT '出站丢包率(%)',
|
||||
`in_speed` varchar(50) COMMENT '接收带宽(Mbps)',
|
||||
`out_speed` varchar(50) COMMENT '发送带宽(Mbps)',
|
||||
`speed` varchar(100) COMMENT '协商速度',
|
||||
`duplex` varchar(100) COMMENT '工作模式',
|
||||
business_id varchar(50) COMMENT '业务代码',
|
||||
business_name varchar(200) COMMENT '业务名称',
|
||||
service_sn varchar(64) COMMENT '服务器SN',
|
||||
node_name varchar(200) COMMENT '服务器名称',
|
||||
revenue_method varchar(10) COMMENT '收益方式(1.流量,2包端)',
|
||||
package_bandwidth DECIMAL(10,2) COMMENT '包端带宽值',
|
||||
create_time DATETIME COMMENT '创建时间',
|
||||
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
|
||||
create_by VARCHAR(50) COMMENT '创建人',
|
||||
update_by VARCHAR(50) COMMENT '修改人',
|
||||
client_id VARCHAR(255) COMMENT '设备唯一标识',
|
||||
PRIMARY KEY (id),
|
||||
UNIQUE KEY uk_client_mac_name_time (client_id, mac, name, create_time),
|
||||
INDEX idx_name_time (`name`,create_time),
|
||||
INDEX idx_revenue_method (revenue_method),
|
||||
INDEX idx_service_sn (service_sn)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='EPS设备流量初始数据表';
|
||||
</update>
|
||||
<update id="createEpsInitialTrafficTable">
|
||||
CREATE TABLE IF NOT EXISTS ${tableName} (
|
||||
id BIGINT(20) AUTO_INCREMENT COMMENT '唯一标识ID',
|
||||
`name` VARCHAR(100) COMMENT '接口名称',
|
||||
`mac` VARCHAR(20) COMMENT 'MAC地址',
|
||||
`status` VARCHAR(20) COMMENT '运行状态',
|
||||
`name` VARCHAR(100) COMMENT '接口名称',
|
||||
`mac` VARCHAR(20) COMMENT 'MAC地址',
|
||||
`status` VARCHAR(20) COMMENT '运行状态',
|
||||
`type` VARCHAR(30) COMMENT '接口类型',
|
||||
ipV4 VARCHAR(20) COMMENT 'IPv4地址',
|
||||
`in_dropped` DECIMAL(20,2) COMMENT '入站丢包率(%)',
|
||||
`out_dropped` DECIMAL(20,2) COMMENT '出站丢包率(%)',
|
||||
`in_speed` VARCHAR(20) COMMENT '接收带宽(Mbps)',
|
||||
`out_speed` VARCHAR(20) COMMENT '发送带宽(Mbps)',
|
||||
`in_speed` VARCHAR(50) COMMENT '接收带宽(Mbps)',
|
||||
`out_speed` VARCHAR(50) COMMENT '发送带宽(Mbps)',
|
||||
`speed` varchar(100) COMMENT '协商速度',
|
||||
`duplex` varchar(100) COMMENT '工作模式',
|
||||
create_time DATETIME COMMENT '创建时间',
|
||||
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
|
||||
create_by VARCHAR(50) COMMENT '创建人',
|
||||
update_by VARCHAR(50) COMMENT '修改人',
|
||||
client_id VARCHAR(64) COMMENT '设备唯一标识',
|
||||
client_id VARCHAR(255) COMMENT '设备唯一标识',
|
||||
PRIMARY KEY (id),
|
||||
UNIQUE KEY uk_client_mac_name_time (client_id, mac, name, create_time),
|
||||
INDEX idx_name_time (`name`,create_time)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='初始带宽流量表';
|
||||
</update>
|
||||
@@ -107,7 +109,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
|
||||
|
||||
<!-- 批量插入语句 -->
|
||||
<insert id="batchInsert">
|
||||
INSERT INTO ${tableName} (
|
||||
INSERT IGNORE INTO ${tableName} (
|
||||
id,
|
||||
`name`,
|
||||
`mac`,
|
||||
|
||||
@@ -183,7 +183,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
|
||||
ORDER BY create_time DESC
|
||||
</select>
|
||||
<insert id="saveBatchSwitchTraffic" parameterType="InitialSwitchInfoDetails">
|
||||
INSERT INTO initial_switch_info_details
|
||||
INSERT IGNORE INTO initial_switch_info_details
|
||||
(
|
||||
client_id,
|
||||
`name`,
|
||||
|
||||
Reference in New Issue
Block a user