package com.ruoyi.rocketmq.handler; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.ruoyi.common.core.constant.SecurityConstants; import com.ruoyi.common.core.domain.R; import com.ruoyi.common.core.enums.MsgEnum; import com.ruoyi.common.core.utils.DateUtils; import com.ruoyi.common.core.utils.StringUtils; import com.ruoyi.rocketmq.domain.*; import com.ruoyi.rocketmq.domain.vo.CollectDataVo; import com.ruoyi.rocketmq.domain.vo.RegisterMsgVo; import com.ruoyi.rocketmq.domain.vo.RspVo; import com.ruoyi.rocketmq.service.*; import com.ruoyi.rocketmq.utils.DataProcessUtil; import com.ruoyi.rocketmq.utils.JsonDataParser; import com.ruoyi.rocketmq.utils.SwitchJsonDataParser; import com.ruoyi.system.api.RemoteRevenueConfigService; import com.ruoyi.system.api.domain.*; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.dao.DataAccessException; import org.springframework.data.redis.core.RedisOperations; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.SessionCallback; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.math.BigDecimal; import java.math.RoundingMode; import java.util.*; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; /** * 设备消息处理器 */ @Slf4j @Component @EnableScheduling public class MessageHandler { private final Map> messageHandlers = new HashMap<>(); // 心跳状态 private static final String HEARTBEAT_STATUS_PREFIX = "heartbeat:status:"; // 心跳时间 private static final String HEARTBEAT_TIME_PREFIX = "heartbeat:time:"; // 心跳告警 private static final String HEARTBEAT_ALERT_PREFIX = "heartbeat:alert:"; String HEARTBEAT_RECOVERY_COUNT_PREFIX = "heartbeat:recovery:count:"; private static final long HEARTBEAT_TIMEOUT = 30000; // 3分钟超时 @Autowired private RedisTemplate redisTemplate; @Autowired private IInitialBandwidthTrafficService initialBandwidthTrafficService; @Autowired private RemoteRevenueConfigService remoteRevenueConfigService; @Autowired private IInitialDockerInfoService initialDockerInfoService; @Autowired private IInitialCpuInfoService initialCpuInfoService; @Autowired private IInitialDiskInfoService initialDiskInfoService; @Autowired private IInitialMemoryInfoService initialMemoryInfoService; @Autowired private IInitialMountPointInfoService initialMountPointInfoService; @Autowired private IInitialSwitchInfoService initialSwitchInfoService; @Autowired private IInitialSystemInfoService initialSystemInfoService; @Autowired private IInitialSwitchInfoTempService initialSwitchInfoTempService; @Autowired private IInitialHeartbeatListenLogService initialHeartbeatListenLog; @Autowired private IInitialSwitchPowerSupplyService initialSwitchPowerSupplyService; @Autowired private IInitialSwitchFanInfoService initialSwitchFanInfoService; @Autowired private IInitialSwitchMpuInfoService initialSwitchMpuInfoService; @Autowired private IInitialSwitchOpticalModuleService initialSwitchOpticalModuleService; @Autowired private IInitialSwitchOtherCollectDataService insertInitialSwitchOtherInfo; @Autowired private IInitialSystemOtherCollectDataService iInitialSystemOtherCollectDataService; @Autowired private IRmResourceRemoteService rmResourceRemoteService; @Autowired private IRmAgentManagementService rmAgentManagementService; @Autowired private DataProcessUtil dataProcessUtil; @Autowired private IRmNetworkInterfaceService rmNetworkInterfaceService; // 在类中添加 private static final ObjectMapper objectMapper = new ObjectMapper(); /** * 初始化处理器映射 */ @PostConstruct public void init() { // 所有应答类消息使用同一个处理器 // registerHandler(MsgEnum.注册应答.getValue(), this::handleResponseMessage); // registerHandler(MsgEnum.断开应答.getValue(), this::handleResponseMessage); // registerHandler(MsgEnum.开启系统采集应答.getValue(), this::handleResponseMessage); // registerHandler(MsgEnum.关闭系统采集应答.getValue(), this::handleResponseMessage); // registerHandler(MsgEnum.开启交换机采集应答.getValue(), this::handleResponseMessage); // registerHandler(MsgEnum.关闭交换机采集应答.getValue(), this::handleResponseMessage); // registerHandler(MsgEnum.告警设置应答.getValue(), this::handleResponseMessage); // registerHandler(MsgEnum.执行脚本策略应答.getValue(), this::handleScriptRspMessage); // registerHandler(MsgEnum.Agent版本更新应答.getValue(), this::handleAgentUpdateRspMessage); // 其他类型消息可以单独注册处理器 registerHandler(MsgEnum.注册.getValue(), this::handleRegisterMessage); registerHandler(MsgEnum.获取最新策略.getValue(), this::handleNewPolicyMessage); // registerHandler(MsgEnum.CPU上报.getValue(), this::handleCpuMessage); // registerHandler(MsgEnum.磁盘上报.getValue(), this::handleDiskMessage); // registerHandler(MsgEnum.容器上报.getValue(), this::handleDockerMessage); // registerHandler(MsgEnum.内存上报.getValue(), this::handleMemoryMessage); // registerHandler(MsgEnum.网络上报.getValue(), this::handleNetMessage); // registerHandler(MsgEnum.挂载上报.getValue(), this::handleMountPointMessage); // registerHandler(MsgEnum.系统其他上报.getValue(), this::handleOtherSystemMessage); // registerHandler(MsgEnum.交换机上报.getValue(), this::handleSwitchDataMessage); // registerHandler(MsgEnum.心跳上报.getValue(), this::handleHeartbeatMessage); // registerHandler(MsgEnum.多公网IP探测.getValue(), this::handlePublicIpDetectMessage); } /** * 获取最新策略 * @param deviceMessage */ private void handleNewPolicyMessage(DeviceMessage deviceMessage) { List interfaces = JsonDataParser.parseJsonData(deviceMessage.getData(), RegisterMsgVo.class); if(!interfaces.isEmpty()) { // 获取该服务器的策略信息 } } /** * 服务器注册 * @param message */ private void handleRegisterMessage(DeviceMessage message) { List interfaces = JsonDataParser.parseJsonData(message.getData(), RegisterMsgVo.class); if(!interfaces.isEmpty()) { RegisterMsgVo registerMsg = interfaces.get(0); // 自动注册服务器信息 RmRegisterMsgRemote rmRegisterMsgRemote = new RmRegisterMsgRemote(); BeanUtils.copyProperties(registerMsg, rmRegisterMsgRemote); remoteRevenueConfigService.innerAddRegist(rmRegisterMsgRemote, SecurityConstants.INNER); // 时间戳转换 long timestamp = registerMsg.getTimestamp(); long millis = timestamp * 1000; Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 String timeStr = DateUtils.parseDateToStr("yyyy-MM-dd HH:mm:ss", createTime); List networkInfoList = registerMsg.getNetworkInfo(); if(!networkInfoList.isEmpty()){ if(networkInfoList.size()==1){ NetworkInfo networkInfo = networkInfoList.get(0); // 查询该网卡信息是否存在 RmNetworkInterface queryParam = new RmNetworkInterface(); queryParam.setMacAddress(networkInfo.getMac()); List exits = rmNetworkInterfaceService.selectRmNetworkInterfaceList(queryParam); if(exits.isEmpty()){ // 保存网卡信息 RmNetworkInterface insertData = new RmNetworkInterface(); // 业务ip和管理网ip insertData.setBindIp("3"); insertData.setClientId(registerMsg.getClientId()); insertData.setIsp(networkInfo.getCarrier()); insertData.setCity(networkInfo.getCity()); insertData.setGateway(networkInfo.getGateway()); insertData.setInterfaceName(networkInfo.getName()); insertData.setIpv4Address(networkInfo.getIpv4()); insertData.setMacAddress(networkInfo.getMac()); insertData.setProvince(networkInfo.getProvince()); insertData.setPublicIp(networkInfo.getPublicIp()); insertData.setCreateTime(createTime); rmNetworkInterfaceService.insertRmNetworkInterface(insertData); }else{ RmNetworkInterface oldInterfaceMsg = exits.get(0); if(!StringUtils.equals(networkInfo.getPublicIp(),oldInterfaceMsg.getPublicIp())){ RmNetworkInterface updateData = new RmNetworkInterface(); updateData.setMacAddress(networkInfo.getMac()); updateData.setIsp(networkInfo.getCarrier()); updateData.setCity(networkInfo.getCity()); updateData.setGateway(networkInfo.getGateway()); updateData.setInterfaceName(networkInfo.getName()); updateData.setIpv4Address(networkInfo.getIpv4()); updateData.setMacAddress(networkInfo.getMac()); updateData.setProvince(networkInfo.getProvince()); updateData.setPublicIp(networkInfo.getPublicIp()); rmNetworkInterfaceService.updateRmNetworkInterfaceByMac(updateData); } } }else{ for (NetworkInfo networkInfo : networkInfoList) { // 查询该网卡信息是否存在 RmNetworkInterface queryParam = new RmNetworkInterface(); queryParam.setMacAddress(networkInfo.getMac()); List exits = rmNetworkInterfaceService.selectRmNetworkInterfaceList(queryParam); if(exits.isEmpty()){ // 保存网卡信息 RmNetworkInterface insertData = new RmNetworkInterface(); // 业务ip和管理网ip insertData.setClientId(registerMsg.getClientId()); insertData.setIsp(networkInfo.getCarrier()); insertData.setCity(networkInfo.getCity()); insertData.setGateway(networkInfo.getGateway()); insertData.setInterfaceName(networkInfo.getName()); insertData.setIpv4Address(networkInfo.getIpv4()); insertData.setMacAddress(networkInfo.getMac()); insertData.setProvince(networkInfo.getProvince()); insertData.setPublicIp(networkInfo.getPublicIp()); insertData.setCreateTime(createTime); rmNetworkInterfaceService.insertRmNetworkInterface(insertData); }else{ RmNetworkInterface oldInterfaceMsg = exits.get(0); RmNetworkInterface updateData = new RmNetworkInterface(); updateData.setMacAddress(networkInfo.getMac()); updateData.setIsp(networkInfo.getCarrier()); updateData.setCity(networkInfo.getCity()); updateData.setGateway(networkInfo.getGateway()); updateData.setInterfaceName(networkInfo.getName()); updateData.setIpv4Address(networkInfo.getIpv4()); updateData.setMacAddress(networkInfo.getMac()); updateData.setProvince(networkInfo.getProvince()); updateData.setPublicIp(networkInfo.getPublicIp()); if(!StringUtils.equals(networkInfo.getPublicIp(),oldInterfaceMsg.getPublicIp())){ updateData.setBindIp("0"); // 修改绑定公网ip状态 RmResourceRegistrationRemote updateParam = new RmResourceRegistrationRemote(); updateParam.setClientId(registerMsg.getClientId()); updateParam.setMultiPublicIpStatus("0"); remoteRevenueConfigService.updateStatusByResource(updateParam, SecurityConstants.INNER); } rmNetworkInterfaceService.updateRmNetworkInterfaceByMac(updateData); } } } } } } /** * 多公网IP探测 * @param message */ private void handlePublicIpDetectMessage(DeviceMessage message) { List rspVoList = JsonDataParser.parseJsonData(message.getData(), RspVo.class); //TODO 公网信息入库 // 查询公网信息 // 如果公网信息有变化修改公网IP状态为需绑定 } /** * agent更新响应 * @param message */ private void handleAgentUpdateRspMessage(DeviceMessage message) { List rspVoList = JsonDataParser.parseJsonData(message.getData(), RspVo.class); if (!rspVoList.isEmpty()) { RspVo rsp = rspVoList.get(0); if(rsp.getResCode() == 1){ RmAgentManagement rmAgentManagement = new RmAgentManagement(); rmAgentManagement.setHardwareSn(message.getClientId()); rmAgentManagement.setLastUpdateResult("1"); rmAgentManagementService.updateRmAgentManagementByHardwareSn(rmAgentManagement); }else{ RmAgentManagement rmAgentManagement = new RmAgentManagement(); rmAgentManagement.setHardwareSn(message.getClientId()); rmAgentManagement.setLastUpdateResult("0"); rmAgentManagementService.updateRmAgentManagementByHardwareSn(rmAgentManagement); } } } /** * 注册消息处理器 */ private void registerHandler(String dataType, Consumer handler) { messageHandlers.put(dataType, handler); } /** * 处理设备消息(对外暴露的主方法) */ public void handleMessage(DeviceMessage message) { String dataType = message.getDataType(); Consumer handler = messageHandlers.get(dataType); if (handler != null) { handler.accept(message); } else { log.warn("未知数据类型:{}", dataType); } } // ========== 具体的消息处理方法 ========== /** * 网络流量数据入库 * @param message */ private void handleNetMessage(DeviceMessage message) { List interfaces = JsonDataParser.parseJsonData(message.getData(), InitialBandwidthTraffic.class); if(!interfaces.isEmpty()){ // 时间戳转换 long timestamp = interfaces.get(0).getTimestamp(); long millis = timestamp * 1000; Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 String timeStr = DateUtils.parseDateToStr("yyyy-MM-dd HH:mm:ss",createTime); InitialBandwidthTraffic data = new InitialBandwidthTraffic(); interfaces.forEach(iface -> { iface.setClientId(message.getClientId()); iface.setCreateTime(createTime); // 发送流量 iface.setOutSpeed(dataProcessUtil.bytesToBits(iface.getOutSpeed())); // 接收流量 iface.setInSpeed(dataProcessUtil.bytesToBits(iface.getInSpeed())); }); // 批量入库集合 data.setList(interfaces); // 初始流量数据入库 initialBandwidthTrafficService.batchInsert(data); EpsInitialTrafficDataRemote epsInitialTrafficDataRemote = new EpsInitialTrafficDataRemote(); epsInitialTrafficDataRemote.setStartTime(timeStr); epsInitialTrafficDataRemote.setEndTime(timeStr); // 复制到业务初始库 remoteRevenueConfigService.autoSaveServiceTrafficData(epsInitialTrafficDataRemote, SecurityConstants.INNER); }else{ throw new RuntimeException("NET流量data数据为空"); } } /** * docker数据入库 * @param message */ private void handleDockerMessage(DeviceMessage message) { List dockers = JsonDataParser.parseJsonData(message.getData(), InitialDockerInfo.class); if(!dockers.isEmpty()){ // 时间戳转换 long timestamp = dockers.get(0).getTimestamp(); long millis = timestamp * 1000; Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 dockers.forEach(iface -> { iface.setClientId(message.getClientId()); iface.setCreateTime(createTime); }); // 初始容器数据入库 initialDockerInfoService.batchInsertInitialDockerInfo(dockers); }else{ throw new RuntimeException("DOCKER容器data数据为空"); } } /** * cpu数据入库 * @param message */ private void handleCpuMessage(DeviceMessage message) { List cpus = JsonDataParser.parseJsonData(message.getData(),InitialCpuInfo.class); // 时间戳转换 long timestamp = cpus.get(0).getTimestamp(); long millis = timestamp * 1000; Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 if(!cpus.isEmpty()){ cpus.forEach(iface -> { iface.setClientId(message.getClientId()); iface.setCreateTime(createTime); }); // 初始CPU数据入库 initialCpuInfoService.batchInsertInitialCpuInfo(cpus); }else{ throw new RuntimeException("CPUdata数据为空"); } } /** * 磁盘数据入库 * @param message */ private void handleDiskMessage(DeviceMessage message) { List disks = JsonDataParser.parseJsonData(message.getData(), InitialDiskInfo.class); // 时间戳转换 long timestamp = disks.get(0).getTimestamp(); long millis = timestamp * 1000; Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 if(!disks.isEmpty()){ disks.forEach(iface -> { iface.setClientId(message.getClientId()); iface.setCreateTime(createTime); }); // 初始磁盘数据入库 initialDiskInfoService.batchInsertInitialDiskInfo(disks); }else{ throw new RuntimeException("磁盘data数据为空"); } } /** * 内存数据入库 * @param message */ private void handleMemoryMessage(DeviceMessage message) { List memorys = JsonDataParser.parseJsonData(message.getData(), InitialMemoryInfo.class); if(!memorys.isEmpty()){ // 时间戳转换 long timestamp = memorys.get(0).getTimestamp(); long millis = timestamp * 1000; Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 memorys.forEach(iface -> { iface.setClientId(message.getClientId()); iface.setCreateTime(createTime); }); // 初始内存数据入库 initialMemoryInfoService.batchInsertInitialMemoryInfo(memorys); }else{ throw new RuntimeException("内存data数据为空"); } } /** * 挂载点数据入库 * @param message */ private void handleMountPointMessage(DeviceMessage message) { List mountPointInfos = JsonDataParser.parseJsonData(message.getData(), InitialMountPointInfo.class); if(!mountPointInfos.isEmpty()){ // 时间戳转换 long timestamp = mountPointInfos.get(0).getTimestamp(); long millis = timestamp * 1000; Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 mountPointInfos.forEach(iface -> { iface.setClientId(message.getClientId()); iface.setCreateTime(createTime); }); // 初始挂载点数据入库 initialMountPointInfoService.batchInsertInitialMountPointInfo(mountPointInfos); }else{ throw new RuntimeException("挂载点data数据为空"); } } /** * 交换机所有数据入库 * @param message */ private void handleSwitchDataMessage(DeviceMessage message) { List switchData = JsonDataParser.parseJsonData(message.getData(), CollectDataVo.class); if(!switchData.isEmpty()){ CollectDataVo switchDataVo = switchData.get(0); switch(switchDataVo.getType()){ case "switchNetCollect": handleSwitchNetMessage(switchDataVo, message.getClientId()); break; case "switchPwrCollect": handleSwitchPwrMessage(switchDataVo, message.getClientId()); break; case "switchModuleCollect": handleSwitchModuleMessage(switchDataVo, message.getClientId()); break; case "switchMpuCollect": handleSwitchMpuMessage(switchDataVo, message.getClientId()); break; case "switchFanCollect": handleSwitchFanMessage(switchDataVo, message.getClientId()); break; default: handleSwitchOtherMessage(switchDataVo, message.getClientId()); break; } }else{ throw new RuntimeException("交换机data数据为空"); } } /** * 电源发现数据 * @param switchDataVo */ private void handleSwitchPwrMessage(CollectDataVo switchDataVo, String clientId){ List powerSupplyList = SwitchJsonDataParser.parseJsonData(switchDataVo.getValue(), InitialSwitchPowerSupply.class); if (!powerSupplyList.isEmpty()){ for (InitialSwitchPowerSupply insertData : powerSupplyList) { // 时间戳转换 long timestamp = switchDataVo.getTimestamp(); long millis = timestamp * 1000; Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 insertData.setClientId(clientId); insertData.setCreateTime(createTime); } initialSwitchPowerSupplyService.insertBatchInitialSwitchPowerSupply(powerSupplyList); } } /** * 光模块发现数据 * @param switchDataVo */ private void handleSwitchModuleMessage(CollectDataVo switchDataVo, String clientId){ List moduleList = SwitchJsonDataParser.parseJsonData(switchDataVo.getValue(), InitialSwitchOpticalModule.class); if (!moduleList.isEmpty()){ for (InitialSwitchOpticalModule insertData : moduleList) { // 时间戳转换 long timestamp = switchDataVo.getTimestamp(); long millis = timestamp * 1000; Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 insertData.setClientId(clientId); insertData.setCreateTime(createTime); } initialSwitchOpticalModuleService.batchInitialSwitchOpticalModule(moduleList); } } /** * MPU发现数据 * @param switchDataVo */ private void handleSwitchMpuMessage(CollectDataVo switchDataVo, String clientId){ List mpuList = SwitchJsonDataParser.parseJsonData(switchDataVo.getValue(), InitialSwitchMpuInfo.class); if (!mpuList.isEmpty()){ for (InitialSwitchMpuInfo insertData : mpuList) { // 时间戳转换 long timestamp = switchDataVo.getTimestamp(); long millis = timestamp * 1000; Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 insertData.setClientId(clientId); insertData.setCreateTime(createTime); } initialSwitchMpuInfoService.insertBatchInitialSwitchMpuInfo(mpuList); } } /** * 风扇发现数据 * @param switchDataVo */ private void handleSwitchFanMessage(CollectDataVo switchDataVo, String clientId){ List fanList = SwitchJsonDataParser.parseJsonData(switchDataVo.getValue(), InitialSwitchFanInfo.class); if (!fanList.isEmpty()){ for (InitialSwitchFanInfo insertData : fanList) { // 时间戳转换 long timestamp = switchDataVo.getTimestamp(); long millis = timestamp * 1000; Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 insertData.setClientId(clientId); insertData.setCreateTime(createTime); } initialSwitchFanInfoService.insertBatchInitialSwitchFanInfo(fanList); } } /** * 其他发现数据(默认处理) * @param switchDataVo */ private void handleSwitchOtherMessage(CollectDataVo switchDataVo, String clientId) { if (switchDataVo != null) { try { InitialSwitchOtherCollectData insertData = new InitialSwitchOtherCollectData(); // 设置基本信息 long timestamp = switchDataVo.getTimestamp(); Date createTime = new Date(timestamp * 1000 / 1000 * 1000); insertData.setClientId(clientId); insertData.setCreateTime(createTime); String value = switchDataVo.getValue(); JsonNode jsonNode = objectMapper.readTree(value); // 处理数组中的字符串JSON if (jsonNode.isArray() && jsonNode.size() > 0) { JsonNode firstElement = jsonNode.get(0); if (firstElement.isTextual()) { // 二次解析JSON字符串 JsonNode innerJsonNode = objectMapper.readTree(firstElement.asText()); if (innerJsonNode.isObject()) { Iterator> fields = innerJsonNode.fields(); if (fields.hasNext()) { Map.Entry entry = fields.next(); String fieldName = entry.getKey(); String fieldValue = entry.getValue().asText(); insertData.setCollectType(fieldName); if (!"null".equals(fieldValue)) { insertData.setCollectValue(fieldValue); insertInitialSwitchOtherInfo.insertInitialSwitchOtherCollectData(insertData); } } } } } } catch (Exception e) { log.error("解析JSON数据失败: {}, value: {}", e.getMessage(), switchDataVo.getValue(), e); } } } /** * 交换机网卡信息数据入库 * @param switchDataVo */ private void handleSwitchNetMessage(CollectDataVo switchDataVo, String clientId) { List switchInfos = SwitchJsonDataParser.parseJsonData(switchDataVo.getValue(), InitialSwitchInfo.class); if(!switchInfos.isEmpty()){ // 根据clientId查询交换机ip RmResourceRegistrationRemote queryParam = new RmResourceRegistrationRemote(); queryParam.setHardwareSn(clientId); R registMsgR = remoteRevenueConfigService.getListByHardwareSn(queryParam, SecurityConstants.INNER); if(registMsgR != null){ RmResourceRegistrationRemote registMsg = registMsgR.getData(); } // 时间戳转换 long timestamp = switchDataVo.getTimestamp(); long millis = timestamp * 1000; Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 String timeStr = DateUtils.parseDateToStr("yyyy-MM-dd HH:mm:ss",createTime); // 查询临时表信息,计算实际流量值 InitialSwitchInfoTemp temp = new InitialSwitchInfoTemp(); temp.setClientId(clientId); List tempList = initialSwitchInfoTempService.selectInitialSwitchInfoTempList(temp); if(!tempList.isEmpty()){ // 1. 构建快速查找的Map Map tempMap = tempList.stream() .collect(Collectors.toMap( InitialSwitchInfoTemp::getName, Function.identity(), (existing, replacement) -> existing )); // 2. 预计算除数(避免重复创建对象) BigDecimal divisor = new BigDecimal(300); // 3. 计算速度 switchInfos.forEach(switchInfo -> { switchInfo.setClientId(clientId); switchInfo.setCreateTime(createTime); if(registMsgR != null && registMsgR.getData()!=null && registMsgR.getData().getSnmpCollectAddr()!=null){ switchInfo.setSwitchIp(registMsgR.getData().getSnmpCollectAddr()); } InitialSwitchInfoTemp tempInfo = tempMap.get(switchInfo.getName()); if (tempInfo != null) { // 计算inSpeed if (switchInfo.getInBytes() != null && tempInfo.getInBytes() != null) { BigDecimal inDiff = switchInfo.getInBytes().subtract(tempInfo.getInBytes()); // 字节转为bit BigDecimal inDiffBit = inDiff.multiply(new BigDecimal(8)).setScale(0, RoundingMode.HALF_UP); switchInfo.setInSpeed(inDiffBit.divide(divisor, 2, RoundingMode.HALF_UP)); } // 计算outSpeed if (switchInfo.getOutBytes() != null && tempInfo.getOutBytes() != null) { BigDecimal outDiff = switchInfo.getOutBytes().subtract(tempInfo.getOutBytes()); // 字节转为bit BigDecimal outDiffBit = outDiff.multiply(new BigDecimal(8)).setScale(0, RoundingMode.HALF_UP); switchInfo.setOutSpeed(outDiffBit.divide(divisor, 2, RoundingMode.HALF_UP)); } } }); }else{ switchInfos.forEach(switchInfo -> { switchInfo.setClientId(clientId); switchInfo.setCreateTime(createTime); if(registMsgR != null && registMsgR.getData()!=null && registMsgR.getData().getSnmpCollectAddr()!=null){ switchInfo.setSwitchIp(registMsgR.getData().getSnmpCollectAddr()); } }); } // 清空临时表对应switch信息 initialSwitchInfoTempService.truncateSwitchInfoTemp(clientId); // 临时表 用来计算inSpeed outSeppd initialSwitchInfoTempService.batchInsertInitialSwitchInfoTemp(switchInfos); // 初始交换机数据入库 initialSwitchInfoService.batchInsertInitialSwitchInfo(switchInfos); // 业务表入库 InitialSwitchInfoDetailsRemote detailsRemote = new InitialSwitchInfoDetailsRemote(); detailsRemote.setClientId(clientId); detailsRemote.setStartTime(timeStr); detailsRemote.setEndTime(timeStr); remoteRevenueConfigService.autoSaveSwitchTraffic(detailsRemote, SecurityConstants.INNER); }else{ throw new RuntimeException("交换机data数据为空"); } } /** * 系统其他信息 * @param message */ private void handleOtherSystemMessage(DeviceMessage message) { List otherData = JsonDataParser.parseJsonData(message.getData(), CollectDataVo.class); if(!otherData.isEmpty()){ CollectDataVo systemDataVo = otherData.get(0); if (systemDataVo != null){ try { InitialSystemOtherCollectData insertData = new InitialSystemOtherCollectData(); // 时间戳转换 long timestamp = systemDataVo.getTimestamp(); long millis = timestamp * 1000; Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 insertData.setClientId(message.getClientId()); insertData.setCreateTime(createTime); String collectType = systemDataVo.getType(); String collectValue = systemDataVo.getValue(); insertData.setCollectType(collectType); insertData.setCollectValue(collectValue); iInitialSystemOtherCollectDataService.insertInitialSystemOtherCollectData(insertData); } catch (Exception e) { log.error("解析JSON数据失败: {}, value: {}", e.getMessage(), systemDataVo.getValue()); // 可以选择保存原始数据或进行其他错误处理 } } } } /** * 监听心跳 * @param message */ private void handleHeartbeatMessage(DeviceMessage message) { List heartbeats = JsonDataParser.parseJsonData(message.getData(), InitialHeartbeatListen.class); if(!heartbeats.isEmpty()){ InitialHeartbeatListen heartbeat = heartbeats.get(0); String clientId = message.getClientId(); log.debug("处理心跳消息,客户端ID: {}, 时间: {}", clientId, heartbeat.getTimestamp()); // 使用Redis存储状态 String statusKey = HEARTBEAT_STATUS_PREFIX + clientId; String timeKey = HEARTBEAT_TIME_PREFIX + clientId; String recoveryCountKey = HEARTBEAT_RECOVERY_COUNT_PREFIX + clientId; try { // 记录处理前状态(调试用) String prevStatus = redisTemplate.opsForValue().get(statusKey); String prevTime = redisTemplate.opsForValue().get(timeKey); log.debug("客户端ID: {} 处理前状态 - status: {}, time: {}", clientId, prevStatus, prevTime); // 使用事务确保原子性操作 redisTemplate.execute(new SessionCallback() { @Override public Object execute(RedisOperations operations) throws DataAccessException { operations.multi(); // 重置丢失计数为0,设置最后心跳时间 operations.opsForValue().set(statusKey, "0"); operations.opsForValue().set(timeKey, String.valueOf(System.currentTimeMillis())); return operations.exec(); } }); log.debug("客户端ID: {} 心跳处理完成,重置状态和时间", clientId); // 检查是否之前有告警状态 if (Boolean.TRUE.equals(redisTemplate.hasKey(HEARTBEAT_ALERT_PREFIX + clientId))) { // 获取当前恢复次数 String recoveryCountStr = redisTemplate.opsForValue().get(recoveryCountKey); int recoveryCount = (recoveryCountStr == null) ? 1 : Integer.parseInt(recoveryCountStr) + 1; if (recoveryCount == 2) { // 达到2次恢复,执行状态修改 log.warn("客户端ID: {} 心跳恢复达到2次,修改设备状态为在线", clientId); insertHeartbeatLog(clientId, "2", "心跳恢复,设备在线状态改为在线"); redisTemplate.delete(HEARTBEAT_ALERT_PREFIX + clientId); redisTemplate.delete(recoveryCountKey); // 清除恢复计数器 // 修改资源状态 updateResourceStatus(clientId, "1"); } else { // 未达到2次,只记录恢复次数 log.info("客户端ID: {} 心跳恢复第{}次", clientId, recoveryCount); redisTemplate.opsForValue().set(recoveryCountKey, String.valueOf(recoveryCount)); } } } catch (Exception e) { log.error("处理心跳消息异常, clientId: {}", clientId, e); } } } // 添加一个定时任务方法,定期检查心跳状态 @Scheduled(fixedRate = 60000) // 每60s检查一次 public void checkHeartbeatStatus() { long currentTime = System.currentTimeMillis(); log.debug("开始心跳状态检查,当前时间: {}", currentTime); // 获取所有客户端时间键 Set timeKeys = redisTemplate.keys(HEARTBEAT_TIME_PREFIX + "*"); if (timeKeys == null) { log.debug("未找到任何心跳时间键"); return; } log.debug("找到 {} 个客户端需要检查", timeKeys.size()); for (String timeKey : timeKeys) { String clientId = timeKey.substring(HEARTBEAT_TIME_PREFIX.length()); String statusKey = HEARTBEAT_STATUS_PREFIX + clientId; String alertKey = HEARTBEAT_ALERT_PREFIX + clientId; try { // 检查是否已经存在告警 String existingAlert = redisTemplate.opsForValue().get(alertKey); if ("1".equals(existingAlert)) { log.debug("客户端ID: {} 已有告警,跳过检查", clientId); continue; // 如果已有告警,跳过处理 } String lastTimeStr = redisTemplate.opsForValue().get(timeKey); if (lastTimeStr == null) { log.debug("客户端ID: {} 时间键为空,跳过", clientId); continue; } long lastHeartbeatTime = Long.parseLong(lastTimeStr); long timeDiff = currentTime - lastHeartbeatTime; log.debug("客户端ID: {} 最后心跳: {}, 时间差: {}ms, 超时阈值: {}ms", clientId, lastHeartbeatTime, timeDiff, HEARTBEAT_TIMEOUT); if (timeDiff > HEARTBEAT_TIMEOUT) { // 心跳超时处理 - 使用原子操作增加计数 Long lostCount = redisTemplate.opsForValue().increment(statusKey); if (lostCount == 1) { // 确保第一次增加时值为1 redisTemplate.opsForValue().set(statusKey, "1"); lostCount = 1L; } log.warn("客户端ID: {} 心跳超时,连续次数: {}, 时间差: {}ms", clientId, lostCount, timeDiff); if (lostCount >= 3) { log.warn("客户端ID: {} 连续三次心跳丢失,触发告警", clientId); insertHeartbeatLog(clientId, "3", "连续三次心跳丢失"); redisTemplate.opsForValue().set(HEARTBEAT_ALERT_PREFIX + clientId, "1"); // 设置告警后删除timeKey和statusKey redisTemplate.delete(timeKey); redisTemplate.delete(statusKey); log.info("客户端ID: {} 已设置告警并清理心跳记录", clientId); // 修改资源状态 updateResourceStatus(clientId, "0"); } } else { // 如果心跳正常,重置丢失次数 String currentStatus = redisTemplate.opsForValue().get(statusKey); if (!"0".equals(currentStatus)) { redisTemplate.opsForValue().set(statusKey, "0"); log.debug("客户端ID: {} 心跳正常,重置丢失次数从 {} 到 0", clientId, currentStatus); } else { log.debug("客户端ID: {} 心跳正常,状态已是0", clientId); } } } catch (Exception e) { log.error("检查心跳状态异常, clientId: {}", clientId, e); } } log.debug("心跳状态检查完成"); } // 更新资源状态的公共方法 private void updateResourceStatus(String clientId, String status) { log.info("开启更新资源状态========"); RmResourceRegistrationRemote rmResourceRegistrationRemote = new RmResourceRegistrationRemote(); rmResourceRegistrationRemote.setOnlineStatus(status); rmResourceRegistrationRemote.setRegistrationStatus(status); rmResourceRegistrationRemote.setHardwareSn(clientId); remoteRevenueConfigService.updateStatusByResource(rmResourceRegistrationRemote, SecurityConstants.INNER); } // 插入心跳日志到数据库 private void insertHeartbeatLog(String machineId, String status, String remark) { try { InitialHeartbeatListenLog listenLog = new InitialHeartbeatListenLog(); listenLog.setClientId(machineId); listenLog.setStatus(status); // 0-离线 1-在线 2-恢复 3-三次丢失 listenLog.setRemark(remark); listenLog.setCreateTime(new Date()); // 调用DAO或Service插入日志 initialHeartbeatListenLog.insertInitialHeartbeatListenLog(listenLog); log.info("已记录心跳日志,客户端ID: {}, 状态: {}", machineId, status); } catch (Exception e) { log.error("插入心跳日志失败", e); } } /** * 应答信息 * @param message */ private RspVo handleResponseMessage(DeviceMessage message) { List rspVoList = JsonDataParser.parseJsonData(message.getData(), RspVo.class); if (!rspVoList.isEmpty()) { RspVo rsp = rspVoList.get(0); log.info("应答信息:{}",rsp); return rsp; } return null; } /** * 脚本策略应答信息 * @param message */ private void handleScriptRspMessage(DeviceMessage message) { List rspVoList = JsonDataParser.parseJsonData(message.getData(), RspVo.class); if (!rspVoList.isEmpty()) { RspVo rsp = rspVoList.get(0); // 时间戳转换 long timestamp = rsp.getTimestamp(); long millis = timestamp * 1000; Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 // 查询资源信息 RmResourceRegistrationRemote queryParam = new RmResourceRegistrationRemote(); queryParam.setHardwareSn(message.getClientId()); R resourceMsg = remoteRevenueConfigService.getListByHardwareSn(queryParam, SecurityConstants.INNER); RmResourceRegistrationRemote resourceMsgData = resourceMsg.getData(); if(rsp.getResCode() == 1){ if(rsp.getResult() != null){ // 构建脚本执行结果实体类 RmResourceRemote insertData = new RmResourceRemote(); insertData.setHardwareSn(message.getClientId()); insertData.setResourceName(resourceMsgData.getResourceName()); insertData.setExternalIp(resourceMsgData.getIpAddress()); insertData.setManagementPort(22); insertData.setConnectionMethod("1"); insertData.setResourceType(resourceMsgData.getResourceType()); insertData.setDescription(rsp.getResult()); insertData.setCreateTime(createTime); // 执行插入sql rmResourceRemoteService.insertRmResourceRemote(insertData); log.info("脚本执行结果入库成功:{}",rsp); } }else{ log.error("脚本执行失败:{}", rsp); } } } }