diff --git a/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/consumer/RocketMsgListenerHistory.java b/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/consumer/RocketMsgListenerHistory.java index 23a6e3e..c85054e 100644 --- a/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/consumer/RocketMsgListenerHistory.java +++ b/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/consumer/RocketMsgListenerHistory.java @@ -1,616 +1,616 @@ -package com.ruoyi.rocketmq.consumer; - -import com.alibaba.fastjson.JSON; -import com.ruoyi.common.core.constant.SecurityConstants; -import com.ruoyi.common.core.domain.R; -import com.ruoyi.common.core.utils.DateUtils; -import com.ruoyi.common.core.utils.StringUtils; -import com.ruoyi.rocketmq.domain.*; -import com.ruoyi.rocketmq.domain.vo.RspVo; -import com.ruoyi.rocketmq.enums.MessageCodeEnum; -import com.ruoyi.rocketmq.handler.DeviceMessageHandler; -import com.ruoyi.rocketmq.producer.ConsumeException; -import com.ruoyi.rocketmq.service.*; -import com.ruoyi.rocketmq.utils.JsonDataParser; -import com.ruoyi.system.api.RemoteRevenueConfigService; -import com.ruoyi.system.api.domain.AllInterfaceNameRemote; -import com.ruoyi.system.api.domain.EpsInitialTrafficDataRemote; -import com.ruoyi.system.api.domain.InitialSwitchInfoDetailsRemote; -import com.ruoyi.system.api.domain.RmResourceRegistrationRemote; -import lombok.extern.slf4j.Slf4j; -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; -import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; -import org.apache.rocketmq.common.message.MessageExt; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.util.CollectionUtils; - -import java.io.UnsupportedEncodingException; -import java.math.BigDecimal; -import java.math.RoundingMode; -import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.function.Function; -import java.util.stream.Collectors; - -/** - * 消息监听 v1.0 - */ -@Slf4j -public class RocketMsgListenerHistory implements MessageListenerConcurrently { - // 心跳状态 - private static final String HEARTBEAT_STATUS_PREFIX = "heartbeat:status:"; - // 心跳时间 - private static final String HEARTBEAT_TIME_PREFIX = "heartbeat:time:"; - // 心跳告警 - private static final String HEARTBEAT_ALERT_PREFIX = "heartbeat:alert:"; - String HEARTBEAT_RECOVERY_COUNT_PREFIX = "heartbeat:recovery:count:"; - private static final long HEARTBEAT_TIMEOUT = 180000; // 3分钟超时 - - - @Autowired - private RedisTemplate redisTemplate; - private final IInitialBandwidthTrafficService initialBandwidthTrafficService; - private final RemoteRevenueConfigService remoteRevenueConfigService; - @Autowired - private IInitialDockerInfoService initialDockerInfoService; - @Autowired - private IInitialCpuInfoService initialCpuInfoService; - @Autowired - private IInitialDiskInfoService initialDiskInfoService; - @Autowired - private IInitialMemoryInfoService initialMemoryInfoService; - @Autowired - private IInitialMountPointInfoService initialMountPointInfoService; - @Autowired - private IInitialSwitchInfoService initialSwitchInfoService; - @Autowired - private IInitialSystemInfoService initialSystemInfoService; - @Autowired - private IInitialSwitchInfoTempService initialSwitchInfoTempService; - @Autowired - private IInitialHeartbeatListenLogService initialHeartbeatListenLog; - @Autowired - public RocketMsgListenerHistory(IInitialBandwidthTrafficService initialBandwidthTrafficService, - RemoteRevenueConfigService remoteRevenueConfigService) { - this.initialBandwidthTrafficService = initialBandwidthTrafficService; - this.remoteRevenueConfigService = remoteRevenueConfigService; - } - @Autowired - private DeviceMessageHandler deviceMessageHandler; - - /** - * 消费消息 - * @param list msgs.size() >= 1 - * DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here - * 这里只设置为1,当设置为多个时,list中只要有一条消息消费失败,就会整体重试 - * @param consumeConcurrentlyContext 上下文信息 - * @return 消费状态 成功(CONSUME_SUCCESS)或者 重试 (RECONSUME_LATER) - */ - @Override - public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { - try{ - //消息不等于空情况 - if (!CollectionUtils.isEmpty(list)) { - //获取topic - for (MessageExt messageExt : list) { - // 解析消息内容 - String body = new String(messageExt.getBody()); - log.info("接受到的消息为:{}", body); - String tags = messageExt.getTags(); - String topic = messageExt.getTopic(); - String msgId = messageExt.getMsgId(); - String keys = messageExt.getKeys(); - int reConsume = messageExt.getReconsumeTimes(); - // 消息已经重试了3次,如果不需要再次消费,则返回成功 - if (reConsume == 3) { - // TODO 补偿信息 - log.error("消息消费三次失败,消息内容:{}", body); - return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//根据业务返回是否正常 - } - if(MessageCodeEnum.TONGRAN_AGENT_UP.getCode().equals(topic)){ - // 拿到信息 - DeviceMessage message = JSON.parseObject(body, DeviceMessage.class); - // 处理消息 - deviceMessageHandler.handleMessage(message); - return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//业务处理成功 - } - // 根据不同的topic处理不同的业务 这里以订单消息为例子 - if (MessageCodeEnum.AGENT_MESSAGE_TOPIC.getCode().equals(topic)) { - // 拿到信息 - DeviceMessage message = JSON.parseObject(body, DeviceMessage.class); - switch (message.getDataType()){ - case "NET": - handleNetMessage(message); - break; - case "CPU": - handleCpuMessage(message); - break; - case "SYSTEM": - handleSystemMessage(message); - break; - case "DISK": - handleDiskMessage(message); - break; - case "POINT": - handleMountPointMessage(message); - break; - case "MEMORY": - handleMemoryMessage(message); - break; - case "DOCKER": - handleDockerMessage(message); - break; - case "SWITCHBOARD": - handleSwitchMessage(message); - break; - case "HEARTBEAT": - handleHeartbeatMessage(message); - break; - default: - log.warn("未知数据类型:{}",message.getDataType()); - } - return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//业务处理成功 - } - } - } - // 消息消费失败 - //broker会根据设置的messageDelayLevel发起重试,默认16次 - return ConsumeConcurrentlyStatus.RECONSUME_LATER; - } catch (Exception e) { - // 调用 handleException 方法处理异常并返回处理结果 - return handleException(e); - } - } - - /** - * 异常处理 - * - * @param e 捕获的异常 - * @return 消息消费结果 - */ - private static ConsumeConcurrentlyStatus handleException(final Exception e) { - Class exceptionClass = e.getClass(); - if (exceptionClass.equals(UnsupportedEncodingException.class)) { - log.error(e.getMessage()); - } else if (exceptionClass.equals(ConsumeException.class)) { - log.error(e.getMessage()); - } else{ - log.error(e.getMessage()); - } - return ConsumeConcurrentlyStatus.RECONSUME_LATER; - } - - /** - * 网络流量数据入库 - * @param message - */ - private void handleNetMessage(DeviceMessage message) { - List interfaces = JsonDataParser.parseJsonData(message.getData(), InitialBandwidthTraffic.class); - if(!interfaces.isEmpty()){ - // 时间戳转换 - long timestamp = interfaces.get(0).getTimestamp(); - long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp; - Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 - String timeStr = DateUtils.parseDateToStr("yyyy-MM-dd HH:mm:ss",createTime); - InitialBandwidthTraffic data = new InitialBandwidthTraffic(); - interfaces.forEach(iface -> { - iface.setClientId(message.getClientId()); - iface.setCreateTime(createTime); - }); - // 批量入库集合 - data.setList(interfaces); - // 初始流量数据入库 - initialBandwidthTrafficService.batchInsert(data); - EpsInitialTrafficDataRemote epsInitialTrafficDataRemote = new EpsInitialTrafficDataRemote(); - epsInitialTrafficDataRemote.setStartTime(timeStr); - epsInitialTrafficDataRemote.setEndTime(timeStr); - // 复制到业务初始库 - remoteRevenueConfigService.autoSaveServiceTrafficData(epsInitialTrafficDataRemote, SecurityConstants.INNER); - }else{ - throw new RuntimeException("NET流量data数据为空"); - } - } - /** - * docker数据入库 - * @param message - */ - private void handleDockerMessage(DeviceMessage message) { - List dockers = JsonDataParser.parseJsonData(message.getData(), InitialDockerInfo.class); - if(!dockers.isEmpty()){ - // 时间戳转换 - long timestamp = dockers.get(0).getTimestamp(); - long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp; - Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 - dockers.forEach(iface -> { - iface.setClientId(message.getClientId()); - iface.setCreateTime(createTime); - }); - // 初始容器数据入库 - initialDockerInfoService.batchInsertInitialDockerInfo(dockers); - }else{ - throw new RuntimeException("DOCKER容器data数据为空"); - } - } - /** - * cpu数据入库 - * @param message - */ - private void handleCpuMessage(DeviceMessage message) { - List cpus = JsonDataParser.parseJsonData(message.getData(),InitialCpuInfo.class); - // 时间戳转换 - long timestamp = cpus.get(0).getTimestamp(); - long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp; - Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 - if(!cpus.isEmpty()){ - cpus.forEach(iface -> { - iface.setClientId(message.getClientId()); - iface.setCreateTime(createTime); - }); - // 初始CPU数据入库 - initialCpuInfoService.batchInsertInitialCpuInfo(cpus); - }else{ - throw new RuntimeException("CPUdata数据为空"); - } - } - /** - * 磁盘数据入库 - * @param message - */ - private void handleDiskMessage(DeviceMessage message) { - List disks = JsonDataParser.parseJsonData(message.getData(), InitialDiskInfo.class); - // 时间戳转换 - long timestamp = disks.get(0).getTimestamp(); - long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp; - Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 - if(!disks.isEmpty()){ - disks.forEach(iface -> { - iface.setClientId(message.getClientId()); - iface.setCreateTime(createTime); - }); - // 初始磁盘数据入库 - initialDiskInfoService.batchInsertInitialDiskInfo(disks); - }else{ - throw new RuntimeException("磁盘data数据为空"); - } - } - /** - * 内存数据入库 - * @param message - */ - private void handleMemoryMessage(DeviceMessage message) { - List memorys = JsonDataParser.parseJsonData(message.getData(), InitialMemoryInfo.class); - if(!memorys.isEmpty()){ - // 时间戳转换 - long timestamp = memorys.get(0).getTimestamp(); - long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp; - Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 - memorys.forEach(iface -> { - iface.setClientId(message.getClientId()); - iface.setCreateTime(createTime); - }); - // 初始内存数据入库 - initialMemoryInfoService.batchInsertInitialMemoryInfo(memorys); - }else{ - throw new RuntimeException("内存data数据为空"); - } - } - /** - * 挂载点数据入库 - * @param message - */ - private void handleMountPointMessage(DeviceMessage message) { - List mountPointInfos = JsonDataParser.parseJsonData(message.getData(), InitialMountPointInfo.class); - if(!mountPointInfos.isEmpty()){ - // 时间戳转换 - long timestamp = mountPointInfos.get(0).getTimestamp(); - long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp; - Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 - mountPointInfos.forEach(iface -> { - iface.setClientId(message.getClientId()); - iface.setCreateTime(createTime); - }); - // 初始挂载点数据入库 - initialMountPointInfoService.batchInsertInitialMountPointInfo(mountPointInfos); - }else{ - throw new RuntimeException("挂载点data数据为空"); - } - } - /** - * 交换机数据入库 - * @param message - */ - private void handleSwitchMessage(DeviceMessage message) { - List switchInfos = JsonDataParser.parseJsonData(message.getData(), InitialSwitchInfo.class); - if(!switchInfos.isEmpty()){ - // 时间戳转换 - long timestamp = switchInfos.get(0).getTimestamp(); - long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp; - Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 - String timeStr = DateUtils.parseDateToStr("yyyy-MM-dd HH:mm:ss",createTime); - // 查询临时表信息,计算实际流量值 - InitialSwitchInfoTemp temp = new InitialSwitchInfoTemp(); - temp.setClientId(message.getClientId()); - List tempList = initialSwitchInfoTempService.selectInitialSwitchInfoTempList(temp); - if(!tempList.isEmpty()){ - // 1. 构建快速查找的Map - Map tempMap = tempList.stream() - .collect(Collectors.toMap( - InitialSwitchInfoTemp::getName, - Function.identity(), - (existing, replacement) -> existing - )); - - // 2. 预计算除数(避免重复创建对象) - BigDecimal divisor = new BigDecimal(300); - - // 3. 计算速度 - switchInfos.forEach(switchInfo -> { - switchInfo.setClientId(message.getClientId()); - switchInfo.setCreateTime(createTime); - InitialSwitchInfoTemp tempInfo = tempMap.get(switchInfo.getName()); - if (tempInfo != null) { - // 计算inSpeed - if (switchInfo.getInBytes() != null && tempInfo.getInBytes() != null) { - BigDecimal inDiff = switchInfo.getInBytes().subtract(tempInfo.getInBytes()); - switchInfo.setInSpeed(inDiff.divide(divisor, 2, RoundingMode.HALF_UP)); - } - - // 计算outSpeed - if (switchInfo.getOutBytes() != null && tempInfo.getOutBytes() != null) { - BigDecimal outDiff = switchInfo.getOutBytes().subtract(tempInfo.getOutBytes()); - switchInfo.setOutSpeed(outDiff.divide(divisor, 2, RoundingMode.HALF_UP)); - } - } - }); - }else{ - switchInfos.forEach(switchInfo -> { - switchInfo.setClientId(message.getClientId()); - switchInfo.setCreateTime(createTime); - }); - } - // 清空临时表对应switch信息 - initialSwitchInfoTempService.truncateSwitchInfoTemp(message.getClientId()); - // 临时表 用来计算inSpeed outSeppd - initialSwitchInfoTempService.batchInsertInitialSwitchInfoTemp(switchInfos); - // 初始交换机数据入库 - initialSwitchInfoService.batchInsertInitialSwitchInfo(switchInfos); - // 业务表入库 - InitialSwitchInfoDetailsRemote detailsRemote = new InitialSwitchInfoDetailsRemote(); - detailsRemote.setClientId(message.getClientId()); - detailsRemote.setStartTime(timeStr); - detailsRemote.setEndTime(timeStr); - remoteRevenueConfigService.autoSaveSwitchTraffic(detailsRemote, SecurityConstants.INNER); - }else{ - throw new RuntimeException("交换机data数据为空"); - } - } - /** - * 系统数据入库 - * @param message - */ - private void handleSystemMessage(DeviceMessage message) { - List systemInfos = JsonDataParser.parseJsonData(message.getData(), InitialSystemInfo.class); - if(!systemInfos.isEmpty()){ - // 时间戳转换 - long timestamp = systemInfos.get(0).getTimestamp(); - long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp; - Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 - systemInfos.forEach(iface -> { - iface.setClientId(message.getClientId()); - iface.setCreateTime(createTime); - }); - // 初始系统数据入库 - initialSystemInfoService.batchInsertInitialSystemInfo(systemInfos); - }else{ - throw new RuntimeException("系统data数据为空"); - } - } - /** - * 监听心跳 - * @param message - */ - private void handleHeartbeatMessage(DeviceMessage message) { - List heartbeats = JsonDataParser.parseJsonData(message.getData(), InitialHeartbeatListen.class); - if(!heartbeats.isEmpty()){ - InitialHeartbeatListen heartbeat = heartbeats.get(0); - String clientId = message.getClientId(); - log.info("处理心跳消息,客户端ID: {}, 时间: {}", clientId, heartbeat.getTimestamp()); - // 使用Redis存储状态 - String statusKey = HEARTBEAT_STATUS_PREFIX + clientId; - String timeKey = HEARTBEAT_TIME_PREFIX + clientId; - String recoveryCountKey = HEARTBEAT_RECOVERY_COUNT_PREFIX + clientId; // 恢复次数计数器 - try { - // 重置丢失计数为0,设置最后心跳时间 - redisTemplate.opsForValue().set(statusKey, "0"); - redisTemplate.opsForValue().set(timeKey, String.valueOf(System.currentTimeMillis())); - - // 检查是否之前有告警状态 - if (Boolean.TRUE.equals(redisTemplate.hasKey(HEARTBEAT_ALERT_PREFIX + clientId))) { - // 获取当前恢复次数 - String recoveryCountStr = redisTemplate.opsForValue().get(recoveryCountKey); - int recoveryCount = (recoveryCountStr == null) ? 1 : Integer.parseInt(recoveryCountStr) + 1; - - if (recoveryCount == 2) { - // 达到2次恢复,执行状态修改 - log.warn("客户端ID: {} 心跳恢复达到2次,修改设备状态为在线", clientId); - insertHeartbeatLog(clientId, "2", "心跳恢复,设备在线状态改为在线"); - redisTemplate.delete(HEARTBEAT_ALERT_PREFIX + clientId); - redisTemplate.delete(recoveryCountKey); // 清除恢复计数器 - // 修改资源状态 - getResourceMsg(clientId, "1"); - } else { - // 未达到2次,只记录恢复次数 - log.info("客户端ID: {} 心跳恢复第{}次", clientId, recoveryCount); - redisTemplate.opsForValue().set(recoveryCountKey, String.valueOf(recoveryCount)); - } - } - } catch (Exception e) { - log.error("处理心跳消息异常, clientId: {}", clientId, e); - } - } - } - - // 添加一个定时任务方法,定期检查心跳状态 - @Scheduled(fixedRate = 60000) // 每分钟检查一次 - public void checkHeartbeatStatus() { - long currentTime = System.currentTimeMillis(); - // 获取所有客户端时间键 - Set timeKeys = redisTemplate.keys(HEARTBEAT_TIME_PREFIX + "*"); - if (timeKeys == null) return; - - for (String timeKey : timeKeys) { - String clientId = timeKey.substring(HEARTBEAT_TIME_PREFIX.length()); - String statusKey = HEARTBEAT_STATUS_PREFIX + clientId; - String alertKey = HEARTBEAT_ALERT_PREFIX + clientId; - - try { - // 检查是否已经存在告警 - String existingAlert = redisTemplate.opsForValue().get(alertKey); - if ("1".equals(existingAlert)) { - continue; // 如果已有告警,跳过处理 - } - String lastTimeStr = redisTemplate.opsForValue().get(timeKey); - if (lastTimeStr == null) continue; - - long lastHeartbeatTime = Long.parseLong(lastTimeStr); - - if (currentTime - lastHeartbeatTime > HEARTBEAT_TIMEOUT) { - // 心跳超时处理 - String lostCountStr = redisTemplate.opsForValue().get(statusKey); - int lostCount = (lostCountStr == null ? 0 : Integer.parseInt(lostCountStr)) + 1; - redisTemplate.opsForValue().set(statusKey, String.valueOf(lostCount)); - - log.warn("客户端ID: {} 心跳丢失,连续次数: {}", clientId, lostCount); - - if (lostCount >= 3) { - insertHeartbeatLog(clientId, "3", "连续三次心跳丢失"); - redisTemplate.opsForValue().set(HEARTBEAT_ALERT_PREFIX + clientId, "1"); - // 设置告警后删除timeKey和statusKey - redisTemplate.delete(timeKey); - redisTemplate.delete(statusKey); - - log.info("客户端ID: {} 已设置告警并清理心跳记录", clientId); - // 修改资源状态 - getResourceMsg(clientId, "0"); - } - }else { - // 如果心跳正常,重置丢失次数 - redisTemplate.opsForValue().set(statusKey, "0"); - log.debug("客户端ID: {} 心跳正常,重置丢失次数", clientId); - } - } catch (Exception e) { - log.error("检查心跳状态异常, clientId: {}", clientId, e); - } - } - } - - /** - * 修改资源在线状态 - * @param clientId - * @param status - */ - private void getResourceMsg(String clientId, String status){ - String ipAddress = null; - AllInterfaceNameRemote interfaceNameRemote = new AllInterfaceNameRemote(); - interfaceNameRemote.setClientId(clientId); - - // 1. 先获取交换机IP - interfaceNameRemote.setResourceType("2"); - R switchResult = remoteRevenueConfigService.getMsgByClientId( - interfaceNameRemote, SecurityConstants.INNER); - - if (switchResult != null && switchResult.getData() != null && - StringUtils.isNotEmpty(switchResult.getData().getSwitchIp())) { - // 更新交换机状态 - ipAddress = switchResult.getData().getSwitchIp(); - updateResourceStatus(ipAddress, status); - - // 2. 再获取服务器IP - interfaceNameRemote.setResourceType("1"); - R serverResult = remoteRevenueConfigService.getMsgByClientId( - interfaceNameRemote, SecurityConstants.INNER); - - if (serverResult != null && serverResult.getData() != null && - StringUtils.isNotEmpty(serverResult.getData().getServerIp())) { - // 更新服务器状态 - updateResourceStatus(serverResult.getData().getServerIp(), status); - } - } else { - // 3. 如果没有交换机IP,只获取服务器IP - interfaceNameRemote.setResourceType("1"); - R serverResult = remoteRevenueConfigService.getMsgByClientId( - interfaceNameRemote, SecurityConstants.INNER); - - if (serverResult != null && serverResult.getData() != null && - StringUtils.isNotEmpty(serverResult.getData().getServerIp())) { - // 更新服务器状态 - updateResourceStatus(serverResult.getData().getServerIp(), status); - } else { - log.warn("未找到客户端ID: {} 对应的IP地址", clientId); - } - } - } - // 更新资源状态的公共方法 - private void updateResourceStatus(String ipAddress, String status) { - RmResourceRegistrationRemote rmResourceRegistrationRemote = new RmResourceRegistrationRemote(); - rmResourceRegistrationRemote.setOnlineStatus(status); - rmResourceRegistrationRemote.setRegistrationStatus(status); - rmResourceRegistrationRemote.setIpAddress(ipAddress); - remoteRevenueConfigService.updateStatusByResource(rmResourceRegistrationRemote, SecurityConstants.INNER); - } - // 插入心跳日志到数据库 - private void insertHeartbeatLog(String machineId, String status, String remark) { - try { - InitialHeartbeatListenLog listenLog = new InitialHeartbeatListenLog(); - listenLog.setClientId(machineId); - listenLog.setStatus(status); // 0-离线 1-在线 2-恢复 3-三次丢失 - listenLog.setRemark(remark); - listenLog.setCreateTime(new Date()); - - // 调用DAO或Service插入日志 - initialHeartbeatListenLog.insertInitialHeartbeatListenLog(listenLog); - log.info("已记录心跳日志,客户端ID: {}, 状态: {}", machineId, status); - } catch (Exception e) { - log.error("插入心跳日志失败", e); - } - } - - /** - * 应答信息 - * @param message - */ - private RspVo handleResponseMessage(DeviceMessage message) { - List rspVoList = JsonDataParser.parseJsonData(message.getData(), RspVo.class); - if (!rspVoList.isEmpty()) { - RspVo rsp = rspVoList.get(0); - log.info("应答信息:{}",rsp); - return rsp; - } - return null; - } - /** - * 注册应答处理 - * @param message - */ -// private void handleRegisterMessage(DeviceMessage message) { -// RspVo rspVo = handleResponseMessage(message); -// String clientId = message.getClientId(); -// if (rspVo != null && rspVo.getResCode() == 1) { -// RmResourceRegistrationRemote rmResourceRegistrationRemote = new RmResourceRegistrationRemote(); -// rmResourceRegistrationRemote.setRegistrationStatus("1"); -// rmResourceRegistrationRemote.setHardwareSn(clientId); -// remoteRevenueConfigService.updateStatusByResource(rmResourceRegistrationRemote, SecurityConstants.INNER); +//package com.ruoyi.rocketmq.consumer; +// +//import com.alibaba.fastjson.JSON; +//import com.ruoyi.common.core.constant.SecurityConstants; +//import com.ruoyi.common.core.domain.R; +//import com.ruoyi.common.core.utils.DateUtils; +//import com.ruoyi.common.core.utils.StringUtils; +//import com.ruoyi.rocketmq.domain.*; +//import com.ruoyi.rocketmq.domain.vo.RspVo; +//import com.ruoyi.rocketmq.enums.MessageCodeEnum; +//import com.ruoyi.rocketmq.handler.DeviceMessageHandler; +//import com.ruoyi.rocketmq.producer.ConsumeException; +//import com.ruoyi.rocketmq.service.*; +//import com.ruoyi.rocketmq.utils.JsonDataParser; +//import com.ruoyi.system.api.RemoteRevenueConfigService; +//import com.ruoyi.system.api.domain.AllInterfaceNameRemote; +//import com.ruoyi.system.api.domain.EpsInitialTrafficDataRemote; +//import com.ruoyi.system.api.domain.InitialSwitchInfoDetailsRemote; +//import com.ruoyi.system.api.domain.RmResourceRegistrationRemote; +//import lombok.extern.slf4j.Slf4j; +//import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +//import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +//import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +//import org.apache.rocketmq.common.message.MessageExt; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.data.redis.core.RedisTemplate; +//import org.springframework.scheduling.annotation.Scheduled; +//import org.springframework.util.CollectionUtils; +// +//import java.io.UnsupportedEncodingException; +//import java.math.BigDecimal; +//import java.math.RoundingMode; +//import java.util.Date; +//import java.util.List; +//import java.util.Map; +//import java.util.Set; +//import java.util.function.Function; +//import java.util.stream.Collectors; +// +///** +// * 消息监听 v1.0 +// */ +//@Slf4j +//public class RocketMsgListenerHistory implements MessageListenerConcurrently { +// // 心跳状态 +// private static final String HEARTBEAT_STATUS_PREFIX = "heartbeat:status:"; +// // 心跳时间 +// private static final String HEARTBEAT_TIME_PREFIX = "heartbeat:time:"; +// // 心跳告警 +// private static final String HEARTBEAT_ALERT_PREFIX = "heartbeat:alert:"; +// String HEARTBEAT_RECOVERY_COUNT_PREFIX = "heartbeat:recovery:count:"; +// private static final long HEARTBEAT_TIMEOUT = 180000; // 3分钟超时 +// +// +// @Autowired +// private RedisTemplate redisTemplate; +// private final IInitialBandwidthTrafficService initialBandwidthTrafficService; +// private final RemoteRevenueConfigService remoteRevenueConfigService; +// @Autowired +// private IInitialDockerInfoService initialDockerInfoService; +// @Autowired +// private IInitialCpuInfoService initialCpuInfoService; +// @Autowired +// private IInitialDiskInfoService initialDiskInfoService; +// @Autowired +// private IInitialMemoryInfoService initialMemoryInfoService; +// @Autowired +// private IInitialMountPointInfoService initialMountPointInfoService; +// @Autowired +// private IInitialSwitchInfoService initialSwitchInfoService; +// @Autowired +// private IInitialSystemInfoService initialSystemInfoService; +// @Autowired +// private IInitialSwitchInfoTempService initialSwitchInfoTempService; +// @Autowired +// private IInitialHeartbeatListenLogService initialHeartbeatListenLog; +// @Autowired +// public RocketMsgListenerHistory(IInitialBandwidthTrafficService initialBandwidthTrafficService, +// RemoteRevenueConfigService remoteRevenueConfigService) { +// this.initialBandwidthTrafficService = initialBandwidthTrafficService; +// this.remoteRevenueConfigService = remoteRevenueConfigService; +// } +// @Autowired +// private DeviceMessageHandler deviceMessageHandler; +// +// /** +// * 消费消息 +// * @param list msgs.size() >= 1 +// * DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here +// * 这里只设置为1,当设置为多个时,list中只要有一条消息消费失败,就会整体重试 +// * @param consumeConcurrentlyContext 上下文信息 +// * @return 消费状态 成功(CONSUME_SUCCESS)或者 重试 (RECONSUME_LATER) +// */ +// @Override +// public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { +// try{ +// //消息不等于空情况 +// if (!CollectionUtils.isEmpty(list)) { +// //获取topic +// for (MessageExt messageExt : list) { +// // 解析消息内容 +// String body = new String(messageExt.getBody()); +// log.info("接受到的消息为:{}", body); +// String tags = messageExt.getTags(); +// String topic = messageExt.getTopic(); +// String msgId = messageExt.getMsgId(); +// String keys = messageExt.getKeys(); +// int reConsume = messageExt.getReconsumeTimes(); +// // 消息已经重试了3次,如果不需要再次消费,则返回成功 +// if (reConsume == 3) { +// // TODO 补偿信息 +// log.error("消息消费三次失败,消息内容:{}", body); +// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//根据业务返回是否正常 +// } +// if(MessageCodeEnum.TONGRAN_AGENT_UP.getCode().equals(topic)){ +// // 拿到信息 +// DeviceMessage message = JSON.parseObject(body, DeviceMessage.class); +// // 处理消息 +// deviceMessageHandler.handleMessage(message); +// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//业务处理成功 +// } +// // 根据不同的topic处理不同的业务 这里以订单消息为例子 +// if (MessageCodeEnum.AGENT_MESSAGE_TOPIC.getCode().equals(topic)) { +// // 拿到信息 +// DeviceMessage message = JSON.parseObject(body, DeviceMessage.class); +// switch (message.getDataType()){ +// case "NET": +// handleNetMessage(message); +// break; +// case "CPU": +// handleCpuMessage(message); +// break; +// case "SYSTEM": +// handleSystemMessage(message); +// break; +// case "DISK": +// handleDiskMessage(message); +// break; +// case "POINT": +// handleMountPointMessage(message); +// break; +// case "MEMORY": +// handleMemoryMessage(message); +// break; +// case "DOCKER": +// handleDockerMessage(message); +// break; +// case "SWITCHBOARD": +// handleSwitchMessage(message); +// break; +// case "HEARTBEAT": +// handleHeartbeatMessage(message); +// break; +// default: +// log.warn("未知数据类型:{}",message.getDataType()); +// } +// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//业务处理成功 +// } +// } +// } +// // 消息消费失败 +// //broker会根据设置的messageDelayLevel发起重试,默认16次 +// return ConsumeConcurrentlyStatus.RECONSUME_LATER; +// } catch (Exception e) { +// // 调用 handleException 方法处理异常并返回处理结果 +// return handleException(e); +// } +// } +// +// /** +// * 异常处理 +// * +// * @param e 捕获的异常 +// * @return 消息消费结果 +// */ +// private static ConsumeConcurrentlyStatus handleException(final Exception e) { +// Class exceptionClass = e.getClass(); +// if (exceptionClass.equals(UnsupportedEncodingException.class)) { +// log.error(e.getMessage()); +// } else if (exceptionClass.equals(ConsumeException.class)) { +// log.error(e.getMessage()); +// } else{ +// log.error(e.getMessage()); +// } +// return ConsumeConcurrentlyStatus.RECONSUME_LATER; +// } +// +// /** +// * 网络流量数据入库 +// * @param message +// */ +// private void handleNetMessage(DeviceMessage message) { +// List interfaces = JsonDataParser.parseJsonData(message.getData(), InitialBandwidthTraffic.class); +// if(!interfaces.isEmpty()){ +// // 时间戳转换 +// long timestamp = interfaces.get(0).getTimestamp(); +// long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp; +// Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 +// String timeStr = DateUtils.parseDateToStr("yyyy-MM-dd HH:mm:ss",createTime); +// InitialBandwidthTraffic data = new InitialBandwidthTraffic(); +// interfaces.forEach(iface -> { +// iface.setClientId(message.getClientId()); +// iface.setCreateTime(createTime); +// }); +// // 批量入库集合 +// data.setList(interfaces); +// // 初始流量数据入库 +// initialBandwidthTrafficService.batchInsert(data); +// EpsInitialTrafficDataRemote epsInitialTrafficDataRemote = new EpsInitialTrafficDataRemote(); +// epsInitialTrafficDataRemote.setStartTime(timeStr); +// epsInitialTrafficDataRemote.setEndTime(timeStr); +// // 复制到业务初始库 +// remoteRevenueConfigService.autoSaveServiceTrafficData(epsInitialTrafficDataRemote, SecurityConstants.INNER); // }else{ -// if(rspVo == null){ -// log.error("注册失败:应答信息为null"); +// throw new RuntimeException("NET流量data数据为空"); +// } +// } +// /** +// * docker数据入库 +// * @param message +// */ +// private void handleDockerMessage(DeviceMessage message) { +// List dockers = JsonDataParser.parseJsonData(message.getData(), InitialDockerInfo.class); +// if(!dockers.isEmpty()){ +// // 时间戳转换 +// long timestamp = dockers.get(0).getTimestamp(); +// long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp; +// Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 +// dockers.forEach(iface -> { +// iface.setClientId(message.getClientId()); +// iface.setCreateTime(createTime); +// }); +// // 初始容器数据入库 +// initialDockerInfoService.batchInsertInitialDockerInfo(dockers); +// }else{ +// throw new RuntimeException("DOCKER容器data数据为空"); +// } +// } +// /** +// * cpu数据入库 +// * @param message +// */ +// private void handleCpuMessage(DeviceMessage message) { +// List cpus = JsonDataParser.parseJsonData(message.getData(),InitialCpuInfo.class); +// // 时间戳转换 +// long timestamp = cpus.get(0).getTimestamp(); +// long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp; +// Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 +// if(!cpus.isEmpty()){ +// cpus.forEach(iface -> { +// iface.setClientId(message.getClientId()); +// iface.setCreateTime(createTime); +// }); +// // 初始CPU数据入库 +// initialCpuInfoService.batchInsertInitialCpuInfo(cpus); +// }else{ +// throw new RuntimeException("CPUdata数据为空"); +// } +// } +// /** +// * 磁盘数据入库 +// * @param message +// */ +// private void handleDiskMessage(DeviceMessage message) { +// List disks = JsonDataParser.parseJsonData(message.getData(), InitialDiskInfo.class); +// // 时间戳转换 +// long timestamp = disks.get(0).getTimestamp(); +// long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp; +// Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 +// if(!disks.isEmpty()){ +// disks.forEach(iface -> { +// iface.setClientId(message.getClientId()); +// iface.setCreateTime(createTime); +// }); +// // 初始磁盘数据入库 +// initialDiskInfoService.batchInsertInitialDiskInfo(disks); +// }else{ +// throw new RuntimeException("磁盘data数据为空"); +// } +// } +// /** +// * 内存数据入库 +// * @param message +// */ +// private void handleMemoryMessage(DeviceMessage message) { +// List memorys = JsonDataParser.parseJsonData(message.getData(), InitialMemoryInfo.class); +// if(!memorys.isEmpty()){ +// // 时间戳转换 +// long timestamp = memorys.get(0).getTimestamp(); +// long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp; +// Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 +// memorys.forEach(iface -> { +// iface.setClientId(message.getClientId()); +// iface.setCreateTime(createTime); +// }); +// // 初始内存数据入库 +// initialMemoryInfoService.batchInsertInitialMemoryInfo(memorys); +// }else{ +// throw new RuntimeException("内存data数据为空"); +// } +// } +// /** +// * 挂载点数据入库 +// * @param message +// */ +// private void handleMountPointMessage(DeviceMessage message) { +// List mountPointInfos = JsonDataParser.parseJsonData(message.getData(), InitialMountPointInfo.class); +// if(!mountPointInfos.isEmpty()){ +// // 时间戳转换 +// long timestamp = mountPointInfos.get(0).getTimestamp(); +// long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp; +// Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 +// mountPointInfos.forEach(iface -> { +// iface.setClientId(message.getClientId()); +// iface.setCreateTime(createTime); +// }); +// // 初始挂载点数据入库 +// initialMountPointInfoService.batchInsertInitialMountPointInfo(mountPointInfos); +// }else{ +// throw new RuntimeException("挂载点data数据为空"); +// } +// } +// /** +// * 交换机数据入库 +// * @param message +// */ +// private void handleSwitchMessage(DeviceMessage message) { +// List switchInfos = JsonDataParser.parseJsonData(message.getData(), InitialSwitchInfo.class); +// if(!switchInfos.isEmpty()){ +// // 时间戳转换 +// long timestamp = switchInfos.get(0).getTimestamp(); +// long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp; +// Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 +// String timeStr = DateUtils.parseDateToStr("yyyy-MM-dd HH:mm:ss",createTime); +// // 查询临时表信息,计算实际流量值 +// InitialSwitchInfoTemp temp = new InitialSwitchInfoTemp(); +// temp.setClientId(message.getClientId()); +// List tempList = initialSwitchInfoTempService.selectInitialSwitchInfoTempList(temp); +// if(!tempList.isEmpty()){ +// // 1. 构建快速查找的Map +// Map tempMap = tempList.stream() +// .collect(Collectors.toMap( +// InitialSwitchInfoTemp::getName, +// Function.identity(), +// (existing, replacement) -> existing +// )); +// +// // 2. 预计算除数(避免重复创建对象) +// BigDecimal divisor = new BigDecimal(300); +// +// // 3. 计算速度 +// switchInfos.forEach(switchInfo -> { +// switchInfo.setClientId(message.getClientId()); +// switchInfo.setCreateTime(createTime); +// InitialSwitchInfoTemp tempInfo = tempMap.get(switchInfo.getName()); +// if (tempInfo != null) { +// // 计算inSpeed +// if (switchInfo.getInBytes() != null && tempInfo.getInBytes() != null) { +// BigDecimal inDiff = switchInfo.getInBytes().subtract(tempInfo.getInBytes()); +// switchInfo.setInSpeed(inDiff.divide(divisor, 2, RoundingMode.HALF_UP)); +// } +// +// // 计算outSpeed +// if (switchInfo.getOutBytes() != null && tempInfo.getOutBytes() != null) { +// BigDecimal outDiff = switchInfo.getOutBytes().subtract(tempInfo.getOutBytes()); +// switchInfo.setOutSpeed(outDiff.divide(divisor, 2, RoundingMode.HALF_UP)); +// } +// } +// }); // }else{ -// log.error("注册失败:{}",rspVo.getResMsg()); +// switchInfos.forEach(switchInfo -> { +// switchInfo.setClientId(message.getClientId()); +// switchInfo.setCreateTime(createTime); +// }); +// } +// // 清空临时表对应switch信息 +// initialSwitchInfoTempService.truncateSwitchInfoTemp(message.getClientId()); +// // 临时表 用来计算inSpeed outSeppd +// initialSwitchInfoTempService.batchInsertInitialSwitchInfoTemp(switchInfos); +// // 初始交换机数据入库 +// initialSwitchInfoService.batchInsertInitialSwitchInfo(switchInfos); +// // 业务表入库 +// InitialSwitchInfoDetailsRemote detailsRemote = new InitialSwitchInfoDetailsRemote(); +// detailsRemote.setClientId(message.getClientId()); +// detailsRemote.setStartTime(timeStr); +// detailsRemote.setEndTime(timeStr); +// remoteRevenueConfigService.autoSaveSwitchTraffic(detailsRemote, SecurityConstants.INNER); +// }else{ +// throw new RuntimeException("交换机data数据为空"); +// } +// } +// /** +// * 系统数据入库 +// * @param message +// */ +// private void handleSystemMessage(DeviceMessage message) { +// List systemInfos = JsonDataParser.parseJsonData(message.getData(), InitialSystemInfo.class); +// if(!systemInfos.isEmpty()){ +// // 时间戳转换 +// long timestamp = systemInfos.get(0).getTimestamp(); +// long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp; +// Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 +// systemInfos.forEach(iface -> { +// iface.setClientId(message.getClientId()); +// iface.setCreateTime(createTime); +// }); +// // 初始系统数据入库 +// initialSystemInfoService.batchInsertInitialSystemInfo(systemInfos); +// }else{ +// throw new RuntimeException("系统data数据为空"); +// } +// } +// /** +// * 监听心跳 +// * @param message +// */ +// private void handleHeartbeatMessage(DeviceMessage message) { +// List heartbeats = JsonDataParser.parseJsonData(message.getData(), InitialHeartbeatListen.class); +// if(!heartbeats.isEmpty()){ +// InitialHeartbeatListen heartbeat = heartbeats.get(0); +// String clientId = message.getClientId(); +// log.info("处理心跳消息,客户端ID: {}, 时间: {}", clientId, heartbeat.getTimestamp()); +// // 使用Redis存储状态 +// String statusKey = HEARTBEAT_STATUS_PREFIX + clientId; +// String timeKey = HEARTBEAT_TIME_PREFIX + clientId; +// String recoveryCountKey = HEARTBEAT_RECOVERY_COUNT_PREFIX + clientId; // 恢复次数计数器 +// try { +// // 重置丢失计数为0,设置最后心跳时间 +// redisTemplate.opsForValue().set(statusKey, "0"); +// redisTemplate.opsForValue().set(timeKey, String.valueOf(System.currentTimeMillis())); +// +// // 检查是否之前有告警状态 +// if (Boolean.TRUE.equals(redisTemplate.hasKey(HEARTBEAT_ALERT_PREFIX + clientId))) { +// // 获取当前恢复次数 +// String recoveryCountStr = redisTemplate.opsForValue().get(recoveryCountKey); +// int recoveryCount = (recoveryCountStr == null) ? 1 : Integer.parseInt(recoveryCountStr) + 1; +// +// if (recoveryCount == 2) { +// // 达到2次恢复,执行状态修改 +// log.warn("客户端ID: {} 心跳恢复达到2次,修改设备状态为在线", clientId); +// insertHeartbeatLog(clientId, "2", "心跳恢复,设备在线状态改为在线"); +// redisTemplate.delete(HEARTBEAT_ALERT_PREFIX + clientId); +// redisTemplate.delete(recoveryCountKey); // 清除恢复计数器 +// // 修改资源状态 +// getResourceMsg(clientId, "1"); +// } else { +// // 未达到2次,只记录恢复次数 +// log.info("客户端ID: {} 心跳恢复第{}次", clientId, recoveryCount); +// redisTemplate.opsForValue().set(recoveryCountKey, String.valueOf(recoveryCount)); +// } +// } +// } catch (Exception e) { +// log.error("处理心跳消息异常, clientId: {}", clientId, e); // } // } // } -} +// +// // 添加一个定时任务方法,定期检查心跳状态 +// @Scheduled(fixedRate = 60000) // 每分钟检查一次 +// public void checkHeartbeatStatus() { +// long currentTime = System.currentTimeMillis(); +// // 获取所有客户端时间键 +// Set timeKeys = redisTemplate.keys(HEARTBEAT_TIME_PREFIX + "*"); +// if (timeKeys == null) return; +// +// for (String timeKey : timeKeys) { +// String clientId = timeKey.substring(HEARTBEAT_TIME_PREFIX.length()); +// String statusKey = HEARTBEAT_STATUS_PREFIX + clientId; +// String alertKey = HEARTBEAT_ALERT_PREFIX + clientId; +// +// try { +// // 检查是否已经存在告警 +// String existingAlert = redisTemplate.opsForValue().get(alertKey); +// if ("1".equals(existingAlert)) { +// continue; // 如果已有告警,跳过处理 +// } +// String lastTimeStr = redisTemplate.opsForValue().get(timeKey); +// if (lastTimeStr == null) continue; +// +// long lastHeartbeatTime = Long.parseLong(lastTimeStr); +// +// if (currentTime - lastHeartbeatTime > HEARTBEAT_TIMEOUT) { +// // 心跳超时处理 +// String lostCountStr = redisTemplate.opsForValue().get(statusKey); +// int lostCount = (lostCountStr == null ? 0 : Integer.parseInt(lostCountStr)) + 1; +// redisTemplate.opsForValue().set(statusKey, String.valueOf(lostCount)); +// +// log.warn("客户端ID: {} 心跳丢失,连续次数: {}", clientId, lostCount); +// +// if (lostCount >= 3) { +// insertHeartbeatLog(clientId, "3", "连续三次心跳丢失"); +// redisTemplate.opsForValue().set(HEARTBEAT_ALERT_PREFIX + clientId, "1"); +// // 设置告警后删除timeKey和statusKey +// redisTemplate.delete(timeKey); +// redisTemplate.delete(statusKey); +// +// log.info("客户端ID: {} 已设置告警并清理心跳记录", clientId); +// // 修改资源状态 +// getResourceMsg(clientId, "0"); +// } +// }else { +// // 如果心跳正常,重置丢失次数 +// redisTemplate.opsForValue().set(statusKey, "0"); +// log.debug("客户端ID: {} 心跳正常,重置丢失次数", clientId); +// } +// } catch (Exception e) { +// log.error("检查心跳状态异常, clientId: {}", clientId, e); +// } +// } +// } +// +// /** +// * 修改资源在线状态 +// * @param clientId +// * @param status +// */ +// private void getResourceMsg(String clientId, String status){ +// String ipAddress = null; +// AllInterfaceNameRemote interfaceNameRemote = new AllInterfaceNameRemote(); +// interfaceNameRemote.setClientId(clientId); +// +// // 1. 先获取交换机IP +// interfaceNameRemote.setResourceType("2"); +// R switchResult = remoteRevenueConfigService.getMsgByClientId( +// interfaceNameRemote, SecurityConstants.INNER); +// +// if (switchResult != null && switchResult.getData() != null && +// StringUtils.isNotEmpty(switchResult.getData().getSwitchIp())) { +// // 更新交换机状态 +// ipAddress = switchResult.getData().getSwitchIp(); +// updateResourceStatus(ipAddress, status); +// +// // 2. 再获取服务器IP +// interfaceNameRemote.setResourceType("1"); +// R serverResult = remoteRevenueConfigService.getMsgByClientId( +// interfaceNameRemote, SecurityConstants.INNER); +// +// if (serverResult != null && serverResult.getData() != null && +// StringUtils.isNotEmpty(serverResult.getData().getServerIp())) { +// // 更新服务器状态 +// updateResourceStatus(serverResult.getData().getServerIp(), status); +// } +// } else { +// // 3. 如果没有交换机IP,只获取服务器IP +// interfaceNameRemote.setResourceType("1"); +// R serverResult = remoteRevenueConfigService.getMsgByClientId( +// interfaceNameRemote, SecurityConstants.INNER); +// +// if (serverResult != null && serverResult.getData() != null && +// StringUtils.isNotEmpty(serverResult.getData().getServerIp())) { +// // 更新服务器状态 +// updateResourceStatus(serverResult.getData().getServerIp(), status); +// } else { +// log.warn("未找到客户端ID: {} 对应的IP地址", clientId); +// } +// } +// } +// // 更新资源状态的公共方法 +// private void updateResourceStatus(String ipAddress, String status) { +// RmResourceRegistrationRemote rmResourceRegistrationRemote = new RmResourceRegistrationRemote(); +// rmResourceRegistrationRemote.setOnlineStatus(status); +// rmResourceRegistrationRemote.setRegistrationStatus(status); +// rmResourceRegistrationRemote.setIpAddress(ipAddress); +// remoteRevenueConfigService.updateStatusByResource(rmResourceRegistrationRemote, SecurityConstants.INNER); +// } +// // 插入心跳日志到数据库 +// private void insertHeartbeatLog(String machineId, String status, String remark) { +// try { +// InitialHeartbeatListenLog listenLog = new InitialHeartbeatListenLog(); +// listenLog.setClientId(machineId); +// listenLog.setStatus(status); // 0-离线 1-在线 2-恢复 3-三次丢失 +// listenLog.setRemark(remark); +// listenLog.setCreateTime(new Date()); +// +// // 调用DAO或Service插入日志 +// initialHeartbeatListenLog.insertInitialHeartbeatListenLog(listenLog); +// log.info("已记录心跳日志,客户端ID: {}, 状态: {}", machineId, status); +// } catch (Exception e) { +// log.error("插入心跳日志失败", e); +// } +// } +// +// /** +// * 应答信息 +// * @param message +// */ +// private RspVo handleResponseMessage(DeviceMessage message) { +// List rspVoList = JsonDataParser.parseJsonData(message.getData(), RspVo.class); +// if (!rspVoList.isEmpty()) { +// RspVo rsp = rspVoList.get(0); +// log.info("应答信息:{}",rsp); +// return rsp; +// } +// return null; +// } +// /** +// * 注册应答处理 +// * @param message +// */ +//// private void handleRegisterMessage(DeviceMessage message) { +//// RspVo rspVo = handleResponseMessage(message); +//// String clientId = message.getClientId(); +//// if (rspVo != null && rspVo.getResCode() == 1) { +//// RmResourceRegistrationRemote rmResourceRegistrationRemote = new RmResourceRegistrationRemote(); +//// rmResourceRegistrationRemote.setRegistrationStatus("1"); +//// rmResourceRegistrationRemote.setHardwareSn(clientId); +//// remoteRevenueConfigService.updateStatusByResource(rmResourceRegistrationRemote, SecurityConstants.INNER); +//// }else{ +//// if(rspVo == null){ +//// log.error("注册失败:应答信息为null"); +//// }else{ +//// log.error("注册失败:{}",rspVo.getResMsg()); +//// } +//// } +//// } +//} diff --git a/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/handler/DeviceMessageHandler.java b/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/handler/DeviceMessageHandler.java index 4d083f1..e3d2dcb 100644 --- a/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/handler/DeviceMessageHandler.java +++ b/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/handler/DeviceMessageHandler.java @@ -18,7 +18,10 @@ import com.ruoyi.system.api.domain.InitialSwitchInfoDetailsRemote; import com.ruoyi.system.api.domain.RmResourceRegistrationRemote; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.dao.DataAccessException; +import org.springframework.data.redis.core.RedisOperations; import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.core.SessionCallback; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @@ -570,14 +573,31 @@ public class DeviceMessageHandler { InitialHeartbeatListen heartbeat = heartbeats.get(0); String clientId = message.getClientId(); log.debug("处理心跳消息,客户端ID: {}, 时间: {}", clientId, heartbeat.getTimestamp()); + // 使用Redis存储状态 String statusKey = HEARTBEAT_STATUS_PREFIX + clientId; String timeKey = HEARTBEAT_TIME_PREFIX + clientId; - String recoveryCountKey = HEARTBEAT_RECOVERY_COUNT_PREFIX + clientId; // 恢复次数计数器 + String recoveryCountKey = HEARTBEAT_RECOVERY_COUNT_PREFIX + clientId; + try { - // 重置丢失计数为0,设置最后心跳时间 - redisTemplate.opsForValue().set(statusKey, "0"); - redisTemplate.opsForValue().set(timeKey, String.valueOf(System.currentTimeMillis())); + // 记录处理前状态(调试用) + String prevStatus = redisTemplate.opsForValue().get(statusKey); + String prevTime = redisTemplate.opsForValue().get(timeKey); + log.debug("客户端ID: {} 处理前状态 - status: {}, time: {}", clientId, prevStatus, prevTime); + + // 使用事务确保原子性操作 + redisTemplate.execute(new SessionCallback() { + @Override + public Object execute(RedisOperations operations) throws DataAccessException { + operations.multi(); + // 重置丢失计数为0,设置最后心跳时间 + operations.opsForValue().set(statusKey, "0"); + operations.opsForValue().set(timeKey, String.valueOf(System.currentTimeMillis())); + return operations.exec(); + } + }); + + log.debug("客户端ID: {} 心跳处理完成,重置状态和时间", clientId); // 检查是否之前有告警状态 if (Boolean.TRUE.equals(redisTemplate.hasKey(HEARTBEAT_ALERT_PREFIX + clientId))) { @@ -606,12 +626,19 @@ public class DeviceMessageHandler { } // 添加一个定时任务方法,定期检查心跳状态 - @Scheduled(fixedRate = 30000) // 每30s检查一次 + @Scheduled(fixedRate = 60000) // 每60s检查一次 public void checkHeartbeatStatus() { long currentTime = System.currentTimeMillis(); + log.debug("开始心跳状态检查,当前时间: {}", currentTime); + // 获取所有客户端时间键 Set timeKeys = redisTemplate.keys(HEARTBEAT_TIME_PREFIX + "*"); - if (timeKeys == null) return; + if (timeKeys == null) { + log.debug("未找到任何心跳时间键"); + return; + } + + log.debug("找到 {} 个客户端需要检查", timeKeys.size()); for (String timeKey : timeKeys) { String clientId = timeKey.substring(HEARTBEAT_TIME_PREFIX.length()); @@ -622,22 +649,36 @@ public class DeviceMessageHandler { // 检查是否已经存在告警 String existingAlert = redisTemplate.opsForValue().get(alertKey); if ("1".equals(existingAlert)) { + log.debug("客户端ID: {} 已有告警,跳过检查", clientId); continue; // 如果已有告警,跳过处理 } + String lastTimeStr = redisTemplate.opsForValue().get(timeKey); - if (lastTimeStr == null) continue; + if (lastTimeStr == null) { + log.debug("客户端ID: {} 时间键为空,跳过", clientId); + continue; + } long lastHeartbeatTime = Long.parseLong(lastTimeStr); + long timeDiff = currentTime - lastHeartbeatTime; - if (currentTime - lastHeartbeatTime > HEARTBEAT_TIMEOUT) { - // 心跳超时处理 - String lostCountStr = redisTemplate.opsForValue().get(statusKey); - int lostCount = (lostCountStr == null ? 0 : Integer.parseInt(lostCountStr)) + 1; - redisTemplate.opsForValue().set(statusKey, String.valueOf(lostCount)); + log.debug("客户端ID: {} 最后心跳: {}, 时间差: {}ms, 超时阈值: {}ms", + clientId, lastHeartbeatTime, timeDiff, HEARTBEAT_TIMEOUT); - log.warn("客户端ID: {} 心跳丢失,连续次数: {}", clientId, lostCount); + if (timeDiff > HEARTBEAT_TIMEOUT) { + // 心跳超时处理 - 使用原子操作增加计数 + Long lostCount = redisTemplate.opsForValue().increment(statusKey); + if (lostCount == 1) { + // 确保第一次增加时值为1 + redisTemplate.opsForValue().set(statusKey, "1"); + lostCount = 1L; + } + + log.warn("客户端ID: {} 心跳超时,连续次数: {}, 时间差: {}ms", + clientId, lostCount, timeDiff); if (lostCount >= 3) { + log.warn("客户端ID: {} 连续三次心跳丢失,触发告警", clientId); insertHeartbeatLog(clientId, "3", "连续三次心跳丢失"); redisTemplate.opsForValue().set(HEARTBEAT_ALERT_PREFIX + clientId, "1"); // 设置告警后删除timeKey和statusKey @@ -648,15 +689,22 @@ public class DeviceMessageHandler { // 修改资源状态 updateResourceStatus(clientId, "0"); } - }else { + } else { // 如果心跳正常,重置丢失次数 - redisTemplate.opsForValue().set(statusKey, "0"); - log.debug("客户端ID: {} 心跳正常,重置丢失次数", clientId); + String currentStatus = redisTemplate.opsForValue().get(statusKey); + if (!"0".equals(currentStatus)) { + redisTemplate.opsForValue().set(statusKey, "0"); + log.debug("客户端ID: {} 心跳正常,重置丢失次数从 {} 到 0", clientId, currentStatus); + } else { + log.debug("客户端ID: {} 心跳正常,状态已是0", clientId); + } } } catch (Exception e) { log.error("检查心跳状态异常, clientId: {}", clientId, e); } } + + log.debug("心跳状态检查完成"); } // 更新资源状态的公共方法 diff --git a/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/service/impl/RmMonitorPolicyServiceImpl.java b/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/service/impl/RmMonitorPolicyServiceImpl.java index 86fa660..bc80b8b 100644 --- a/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/service/impl/RmMonitorPolicyServiceImpl.java +++ b/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/service/impl/RmMonitorPolicyServiceImpl.java @@ -300,10 +300,10 @@ public class RmMonitorPolicyServiceImpl implements IRmMonitorPolicyService sendConfigurationsToDevices(devices, uniqueList); } // 更新策略状态为已下发 -// RmMonitorPolicy policyUpdate = new RmMonitorPolicy(); -// policyUpdate.setId(id); -// policyUpdate.setStatus("1"); -// rmMonitorPolicyMapper.updateRmMonitorPolicy(policyUpdate); + RmMonitorPolicy policyUpdate = new RmMonitorPolicy(); + policyUpdate.setId(id); + policyUpdate.setStatus("1"); + rmMonitorPolicyMapper.updateRmMonitorPolicy(policyUpdate); return 1; } catch (Exception e) {