优化心跳监控方法
This commit is contained in:
		
										
											
												File diff suppressed because it is too large
												Load Diff
											
										
									
								
							@@ -18,7 +18,10 @@ import com.ruoyi.system.api.domain.InitialSwitchInfoDetailsRemote;
 | 
				
			|||||||
import com.ruoyi.system.api.domain.RmResourceRegistrationRemote;
 | 
					import com.ruoyi.system.api.domain.RmResourceRegistrationRemote;
 | 
				
			||||||
import lombok.extern.slf4j.Slf4j;
 | 
					import lombok.extern.slf4j.Slf4j;
 | 
				
			||||||
import org.springframework.beans.factory.annotation.Autowired;
 | 
					import org.springframework.beans.factory.annotation.Autowired;
 | 
				
			||||||
 | 
					import org.springframework.dao.DataAccessException;
 | 
				
			||||||
 | 
					import org.springframework.data.redis.core.RedisOperations;
 | 
				
			||||||
import org.springframework.data.redis.core.RedisTemplate;
 | 
					import org.springframework.data.redis.core.RedisTemplate;
 | 
				
			||||||
 | 
					import org.springframework.data.redis.core.SessionCallback;
 | 
				
			||||||
import org.springframework.scheduling.annotation.EnableScheduling;
 | 
					import org.springframework.scheduling.annotation.EnableScheduling;
 | 
				
			||||||
import org.springframework.scheduling.annotation.Scheduled;
 | 
					import org.springframework.scheduling.annotation.Scheduled;
 | 
				
			||||||
import org.springframework.stereotype.Component;
 | 
					import org.springframework.stereotype.Component;
 | 
				
			||||||
