增加首页接口,心跳监控信息改为redis存储
This commit is contained in:
@@ -2,21 +2,26 @@ package com.ruoyi.rocketmq.consumer;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.ruoyi.common.core.constant.SecurityConstants;
|
||||
import com.ruoyi.common.core.domain.R;
|
||||
import com.ruoyi.common.core.utils.DateUtils;
|
||||
import com.ruoyi.common.core.utils.StringUtils;
|
||||
import com.ruoyi.rocketmq.domain.*;
|
||||
import com.ruoyi.rocketmq.enums.MessageCodeEnum;
|
||||
import com.ruoyi.rocketmq.producer.ConsumeException;
|
||||
import com.ruoyi.rocketmq.service.*;
|
||||
import com.ruoyi.rocketmq.utils.JsonDataParser;
|
||||
import com.ruoyi.system.api.RemoteRevenueConfigService;
|
||||
import com.ruoyi.system.api.domain.AllInterfaceNameRemote;
|
||||
import com.ruoyi.system.api.domain.EpsInitialTrafficDataRemote;
|
||||
import com.ruoyi.system.api.domain.InitialSwitchInfoDetailsRemote;
|
||||
import com.ruoyi.system.api.domain.RmResourceRegistrationRemote;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
||||
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
|
||||
import org.apache.rocketmq.common.message.MessageExt;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
@@ -27,7 +32,7 @@ import java.math.RoundingMode;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@@ -38,11 +43,15 @@ import java.util.stream.Collectors;
|
||||
@Component
|
||||
public class RocketMsgListener implements MessageListenerConcurrently {
|
||||
|
||||
// 在类中添加以下成员变量来记录心跳状态
|
||||
private final Map<String, Integer> heartbeatStatusMap = new ConcurrentHashMap<>(); // 客户端ID -> 连续丢失心跳次数
|
||||
private final Map<String, Long> lastHeartbeatTimeMap = new ConcurrentHashMap<>(); // 客户端ID -> 最后心跳时间
|
||||
private static final String HEARTBEAT_STATUS_PREFIX = "heartbeat:status:";
|
||||
private static final String HEARTBEAT_TIME_PREFIX = "heartbeat:time:";
|
||||
private static final String HEARTBEAT_ALERT_PREFIX = "heartbeat:alert:";
|
||||
String HEARTBEAT_RECOVERY_COUNT_PREFIX = "heartbeat:recovery:count:";
|
||||
private static final long HEARTBEAT_TIMEOUT = 180000; // 3分钟超时
|
||||
|
||||
|
||||
@Autowired
|
||||
private RedisTemplate<String, String> redisTemplate;
|
||||
private final IInitialBandwidthTrafficService initialBandwidthTrafficService;
|
||||
private final RemoteRevenueConfigService remoteRevenueConfigService;
|
||||
@Autowired
|
||||
@@ -396,26 +405,43 @@ public class RocketMsgListener implements MessageListenerConcurrently {
|
||||
* @param message
|
||||
*/
|
||||
private void handleHeartbeatMessage(DeviceMessage message) {
|
||||
try {
|
||||
List<InitialHeartbeatListen> heartbeats = JsonDataParser.parseJsonData(message.getData(), InitialHeartbeatListen.class);
|
||||
if(!heartbeats.isEmpty()){
|
||||
InitialHeartbeatListen heartbeat = heartbeats.get(0);
|
||||
String clientId = message.getClientId();
|
||||
log.info("处理心跳消息,客户端ID: {}, 时间: {}", clientId, heartbeat.getTimestamp());
|
||||
List<InitialHeartbeatListen> heartbeats = JsonDataParser.parseJsonData(message.getData(), InitialHeartbeatListen.class);
|
||||
if(!heartbeats.isEmpty()){
|
||||
InitialHeartbeatListen heartbeat = heartbeats.get(0);
|
||||
String clientId = message.getClientId();
|
||||
log.info("处理心跳消息,客户端ID: {}, 时间: {}", clientId, heartbeat.getTimestamp());
|
||||
// 使用Redis存储状态
|
||||
String statusKey = HEARTBEAT_STATUS_PREFIX + clientId;
|
||||
String timeKey = HEARTBEAT_TIME_PREFIX + clientId;
|
||||
String recoveryCountKey = HEARTBEAT_RECOVERY_COUNT_PREFIX + clientId; // 恢复次数计数器
|
||||
try {
|
||||
// 重置丢失计数为0,设置最后心跳时间
|
||||
redisTemplate.opsForValue().set(statusKey, "0");
|
||||
redisTemplate.opsForValue().set(timeKey, String.valueOf(System.currentTimeMillis()));
|
||||
|
||||
// 更新心跳状态
|
||||
heartbeatStatusMap.put(clientId, 0); // 重置为0表示收到心跳
|
||||
lastHeartbeatTimeMap.put(clientId, System.currentTimeMillis());
|
||||
// 检查是否之前有告警状态
|
||||
if (Boolean.TRUE.equals(redisTemplate.hasKey(HEARTBEAT_ALERT_PREFIX + clientId))) {
|
||||
// 获取当前恢复次数
|
||||
String recoveryCountStr = redisTemplate.opsForValue().get(recoveryCountKey);
|
||||
int recoveryCount = (recoveryCountStr == null) ? 1 : Integer.parseInt(recoveryCountStr) + 1;
|
||||
|
||||
// 检查是否之前有丢失心跳的情况
|
||||
if (heartbeatStatusMap.getOrDefault(clientId, 0) > 0) {
|
||||
// 之前有丢失心跳,现在恢复了,记录恢复日志
|
||||
log.warn("客户端ID: {} 心跳恢复", clientId);
|
||||
insertHeartbeatLog(clientId, "2", "心跳恢复,服务器在线"); // 1表示恢复
|
||||
if (recoveryCount == 2) {
|
||||
// 达到2次恢复,执行状态修改
|
||||
log.warn("客户端ID: {} 心跳恢复达到2次,修改设备状态为在线", clientId);
|
||||
insertHeartbeatLog(clientId, "2", "心跳恢复,设备在线状态改为在线");
|
||||
redisTemplate.delete(HEARTBEAT_ALERT_PREFIX + clientId);
|
||||
redisTemplate.delete(recoveryCountKey); // 清除恢复计数器
|
||||
// 修改资源状态
|
||||
getResourceMsg(clientId, "1");
|
||||
} else {
|
||||
// 未达到2次,只记录恢复次数
|
||||
log.info("客户端ID: {} 心跳恢复第{}次", clientId, recoveryCount);
|
||||
redisTemplate.opsForValue().set(recoveryCountKey, String.valueOf(recoveryCount));
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("处理心跳消息异常, clientId: {}", clientId, e);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("处理心跳消息异常", e);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -423,31 +449,94 @@ public class RocketMsgListener implements MessageListenerConcurrently {
|
||||
@Scheduled(fixedRate = 60000) // 每分钟检查一次
|
||||
public void checkHeartbeatStatus() {
|
||||
long currentTime = System.currentTimeMillis();
|
||||
long heartbeatTimeout = 180000; // 3分钟无心跳认为丢失
|
||||
// 获取所有客户端时间键
|
||||
Set<String> timeKeys = redisTemplate.keys(HEARTBEAT_TIME_PREFIX + "*");
|
||||
if (timeKeys == null) return;
|
||||
|
||||
for (Map.Entry<String, Long> entry : lastHeartbeatTimeMap.entrySet()) {
|
||||
String clientId = entry.getKey();
|
||||
long lastHeartbeatTime = entry.getValue();
|
||||
for (String timeKey : timeKeys) {
|
||||
String clientId = timeKey.substring(HEARTBEAT_TIME_PREFIX.length());
|
||||
String statusKey = HEARTBEAT_STATUS_PREFIX + clientId;
|
||||
|
||||
if (currentTime - lastHeartbeatTime > heartbeatTimeout) {
|
||||
// 心跳超时
|
||||
int lostCount = heartbeatStatusMap.getOrDefault(clientId, 0) + 1;
|
||||
heartbeatStatusMap.put(clientId, lostCount);
|
||||
try {
|
||||
String lastTimeStr = redisTemplate.opsForValue().get(timeKey);
|
||||
if (lastTimeStr == null) continue;
|
||||
|
||||
log.warn("客户端ID: {} 心跳丢失,连续次数: {}", clientId, lostCount);
|
||||
long lastHeartbeatTime = Long.parseLong(lastTimeStr);
|
||||
|
||||
if (lostCount == 3) {
|
||||
// 两次获取不到心跳
|
||||
insertHeartbeatLog(clientId, "3", "连续三次心跳丢失,服务器离线");
|
||||
// 把资源注册表资源信息改为离线
|
||||
// RmResourceRegistrationRemote rmResourceRegistrationRemote = new RmResourceRegistrationRemote();
|
||||
// rmResourceRegistrationRemote.setOnlineStatus("0");
|
||||
// remoteRevenueConfigService.updateStatusByResource(rmResourceRegistrationRemote, SecurityConstants.INNER);
|
||||
if (currentTime - lastHeartbeatTime > HEARTBEAT_TIMEOUT) {
|
||||
// 心跳超时处理
|
||||
String lostCountStr = redisTemplate.opsForValue().get(statusKey);
|
||||
int lostCount = (lostCountStr == null ? 0 : Integer.parseInt(lostCountStr)) + 1;
|
||||
redisTemplate.opsForValue().set(statusKey, String.valueOf(lostCount));
|
||||
|
||||
log.warn("客户端ID: {} 心跳丢失,连续次数: {}", clientId, lostCount);
|
||||
|
||||
if (lostCount == 3) {
|
||||
insertHeartbeatLog(clientId, "3", "连续三次心跳丢失");
|
||||
redisTemplate.opsForValue().set(HEARTBEAT_ALERT_PREFIX + clientId, "1");
|
||||
// 修改资源状态
|
||||
getResourceMsg(clientId, "0");
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("检查心跳状态异常, clientId: {}", clientId, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 修改资源在线状态
|
||||
* @param clientId
|
||||
* @param status
|
||||
*/
|
||||
private void getResourceMsg(String clientId, String status){
|
||||
String ipAddress = null;
|
||||
AllInterfaceNameRemote interfaceNameRemote = new AllInterfaceNameRemote();
|
||||
interfaceNameRemote.setClientId(clientId);
|
||||
|
||||
// 1. 先获取交换机IP
|
||||
interfaceNameRemote.setResourceType("2");
|
||||
R<AllInterfaceNameRemote> switchResult = remoteRevenueConfigService.getMsgByClientId(
|
||||
interfaceNameRemote, SecurityConstants.INNER);
|
||||
|
||||
if (switchResult != null && switchResult.getData() != null &&
|
||||
StringUtils.isNotEmpty(switchResult.getData().getSwitchIp())) {
|
||||
// 更新交换机状态
|
||||
ipAddress = switchResult.getData().getSwitchIp();
|
||||
updateResourceStatus(ipAddress, status);
|
||||
|
||||
// 2. 再获取服务器IP
|
||||
interfaceNameRemote.setResourceType("1");
|
||||
R<AllInterfaceNameRemote> serverResult = remoteRevenueConfigService.getMsgByClientId(
|
||||
interfaceNameRemote, SecurityConstants.INNER);
|
||||
|
||||
if (serverResult != null && serverResult.getData() != null &&
|
||||
StringUtils.isNotEmpty(serverResult.getData().getServerIp())) {
|
||||
// 更新服务器状态
|
||||
updateResourceStatus(serverResult.getData().getServerIp(), status);
|
||||
}
|
||||
} else {
|
||||
// 3. 如果没有交换机IP,只获取服务器IP
|
||||
interfaceNameRemote.setResourceType("1");
|
||||
R<AllInterfaceNameRemote> serverResult = remoteRevenueConfigService.getMsgByClientId(
|
||||
interfaceNameRemote, SecurityConstants.INNER);
|
||||
|
||||
if (serverResult != null && serverResult.getData() != null &&
|
||||
StringUtils.isNotEmpty(serverResult.getData().getServerIp())) {
|
||||
// 更新服务器状态
|
||||
updateResourceStatus(serverResult.getData().getServerIp(), status);
|
||||
} else {
|
||||
log.warn("未找到客户端ID: {} 对应的IP地址", clientId);
|
||||
}
|
||||
}
|
||||
}
|
||||
// 更新资源状态的公共方法
|
||||
private void updateResourceStatus(String ipAddress, String status) {
|
||||
RmResourceRegistrationRemote rmResourceRegistrationRemote = new RmResourceRegistrationRemote();
|
||||
rmResourceRegistrationRemote.setOnlineStatus(status);
|
||||
rmResourceRegistrationRemote.setIpAddress(ipAddress);
|
||||
remoteRevenueConfigService.updateStatusByResource(rmResourceRegistrationRemote, SecurityConstants.INNER);
|
||||
}
|
||||
// 插入心跳日志到数据库
|
||||
private void insertHeartbeatLog(String machineId, String status, String remark) {
|
||||
try {
|
||||
|
||||
Reference in New Issue
Block a user