修复时间戳与数据表入库时间差1秒问题,开发重新计算接口,交换机详情增加业务代码,业务名称,修复各个表业务信息不匹配问题
This commit is contained in:
@@ -23,6 +23,7 @@ import org.springframework.util.CollectionUtils;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.math.BigDecimal;
|
||||
import java.math.RoundingMode;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Function;
|
||||
@@ -94,9 +95,6 @@ public class RocketMsgListener implements MessageListenerConcurrently {
|
||||
if (MessageCodeEnum.AGENT_MESSAGE_TOPIC.getCode().equals(topic)) {
|
||||
// 拿到信息
|
||||
DeviceMessage message = JSON.parseObject(body, DeviceMessage.class);
|
||||
String duringTime = DateUtils.parseDateToStr("yyyy-MM-dd HH:mm:ss",message.getReceiveTime());
|
||||
message.setDuringTime(duringTime);
|
||||
message.setReceiveTime(DateUtils.dateTime("yyyy-MM-dd HH:mm:ss",duringTime));
|
||||
switch (message.getDataType()){
|
||||
case "NET":
|
||||
handleNetMessage(message);
|
||||
@@ -164,18 +162,23 @@ public class RocketMsgListener implements MessageListenerConcurrently {
|
||||
private void handleNetMessage(DeviceMessage message) {
|
||||
List<InitialBandwidthTraffic> interfaces = JsonDataParser.parseJsonData(message.getData(), InitialBandwidthTraffic.class);
|
||||
if(!interfaces.isEmpty()){
|
||||
// 时间戳转换
|
||||
long timestamp = interfaces.get(0).getTimestamp();
|
||||
long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp;
|
||||
Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒
|
||||
String timeStr = DateUtils.parseDateToStr("yyyy-MM-dd HH:mm:ss",createTime);
|
||||
InitialBandwidthTraffic data = new InitialBandwidthTraffic();
|
||||
interfaces.forEach(iface -> {
|
||||
iface.setClientId(message.getClientId());
|
||||
iface.setCreateTime(message.getReceiveTime());
|
||||
iface.setCreateTime(createTime);
|
||||
});
|
||||
// 批量入库集合
|
||||
data.setList(interfaces);
|
||||
// 初始流量数据入库
|
||||
initialBandwidthTrafficService.batchInsert(data);
|
||||
EpsInitialTrafficDataRemote epsInitialTrafficDataRemote = new EpsInitialTrafficDataRemote();
|
||||
epsInitialTrafficDataRemote.setStartTime(message.getDuringTime());
|
||||
epsInitialTrafficDataRemote.setEndTime(message.getDuringTime());
|
||||
epsInitialTrafficDataRemote.setStartTime(timeStr);
|
||||
epsInitialTrafficDataRemote.setEndTime(timeStr);
|
||||
// 复制到业务初始库
|
||||
remoteRevenueConfigService.autoSaveServiceTrafficData(epsInitialTrafficDataRemote, SecurityConstants.INNER);
|
||||
}else{
|
||||
@@ -189,9 +192,13 @@ public class RocketMsgListener implements MessageListenerConcurrently {
|
||||
private void handleDockerMessage(DeviceMessage message) {
|
||||
List<InitialDockerInfo> dockers = JsonDataParser.parseJsonData(message.getData(), InitialDockerInfo.class);
|
||||
if(!dockers.isEmpty()){
|
||||
// 时间戳转换
|
||||
long timestamp = dockers.get(0).getTimestamp();
|
||||
long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp;
|
||||
Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒
|
||||
dockers.forEach(iface -> {
|
||||
iface.setClientId(message.getClientId());
|
||||
iface.setCreateTime(message.getReceiveTime());
|
||||
iface.setCreateTime(createTime);
|
||||
});
|
||||
// 初始容器数据入库
|
||||
initialDockerInfoService.batchInsertInitialDockerInfo(dockers);
|
||||
@@ -205,10 +212,14 @@ public class RocketMsgListener implements MessageListenerConcurrently {
|
||||
*/
|
||||
private void handleCpuMessage(DeviceMessage message) {
|
||||
List<InitialCpuInfo> cpus = JsonDataParser.parseJsonData(message.getData(),InitialCpuInfo.class);
|
||||
// 时间戳转换
|
||||
long timestamp = cpus.get(0).getTimestamp();
|
||||
long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp;
|
||||
Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒
|
||||
if(!cpus.isEmpty()){
|
||||
cpus.forEach(iface -> {
|
||||
iface.setClientId(message.getClientId());
|
||||
iface.setCreateTime(message.getReceiveTime());
|
||||
iface.setCreateTime(createTime);
|
||||
});
|
||||
// 初始CPU数据入库
|
||||
initialCpuInfoService.batchInsertInitialCpuInfo(cpus);
|
||||
@@ -222,10 +233,14 @@ public class RocketMsgListener implements MessageListenerConcurrently {
|
||||
*/
|
||||
private void handleDiskMessage(DeviceMessage message) {
|
||||
List<InitialDiskInfo> disks = JsonDataParser.parseJsonData(message.getData(), InitialDiskInfo.class);
|
||||
// 时间戳转换
|
||||
long timestamp = disks.get(0).getTimestamp();
|
||||
long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp;
|
||||
Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒
|
||||
if(!disks.isEmpty()){
|
||||
disks.forEach(iface -> {
|
||||
iface.setClientId(message.getClientId());
|
||||
iface.setCreateTime(message.getReceiveTime());
|
||||
iface.setCreateTime(createTime);
|
||||
});
|
||||
// 初始磁盘数据入库
|
||||
initialDiskInfoService.batchInsertInitialDiskInfo(disks);
|
||||
@@ -240,9 +255,13 @@ public class RocketMsgListener implements MessageListenerConcurrently {
|
||||
private void handleMemoryMessage(DeviceMessage message) {
|
||||
List<InitialMemoryInfo> memorys = JsonDataParser.parseJsonData(message.getData(), InitialMemoryInfo.class);
|
||||
if(!memorys.isEmpty()){
|
||||
// 时间戳转换
|
||||
long timestamp = memorys.get(0).getTimestamp();
|
||||
long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp;
|
||||
Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒
|
||||
memorys.forEach(iface -> {
|
||||
iface.setClientId(message.getClientId());
|
||||
iface.setCreateTime(message.getReceiveTime());
|
||||
iface.setCreateTime(createTime);
|
||||
});
|
||||
// 初始内存数据入库
|
||||
initialMemoryInfoService.batchInsertInitialMemoryInfo(memorys);
|
||||
@@ -257,9 +276,13 @@ public class RocketMsgListener implements MessageListenerConcurrently {
|
||||
private void handleMountPointMessage(DeviceMessage message) {
|
||||
List<InitialMountPointInfo> mountPointInfos = JsonDataParser.parseJsonData(message.getData(), InitialMountPointInfo.class);
|
||||
if(!mountPointInfos.isEmpty()){
|
||||
// 时间戳转换
|
||||
long timestamp = mountPointInfos.get(0).getTimestamp();
|
||||
long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp;
|
||||
Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒
|
||||
mountPointInfos.forEach(iface -> {
|
||||
iface.setClientId(message.getClientId());
|
||||
iface.setCreateTime(message.getReceiveTime());
|
||||
iface.setCreateTime(createTime);
|
||||
});
|
||||
// 初始挂载点数据入库
|
||||
initialMountPointInfoService.batchInsertInitialMountPointInfo(mountPointInfos);
|
||||
@@ -274,6 +297,11 @@ public class RocketMsgListener implements MessageListenerConcurrently {
|
||||
private void handleSwitchMessage(DeviceMessage message) {
|
||||
List<InitialSwitchInfo> switchInfos = JsonDataParser.parseJsonData(message.getData(), InitialSwitchInfo.class);
|
||||
if(!switchInfos.isEmpty()){
|
||||
// 时间戳转换
|
||||
long timestamp = switchInfos.get(0).getTimestamp();
|
||||
long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp;
|
||||
Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒
|
||||
String timeStr = DateUtils.parseDateToStr("yyyy-MM-dd HH:mm:ss",createTime);
|
||||
// 查询临时表信息,计算实际流量值
|
||||
InitialSwitchInfoTemp temp = new InitialSwitchInfoTemp();
|
||||
temp.setClientId(message.getClientId());
|
||||
@@ -293,7 +321,7 @@ public class RocketMsgListener implements MessageListenerConcurrently {
|
||||
// 3. 计算速度
|
||||
switchInfos.forEach(switchInfo -> {
|
||||
switchInfo.setClientId(message.getClientId());
|
||||
switchInfo.setCreateTime(message.getReceiveTime());
|
||||
switchInfo.setCreateTime(createTime);
|
||||
InitialSwitchInfoTemp tempInfo = tempMap.get(switchInfo.getName());
|
||||
if (tempInfo != null) {
|
||||
// 计算inSpeed
|
||||
@@ -319,8 +347,8 @@ public class RocketMsgListener implements MessageListenerConcurrently {
|
||||
// 业务表入库
|
||||
InitialSwitchInfoDetailsRemote detailsRemote = new InitialSwitchInfoDetailsRemote();
|
||||
detailsRemote.setClientId(message.getClientId());
|
||||
detailsRemote.setStartTime(message.getDuringTime());
|
||||
detailsRemote.setEndTime(message.getDuringTime());
|
||||
detailsRemote.setStartTime(timeStr);
|
||||
detailsRemote.setEndTime(timeStr);
|
||||
remoteRevenueConfigService.autoSaveSwitchTraffic(detailsRemote, SecurityConstants.INNER);
|
||||
}else{
|
||||
throw new RuntimeException("交换机data数据为空");
|
||||
@@ -333,9 +361,13 @@ public class RocketMsgListener implements MessageListenerConcurrently {
|
||||
private void handleSystemMessage(DeviceMessage message) {
|
||||
List<InitialSystemInfo> systemInfos = JsonDataParser.parseJsonData(message.getData(), InitialSystemInfo.class);
|
||||
if(!systemInfos.isEmpty()){
|
||||
// 时间戳转换
|
||||
long timestamp = systemInfos.get(0).getTimestamp();
|
||||
long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp;
|
||||
Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒
|
||||
systemInfos.forEach(iface -> {
|
||||
iface.setClientId(message.getClientId());
|
||||
iface.setCreateTime(message.getReceiveTime());
|
||||
iface.setCreateTime(createTime);
|
||||
});
|
||||
// 初始系统数据入库
|
||||
initialSystemInfoService.batchInsertInitialSystemInfo(systemInfos);
|
||||
|
||||
Reference in New Issue
Block a user