@@ -570,14 +573,31 @@ public class DeviceMessageHandler {
 | 
				
			|||||||
            InitialHeartbeatListen heartbeat = heartbeats.get(0);
 | 
					            InitialHeartbeatListen heartbeat = heartbeats.get(0);
 | 
				
			||||||
            String clientId = message.getClientId();
 | 
					            String clientId = message.getClientId();
 | 
				
			||||||
            log.debug("处理心跳消息,客户端ID: {}, 时间: {}", clientId, heartbeat.getTimestamp());
 | 
					            log.debug("处理心跳消息,客户端ID: {}, 时间: {}", clientId, heartbeat.getTimestamp());
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            // 使用Redis存储状态
 | 
					            // 使用Redis存储状态
 | 
				
			||||||
            String statusKey = HEARTBEAT_STATUS_PREFIX + clientId;
 | 
					            String statusKey = HEARTBEAT_STATUS_PREFIX + clientId;
 | 
				
			||||||
            String timeKey = HEARTBEAT_TIME_PREFIX + clientId;
 | 
					            String timeKey = HEARTBEAT_TIME_PREFIX + clientId;
 | 
				
			||||||
            String recoveryCountKey = HEARTBEAT_RECOVERY_COUNT_PREFIX + clientId; // 恢复次数计数器
 | 
					            String recoveryCountKey = HEARTBEAT_RECOVERY_COUNT_PREFIX + clientId;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            try {
 | 
					            try {
 | 
				
			||||||
                // 重置丢失计数为0,设置最后心跳时间
 | 
					                // 记录处理前状态(调试用)
 | 
				
			||||||
                redisTemplate.opsForValue().set(statusKey, "0");
 | 
					                String prevStatus = redisTemplate.opsForValue().get(statusKey);
 | 
				
			||||||
                redisTemplate.opsForValue().set(timeKey, String.valueOf(System.currentTimeMillis()));
 | 
					                String prevTime = redisTemplate.opsForValue().get(timeKey);
 | 
				
			||||||
 | 
					                log.debug("客户端ID: {} 处理前状态 - status: {}, time: {}", clientId, prevStatus, prevTime);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                // 使用事务确保原子性操作
 | 
				
			||||||
 | 
					                redisTemplate.execute(new SessionCallback<Object>() {
 | 
				
			||||||
 | 
					                    @Override
 | 
				
			||||||
 | 
					                    public Object execute(RedisOperations operations) throws DataAccessException {
 | 
				
			||||||
 | 
					                        operations.multi();
 | 
				
			||||||
 | 
					                        // 重置丢失计数为0,设置最后心跳时间
 | 
				
			||||||
 | 
					                        operations.opsForValue().set(statusKey, "0");
 | 
				
			||||||
 | 
					                        operations.opsForValue().set(timeKey, String.valueOf(System.currentTimeMillis()));
 | 
				
			||||||
 | 
					                        return operations.exec();
 | 
				
			||||||
 | 
					                    }
 | 
				
			||||||
 | 
					                });
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                log.debug("客户端ID: {} 心跳处理完成,重置状态和时间", clientId);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                // 检查是否之前有告警状态
 | 
					                // 检查是否之前有告警状态
 | 
				
			||||||
                if (Boolean.TRUE.equals(redisTemplate.hasKey(HEARTBEAT_ALERT_PREFIX + clientId))) {
 | 
					                if (Boolean.TRUE.equals(redisTemplate.hasKey(HEARTBEAT_ALERT_PREFIX + clientId))) {
 | 
				
			||||||
@@ -606,12 +626,19 @@ public class DeviceMessageHandler {
 | 
				
			|||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    // 添加一个定时任务方法,定期检查心跳状态
 | 
					    // 添加一个定时任务方法,定期检查心跳状态
 | 
				
			||||||
    @Scheduled(fixedRate = 30000) // 每30s检查一次
 | 
					    @Scheduled(fixedRate = 60000) // 每60s检查一次
 | 
				
			||||||
    public void checkHeartbeatStatus() {
 | 
					    public void checkHeartbeatStatus() {
 | 
				
			||||||
        long currentTime = System.currentTimeMillis();
 | 
					        long currentTime = System.currentTimeMillis();
 | 
				
			||||||
 | 
					        log.debug("开始心跳状态检查,当前时间: {}", currentTime);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        // 获取所有客户端时间键
 | 
					        // 获取所有客户端时间键
 | 
				
			||||||
        Set<String> timeKeys = redisTemplate.keys(HEARTBEAT_TIME_PREFIX + "*");
 | 
					        Set<String> timeKeys = redisTemplate.keys(HEARTBEAT_TIME_PREFIX + "*");
 | 
				
			||||||
        if (timeKeys == null) return;
 | 
					        if (timeKeys == null) {
 | 
				
			||||||
 | 
					            log.debug("未找到任何心跳时间键");
 | 
				
			||||||
 | 
					            return;
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        log.debug("找到 {} 个客户端需要检查", timeKeys.size());
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        for (String timeKey : timeKeys) {
 | 
					        for (String timeKey : timeKeys) {
 | 
				
			||||||
            String clientId = timeKey.substring(HEARTBEAT_TIME_PREFIX.length());
 | 
					            String clientId = timeKey.substring(HEARTBEAT_TIME_PREFIX.length());
 | 
				
			||||||
@@ -622,22 +649,36 @@ public class DeviceMessageHandler {
 | 
				
			|||||||
                // 检查是否已经存在告警
 | 
					                // 检查是否已经存在告警
 | 
				
			||||||
                String existingAlert = redisTemplate.opsForValue().get(alertKey);
 | 
					                String existingAlert = redisTemplate.opsForValue().get(alertKey);
 | 
				
			||||||
                if ("1".equals(existingAlert)) {
 | 
					                if ("1".equals(existingAlert)) {
 | 
				
			||||||
 | 
					                    log.debug("客户端ID: {} 已有告警,跳过检查", clientId);
 | 
				
			||||||
                    continue; // 如果已有告警,跳过处理
 | 
					                    continue; // 如果已有告警,跳过处理
 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                String lastTimeStr = redisTemplate.opsForValue().get(timeKey);
 | 
					                String lastTimeStr = redisTemplate.opsForValue().get(timeKey);
 | 
				
			||||||
                if (lastTimeStr == null) continue;
 | 
					                if (lastTimeStr == null) {
 | 
				
			||||||
 | 
					                    log.debug("客户端ID: {} 时间键为空,跳过", clientId);
 | 
				
			||||||
 | 
					                    continue;
 | 
				
			||||||
 | 
					                }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                long lastHeartbeatTime = Long.parseLong(lastTimeStr);
 | 
					                long lastHeartbeatTime = Long.parseLong(lastTimeStr);
 | 
				
			||||||
 | 
					                long timeDiff = currentTime - lastHeartbeatTime;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                if (currentTime - lastHeartbeatTime > HEARTBEAT_TIMEOUT) {
 | 
					                log.debug("客户端ID: {} 最后心跳: {}, 时间差: {}ms, 超时阈值: {}ms",
 | 
				
			||||||
                    // 心跳超时处理
 | 
					                        clientId, lastHeartbeatTime, timeDiff, HEARTBEAT_TIMEOUT);
 | 
				
			||||||
                    String lostCountStr = redisTemplate.opsForValue().get(statusKey);
 | 
					 | 
				
			||||||
                    int lostCount = (lostCountStr == null ? 0 : Integer.parseInt(lostCountStr)) + 1;
 | 
					 | 
				
			||||||
                    redisTemplate.opsForValue().set(statusKey, String.valueOf(lostCount));
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    log.warn("客户端ID: {} 心跳丢失,连续次数: {}", clientId, lostCount);
 | 
					                if (timeDiff > HEARTBEAT_TIMEOUT) {
 | 
				
			||||||
 | 
					                    // 心跳超时处理 - 使用原子操作增加计数
 | 
				
			||||||
 | 
					                    Long lostCount = redisTemplate.opsForValue().increment(statusKey);
 | 
				
			||||||
 | 
					                    if (lostCount == 1) {
 | 
				
			||||||
 | 
					                        // 确保第一次增加时值为1
 | 
				
			||||||
 | 
					                        redisTemplate.opsForValue().set(statusKey, "1");
 | 
				
			||||||
 | 
					                        lostCount = 1L;
 | 
				
			||||||
 | 
					                    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                    log.warn("客户端ID: {} 心跳超时,连续次数: {}, 时间差: {}ms",
 | 
				
			||||||
 | 
					                            clientId, lostCount, timeDiff);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    if (lostCount >= 3) {
 | 
					                    if (lostCount >= 3) {
 | 
				
			||||||
 | 
					                        log.warn("客户端ID: {} 连续三次心跳丢失,触发告警", clientId);
 | 
				
			||||||
                        insertHeartbeatLog(clientId, "3", "连续三次心跳丢失");
 | 
					                        insertHeartbeatLog(clientId, "3", "连续三次心跳丢失");
 | 
				
			||||||
                        redisTemplate.opsForValue().set(HEARTBEAT_ALERT_PREFIX + clientId, "1");
 | 
					                        redisTemplate.opsForValue().set(HEARTBEAT_ALERT_PREFIX + clientId, "1");
 | 
				
			||||||
                        // 设置告警后删除timeKey和statusKey
 | 
					                        // 设置告警后删除timeKey和statusKey
 | 
				
			||||||
@@ -648,15 +689,22 @@ public class DeviceMessageHandler {
 | 
				
			|||||||
                        // 修改资源状态
 | 
					                        // 修改资源状态
 | 
				
			||||||
                        updateResourceStatus(clientId, "0");
 | 
					                        updateResourceStatus(clientId, "0");
 | 
				
			||||||
                    }
 | 
					                    }
 | 
				
			||||||
                }else {
 | 
					                } else {
 | 
				
			||||||
                    // 如果心跳正常,重置丢失次数
 | 
					                    // 如果心跳正常,重置丢失次数
 | 
				
			||||||
                    redisTemplate.opsForValue().set(statusKey, "0");
 | 
					                    String currentStatus = redisTemplate.opsForValue().get(statusKey);
 | 
				
			||||||
                    log.debug("客户端ID: {} 心跳正常,重置丢失次数", clientId);
 | 
					                    if (!"0".equals(currentStatus)) {
 | 
				
			||||||
 | 
					                        redisTemplate.opsForValue().set(statusKey, "0");
 | 
				
			||||||
 | 
					                        log.debug("客户端ID: {} 心跳正常,重置丢失次数从 {} 到 0", clientId, currentStatus);
 | 
				
			||||||
 | 
					                    } else {
 | 
				
			||||||
 | 
					                        log.debug("客户端ID: {} 心跳正常,状态已是0", clientId);
 | 
				
			||||||
 | 
					                    }
 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
            } catch (Exception e) {
 | 
					            } catch (Exception e) {
 | 
				
			||||||
                log.error("检查心跳状态异常, clientId: {}", clientId, e);
 | 
					                log.error("检查心跳状态异常, clientId: {}", clientId, e);
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        log.debug("心跳状态检查完成");
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    // 更新资源状态的公共方法
 | 
					    // 更新资源状态的公共方法
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -300,10 +300,10 @@ public class RmMonitorPolicyServiceImpl implements IRmMonitorPolicyService
 | 
				
			|||||||
                sendConfigurationsToDevices(devices, uniqueList);
 | 
					                sendConfigurationsToDevices(devices, uniqueList);
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
            // 更新策略状态为已下发
 | 
					            // 更新策略状态为已下发
 | 
				
			||||||
//            RmMonitorPolicy policyUpdate = new RmMonitorPolicy();
 | 
					            RmMonitorPolicy policyUpdate = new RmMonitorPolicy();
 | 
				
			||||||
//            policyUpdate.setId(id);
 | 
					            policyUpdate.setId(id);
 | 
				
			||||||
//            policyUpdate.setStatus("1");
 | 
					            policyUpdate.setStatus("1");
 | 
				
			||||||
//            rmMonitorPolicyMapper.updateRmMonitorPolicy(policyUpdate);
 | 
					            rmMonitorPolicyMapper.updateRmMonitorPolicy(policyUpdate);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            return 1;
 | 
					            return 1;
 | 
				
			||||||
        } catch (Exception e) {
 | 
					        } catch (Exception e) {
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user