diff --git a/ruoyi-modules/ruoyi-system/src/main/java/com/ruoyi/system/mapper/EpsServerRevenueConfigMapper.java b/ruoyi-modules/ruoyi-system/src/main/java/com/ruoyi/system/mapper/EpsServerRevenueConfigMapper.java index 56a8d23..e69306d 100644 --- a/ruoyi-modules/ruoyi-system/src/main/java/com/ruoyi/system/mapper/EpsServerRevenueConfigMapper.java +++ b/ruoyi-modules/ruoyi-system/src/main/java/com/ruoyi/system/mapper/EpsServerRevenueConfigMapper.java @@ -77,6 +77,13 @@ public interface EpsServerRevenueConfigMapper * @return 服务器收益方式配置 */ public Map getNodeMsgByIp(@Param("ipAddress") String ipAddress); + /** + * 根据sn查询服务器信息 + * + * @param hardwareSn 硬件SN + * @return 服务器收益方式配置 + */ + public Map getNodeMsgBySn(@Param("hardwareSn") String hardwareSn); int updateEpsServerRevenueConfigByServerSn(EpsServerRevenueConfig epsServerRevenueConfig); diff --git a/ruoyi-modules/ruoyi-system/src/main/java/com/ruoyi/system/service/impl/EpsServerRevenueConfigServiceImpl.java b/ruoyi-modules/ruoyi-system/src/main/java/com/ruoyi/system/service/impl/EpsServerRevenueConfigServiceImpl.java index bea852c..88c0ca6 100644 --- a/ruoyi-modules/ruoyi-system/src/main/java/com/ruoyi/system/service/impl/EpsServerRevenueConfigServiceImpl.java +++ b/ruoyi-modules/ruoyi-system/src/main/java/com/ruoyi/system/service/impl/EpsServerRevenueConfigServiceImpl.java @@ -173,13 +173,8 @@ public class EpsServerRevenueConfigServiceImpl implements IEpsServerRevenueConfi List dataList = epsInitialTrafficDataService.getAllTraficMsg(epsInitialTrafficData); List batchList = new ArrayList<>(); for (EpsInitialTrafficData initialTrafficData : dataList) { - // 根据ip查询节点名称 - String ip = initialTrafficData.getIpV4() == null ? "" : initialTrafficData.getIpV4(); - String mac = initialTrafficData.getMac() == null ? "" : initialTrafficData.getMac(); - Map nodeMsg = epsServerRevenueConfigMapper.getNodeMsgByIp(ip); - if(nodeMsg == null){ - nodeMsg = epsServerRevenueConfigMapper.getNodeMsgByIp(mac); - } + // 根据clientId查询节点名称 + Map nodeMsg = epsServerRevenueConfigMapper.getNodeMsgBySn(initialTrafficData.getClientId()); // 赋值 if(nodeMsg != null){ initialTrafficData.setServiceSn(getNullableString(nodeMsg, "hardwareSn")); diff --git a/ruoyi-modules/ruoyi-system/src/main/java/com/ruoyi/system/service/impl/InitialSwitchInfoDetailsServiceImpl.java b/ruoyi-modules/ruoyi-system/src/main/java/com/ruoyi/system/service/impl/InitialSwitchInfoDetailsServiceImpl.java index 4bf000c..743ab35 100644 --- a/ruoyi-modules/ruoyi-system/src/main/java/com/ruoyi/system/service/impl/InitialSwitchInfoDetailsServiceImpl.java +++ b/ruoyi-modules/ruoyi-system/src/main/java/com/ruoyi/system/service/impl/InitialSwitchInfoDetailsServiceImpl.java @@ -134,10 +134,10 @@ public class InitialSwitchInfoDetailsServiceImpl implements IInitialSwitchInfoDe details.setId(null); // 根据接口名称查询交换机信息 String interfaceName = details.getName(); - String switchIp = details.getSwitchIp(); + String switchSn = details.getSwitchSn(); RmEpsTopologyManagement rmEpsTopologyManagement = new RmEpsTopologyManagement(); rmEpsTopologyManagement.setInterfaceName(interfaceName); - rmEpsTopologyManagement.setSwitchIpAddress(switchIp); + rmEpsTopologyManagement.setSwitchSn(switchSn); List managements = rmEpsTopologyManagementMapper.selectRmEpsTopologyManagementList(rmEpsTopologyManagement); // 赋值 if(!managements.isEmpty()){ diff --git a/ruoyi-modules/ruoyi-system/src/main/resources/mapper/system/AllInterfaceNameMapper.xml b/ruoyi-modules/ruoyi-system/src/main/resources/mapper/system/AllInterfaceNameMapper.xml index 5ef0e0f..f5ecc61 100644 --- a/ruoyi-modules/ruoyi-system/src/main/resources/mapper/system/AllInterfaceNameMapper.xml +++ b/ruoyi-modules/ruoyi-system/src/main/resources/mapper/system/AllInterfaceNameMapper.xml @@ -54,7 +54,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" - insert into all_interface_name + insert IGNORE into all_interface_name client_id, interface_name, @@ -166,7 +166,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" - INSERT INTO all_interface_name + INSERT IGNORE INTO all_interface_name ( interface_name, client_id, diff --git a/ruoyi-modules/ruoyi-system/src/main/resources/mapper/system/EpsServerRevenueConfigMapper.xml b/ruoyi-modules/ruoyi-system/src/main/resources/mapper/system/EpsServerRevenueConfigMapper.xml index a639d60..8e01076 100644 --- a/ruoyi-modules/ruoyi-system/src/main/resources/mapper/system/EpsServerRevenueConfigMapper.xml +++ b/ruoyi-modules/ruoyi-system/src/main/resources/mapper/system/EpsServerRevenueConfigMapper.xml @@ -148,6 +148,27 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" and rrr.ip_address = #{ipAddress} + + + diff --git a/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/consumer/RocketMsgListener.java b/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/consumer/RocketMsgListener.java index 8d1247e..bada857 100644 --- a/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/consumer/RocketMsgListener.java +++ b/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/consumer/RocketMsgListener.java @@ -1,42 +1,21 @@ package com.ruoyi.rocketmq.consumer; import com.alibaba.fastjson.JSON; -import com.ruoyi.common.core.constant.SecurityConstants; -import com.ruoyi.common.core.domain.R; -import com.ruoyi.common.core.utils.DateUtils; -import com.ruoyi.common.core.utils.StringUtils; -import com.ruoyi.rocketmq.domain.*; -import com.ruoyi.rocketmq.domain.vo.RspVo; +import com.ruoyi.rocketmq.domain.DeviceMessage; import com.ruoyi.rocketmq.enums.MessageCodeEnum; import com.ruoyi.rocketmq.handler.DeviceMessageHandler; import com.ruoyi.rocketmq.producer.ConsumeException; -import com.ruoyi.rocketmq.service.*; -import com.ruoyi.rocketmq.utils.JsonDataParser; -import com.ruoyi.system.api.RemoteRevenueConfigService; -import com.ruoyi.system.api.domain.AllInterfaceNameRemote; -import com.ruoyi.system.api.domain.EpsInitialTrafficDataRemote; -import com.ruoyi.system.api.domain.InitialSwitchInfoDetailsRemote; -import com.ruoyi.system.api.domain.RmResourceRegistrationRemote; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import java.io.UnsupportedEncodingException; -import java.math.BigDecimal; -import java.math.RoundingMode; -import java.util.Date; import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.function.Function; -import java.util.stream.Collectors; /** * 消息监听 @@ -44,44 +23,6 @@ import java.util.stream.Collectors; @Slf4j @Component public class RocketMsgListener implements MessageListenerConcurrently { - // 心跳状态 - private static final String HEARTBEAT_STATUS_PREFIX = "heartbeat:status:"; - // 心跳时间 - private static final String HEARTBEAT_TIME_PREFIX = "heartbeat:time:"; - // 心跳告警 - private static final String HEARTBEAT_ALERT_PREFIX = "heartbeat:alert:"; - String HEARTBEAT_RECOVERY_COUNT_PREFIX = "heartbeat:recovery:count:"; - private static final long HEARTBEAT_TIMEOUT = 180000; // 3分钟超时 - - - @Autowired - private RedisTemplate redisTemplate; - private final IInitialBandwidthTrafficService initialBandwidthTrafficService; - private final RemoteRevenueConfigService remoteRevenueConfigService; - @Autowired - private IInitialDockerInfoService initialDockerInfoService; - @Autowired - private IInitialCpuInfoService initialCpuInfoService; - @Autowired - private IInitialDiskInfoService initialDiskInfoService; - @Autowired - private IInitialMemoryInfoService initialMemoryInfoService; - @Autowired - private IInitialMountPointInfoService initialMountPointInfoService; - @Autowired - private IInitialSwitchInfoService initialSwitchInfoService; - @Autowired - private IInitialSystemInfoService initialSystemInfoService; - @Autowired - private IInitialSwitchInfoTempService initialSwitchInfoTempService; - @Autowired - private IInitialHeartbeatListenLogService initialHeartbeatListenLog; - @Autowired - public RocketMsgListener(IInitialBandwidthTrafficService initialBandwidthTrafficService, - RemoteRevenueConfigService remoteRevenueConfigService) { - this.initialBandwidthTrafficService = initialBandwidthTrafficService; - this.remoteRevenueConfigService = remoteRevenueConfigService; - } @Autowired private DeviceMessageHandler deviceMessageHandler; @@ -122,42 +63,6 @@ public class RocketMsgListener implements MessageListenerConcurrently { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//业务处理成功 } // 根据不同的topic处理不同的业务 这里以订单消息为例子 - if (MessageCodeEnum.AGENT_MESSAGE_TOPIC.getCode().equals(topic)) { - // 拿到信息 - DeviceMessage message = JSON.parseObject(body, DeviceMessage.class); - switch (message.getDataType()){ - case "NET": - handleNetMessage(message); - break; - case "CPU": - handleCpuMessage(message); - break; - case "SYSTEM": - handleSystemMessage(message); - break; - case "DISK": - handleDiskMessage(message); - break; - case "POINT": - handleMountPointMessage(message); - break; - case "MEMORY": - handleMemoryMessage(message); - break; - case "DOCKER": - handleDockerMessage(message); - break; - case "SWITCHBOARD": - handleSwitchMessage(message); - break; - case "HEARTBEAT": - handleHeartbeatMessage(message); - break; - default: - log.warn("未知数据类型:{}",message.getDataType()); - } - return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//业务处理成功 - } } } // 消息消费失败 @@ -186,417 +91,4 @@ public class RocketMsgListener implements MessageListenerConcurrently { } return ConsumeConcurrentlyStatus.RECONSUME_LATER; } - - /** - * 网络流量数据入库 - * @param message - */ - private void handleNetMessage(DeviceMessage message) { - List interfaces = JsonDataParser.parseJsonData(message.getData(), InitialBandwidthTraffic.class); - if(!interfaces.isEmpty()){ - // 时间戳转换 - long timestamp = interfaces.get(0).getTimestamp(); - long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp; - Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 - String timeStr = DateUtils.parseDateToStr("yyyy-MM-dd HH:mm:ss",createTime); - InitialBandwidthTraffic data = new InitialBandwidthTraffic(); - interfaces.forEach(iface -> { - iface.setClientId(message.getClientId()); - iface.setCreateTime(createTime); - }); - // 批量入库集合 - data.setList(interfaces); - // 初始流量数据入库 - initialBandwidthTrafficService.batchInsert(data); - EpsInitialTrafficDataRemote epsInitialTrafficDataRemote = new EpsInitialTrafficDataRemote(); - epsInitialTrafficDataRemote.setStartTime(timeStr); - epsInitialTrafficDataRemote.setEndTime(timeStr); - // 复制到业务初始库 - remoteRevenueConfigService.autoSaveServiceTrafficData(epsInitialTrafficDataRemote, SecurityConstants.INNER); - }else{ - throw new RuntimeException("NET流量data数据为空"); - } - } - /** - * docker数据入库 - * @param message - */ - private void handleDockerMessage(DeviceMessage message) { - List dockers = JsonDataParser.parseJsonData(message.getData(), InitialDockerInfo.class); - if(!dockers.isEmpty()){ - // 时间戳转换 - long timestamp = dockers.get(0).getTimestamp(); - long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp; - Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 - dockers.forEach(iface -> { - iface.setClientId(message.getClientId()); - iface.setCreateTime(createTime); - }); - // 初始容器数据入库 - initialDockerInfoService.batchInsertInitialDockerInfo(dockers); - }else{ - throw new RuntimeException("DOCKER容器data数据为空"); - } - } - /** - * cpu数据入库 - * @param message - */ - private void handleCpuMessage(DeviceMessage message) { - List cpus = JsonDataParser.parseJsonData(message.getData(),InitialCpuInfo.class); - // 时间戳转换 - long timestamp = cpus.get(0).getTimestamp(); - long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp; - Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 - if(!cpus.isEmpty()){ - cpus.forEach(iface -> { - iface.setClientId(message.getClientId()); - iface.setCreateTime(createTime); - }); - // 初始CPU数据入库 - initialCpuInfoService.batchInsertInitialCpuInfo(cpus); - }else{ - throw new RuntimeException("CPUdata数据为空"); - } - } - /** - * 磁盘数据入库 - * @param message - */ - private void handleDiskMessage(DeviceMessage message) { - List disks = JsonDataParser.parseJsonData(message.getData(), InitialDiskInfo.class); - // 时间戳转换 - long timestamp = disks.get(0).getTimestamp(); - long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp; - Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 - if(!disks.isEmpty()){ - disks.forEach(iface -> { - iface.setClientId(message.getClientId()); - iface.setCreateTime(createTime); - }); - // 初始磁盘数据入库 - initialDiskInfoService.batchInsertInitialDiskInfo(disks); - }else{ - throw new RuntimeException("磁盘data数据为空"); - } - } - /** - * 内存数据入库 - * @param message - */ - private void handleMemoryMessage(DeviceMessage message) { - List memorys = JsonDataParser.parseJsonData(message.getData(), InitialMemoryInfo.class); - if(!memorys.isEmpty()){ - // 时间戳转换 - long timestamp = memorys.get(0).getTimestamp(); - long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp; - Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 - memorys.forEach(iface -> { - iface.setClientId(message.getClientId()); - iface.setCreateTime(createTime); - }); - // 初始内存数据入库 - initialMemoryInfoService.batchInsertInitialMemoryInfo(memorys); - }else{ - throw new RuntimeException("内存data数据为空"); - } - } - /** - * 挂载点数据入库 - * @param message - */ - private void handleMountPointMessage(DeviceMessage message) { - List mountPointInfos = JsonDataParser.parseJsonData(message.getData(), InitialMountPointInfo.class); - if(!mountPointInfos.isEmpty()){ - // 时间戳转换 - long timestamp = mountPointInfos.get(0).getTimestamp(); - long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp; - Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 - mountPointInfos.forEach(iface -> { - iface.setClientId(message.getClientId()); - iface.setCreateTime(createTime); - }); - // 初始挂载点数据入库 - initialMountPointInfoService.batchInsertInitialMountPointInfo(mountPointInfos); - }else{ - throw new RuntimeException("挂载点data数据为空"); - } - } - /** - * 交换机数据入库 - * @param message - */ - private void handleSwitchMessage(DeviceMessage message) { - List switchInfos = JsonDataParser.parseJsonData(message.getData(), InitialSwitchInfo.class); - if(!switchInfos.isEmpty()){ - // 时间戳转换 - long timestamp = switchInfos.get(0).getTimestamp(); - long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp; - Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 - String timeStr = DateUtils.parseDateToStr("yyyy-MM-dd HH:mm:ss",createTime); - // 查询临时表信息,计算实际流量值 - InitialSwitchInfoTemp temp = new InitialSwitchInfoTemp(); - temp.setClientId(message.getClientId()); - List tempList = initialSwitchInfoTempService.selectInitialSwitchInfoTempList(temp); - if(!tempList.isEmpty()){ - // 1. 构建快速查找的Map - Map tempMap = tempList.stream() - .collect(Collectors.toMap( - InitialSwitchInfoTemp::getName, - Function.identity(), - (existing, replacement) -> existing - )); - - // 2. 预计算除数(避免重复创建对象) - BigDecimal divisor = new BigDecimal(300); - - // 3. 计算速度 - switchInfos.forEach(switchInfo -> { - switchInfo.setClientId(message.getClientId()); - switchInfo.setCreateTime(createTime); - InitialSwitchInfoTemp tempInfo = tempMap.get(switchInfo.getName()); - if (tempInfo != null) { - // 计算inSpeed - if (switchInfo.getInBytes() != null && tempInfo.getInBytes() != null) { - BigDecimal inDiff = switchInfo.getInBytes().subtract(tempInfo.getInBytes()); - switchInfo.setInSpeed(inDiff.divide(divisor, 2, RoundingMode.HALF_UP)); - } - - // 计算outSpeed - if (switchInfo.getOutBytes() != null && tempInfo.getOutBytes() != null) { - BigDecimal outDiff = switchInfo.getOutBytes().subtract(tempInfo.getOutBytes()); - switchInfo.setOutSpeed(outDiff.divide(divisor, 2, RoundingMode.HALF_UP)); - } - } - }); - }else{ - switchInfos.forEach(switchInfo -> { - switchInfo.setClientId(message.getClientId()); - switchInfo.setCreateTime(createTime); - }); - } - // 清空临时表对应switch信息 - initialSwitchInfoTempService.truncateSwitchInfoTemp(message.getClientId()); - // 临时表 用来计算inSpeed outSeppd - initialSwitchInfoTempService.batchInsertInitialSwitchInfoTemp(switchInfos); - // 初始交换机数据入库 - initialSwitchInfoService.batchInsertInitialSwitchInfo(switchInfos); - // 业务表入库 - InitialSwitchInfoDetailsRemote detailsRemote = new InitialSwitchInfoDetailsRemote(); - detailsRemote.setClientId(message.getClientId()); - detailsRemote.setStartTime(timeStr); - detailsRemote.setEndTime(timeStr); - remoteRevenueConfigService.autoSaveSwitchTraffic(detailsRemote, SecurityConstants.INNER); - }else{ - throw new RuntimeException("交换机data数据为空"); - } - } - /** - * 系统数据入库 - * @param message - */ - private void handleSystemMessage(DeviceMessage message) { - List systemInfos = JsonDataParser.parseJsonData(message.getData(), InitialSystemInfo.class); - if(!systemInfos.isEmpty()){ - // 时间戳转换 - long timestamp = systemInfos.get(0).getTimestamp(); - long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp; - Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 - systemInfos.forEach(iface -> { - iface.setClientId(message.getClientId()); - iface.setCreateTime(createTime); - }); - // 初始系统数据入库 - initialSystemInfoService.batchInsertInitialSystemInfo(systemInfos); - }else{ - throw new RuntimeException("系统data数据为空"); - } - } - /** - * 监听心跳 - * @param message - */ - private void handleHeartbeatMessage(DeviceMessage message) { - List heartbeats = JsonDataParser.parseJsonData(message.getData(), InitialHeartbeatListen.class); - if(!heartbeats.isEmpty()){ - InitialHeartbeatListen heartbeat = heartbeats.get(0); - String clientId = message.getClientId(); - log.info("处理心跳消息,客户端ID: {}, 时间: {}", clientId, heartbeat.getTimestamp()); - // 使用Redis存储状态 - String statusKey = HEARTBEAT_STATUS_PREFIX + clientId; - String timeKey = HEARTBEAT_TIME_PREFIX + clientId; - String recoveryCountKey = HEARTBEAT_RECOVERY_COUNT_PREFIX + clientId; // 恢复次数计数器 - try { - // 重置丢失计数为0,设置最后心跳时间 - redisTemplate.opsForValue().set(statusKey, "0"); - redisTemplate.opsForValue().set(timeKey, String.valueOf(System.currentTimeMillis())); - - // 检查是否之前有告警状态 - if (Boolean.TRUE.equals(redisTemplate.hasKey(HEARTBEAT_ALERT_PREFIX + clientId))) { - // 获取当前恢复次数 - String recoveryCountStr = redisTemplate.opsForValue().get(recoveryCountKey); - int recoveryCount = (recoveryCountStr == null) ? 1 : Integer.parseInt(recoveryCountStr) + 1; - - if (recoveryCount == 2) { - // 达到2次恢复,执行状态修改 - log.warn("客户端ID: {} 心跳恢复达到2次,修改设备状态为在线", clientId); - insertHeartbeatLog(clientId, "2", "心跳恢复,设备在线状态改为在线"); - redisTemplate.delete(HEARTBEAT_ALERT_PREFIX + clientId); - redisTemplate.delete(recoveryCountKey); // 清除恢复计数器 - // 修改资源状态 - getResourceMsg(clientId, "1"); - } else { - // 未达到2次,只记录恢复次数 - log.info("客户端ID: {} 心跳恢复第{}次", clientId, recoveryCount); - redisTemplate.opsForValue().set(recoveryCountKey, String.valueOf(recoveryCount)); - } - } - } catch (Exception e) { - log.error("处理心跳消息异常, clientId: {}", clientId, e); - } - } - } - - // 添加一个定时任务方法,定期检查心跳状态 - @Scheduled(fixedRate = 60000) // 每分钟检查一次 - public void checkHeartbeatStatus() { - long currentTime = System.currentTimeMillis(); - // 获取所有客户端时间键 - Set timeKeys = redisTemplate.keys(HEARTBEAT_TIME_PREFIX + "*"); - if (timeKeys == null) return; - - for (String timeKey : timeKeys) { - String clientId = timeKey.substring(HEARTBEAT_TIME_PREFIX.length()); - String statusKey = HEARTBEAT_STATUS_PREFIX + clientId; - - try { - String lastTimeStr = redisTemplate.opsForValue().get(timeKey); - if (lastTimeStr == null) continue; - - long lastHeartbeatTime = Long.parseLong(lastTimeStr); - - if (currentTime - lastHeartbeatTime > HEARTBEAT_TIMEOUT) { - // 心跳超时处理 - String lostCountStr = redisTemplate.opsForValue().get(statusKey); - int lostCount = (lostCountStr == null ? 0 : Integer.parseInt(lostCountStr)) + 1; - redisTemplate.opsForValue().set(statusKey, String.valueOf(lostCount)); - - log.warn("客户端ID: {} 心跳丢失,连续次数: {}", clientId, lostCount); - - if (lostCount == 3) { - insertHeartbeatLog(clientId, "3", "连续三次心跳丢失"); - redisTemplate.opsForValue().set(HEARTBEAT_ALERT_PREFIX + clientId, "1"); - // 修改资源状态 - getResourceMsg(clientId, "0"); - } - } - } catch (Exception e) { - log.error("检查心跳状态异常, clientId: {}", clientId, e); - } - } - } - - /** - * 修改资源在线状态 - * @param clientId - * @param status - */ - private void getResourceMsg(String clientId, String status){ - String ipAddress = null; - AllInterfaceNameRemote interfaceNameRemote = new AllInterfaceNameRemote(); - interfaceNameRemote.setClientId(clientId); - - // 1. 先获取交换机IP - interfaceNameRemote.setResourceType("2"); - R switchResult = remoteRevenueConfigService.getMsgByClientId( - interfaceNameRemote, SecurityConstants.INNER); - - if (switchResult != null && switchResult.getData() != null && - StringUtils.isNotEmpty(switchResult.getData().getSwitchIp())) { - // 更新交换机状态 - ipAddress = switchResult.getData().getSwitchIp(); - updateResourceStatus(ipAddress, status); - - // 2. 再获取服务器IP - interfaceNameRemote.setResourceType("1"); - R serverResult = remoteRevenueConfigService.getMsgByClientId( - interfaceNameRemote, SecurityConstants.INNER); - - if (serverResult != null && serverResult.getData() != null && - StringUtils.isNotEmpty(serverResult.getData().getServerIp())) { - // 更新服务器状态 - updateResourceStatus(serverResult.getData().getServerIp(), status); - } - } else { - // 3. 如果没有交换机IP,只获取服务器IP - interfaceNameRemote.setResourceType("1"); - R serverResult = remoteRevenueConfigService.getMsgByClientId( - interfaceNameRemote, SecurityConstants.INNER); - - if (serverResult != null && serverResult.getData() != null && - StringUtils.isNotEmpty(serverResult.getData().getServerIp())) { - // 更新服务器状态 - updateResourceStatus(serverResult.getData().getServerIp(), status); - } else { - log.warn("未找到客户端ID: {} 对应的IP地址", clientId); - } - } - } - // 更新资源状态的公共方法 - private void updateResourceStatus(String ipAddress, String status) { - RmResourceRegistrationRemote rmResourceRegistrationRemote = new RmResourceRegistrationRemote(); - rmResourceRegistrationRemote.setOnlineStatus(status); - rmResourceRegistrationRemote.setIpAddress(ipAddress); - remoteRevenueConfigService.updateStatusByResource(rmResourceRegistrationRemote, SecurityConstants.INNER); - } - // 插入心跳日志到数据库 - private void insertHeartbeatLog(String machineId, String status, String remark) { - try { - InitialHeartbeatListenLog listenLog = new InitialHeartbeatListenLog(); - listenLog.setClientId(machineId); - listenLog.setStatus(status); // 0-离线 1-在线 2-恢复 3-三次丢失 - listenLog.setRemark(remark); - listenLog.setCreateTime(new Date()); - - // 调用DAO或Service插入日志 - initialHeartbeatListenLog.insertInitialHeartbeatListenLog(listenLog); - log.info("已记录心跳日志,客户端ID: {}, 状态: {}", machineId, status); - } catch (Exception e) { - log.error("插入心跳日志失败", e); - } - } - - /** - * 应答信息 - * @param message - */ - private RspVo handleResponseMessage(DeviceMessage message) { - List rspVoList = JsonDataParser.parseJsonData(message.getData(), RspVo.class); - if (!rspVoList.isEmpty()) { - RspVo rsp = rspVoList.get(0); - log.info("应答信息:{}",rsp); - return rsp; - } - return null; - } - /** - * 注册应答处理 - * @param message - */ -// private void handleRegisterMessage(DeviceMessage message) { -// RspVo rspVo = handleResponseMessage(message); -// String clientId = message.getClientId(); -// if (rspVo != null && rspVo.getResCode() == 1) { -// RmResourceRegistrationRemote rmResourceRegistrationRemote = new RmResourceRegistrationRemote(); -// rmResourceRegistrationRemote.setRegistrationStatus("1"); -// rmResourceRegistrationRemote.setHardwareSn(clientId); -// remoteRevenueConfigService.updateStatusByResource(rmResourceRegistrationRemote, SecurityConstants.INNER); -// }else{ -// if(rspVo == null){ -// log.error("注册失败:应答信息为null"); -// }else{ -// log.error("注册失败:{}",rspVo.getResMsg()); -// } -// } -// } } diff --git a/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/consumer/RocketMsgListenerHistory.java b/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/consumer/RocketMsgListenerHistory.java new file mode 100644 index 0000000..23a6e3e --- /dev/null +++ b/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/consumer/RocketMsgListenerHistory.java @@ -0,0 +1,616 @@ +package com.ruoyi.rocketmq.consumer; + +import com.alibaba.fastjson.JSON; +import com.ruoyi.common.core.constant.SecurityConstants; +import com.ruoyi.common.core.domain.R; +import com.ruoyi.common.core.utils.DateUtils; +import com.ruoyi.common.core.utils.StringUtils; +import com.ruoyi.rocketmq.domain.*; +import com.ruoyi.rocketmq.domain.vo.RspVo; +import com.ruoyi.rocketmq.enums.MessageCodeEnum; +import com.ruoyi.rocketmq.handler.DeviceMessageHandler; +import com.ruoyi.rocketmq.producer.ConsumeException; +import com.ruoyi.rocketmq.service.*; +import com.ruoyi.rocketmq.utils.JsonDataParser; +import com.ruoyi.system.api.RemoteRevenueConfigService; +import com.ruoyi.system.api.domain.AllInterfaceNameRemote; +import com.ruoyi.system.api.domain.EpsInitialTrafficDataRemote; +import com.ruoyi.system.api.domain.InitialSwitchInfoDetailsRemote; +import com.ruoyi.system.api.domain.RmResourceRegistrationRemote; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.common.message.MessageExt; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.util.CollectionUtils; + +import java.io.UnsupportedEncodingException; +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * 消息监听 v1.0 + */ +@Slf4j +public class RocketMsgListenerHistory implements MessageListenerConcurrently { + // 心跳状态 + private static final String HEARTBEAT_STATUS_PREFIX = "heartbeat:status:"; + // 心跳时间 + private static final String HEARTBEAT_TIME_PREFIX = "heartbeat:time:"; + // 心跳告警 + private static final String HEARTBEAT_ALERT_PREFIX = "heartbeat:alert:"; + String HEARTBEAT_RECOVERY_COUNT_PREFIX = "heartbeat:recovery:count:"; + private static final long HEARTBEAT_TIMEOUT = 180000; // 3分钟超时 + + + @Autowired + private RedisTemplate redisTemplate; + private final IInitialBandwidthTrafficService initialBandwidthTrafficService; + private final RemoteRevenueConfigService remoteRevenueConfigService; + @Autowired + private IInitialDockerInfoService initialDockerInfoService; + @Autowired + private IInitialCpuInfoService initialCpuInfoService; + @Autowired + private IInitialDiskInfoService initialDiskInfoService; + @Autowired + private IInitialMemoryInfoService initialMemoryInfoService; + @Autowired + private IInitialMountPointInfoService initialMountPointInfoService; + @Autowired + private IInitialSwitchInfoService initialSwitchInfoService; + @Autowired + private IInitialSystemInfoService initialSystemInfoService; + @Autowired + private IInitialSwitchInfoTempService initialSwitchInfoTempService; + @Autowired + private IInitialHeartbeatListenLogService initialHeartbeatListenLog; + @Autowired + public RocketMsgListenerHistory(IInitialBandwidthTrafficService initialBandwidthTrafficService, + RemoteRevenueConfigService remoteRevenueConfigService) { + this.initialBandwidthTrafficService = initialBandwidthTrafficService; + this.remoteRevenueConfigService = remoteRevenueConfigService; + } + @Autowired + private DeviceMessageHandler deviceMessageHandler; + + /** + * 消费消息 + * @param list msgs.size() >= 1 + * DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here + * 这里只设置为1,当设置为多个时,list中只要有一条消息消费失败,就会整体重试 + * @param consumeConcurrentlyContext 上下文信息 + * @return 消费状态 成功(CONSUME_SUCCESS)或者 重试 (RECONSUME_LATER) + */ + @Override + public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { + try{ + //消息不等于空情况 + if (!CollectionUtils.isEmpty(list)) { + //获取topic + for (MessageExt messageExt : list) { + // 解析消息内容 + String body = new String(messageExt.getBody()); + log.info("接受到的消息为:{}", body); + String tags = messageExt.getTags(); + String topic = messageExt.getTopic(); + String msgId = messageExt.getMsgId(); + String keys = messageExt.getKeys(); + int reConsume = messageExt.getReconsumeTimes(); + // 消息已经重试了3次,如果不需要再次消费,则返回成功 + if (reConsume == 3) { + // TODO 补偿信息 + log.error("消息消费三次失败,消息内容:{}", body); + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//根据业务返回是否正常 + } + if(MessageCodeEnum.TONGRAN_AGENT_UP.getCode().equals(topic)){ + // 拿到信息 + DeviceMessage message = JSON.parseObject(body, DeviceMessage.class); + // 处理消息 + deviceMessageHandler.handleMessage(message); + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//业务处理成功 + } + // 根据不同的topic处理不同的业务 这里以订单消息为例子 + if (MessageCodeEnum.AGENT_MESSAGE_TOPIC.getCode().equals(topic)) { + // 拿到信息 + DeviceMessage message = JSON.parseObject(body, DeviceMessage.class); + switch (message.getDataType()){ + case "NET": + handleNetMessage(message); + break; + case "CPU": + handleCpuMessage(message); + break; + case "SYSTEM": + handleSystemMessage(message); + break; + case "DISK": + handleDiskMessage(message); + break; + case "POINT": + handleMountPointMessage(message); + break; + case "MEMORY": + handleMemoryMessage(message); + break; + case "DOCKER": + handleDockerMessage(message); + break; + case "SWITCHBOARD": + handleSwitchMessage(message); + break; + case "HEARTBEAT": + handleHeartbeatMessage(message); + break; + default: + log.warn("未知数据类型:{}",message.getDataType()); + } + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//业务处理成功 + } + } + } + // 消息消费失败 + //broker会根据设置的messageDelayLevel发起重试,默认16次 + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + } catch (Exception e) { + // 调用 handleException 方法处理异常并返回处理结果 + return handleException(e); + } + } + + /** + * 异常处理 + * + * @param e 捕获的异常 + * @return 消息消费结果 + */ + private static ConsumeConcurrentlyStatus handleException(final Exception e) { + Class exceptionClass = e.getClass(); + if (exceptionClass.equals(UnsupportedEncodingException.class)) { + log.error(e.getMessage()); + } else if (exceptionClass.equals(ConsumeException.class)) { + log.error(e.getMessage()); + } else{ + log.error(e.getMessage()); + } + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + } + + /** + * 网络流量数据入库 + * @param message + */ + private void handleNetMessage(DeviceMessage message) { + List interfaces = JsonDataParser.parseJsonData(message.getData(), InitialBandwidthTraffic.class); + if(!interfaces.isEmpty()){ + // 时间戳转换 + long timestamp = interfaces.get(0).getTimestamp(); + long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp; + Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 + String timeStr = DateUtils.parseDateToStr("yyyy-MM-dd HH:mm:ss",createTime); + InitialBandwidthTraffic data = new InitialBandwidthTraffic(); + interfaces.forEach(iface -> { + iface.setClientId(message.getClientId()); + iface.setCreateTime(createTime); + }); + // 批量入库集合 + data.setList(interfaces); + // 初始流量数据入库 + initialBandwidthTrafficService.batchInsert(data); + EpsInitialTrafficDataRemote epsInitialTrafficDataRemote = new EpsInitialTrafficDataRemote(); + epsInitialTrafficDataRemote.setStartTime(timeStr); + epsInitialTrafficDataRemote.setEndTime(timeStr); + // 复制到业务初始库 + remoteRevenueConfigService.autoSaveServiceTrafficData(epsInitialTrafficDataRemote, SecurityConstants.INNER); + }else{ + throw new RuntimeException("NET流量data数据为空"); + } + } + /** + * docker数据入库 + * @param message + */ + private void handleDockerMessage(DeviceMessage message) { + List dockers = JsonDataParser.parseJsonData(message.getData(), InitialDockerInfo.class); + if(!dockers.isEmpty()){ + // 时间戳转换 + long timestamp = dockers.get(0).getTimestamp(); + long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp; + Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 + dockers.forEach(iface -> { + iface.setClientId(message.getClientId()); + iface.setCreateTime(createTime); + }); + // 初始容器数据入库 + initialDockerInfoService.batchInsertInitialDockerInfo(dockers); + }else{ + throw new RuntimeException("DOCKER容器data数据为空"); + } + } + /** + * cpu数据入库 + * @param message + */ + private void handleCpuMessage(DeviceMessage message) { + List cpus = JsonDataParser.parseJsonData(message.getData(),InitialCpuInfo.class); + // 时间戳转换 + long timestamp = cpus.get(0).getTimestamp(); + long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp; + Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 + if(!cpus.isEmpty()){ + cpus.forEach(iface -> { + iface.setClientId(message.getClientId()); + iface.setCreateTime(createTime); + }); + // 初始CPU数据入库 + initialCpuInfoService.batchInsertInitialCpuInfo(cpus); + }else{ + throw new RuntimeException("CPUdata数据为空"); + } + } + /** + * 磁盘数据入库 + * @param message + */ + private void handleDiskMessage(DeviceMessage message) { + List disks = JsonDataParser.parseJsonData(message.getData(), InitialDiskInfo.class); + // 时间戳转换 + long timestamp = disks.get(0).getTimestamp(); + long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp; + Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 + if(!disks.isEmpty()){ + disks.forEach(iface -> { + iface.setClientId(message.getClientId()); + iface.setCreateTime(createTime); + }); + // 初始磁盘数据入库 + initialDiskInfoService.batchInsertInitialDiskInfo(disks); + }else{ + throw new RuntimeException("磁盘data数据为空"); + } + } + /** + * 内存数据入库 + * @param message + */ + private void handleMemoryMessage(DeviceMessage message) { + List memorys = JsonDataParser.parseJsonData(message.getData(), InitialMemoryInfo.class); + if(!memorys.isEmpty()){ + // 时间戳转换 + long timestamp = memorys.get(0).getTimestamp(); + long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp; + Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 + memorys.forEach(iface -> { + iface.setClientId(message.getClientId()); + iface.setCreateTime(createTime); + }); + // 初始内存数据入库 + initialMemoryInfoService.batchInsertInitialMemoryInfo(memorys); + }else{ + throw new RuntimeException("内存data数据为空"); + } + } + /** + * 挂载点数据入库 + * @param message + */ + private void handleMountPointMessage(DeviceMessage message) { + List mountPointInfos = JsonDataParser.parseJsonData(message.getData(), InitialMountPointInfo.class); + if(!mountPointInfos.isEmpty()){ + // 时间戳转换 + long timestamp = mountPointInfos.get(0).getTimestamp(); + long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp; + Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 + mountPointInfos.forEach(iface -> { + iface.setClientId(message.getClientId()); + iface.setCreateTime(createTime); + }); + // 初始挂载点数据入库 + initialMountPointInfoService.batchInsertInitialMountPointInfo(mountPointInfos); + }else{ + throw new RuntimeException("挂载点data数据为空"); + } + } + /** + * 交换机数据入库 + * @param message + */ + private void handleSwitchMessage(DeviceMessage message) { + List switchInfos = JsonDataParser.parseJsonData(message.getData(), InitialSwitchInfo.class); + if(!switchInfos.isEmpty()){ + // 时间戳转换 + long timestamp = switchInfos.get(0).getTimestamp(); + long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp; + Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 + String timeStr = DateUtils.parseDateToStr("yyyy-MM-dd HH:mm:ss",createTime); + // 查询临时表信息,计算实际流量值 + InitialSwitchInfoTemp temp = new InitialSwitchInfoTemp(); + temp.setClientId(message.getClientId()); + List tempList = initialSwitchInfoTempService.selectInitialSwitchInfoTempList(temp); + if(!tempList.isEmpty()){ + // 1. 构建快速查找的Map + Map tempMap = tempList.stream() + .collect(Collectors.toMap( + InitialSwitchInfoTemp::getName, + Function.identity(), + (existing, replacement) -> existing + )); + + // 2. 预计算除数(避免重复创建对象) + BigDecimal divisor = new BigDecimal(300); + + // 3. 计算速度 + switchInfos.forEach(switchInfo -> { + switchInfo.setClientId(message.getClientId()); + switchInfo.setCreateTime(createTime); + InitialSwitchInfoTemp tempInfo = tempMap.get(switchInfo.getName()); + if (tempInfo != null) { + // 计算inSpeed + if (switchInfo.getInBytes() != null && tempInfo.getInBytes() != null) { + BigDecimal inDiff = switchInfo.getInBytes().subtract(tempInfo.getInBytes()); + switchInfo.setInSpeed(inDiff.divide(divisor, 2, RoundingMode.HALF_UP)); + } + + // 计算outSpeed + if (switchInfo.getOutBytes() != null && tempInfo.getOutBytes() != null) { + BigDecimal outDiff = switchInfo.getOutBytes().subtract(tempInfo.getOutBytes()); + switchInfo.setOutSpeed(outDiff.divide(divisor, 2, RoundingMode.HALF_UP)); + } + } + }); + }else{ + switchInfos.forEach(switchInfo -> { + switchInfo.setClientId(message.getClientId()); + switchInfo.setCreateTime(createTime); + }); + } + // 清空临时表对应switch信息 + initialSwitchInfoTempService.truncateSwitchInfoTemp(message.getClientId()); + // 临时表 用来计算inSpeed outSeppd + initialSwitchInfoTempService.batchInsertInitialSwitchInfoTemp(switchInfos); + // 初始交换机数据入库 + initialSwitchInfoService.batchInsertInitialSwitchInfo(switchInfos); + // 业务表入库 + InitialSwitchInfoDetailsRemote detailsRemote = new InitialSwitchInfoDetailsRemote(); + detailsRemote.setClientId(message.getClientId()); + detailsRemote.setStartTime(timeStr); + detailsRemote.setEndTime(timeStr); + remoteRevenueConfigService.autoSaveSwitchTraffic(detailsRemote, SecurityConstants.INNER); + }else{ + throw new RuntimeException("交换机data数据为空"); + } + } + /** + * 系统数据入库 + * @param message + */ + private void handleSystemMessage(DeviceMessage message) { + List systemInfos = JsonDataParser.parseJsonData(message.getData(), InitialSystemInfo.class); + if(!systemInfos.isEmpty()){ + // 时间戳转换 + long timestamp = systemInfos.get(0).getTimestamp(); + long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp; + Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 + systemInfos.forEach(iface -> { + iface.setClientId(message.getClientId()); + iface.setCreateTime(createTime); + }); + // 初始系统数据入库 + initialSystemInfoService.batchInsertInitialSystemInfo(systemInfos); + }else{ + throw new RuntimeException("系统data数据为空"); + } + } + /** + * 监听心跳 + * @param message + */ + private void handleHeartbeatMessage(DeviceMessage message) { + List heartbeats = JsonDataParser.parseJsonData(message.getData(), InitialHeartbeatListen.class); + if(!heartbeats.isEmpty()){ + InitialHeartbeatListen heartbeat = heartbeats.get(0); + String clientId = message.getClientId(); + log.info("处理心跳消息,客户端ID: {}, 时间: {}", clientId, heartbeat.getTimestamp()); + // 使用Redis存储状态 + String statusKey = HEARTBEAT_STATUS_PREFIX + clientId; + String timeKey = HEARTBEAT_TIME_PREFIX + clientId; + String recoveryCountKey = HEARTBEAT_RECOVERY_COUNT_PREFIX + clientId; // 恢复次数计数器 + try { + // 重置丢失计数为0,设置最后心跳时间 + redisTemplate.opsForValue().set(statusKey, "0"); + redisTemplate.opsForValue().set(timeKey, String.valueOf(System.currentTimeMillis())); + + // 检查是否之前有告警状态 + if (Boolean.TRUE.equals(redisTemplate.hasKey(HEARTBEAT_ALERT_PREFIX + clientId))) { + // 获取当前恢复次数 + String recoveryCountStr = redisTemplate.opsForValue().get(recoveryCountKey); + int recoveryCount = (recoveryCountStr == null) ? 1 : Integer.parseInt(recoveryCountStr) + 1; + + if (recoveryCount == 2) { + // 达到2次恢复,执行状态修改 + log.warn("客户端ID: {} 心跳恢复达到2次,修改设备状态为在线", clientId); + insertHeartbeatLog(clientId, "2", "心跳恢复,设备在线状态改为在线"); + redisTemplate.delete(HEARTBEAT_ALERT_PREFIX + clientId); + redisTemplate.delete(recoveryCountKey); // 清除恢复计数器 + // 修改资源状态 + getResourceMsg(clientId, "1"); + } else { + // 未达到2次,只记录恢复次数 + log.info("客户端ID: {} 心跳恢复第{}次", clientId, recoveryCount); + redisTemplate.opsForValue().set(recoveryCountKey, String.valueOf(recoveryCount)); + } + } + } catch (Exception e) { + log.error("处理心跳消息异常, clientId: {}", clientId, e); + } + } + } + + // 添加一个定时任务方法,定期检查心跳状态 + @Scheduled(fixedRate = 60000) // 每分钟检查一次 + public void checkHeartbeatStatus() { + long currentTime = System.currentTimeMillis(); + // 获取所有客户端时间键 + Set timeKeys = redisTemplate.keys(HEARTBEAT_TIME_PREFIX + "*"); + if (timeKeys == null) return; + + for (String timeKey : timeKeys) { + String clientId = timeKey.substring(HEARTBEAT_TIME_PREFIX.length()); + String statusKey = HEARTBEAT_STATUS_PREFIX + clientId; + String alertKey = HEARTBEAT_ALERT_PREFIX + clientId; + + try { + // 检查是否已经存在告警 + String existingAlert = redisTemplate.opsForValue().get(alertKey); + if ("1".equals(existingAlert)) { + continue; // 如果已有告警,跳过处理 + } + String lastTimeStr = redisTemplate.opsForValue().get(timeKey); + if (lastTimeStr == null) continue; + + long lastHeartbeatTime = Long.parseLong(lastTimeStr); + + if (currentTime - lastHeartbeatTime > HEARTBEAT_TIMEOUT) { + // 心跳超时处理 + String lostCountStr = redisTemplate.opsForValue().get(statusKey); + int lostCount = (lostCountStr == null ? 0 : Integer.parseInt(lostCountStr)) + 1; + redisTemplate.opsForValue().set(statusKey, String.valueOf(lostCount)); + + log.warn("客户端ID: {} 心跳丢失,连续次数: {}", clientId, lostCount); + + if (lostCount >= 3) { + insertHeartbeatLog(clientId, "3", "连续三次心跳丢失"); + redisTemplate.opsForValue().set(HEARTBEAT_ALERT_PREFIX + clientId, "1"); + // 设置告警后删除timeKey和statusKey + redisTemplate.delete(timeKey); + redisTemplate.delete(statusKey); + + log.info("客户端ID: {} 已设置告警并清理心跳记录", clientId); + // 修改资源状态 + getResourceMsg(clientId, "0"); + } + }else { + // 如果心跳正常,重置丢失次数 + redisTemplate.opsForValue().set(statusKey, "0"); + log.debug("客户端ID: {} 心跳正常,重置丢失次数", clientId); + } + } catch (Exception e) { + log.error("检查心跳状态异常, clientId: {}", clientId, e); + } + } + } + + /** + * 修改资源在线状态 + * @param clientId + * @param status + */ + private void getResourceMsg(String clientId, String status){ + String ipAddress = null; + AllInterfaceNameRemote interfaceNameRemote = new AllInterfaceNameRemote(); + interfaceNameRemote.setClientId(clientId); + + // 1. 先获取交换机IP + interfaceNameRemote.setResourceType("2"); + R switchResult = remoteRevenueConfigService.getMsgByClientId( + interfaceNameRemote, SecurityConstants.INNER); + + if (switchResult != null && switchResult.getData() != null && + StringUtils.isNotEmpty(switchResult.getData().getSwitchIp())) { + // 更新交换机状态 + ipAddress = switchResult.getData().getSwitchIp(); + updateResourceStatus(ipAddress, status); + + // 2. 再获取服务器IP + interfaceNameRemote.setResourceType("1"); + R serverResult = remoteRevenueConfigService.getMsgByClientId( + interfaceNameRemote, SecurityConstants.INNER); + + if (serverResult != null && serverResult.getData() != null && + StringUtils.isNotEmpty(serverResult.getData().getServerIp())) { + // 更新服务器状态 + updateResourceStatus(serverResult.getData().getServerIp(), status); + } + } else { + // 3. 如果没有交换机IP,只获取服务器IP + interfaceNameRemote.setResourceType("1"); + R serverResult = remoteRevenueConfigService.getMsgByClientId( + interfaceNameRemote, SecurityConstants.INNER); + + if (serverResult != null && serverResult.getData() != null && + StringUtils.isNotEmpty(serverResult.getData().getServerIp())) { + // 更新服务器状态 + updateResourceStatus(serverResult.getData().getServerIp(), status); + } else { + log.warn("未找到客户端ID: {} 对应的IP地址", clientId); + } + } + } + // 更新资源状态的公共方法 + private void updateResourceStatus(String ipAddress, String status) { + RmResourceRegistrationRemote rmResourceRegistrationRemote = new RmResourceRegistrationRemote(); + rmResourceRegistrationRemote.setOnlineStatus(status); + rmResourceRegistrationRemote.setRegistrationStatus(status); + rmResourceRegistrationRemote.setIpAddress(ipAddress); + remoteRevenueConfigService.updateStatusByResource(rmResourceRegistrationRemote, SecurityConstants.INNER); + } + // 插入心跳日志到数据库 + private void insertHeartbeatLog(String machineId, String status, String remark) { + try { + InitialHeartbeatListenLog listenLog = new InitialHeartbeatListenLog(); + listenLog.setClientId(machineId); + listenLog.setStatus(status); // 0-离线 1-在线 2-恢复 3-三次丢失 + listenLog.setRemark(remark); + listenLog.setCreateTime(new Date()); + + // 调用DAO或Service插入日志 + initialHeartbeatListenLog.insertInitialHeartbeatListenLog(listenLog); + log.info("已记录心跳日志,客户端ID: {}, 状态: {}", machineId, status); + } catch (Exception e) { + log.error("插入心跳日志失败", e); + } + } + + /** + * 应答信息 + * @param message + */ + private RspVo handleResponseMessage(DeviceMessage message) { + List rspVoList = JsonDataParser.parseJsonData(message.getData(), RspVo.class); + if (!rspVoList.isEmpty()) { + RspVo rsp = rspVoList.get(0); + log.info("应答信息:{}",rsp); + return rsp; + } + return null; + } + /** + * 注册应答处理 + * @param message + */ +// private void handleRegisterMessage(DeviceMessage message) { +// RspVo rspVo = handleResponseMessage(message); +// String clientId = message.getClientId(); +// if (rspVo != null && rspVo.getResCode() == 1) { +// RmResourceRegistrationRemote rmResourceRegistrationRemote = new RmResourceRegistrationRemote(); +// rmResourceRegistrationRemote.setRegistrationStatus("1"); +// rmResourceRegistrationRemote.setHardwareSn(clientId); +// remoteRevenueConfigService.updateStatusByResource(rmResourceRegistrationRemote, SecurityConstants.INNER); +// }else{ +// if(rspVo == null){ +// log.error("注册失败:应答信息为null"); +// }else{ +// log.error("注册失败:{}",rspVo.getResMsg()); +// } +// } +// } +} diff --git a/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/controller/RmAgentManagementController.java b/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/controller/RmAgentManagementController.java new file mode 100644 index 0000000..f83f8f9 --- /dev/null +++ b/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/controller/RmAgentManagementController.java @@ -0,0 +1,102 @@ +package com.ruoyi.rocketmq.controller; + +import com.ruoyi.common.core.utils.poi.ExcelUtil; +import com.ruoyi.common.core.web.controller.BaseController; +import com.ruoyi.common.core.web.domain.AjaxResult; +import com.ruoyi.common.core.web.page.PageDomain; +import com.ruoyi.common.core.web.page.TableDataInfo; +import com.ruoyi.common.log.annotation.Log; +import com.ruoyi.common.log.enums.BusinessType; +import com.ruoyi.common.security.annotation.RequiresPermissions; +import com.ruoyi.rocketmq.domain.RmAgentManagement; +import com.ruoyi.rocketmq.service.IRmAgentManagementService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.*; + +import javax.servlet.http.HttpServletResponse; +import java.util.List; + +/** + * Agent管理Controller + * + * @author gyt + * @date 2025-09-15 + */ +@RestController +@RequestMapping("/agentManagement") +public class RmAgentManagementController extends BaseController +{ + @Autowired + private IRmAgentManagementService rmAgentManagementService; + + /** + * 查询Agent管理列表 + */ + @RequiresPermissions("rocketmq:management:list") + @GetMapping("/list") + public TableDataInfo list(RmAgentManagement rmAgentManagement) + { + PageDomain pageDomain = new PageDomain(); + pageDomain.setPageNum(rmAgentManagement.getPageNum()); + pageDomain.setPageSize(rmAgentManagement.getPageSize()); + startPage(pageDomain); + List list = rmAgentManagementService.selectRmAgentManagementList(rmAgentManagement); + return getDataTable(list); + } + + /** + * 导出Agent管理列表 + */ + @RequiresPermissions("rocketmq:management:export") + @Log(title = "Agent管理", businessType = BusinessType.EXPORT) + @PostMapping("/export") + public void export(HttpServletResponse response, RmAgentManagement rmAgentManagement) + { + List list = rmAgentManagementService.selectRmAgentManagementList(rmAgentManagement); + ExcelUtil util = new ExcelUtil(RmAgentManagement.class); + util.exportExcel(response, list, "Agent管理数据"); + } + + /** + * 获取Agent管理详细信息 + */ + @RequiresPermissions("rocketmq:management:query") + @GetMapping(value = "/{id}") + public AjaxResult getInfo(@PathVariable("id") Long id) + { + return success(rmAgentManagementService.selectRmAgentManagementById(id)); + } + + /** + * 新增Agent管理 + */ + @RequiresPermissions("rocketmq:management:add") + @Log(title = "Agent管理", businessType = BusinessType.INSERT) + @PostMapping + public AjaxResult add(@RequestBody RmAgentManagement rmAgentManagement) + { + return toAjax(rmAgentManagementService.addRmAgentManagement(rmAgentManagement)); + } + + /** + * 修改Agent管理 + */ + @RequiresPermissions("rocketmq:management:edit") + @Log(title = "Agent管理", businessType = BusinessType.UPDATE) + @PutMapping + public AjaxResult edit(@RequestBody RmAgentManagement rmAgentManagement) + { + return toAjax(rmAgentManagementService.updateRmAgentManagement(rmAgentManagement)); + } + + /** + * 删除Agent管理 + */ + @RequiresPermissions("rocketmq:management:remove") + @Log(title = "Agent管理", businessType = BusinessType.DELETE) + @DeleteMapping("/{ids}") + public AjaxResult remove(@PathVariable Long[] ids) + { + return toAjax(rmAgentManagementService.deleteRmAgentManagementByIds(ids)); + } +} diff --git a/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/controller/RmMonitorPolicyController.java b/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/controller/RmMonitorPolicyController.java index 417a909..f8908bb 100644 --- a/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/controller/RmMonitorPolicyController.java +++ b/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/controller/RmMonitorPolicyController.java @@ -50,10 +50,11 @@ public class RmMonitorPolicyController extends BaseController @RequiresPermissions("rocketmq:policy:export") @Log(title = "资源监控策略", businessType = BusinessType.EXPORT) @PostMapping("/export") - public void export(HttpServletResponse response, RmMonitorPolicy rmMonitorPolicy) + public void export(HttpServletResponse response, @RequestBody RmMonitorPolicy rmMonitorPolicy) { List list = rmMonitorPolicyService.selectRmMonitorPolicyList(rmMonitorPolicy); ExcelUtil util = new ExcelUtil(RmMonitorPolicy.class); + util.showColumn(rmMonitorPolicy.getProperties()); util.exportExcel(response, list, "资源监控策略数据"); } @@ -90,7 +91,11 @@ public class RmMonitorPolicyController extends BaseController @PutMapping public AjaxResult edit(@RequestBody RmMonitorPolicy rmMonitorPolicy) { - return toAjax(rmMonitorPolicyService.updateRmMonitorPolicy(rmMonitorPolicy)); + int rows = rmMonitorPolicyService.updateRmMonitorPolicy(rmMonitorPolicy); + if(rows == -1){ + return AjaxResult.error("资源监控策略新增失败,该资源组已绑定其他策略"); + } + return toAjax(rows); } /** * 资源监控策略下发 diff --git a/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/controller/RmMonitorTemplateController.java b/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/controller/RmMonitorTemplateController.java index 9d01a7d..8807b01 100644 --- a/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/controller/RmMonitorTemplateController.java +++ b/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/controller/RmMonitorTemplateController.java @@ -52,10 +52,11 @@ public class RmMonitorTemplateController extends BaseController @RequiresPermissions("rocketmq:template:export") @Log(title = "监控模板", businessType = BusinessType.EXPORT) @PostMapping("/export") - public void export(HttpServletResponse response, RmMonitorTemplate rmMonitorTemplate) + public void export(HttpServletResponse response, @RequestBody RmMonitorTemplate rmMonitorTemplate) { List list = rmMonitorTemplateService.selectRmMonitorTemplateList(rmMonitorTemplate); ExcelUtil util = new ExcelUtil(RmMonitorTemplate.class); + util.showColumn(rmMonitorTemplate.getProperties()); util.exportExcel(response, list, "监控模板数据"); } diff --git a/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/domain/InitialHeartbeatListenLog.java b/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/domain/InitialHeartbeatListenLog.java index 0e6c06d..68dc6be 100644 --- a/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/domain/InitialHeartbeatListenLog.java +++ b/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/domain/InitialHeartbeatListenLog.java @@ -1,9 +1,8 @@ package com.ruoyi.rocketmq.domain; -import org.apache.commons.lang3.builder.ToStringBuilder; -import org.apache.commons.lang3.builder.ToStringStyle; import com.ruoyi.common.core.annotation.Excel; import com.ruoyi.common.core.web.domain.BaseEntity; +import lombok.Data; /** * 心跳信息日志对象 initial_heartbeat_listen_log @@ -11,47 +10,16 @@ import com.ruoyi.common.core.web.domain.BaseEntity; * @author gyt * @date 2025-09-08 */ +@Data public class InitialHeartbeatListenLog extends BaseEntity { private static final long serialVersionUID = 1L; + private Long id; /** 客户端ID */ private String clientId; /** 状态0-正常 1-恢复 2-两次丢失 3-三次丢失 */ @Excel(name = "状态0-正常 1-恢复 2-两次丢失 3-三次丢失") private String status; - - public void setClientId(String clientId) - { - this.clientId = clientId; - } - - public String getClientId() - { - return clientId; - } - - public void setStatus(String status) - { - this.status = status; - } - - public String getStatus() - { - return status; - } - - @Override - public String toString() { - return new ToStringBuilder(this,ToStringStyle.MULTI_LINE_STYLE) - .append("clientId", getClientId()) - .append("status", getStatus()) - .append("remark", getRemark()) - .append("createTime", getCreateTime()) - .append("updateTime", getUpdateTime()) - .append("createBy", getCreateBy()) - .append("updateBy", getUpdateBy()) - .toString(); - } } diff --git a/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/domain/RmAgentManagement.java b/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/domain/RmAgentManagement.java new file mode 100644 index 0000000..45a38a6 --- /dev/null +++ b/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/domain/RmAgentManagement.java @@ -0,0 +1,77 @@ +package com.ruoyi.rocketmq.domain; + +import com.fasterxml.jackson.annotation.JsonFormat; +import com.ruoyi.common.core.annotation.Excel; +import com.ruoyi.common.core.web.domain.BaseEntity; +import lombok.Data; + +import java.time.LocalDateTime; +import java.util.Date; + +/** + * Agent管理对象 rm_agent_management + * + * @author gyt + * @date 2025-09-15 + */ +@Data +public class RmAgentManagement extends BaseEntity +{ + private static final long serialVersionUID = 1L; + + /** 主键ID */ + private Long id; + + /** 硬件SN码 */ + @Excel(name = "硬件SN码") + private String hardwareSn; + + /** 资源名称 */ + @Excel(name = "资源名称") + private String resourceName; + + /** 内网IP地址 */ + @Excel(name = "内网IP地址") + private String internalIp; + + /** Agent状态:0-离线,1-在线,2-异常 */ + @Excel(name = "Agent状态:0-离线,1-在线,2-异常") + private String status; + + /** Agent版本号 */ + @Excel(name = "Agent版本号") + private String agentVersion; + + /** 执行方式 */ + @Excel(name = "执行方式") + private Integer method; + + /** 定时更新时间(cron表达式) */ + @Excel(name = "定时更新时间") + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private LocalDateTime scheduledUpdateTime; + /** 文件地址格式 */ + @Excel(name = "文件地址格式") + private Long fileUrlType; + + /** 文件地址 */ + @Excel(name = "文件地址") + private String fileUrl; + /** 文件目录 */ + @Excel(name = "文件目录") + private String fileDirectory; + + /** 最后一次更新结果(success/failure) */ + @Excel(name = "最后一次更新结果", readConverterExp = "s=uccess/failure") + private String lastUpdateResult; + + /** 最后一次更新时间 */ + @JsonFormat(pattern = "yyyy-MM-dd") + @Excel(name = "最后一次更新时间", width = 30, dateFormat = "yyyy-MM-dd") + private Date lastUpdateTime; + /** 生效服务器id */ + private String includeIds; + /** 生效服务器名称 */ + private String includeNames; + +} diff --git a/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/domain/RmMonitorPolicy.java b/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/domain/RmMonitorPolicy.java index 5239730..84a99f3 100644 --- a/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/domain/RmMonitorPolicy.java +++ b/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/domain/RmMonitorPolicy.java @@ -23,17 +23,6 @@ public class RmMonitorPolicy extends BaseEntity /** 主键ID */ private Long id; - /** 模板ID */ - private Long templateId; - /** 模板名称 */ - @Excel(name = "关联监控模板") - private String templateName; - - /** 资源组ID */ - @Excel(name = "资源组ID") - private Long resourceGroupId; - /** 资源组名称 */ - private String resourceGroupName; /** 策略名称 */ @Excel(name = "策略名称") @@ -43,17 +32,38 @@ public class RmMonitorPolicy extends BaseEntity @Excel(name = "描述") private String description; + /** 资源组ID */ + private Long resourceGroupId; + /** 资源组名称 */ + @Excel(name = "关联资源组") + private String resourceGroupName; + /** 模板ID */ + private Long templateId; + /** 模板名称 */ + @Excel(name = "关联监控模板") + private String templateName; + /** 状态:0-待下发,1-已下发 */ - @Excel(name = "状态:0-待下发,1-已下发") + @Excel(name = "策略状态", readConverterExp = "0=待下发,1=已下发") private String status; /** 下发策略时间 */ - @JsonFormat(pattern = "yyyy-MM-dd") - @Excel(name = "下发策略时间", width = 30, dateFormat = "yyyy-MM-dd") + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + @Excel(name = "下发策略时间", width = 30, dateFormat = "yyyy-MM-dd HH:mm:ss") private Date deployTime; /** 采集周期及id集合 */ private List collectionAndIdList; /** 资源类型,linux switch */ private String resourceType; + /** 查询条件名称 */ + private String queryName; + /** 创建时间 */ + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + @Excel(name = "创建时间") + private Date createTime; + /** 修改时间 */ + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + @Excel(name = "修改时间") + private Date updateTime; } diff --git a/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/domain/RmMonitorTemplate.java b/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/domain/RmMonitorTemplate.java index 62d202f..71a4883 100644 --- a/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/domain/RmMonitorTemplate.java +++ b/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/domain/RmMonitorTemplate.java @@ -1,9 +1,12 @@ package com.ruoyi.rocketmq.domain; +import com.fasterxml.jackson.annotation.JsonFormat; import com.ruoyi.common.core.annotation.Excel; import com.ruoyi.common.core.web.domain.BaseEntity; import lombok.Data; +import java.util.Date; + /** * 监控模板对象 rm_monitor_template * @@ -23,7 +26,7 @@ public class RmMonitorTemplate extends BaseEntity private String templateName; /** 模板描述 */ - @Excel(name = "模板描述") + @Excel(name = "描述") private String description; /** 监控项 */ @@ -35,11 +38,19 @@ public class RmMonitorTemplate extends BaseEntity private String discoveryRules; /** 资源组ID */ - @Excel(name = "资源组ID") private Long resourceGroupId; /** 资源组名称 */ + @Excel(name = "关联资源组") private String resourceGroupName; /** 资源类型(linux,switch) */ private String resourcyType; + /** 创建时间 */ + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + @Excel(name = "创建时间") + private Date createTime; + /** 修改时间 */ + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + @Excel(name = "修改时间") + private Date updateTime; } diff --git a/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/domain/vo/AgentUpdateVo.java b/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/domain/vo/AgentUpdateVo.java new file mode 100644 index 0000000..7db17f9 --- /dev/null +++ b/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/domain/vo/AgentUpdateVo.java @@ -0,0 +1,24 @@ +package com.ruoyi.rocketmq.domain.vo; + +import com.fasterxml.jackson.annotation.JsonFormat; +import lombok.Data; + +import java.time.Instant; +import java.time.LocalDateTime; + +@Data +public class AgentUpdateVo { + /**文件地址,外网HTTP(S)地址 */ + private String fileUrl; + /** 保存文件地址 */ + private String filePath; + /** 执行命令,List格式Json字符串 */ + private String commands; + /** 执行方式:0、立即执行;1、定时执行; */ + private Integer method; + /** 定时时间,执行方式为1、定时执行时该字段必传 */ + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private LocalDateTime policyTime; + /** 时间戳 */ + private long timestamp = Instant.now().getEpochSecond(); +} diff --git a/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/handler/DeviceMessageHandler.java b/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/handler/DeviceMessageHandler.java index 3c72eb8..ef20060 100644 --- a/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/handler/DeviceMessageHandler.java +++ b/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/handler/DeviceMessageHandler.java @@ -1,26 +1,24 @@ package com.ruoyi.rocketmq.handler; -import com.alibaba.fastjson.JSONObject; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.ruoyi.common.core.constant.SecurityConstants; import com.ruoyi.common.core.domain.R; import com.ruoyi.common.core.enums.MsgEnum; import com.ruoyi.common.core.utils.DateUtils; -import com.ruoyi.common.core.utils.StringUtils; import com.ruoyi.rocketmq.domain.*; import com.ruoyi.rocketmq.domain.vo.CollectDataVo; import com.ruoyi.rocketmq.domain.vo.RspVo; import com.ruoyi.rocketmq.service.*; import com.ruoyi.rocketmq.utils.JsonDataParser; import com.ruoyi.system.api.RemoteRevenueConfigService; -import com.ruoyi.system.api.domain.AllInterfaceNameRemote; import com.ruoyi.system.api.domain.EpsInitialTrafficDataRemote; import com.ruoyi.system.api.domain.InitialSwitchInfoDetailsRemote; import com.ruoyi.system.api.domain.RmResourceRegistrationRemote; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @@ -37,6 +35,7 @@ import java.util.stream.Collectors; */ @Slf4j @Component +@EnableScheduling public class DeviceMessageHandler { private final Map> messageHandlers = new HashMap<>(); @@ -47,7 +46,7 @@ public class DeviceMessageHandler { // 心跳告警 private static final String HEARTBEAT_ALERT_PREFIX = "heartbeat:alert:"; String HEARTBEAT_RECOVERY_COUNT_PREFIX = "heartbeat:recovery:count:"; - private static final long HEARTBEAT_TIMEOUT = 180000; // 3分钟超时 + private static final long HEARTBEAT_TIMEOUT = 30000; // 3分钟超时 @Autowired @@ -507,75 +506,6 @@ public class DeviceMessageHandler { throw new RuntimeException("交换机data数据为空"); } } - /** - * 交换机数据入库 - * @param message - */ - private void handleSwitchMessage(DeviceMessage message) { - List switchInfos = JsonDataParser.parseJsonData(message.getData(), InitialSwitchInfo.class); - if(!switchInfos.isEmpty()){ - // 时间戳转换 - long timestamp = switchInfos.get(0).getTimestamp(); - long millis = timestamp * 1000; - Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 - String timeStr = DateUtils.parseDateToStr("yyyy-MM-dd HH:mm:ss",createTime); - // 查询临时表信息,计算实际流量值 - InitialSwitchInfoTemp temp = new InitialSwitchInfoTemp(); - temp.setClientId(message.getClientId()); - List tempList = initialSwitchInfoTempService.selectInitialSwitchInfoTempList(temp); - if(!tempList.isEmpty()){ - // 1. 构建快速查找的Map - Map tempMap = tempList.stream() - .collect(Collectors.toMap( - InitialSwitchInfoTemp::getName, - Function.identity(), - (existing, replacement) -> existing - )); - - // 2. 预计算除数(避免重复创建对象) - BigDecimal divisor = new BigDecimal(300); - - // 3. 计算速度 - switchInfos.forEach(switchInfo -> { - switchInfo.setClientId(message.getClientId()); - switchInfo.setCreateTime(createTime); - InitialSwitchInfoTemp tempInfo = tempMap.get(switchInfo.getName()); - if (tempInfo != null) { - // 计算inSpeed - if (switchInfo.getInBytes() != null && tempInfo.getInBytes() != null) { - BigDecimal inDiff = switchInfo.getInBytes().subtract(tempInfo.getInBytes()); - switchInfo.setInSpeed(inDiff.divide(divisor, 2, RoundingMode.HALF_UP)); - } - - // 计算outSpeed - if (switchInfo.getOutBytes() != null && tempInfo.getOutBytes() != null) { - BigDecimal outDiff = switchInfo.getOutBytes().subtract(tempInfo.getOutBytes()); - switchInfo.setOutSpeed(outDiff.divide(divisor, 2, RoundingMode.HALF_UP)); - } - } - }); - }else{ - switchInfos.forEach(switchInfo -> { - switchInfo.setClientId(message.getClientId()); - switchInfo.setCreateTime(createTime); - }); - } - // 清空临时表对应switch信息 - initialSwitchInfoTempService.truncateSwitchInfoTemp(message.getClientId()); - // 临时表 用来计算inSpeed outSeppd - initialSwitchInfoTempService.batchInsertInitialSwitchInfoTemp(switchInfos); - // 初始交换机数据入库 - initialSwitchInfoService.batchInsertInitialSwitchInfo(switchInfos); - // 业务表入库 - InitialSwitchInfoDetailsRemote detailsRemote = new InitialSwitchInfoDetailsRemote(); - detailsRemote.setClientId(message.getClientId()); - detailsRemote.setStartTime(timeStr); - detailsRemote.setEndTime(timeStr); - remoteRevenueConfigService.autoSaveSwitchTraffic(detailsRemote, SecurityConstants.INNER); - }else{ - throw new RuntimeException("交换机data数据为空"); - } - } /** * 系统其他信息 @@ -609,27 +539,6 @@ public class DeviceMessageHandler { } } } - /** - * 系统数据入库 - * @param message - */ - private void handleSystemMessage(DeviceMessage message) { - List systemInfos = JsonDataParser.parseJsonData(message.getData(), InitialSystemInfo.class); - if(!systemInfos.isEmpty()){ - // 时间戳转换 - long timestamp = systemInfos.get(0).getTimestamp(); - long millis = timestamp * 1000; - Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 - systemInfos.forEach(iface -> { - iface.setClientId(message.getClientId()); - iface.setCreateTime(createTime); - }); - // 初始系统数据入库 - initialSystemInfoService.batchInsertInitialSystemInfo(systemInfos); - }else{ - throw new RuntimeException("系统data数据为空"); - } - } /** * 监听心跳 * @param message @@ -662,7 +571,7 @@ public class DeviceMessageHandler { redisTemplate.delete(HEARTBEAT_ALERT_PREFIX + clientId); redisTemplate.delete(recoveryCountKey); // 清除恢复计数器 // 修改资源状态 - getResourceMsg(clientId, "1"); + updateResourceStatus(clientId, "1"); } else { // 未达到2次,只记录恢复次数 log.info("客户端ID: {} 心跳恢复第{}次", clientId, recoveryCount); @@ -676,7 +585,7 @@ public class DeviceMessageHandler { } // 添加一个定时任务方法,定期检查心跳状态 - @Scheduled(fixedRate = 60000) // 每分钟检查一次 + @Scheduled(fixedRate = 30000) // 每30s检查一次 public void checkHeartbeatStatus() { long currentTime = System.currentTimeMillis(); // 获取所有客户端时间键 @@ -686,8 +595,14 @@ public class DeviceMessageHandler { for (String timeKey : timeKeys) { String clientId = timeKey.substring(HEARTBEAT_TIME_PREFIX.length()); String statusKey = HEARTBEAT_STATUS_PREFIX + clientId; + String alertKey = HEARTBEAT_ALERT_PREFIX + clientId; try { + // 检查是否已经存在告警 + String existingAlert = redisTemplate.opsForValue().get(alertKey); + if ("1".equals(existingAlert)) { + continue; // 如果已有告警,跳过处理 + } String lastTimeStr = redisTemplate.opsForValue().get(timeKey); if (lastTimeStr == null) continue; @@ -701,12 +616,21 @@ public class DeviceMessageHandler { log.warn("客户端ID: {} 心跳丢失,连续次数: {}", clientId, lostCount); - if (lostCount == 3) { + if (lostCount >= 3) { insertHeartbeatLog(clientId, "3", "连续三次心跳丢失"); redisTemplate.opsForValue().set(HEARTBEAT_ALERT_PREFIX + clientId, "1"); + // 设置告警后删除timeKey和statusKey + redisTemplate.delete(timeKey); + redisTemplate.delete(statusKey); + + log.info("客户端ID: {} 已设置告警并清理心跳记录", clientId); // 修改资源状态 - getResourceMsg(clientId, "0"); + updateResourceStatus(clientId, "0"); } + }else { + // 如果心跳正常,重置丢失次数 + redisTemplate.opsForValue().set(statusKey, "0"); + log.debug("客户端ID: {} 心跳正常,重置丢失次数", clientId); } } catch (Exception e) { log.error("检查心跳状态异常, clientId: {}", clientId, e); @@ -714,57 +638,15 @@ public class DeviceMessageHandler { } } - /** - * 修改资源在线状态 - * @param clientId - * @param status - */ - private void getResourceMsg(String clientId, String status){ - String ipAddress = null; - AllInterfaceNameRemote interfaceNameRemote = new AllInterfaceNameRemote(); - interfaceNameRemote.setClientId(clientId); - - // 1. 先获取交换机IP - interfaceNameRemote.setResourceType("2"); - R switchResult = remoteRevenueConfigService.getMsgByClientId( - interfaceNameRemote, SecurityConstants.INNER); - - if (switchResult != null && switchResult.getData() != null && - StringUtils.isNotEmpty(switchResult.getData().getSwitchIp())) { - // 更新交换机状态 - ipAddress = switchResult.getData().getSwitchIp(); - updateResourceStatus(ipAddress, status); - - // 2. 再获取服务器IP - interfaceNameRemote.setResourceType("1"); - R serverResult = remoteRevenueConfigService.getMsgByClientId( - interfaceNameRemote, SecurityConstants.INNER); - - if (serverResult != null && serverResult.getData() != null && - StringUtils.isNotEmpty(serverResult.getData().getServerIp())) { - // 更新服务器状态 - updateResourceStatus(serverResult.getData().getServerIp(), status); - } - } else { - // 3. 如果没有交换机IP,只获取服务器IP - interfaceNameRemote.setResourceType("1"); - R serverResult = remoteRevenueConfigService.getMsgByClientId( - interfaceNameRemote, SecurityConstants.INNER); - - if (serverResult != null && serverResult.getData() != null && - StringUtils.isNotEmpty(serverResult.getData().getServerIp())) { - // 更新服务器状态 - updateResourceStatus(serverResult.getData().getServerIp(), status); - } else { - log.warn("未找到客户端ID: {} 对应的IP地址", clientId); - } - } - } // 更新资源状态的公共方法 - private void updateResourceStatus(String ipAddress, String status) { + private void updateResourceStatus(String clientId, String status) { + log.info("开启更新资源状态========"); RmResourceRegistrationRemote rmResourceRegistrationRemote = new RmResourceRegistrationRemote(); rmResourceRegistrationRemote.setOnlineStatus(status); - rmResourceRegistrationRemote.setIpAddress(ipAddress); + if("0".equals(status)){ + rmResourceRegistrationRemote.setRegistrationStatus(status); + } + rmResourceRegistrationRemote.setHardwareSn(clientId); remoteRevenueConfigService.updateStatusByResource(rmResourceRegistrationRemote, SecurityConstants.INNER); } // 插入心跳日志到数据库 @@ -835,24 +717,4 @@ public class DeviceMessageHandler { } } } - /** - * 注册应答处理 - * @param message - */ -// private void handleRegisterMessage(DeviceMessage message) { -// RspVo rspVo = handleResponseMessage(message); -// String clientId = message.getClientId(); -// if (rspVo != null && rspVo.getResCode() == 1) { -// RmResourceRegistrationRemote rmResourceRegistrationRemote = new RmResourceRegistrationRemote(); -// rmResourceRegistrationRemote.setRegistrationStatus("1"); -// rmResourceRegistrationRemote.setHardwareSn(clientId); -// remoteRevenueConfigService.updateStatusByResource(rmResourceRegistrationRemote, SecurityConstants.INNER); -// }else{ -// if(rspVo == null){ -// log.error("注册失败:应答信息为null"); -// }else{ -// log.error("注册失败:{}",rspVo.getResMsg()); -// } -// } -// } } \ No newline at end of file diff --git a/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/mapper/RmAgentManagementMapper.java b/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/mapper/RmAgentManagementMapper.java new file mode 100644 index 0000000..87ab37c --- /dev/null +++ b/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/mapper/RmAgentManagementMapper.java @@ -0,0 +1,64 @@ +package com.ruoyi.rocketmq.mapper; + +import com.ruoyi.rocketmq.domain.RmAgentManagement; + +import java.util.List; + +/** + * Agent管理Mapper接口 + * + * @author gyt + * @date 2025-09-15 + */ +public interface RmAgentManagementMapper +{ + /** + * 查询Agent管理 + * + * @param id Agent管理主键 + * @return Agent管理 + */ + public RmAgentManagement selectRmAgentManagementById(Long id); + + /** + * 查询Agent管理列表 + * + * @param rmAgentManagement Agent管理 + * @return Agent管理集合 + */ + public List selectRmAgentManagementList(RmAgentManagement rmAgentManagement); + + /** + * 新增Agent管理 + * + * @param rmAgentManagement Agent管理 + * @return 结果 + */ + public int insertRmAgentManagement(RmAgentManagement rmAgentManagement); + + /** + * 修改Agent管理 + * + * @param rmAgentManagement Agent管理 + * @return 结果 + */ + public int updateRmAgentManagement(RmAgentManagement rmAgentManagement); + + /** + * 删除Agent管理 + * + * @param id Agent管理主键 + * @return 结果 + */ + public int deleteRmAgentManagementById(Long id); + + /** + * 批量删除Agent管理 + * + * @param ids 需要删除的数据主键集合 + * @return 结果 + */ + public int deleteRmAgentManagementByIds(Long[] ids); + + void updateRmAgentManagementBySn(RmAgentManagement rmAgentManagement); +} diff --git a/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/mapper/RmTemplateLinuxMapper.java b/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/mapper/RmTemplateLinuxMapper.java index 1cbf4f5..97dab9b 100644 --- a/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/mapper/RmTemplateLinuxMapper.java +++ b/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/mapper/RmTemplateLinuxMapper.java @@ -64,4 +64,5 @@ public interface RmTemplateLinuxMapper void batchInsertRmTemplateLinux(@Param("list") List linuxItems); int deleteByTemplateId(Long templateId); + int updateByTemplateId(Long templateId); } diff --git a/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/mapper/RmTemplateSwitchMapper.java b/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/mapper/RmTemplateSwitchMapper.java index ec43e62..5e4e804 100644 --- a/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/mapper/RmTemplateSwitchMapper.java +++ b/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/mapper/RmTemplateSwitchMapper.java @@ -68,4 +68,6 @@ public interface RmTemplateSwitchMapper void batchInsertRmTemplateSwitch(@Param("list") List switchItems); int deleteByTemplateId(Long templateId); + + int updateByTemplateId(Long templateId); } diff --git a/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/service/IRmAgentManagementService.java b/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/service/IRmAgentManagementService.java new file mode 100644 index 0000000..5b6f56f --- /dev/null +++ b/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/service/IRmAgentManagementService.java @@ -0,0 +1,69 @@ +package com.ruoyi.rocketmq.service; + +import com.ruoyi.rocketmq.domain.RmAgentManagement; + +import java.util.List; + +/** + * Agent管理Service接口 + * + * @author gyt + * @date 2025-09-15 + */ +public interface IRmAgentManagementService +{ + /** + * 查询Agent管理 + * + * @param id Agent管理主键 + * @return Agent管理 + */ + public RmAgentManagement selectRmAgentManagementById(Long id); + + /** + * 查询Agent管理列表 + * + * @param rmAgentManagement Agent管理 + * @return Agent管理集合 + */ + public List selectRmAgentManagementList(RmAgentManagement rmAgentManagement); + + /** + * 新增Agent管理 + * + * @param rmAgentManagement Agent管理 + * @return 结果 + */ + public int insertRmAgentManagement(RmAgentManagement rmAgentManagement); + + /** + * 修改Agent管理 + * + * @param rmAgentManagement Agent管理 + * @return 结果 + */ + public int updateRmAgentManagement(RmAgentManagement rmAgentManagement); + + /** + * 批量删除Agent管理 + * + * @param ids 需要删除的Agent管理主键集合 + * @return 结果 + */ + public int deleteRmAgentManagementByIds(Long[] ids); + + /** + * 删除Agent管理信息 + * + * @param id Agent管理主键 + * @return 结果 + */ + public int deleteRmAgentManagementById(Long id); + + /** + * 配置更新策略 + * @param rmAgentManagement + * @return + */ + int addRmAgentManagement(RmAgentManagement rmAgentManagement); +} diff --git a/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/service/impl/RmAgentManagementServiceImpl.java b/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/service/impl/RmAgentManagementServiceImpl.java new file mode 100644 index 0000000..7c87995 --- /dev/null +++ b/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/service/impl/RmAgentManagementServiceImpl.java @@ -0,0 +1,177 @@ +package com.ruoyi.rocketmq.service.impl; + +import com.alibaba.fastjson.JSONObject; +import com.ruoyi.common.core.constant.SecurityConstants; +import com.ruoyi.common.core.enums.MsgEnum; +import com.ruoyi.common.core.utils.DateUtils; +import com.ruoyi.rocketmq.domain.DeviceMessage; +import com.ruoyi.rocketmq.domain.RmAgentManagement; +import com.ruoyi.rocketmq.domain.vo.AgentUpdateVo; +import com.ruoyi.rocketmq.mapper.RmAgentManagementMapper; +import com.ruoyi.rocketmq.model.ProducerMode; +import com.ruoyi.rocketmq.producer.MessageProducer; +import com.ruoyi.rocketmq.service.IRmAgentManagementService; +import com.ruoyi.system.api.RemoteRevenueConfigService; +import com.ruoyi.system.api.domain.RmResourceRegistrationRemote; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import java.util.ArrayList; +import java.util.List; + +/** + * Agent管理Service业务层处理 + * + * @author gyt + * @date 2025-09-15 + */ +@Service +@Slf4j +public class RmAgentManagementServiceImpl implements IRmAgentManagementService +{ + @Autowired + private RmAgentManagementMapper rmAgentManagementMapper; + @Value("${fileDictory.filePath}") + private String filePath; + +// private static String COMMAND = "/usr/local/tongran/deploy.sh restart"; + private static String COMMAND = "/data/saas-income/houduan/tr-client-test/deploy.sh restart"; + private static String COMMAND2 = "cat /data/saas-income/houduan/tr-client-test/deploy.sh"; + @Autowired + private ProducerMode producerMode; + @Autowired + private RemoteRevenueConfigService remoteRevenueConfigService; + + /** + * 查询Agent管理 + * + * @param id Agent管理主键 + * @return Agent管理 + */ + @Override + public RmAgentManagement selectRmAgentManagementById(Long id) + { + return rmAgentManagementMapper.selectRmAgentManagementById(id); + } + + /** + * 查询Agent管理列表 + * + * @param rmAgentManagement Agent管理 + * @return Agent管理 + */ + @Override + public List selectRmAgentManagementList(RmAgentManagement rmAgentManagement) + { + return rmAgentManagementMapper.selectRmAgentManagementList(rmAgentManagement); + } + + /** + * 新增Agent管理 + * + * @param rmAgentManagement Agent管理 + * @return 结果 + */ + @Override + public int insertRmAgentManagement(RmAgentManagement rmAgentManagement) + { + rmAgentManagement.setCreateTime(DateUtils.getNowDate()); + return rmAgentManagementMapper.insertRmAgentManagement(rmAgentManagement); + } + + /** + * 修改Agent管理 + * + * @param rmAgentManagement Agent管理 + * @return 结果 + */ + @Override + public int updateRmAgentManagement(RmAgentManagement rmAgentManagement) + { + rmAgentManagement.setUpdateTime(DateUtils.getNowDate()); + return rmAgentManagementMapper.updateRmAgentManagement(rmAgentManagement); + } + + /** + * 批量删除Agent管理 + * + * @param ids 需要删除的Agent管理主键 + * @return 结果 + */ + @Override + public int deleteRmAgentManagementByIds(Long[] ids) + { + return rmAgentManagementMapper.deleteRmAgentManagementByIds(ids); + } + + /** + * 删除Agent管理信息 + * + * @param id Agent管理主键 + * @return 结果 + */ + @Override + public int deleteRmAgentManagementById(Long id) + { + return rmAgentManagementMapper.deleteRmAgentManagementById(id); + } + /** + * 配置更新策略 + * @param rmAgentManagement + * @return + */ + @Override + public int addRmAgentManagement(RmAgentManagement rmAgentManagement) { + // 设备 + String idStr = rmAgentManagement.getIncludeIds(); + String[] ids = idStr.split(","); + List registrationRemotes = remoteRevenueConfigService.getRegistrationByIds(ids, SecurityConstants.INNER).getData(); + for (RmResourceRegistrationRemote resourceMsg : registrationRemotes) { + rmAgentManagement.setHardwareSn(resourceMsg.getHardwareSn()); + rmAgentManagement.setResourceName(resourceMsg.getResourceName()); + rmAgentManagement.setInternalIp(resourceMsg.getIpAddress()); + // 查询该资源是否已经配置 + RmAgentManagement agentQueryParam = new RmAgentManagement(); + agentQueryParam.setHardwareSn(resourceMsg.getHardwareSn()); + List agentManagements = rmAgentManagementMapper.selectRmAgentManagementList(agentQueryParam); + if(!agentManagements.isEmpty()){ + // 如果存在,修改 + rmAgentManagementMapper.updateRmAgentManagementBySn(rmAgentManagement); + }else{ + // 如果不存在,添加 + rmAgentManagementMapper.insertRmAgentManagement(rmAgentManagement); + } + // 构建更新策略 + AgentUpdateVo agentUpdateVo = new AgentUpdateVo(); + agentUpdateVo.setFileUrl(rmAgentManagement.getFileUrl()); + agentUpdateVo.setFilePath("/data/saas-income/houduan/tr-client-test/temp"); + List commandList = new ArrayList<>(); + commandList.add(COMMAND); + commandList.add(COMMAND2); + agentUpdateVo.setCommands(JSONObject.toJSONString(commandList)); + agentUpdateVo.setMethod(rmAgentManagement.getMethod()); + if(rmAgentManagement.getMethod() == 1){ + agentUpdateVo.setPolicyTime(rmAgentManagement.getScheduledUpdateTime()); + } + try { + DeviceMessage deviceMessage = new DeviceMessage(); + deviceMessage.setClientId(resourceMsg.getHardwareSn()); + deviceMessage.setDataType(MsgEnum.Agent版本更新.getValue()); + deviceMessage.setData(JSONObject.toJSONString(agentUpdateVo)); + MessageProducer messageProducer = new MessageProducer(); + + messageProducer.sendAsyncProducerMessage( + producerMode.getAgentTopic(), + "", + "", + JSONObject.toJSONString(deviceMessage) + ); + } catch (Exception e) { + log.error("发送设备配置失败,deviceId: {}", resourceMsg.getHardwareSn(), e); + } + } + return 1; + } +} diff --git a/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/service/impl/RmMonitorPolicyServiceImpl.java b/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/service/impl/RmMonitorPolicyServiceImpl.java index 2785225..7c71863 100644 --- a/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/service/impl/RmMonitorPolicyServiceImpl.java +++ b/ruoyi-rocketmq/src/main/java/com/ruoyi/rocketmq/service/impl/RmMonitorPolicyServiceImpl.java @@ -119,9 +119,13 @@ public class RmMonitorPolicyServiceImpl implements IRmMonitorPolicyService queryParam.setResourceGroupId(resourceGroupId); List exits = rmMonitorPolicyMapper.selectRmMonitorPolicyList(queryParam); if(!exits.isEmpty()){ - return -1; + if(exits.get(0).getId() != rmMonitorPolicy.getId()){ + return -1; + } } rmMonitorPolicy.setUpdateTime(DateUtils.getNowDate()); + // 策略状态改为待下发 + rmMonitorPolicy.setStatus("0"); rmMonitorPolicyMapper.updateRmMonitorPolicy(rmMonitorPolicy); // 模板绑定资源组信息 RmMonitorTemplate rmMonitorTemplate = new RmMonitorTemplate(); @@ -132,6 +136,8 @@ public class RmMonitorPolicyServiceImpl implements IRmMonitorPolicyService List collectionAndIdList = rmMonitorPolicy.getCollectionAndIdList(); if(!collectionAndIdList.isEmpty()){ if("linux".equals(rmMonitorPolicy.getResourceType())){ + // 修改之前先置空所有采集周期 + rmTemplateLinuxMapper.updateByTemplateId(rmMonitorPolicy.getTemplateId()); for (RmMonitorPolicyVo rmMonitorPolicyVo : collectionAndIdList) { // 添加采集周期 RmTemplateLinux rmTemplateLinux = new RmTemplateLinux(); @@ -140,6 +146,8 @@ public class RmMonitorPolicyServiceImpl implements IRmMonitorPolicyService rmTemplateLinuxMapper.updateRmTemplateLinux(rmTemplateLinux); } }else if("switch".equals(rmMonitorPolicy.getResourceType())){ + // 修改之前先置空所有采集周期 + rmTemplateSwitchMapper.updateByTemplateId(rmMonitorPolicy.getTemplateId()); for (RmMonitorPolicyVo rmMonitorPolicyVo : collectionAndIdList) { // 添加采集周期 RmTemplateSwitch rmTemplateSwitch = new RmTemplateSwitch(); @@ -163,6 +171,14 @@ public class RmMonitorPolicyServiceImpl implements IRmMonitorPolicyService @Override public int deleteRmMonitorPolicyByIds(Long[] ids) { + for (Long id : ids) { + // 根据id查询策略详情 + RmMonitorPolicy rmMonitorPolicy = rmMonitorPolicyMapper.selectRmMonitorPolicyById(id); + Long templateId = rmMonitorPolicy.getTemplateId(); + // 删除策略的时候,把该模板的采集周期置空 + rmTemplateLinuxMapper.updateByTemplateId(templateId); + rmTemplateSwitchMapper.updateByTemplateId(templateId); + } return rmMonitorPolicyMapper.deleteRmMonitorPolicyByIds(ids); } diff --git a/ruoyi-rocketmq/src/main/resources/mapper/rocketmq/InitialHeartbeatListenLogMapper.xml b/ruoyi-rocketmq/src/main/resources/mapper/rocketmq/InitialHeartbeatListenLogMapper.xml index d9c42f1..01f4898 100644 --- a/ruoyi-rocketmq/src/main/resources/mapper/rocketmq/InitialHeartbeatListenLogMapper.xml +++ b/ruoyi-rocketmq/src/main/resources/mapper/rocketmq/InitialHeartbeatListenLogMapper.xml @@ -1,10 +1,11 @@ + PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" + "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> - + + @@ -15,22 +16,23 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" - select client_id, status, remark, create_time, update_time, create_by, update_by from initial_heartbeat_listen_log + select id, client_id, status, remark, create_time, update_time, create_by, update_by from initial_heartbeat_listen_log - - - where client_id = #{clientId} + where id = #{id} - + insert into initial_heartbeat_listen_log client_id, @@ -40,7 +42,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" update_time, create_by, update_by, - + #{clientId}, #{status}, @@ -49,12 +51,13 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" #{updateTime}, #{createBy}, #{updateBy}, - + update initial_heartbeat_listen_log + client_id = #{clientId}, status = #{status}, remark = #{remark}, create_time = #{createTime}, @@ -62,17 +65,17 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" create_by = #{createBy}, update_by = #{updateBy}, - where client_id = #{clientId} + where id = #{id} - - delete from initial_heartbeat_listen_log where client_id = #{clientId} + + delete from initial_heartbeat_listen_log where id = #{id} - - delete from initial_heartbeat_listen_log where client_id in - - #{clientId} + + delete from initial_heartbeat_listen_log where id in + + #{id} \ No newline at end of file diff --git a/ruoyi-rocketmq/src/main/resources/mapper/rocketmq/RmAgentManagementMapper.xml b/ruoyi-rocketmq/src/main/resources/mapper/rocketmq/RmAgentManagementMapper.xml new file mode 100644 index 0000000..c99a7b6 --- /dev/null +++ b/ruoyi-rocketmq/src/main/resources/mapper/rocketmq/RmAgentManagementMapper.xml @@ -0,0 +1,148 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + select id, hardware_sn, resource_name, internal_ip, status, agent_version, method, scheduled_update_time, file_url_type, file_url, file_directory, last_update_result, last_update_time, create_time, update_time, create_by, update_by from rm_agent_management + + + + + + + + insert into rm_agent_management + + hardware_sn, + resource_name, + internal_ip, + status, + agent_version, + method, + scheduled_update_time, + file_url_type, + file_url, + file_directory, + last_update_result, + last_update_time, + create_time, + update_time, + create_by, + update_by, + + + #{hardwareSn}, + #{resourceName}, + #{internalIp}, + #{status}, + #{agentVersion}, + #{method}, + #{scheduledUpdateTime}, + #{fileUrlType}, + #{fileUrl}, + #{fileDirectory}, + #{lastUpdateResult}, + #{lastUpdateTime}, + #{createTime}, + #{updateTime}, + #{createBy}, + #{updateBy}, + + + + + update rm_agent_management + + hardware_sn = #{hardwareSn}, + resource_name = #{resourceName}, + internal_ip = #{internalIp}, + status = #{status}, + agent_version = #{agentVersion}, + method = #{method}, + scheduled_update_time = #{scheduledUpdateTime}, + file_url_type = #{fileUrlType}, + file_url = #{fileUrl}, + file_directory = #{fileDirectory}, + last_update_result = #{lastUpdateResult}, + last_update_time = #{lastUpdateTime}, + create_time = #{createTime}, + update_time = #{updateTime}, + create_by = #{createBy}, + update_by = #{updateBy}, + + where id = #{id} + + + update rm_agent_management + + resource_name = #{resourceName}, + internal_ip = #{internalIp}, + status = #{status}, + agent_version = #{agentVersion}, + method = #{method}, + scheduled_update_time = #{scheduledUpdateTime}, + file_url_type = #{fileUrlType}, + file_url = #{fileUrl}, + file_directory = #{fileDirectory}, + last_update_result = #{lastUpdateResult}, + last_update_time = #{lastUpdateTime}, + create_time = #{createTime}, + update_time = #{updateTime}, + create_by = #{createBy}, + update_by = #{updateBy}, + + where hardware_sn = #{hardwareSn} + + + + delete from rm_agent_management where id = #{id} + + + + delete from rm_agent_management where id in + + #{id} + + + \ No newline at end of file diff --git a/ruoyi-rocketmq/src/main/resources/mapper/rocketmq/RmMonitorPolicyMapper.xml b/ruoyi-rocketmq/src/main/resources/mapper/rocketmq/RmMonitorPolicyMapper.xml index 557c450..f69772e 100644 --- a/ruoyi-rocketmq/src/main/resources/mapper/rocketmq/RmMonitorPolicyMapper.xml +++ b/ruoyi-rocketmq/src/main/resources/mapper/rocketmq/RmMonitorPolicyMapper.xml @@ -24,13 +24,19 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" diff --git a/ruoyi-rocketmq/src/main/resources/mapper/rocketmq/RmTemplateLinuxMapper.xml b/ruoyi-rocketmq/src/main/resources/mapper/rocketmq/RmTemplateLinuxMapper.xml index 587cda6..1ceaeab 100644 --- a/ruoyi-rocketmq/src/main/resources/mapper/rocketmq/RmTemplateLinuxMapper.xml +++ b/ruoyi-rocketmq/src/main/resources/mapper/rocketmq/RmTemplateLinuxMapper.xml @@ -129,4 +129,9 @@ delete from rm_template_linux where template_id = #{templateId} + + + update rm_template_linux set collection_cycle = null + where template_id = #{templateId} + \ No newline at end of file diff --git a/ruoyi-rocketmq/src/main/resources/mapper/rocketmq/RmTemplateSwitchMapper.xml b/ruoyi-rocketmq/src/main/resources/mapper/rocketmq/RmTemplateSwitchMapper.xml index c7bed8b..eed73c2 100644 --- a/ruoyi-rocketmq/src/main/resources/mapper/rocketmq/RmTemplateSwitchMapper.xml +++ b/ruoyi-rocketmq/src/main/resources/mapper/rocketmq/RmTemplateSwitchMapper.xml @@ -161,4 +161,8 @@ delete from rm_template_switch where template_id = #{templateId} + + update rm_template_switch set collection_cycle = null + where template_id = #{templateId} + \ No newline at end of file