package com.ruoyi.rocketmq.handler; import com.ruoyi.common.core.constant.SecurityConstants; import com.ruoyi.common.core.domain.R; import com.ruoyi.common.core.enums.MsgEnum; import com.ruoyi.common.core.utils.DateUtils; import com.ruoyi.common.core.utils.StringUtils; import com.ruoyi.rocketmq.domain.*; import com.ruoyi.rocketmq.domain.vo.CollectDataVo; import com.ruoyi.rocketmq.domain.vo.RegisterMsgVo; import com.ruoyi.rocketmq.domain.vo.RspResultVo; import com.ruoyi.rocketmq.domain.vo.RspVo; import com.ruoyi.rocketmq.service.*; import com.ruoyi.rocketmq.utils.DataProcessUtil; import com.ruoyi.rocketmq.utils.JsonDataParser; import com.ruoyi.system.api.RemoteRevenueConfigService; import com.ruoyi.system.api.domain.*; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.dao.DataAccessException; import org.springframework.data.redis.core.RedisOperations; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.SessionCallback; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.*; import java.util.function.Consumer; /** * 设备消息处理器 */ @Slf4j @Component @EnableScheduling public class MessageHandler { private final Map> messageHandlers = new HashMap<>(); // 心跳状态 private static final String HEARTBEAT_STATUS_PREFIX = "heartbeat:status:"; // 心跳时间 private static final String HEARTBEAT_TIME_PREFIX = "heartbeat:time:"; // 心跳告警 private static final String HEARTBEAT_ALERT_PREFIX = "heartbeat:alert:"; String HEARTBEAT_RECOVERY_COUNT_PREFIX = "heartbeat:recovery:count:"; private static final long HEARTBEAT_TIMEOUT = 30000; // 3分钟超时 @Autowired private RedisTemplate redisTemplate; @Autowired private IInitialBandwidthTrafficService initialBandwidthTrafficService; @Autowired private RemoteRevenueConfigService remoteRevenueConfigService; @Autowired private IInitialDockerInfoService initialDockerInfoService; @Autowired private IInitialCpuInfoService initialCpuInfoService; @Autowired private IInitialDiskInfoService initialDiskInfoService; @Autowired private IInitialMemoryInfoService initialMemoryInfoService; @Autowired private IInitialMountPointInfoService initialMountPointInfoService; @Autowired private IInitialHeartbeatListenLogService initialHeartbeatListenLog; @Autowired private IInitialSystemOtherCollectDataService iInitialSystemOtherCollectDataService; @Autowired private IRmResourceRemoteService rmResourceRemoteService; @Autowired private IRmAgentManagementService rmAgentManagementService; @Autowired private DataProcessUtil dataProcessUtil; @Autowired private IRmNetworkInterfaceService rmNetworkInterfaceService; @Autowired private IRmMonitorPolicyService rmMonitorPolicyService; @Autowired private IRmDeploymentPolicyService rmDeploymentPolicyService; /** * 初始化处理器映射 */ @PostConstruct public void init() { registerHandler(MsgEnum.执行脚本策略应答.getValue(), this::handleScriptRspMessage); registerHandler(MsgEnum.Agent版本更新应答.getValue(), this::handleAgentUpdateRspMessage); // 其他类型消息可以单独注册处理器 registerHandler(MsgEnum.注册.getValue(), this::handleRegisterMessage); registerHandler(MsgEnum.获取最新策略.getValue(), this::handleNewPolicyMessage); registerHandler(MsgEnum.CPU上报.getValue(), this::handleCpuMessage); registerHandler(MsgEnum.磁盘上报.getValue(), this::handleDiskMessage); registerHandler(MsgEnum.容器上报.getValue(), this::handleDockerMessage); registerHandler(MsgEnum.内存上报.getValue(), this::handleMemoryMessage); registerHandler(MsgEnum.网络上报.getValue(), this::handleNetMessage); registerHandler(MsgEnum.挂载上报.getValue(), this::handleMountPointMessage); registerHandler(MsgEnum.系统其他上报.getValue(), this::handleOtherSystemMessage); registerHandler(MsgEnum.心跳上报.getValue(), this::handleHeartbeatMessage); registerHandler(MsgEnum.多公网IP探测.getValue(), this::handleNetWorkDelectMessage); } /** * 保存网卡信息 * @param message */ private void handleNetWorkDelectMessage(DeviceMessage message) { List interfaces = JsonDataParser.parseJsonData(message.getData(), RegisterMsgVo.class); if(!interfaces.isEmpty()) { String clientId = message.getClientId(); RegisterMsgVo registerMsg = interfaces.get(0); // 时间戳转换 long timestamp = registerMsg.getTimestamp(); long millis = timestamp * 1000; Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 List networkInfoList = registerMsg.getNetworkInfo(); if(!networkInfoList.isEmpty()){ if(networkInfoList.size()==1){ NetworkInfo networkInfo = networkInfoList.get(0); // 查询该网卡信息是否存在 RmNetworkInterface queryParam = new RmNetworkInterface(); queryParam.setClientId(clientId); queryParam.setMacAddress(networkInfo.getMac()); queryParam.setNewFlag(1); List exits = rmNetworkInterfaceService.selectRmNetworkInterfaceList(queryParam); if(exits.isEmpty()){ // 保存网卡信息 RmNetworkInterface insertData = new RmNetworkInterface(); // 业务ip和管理网ip insertData.setBindIp("3"); insertData.setClientId(clientId); insertData.setIsp(networkInfo.getCarrier()); insertData.setCity(networkInfo.getCity()); insertData.setGateway(networkInfo.getGateway()); insertData.setInterfaceName(networkInfo.getName()); insertData.setIpv4Address(networkInfo.getIpv4()); insertData.setMacAddress(networkInfo.getMac()); insertData.setProvince(networkInfo.getProvince()); insertData.setPublicIp(networkInfo.getPublicIp()); insertData.setInterfaceType(networkInfo.getType()); insertData.setCreateTime(createTime); rmNetworkInterfaceService.insertRmNetworkInterface(insertData); }else{ RmNetworkInterface oldInterfaceMsg = exits.get(0); if(!StringUtils.equals(networkInfo.getName(),oldInterfaceMsg.getInterfaceName()) || !StringUtils.equals(networkInfo.getGateway(),oldInterfaceMsg.getGateway())){ // 查询该网卡信息是否旧数据 RmNetworkInterface query = new RmNetworkInterface(); query.setMacAddress(oldInterfaceMsg.getMacAddress()); query.setClientId(clientId); query.setNewFlag(999); List oldExits = rmNetworkInterfaceService.selectRmNetworkInterfaceList(query); if(!oldExits.isEmpty()){ // 先删除旧数据 oldExits.forEach(oldMsg ->{ rmNetworkInterfaceService.deleteRmNetworkInterfaceById(oldMsg.getId()); }); } // 先将已存在的改为旧数据 RmNetworkInterface oldData = new RmNetworkInterface(); oldData.setNewFlag(0); oldData.setClientId(clientId); oldData.setMacAddress(oldInterfaceMsg.getMacAddress()); rmNetworkInterfaceService.updateRmNetworkInterfaceByMac(oldData); RmNetworkInterface insertData = new RmNetworkInterface(); insertData.setClientId(clientId); insertData.setIsp(networkInfo.getCarrier()); insertData.setCity(networkInfo.getCity()); insertData.setGateway(networkInfo.getGateway()); insertData.setInterfaceName(networkInfo.getName()); insertData.setIpv4Address(networkInfo.getIpv4()); insertData.setMacAddress(networkInfo.getMac()); insertData.setProvince(networkInfo.getProvince()); insertData.setPublicIp(networkInfo.getPublicIp()); insertData.setInterfaceType(networkInfo.getType()); rmNetworkInterfaceService.insertRmNetworkInterface(insertData); }else { RmNetworkInterface updateData = new RmNetworkInterface(); // 检查其他字段是否需要更新 boolean needUpdate = false; // 逐个字段比较是否需要更新 if (!StringUtils.equals(networkInfo.getCity(), oldInterfaceMsg.getCity())) { updateData.setCity(networkInfo.getCity()); needUpdate = true; } if (!StringUtils.equals(networkInfo.getIpv4(), oldInterfaceMsg.getIpv4Address())) { updateData.setIpv4Address(networkInfo.getIpv4()); needUpdate = true; } if (!StringUtils.equals(networkInfo.getProvince(), oldInterfaceMsg.getProvince())) { updateData.setProvince(networkInfo.getProvince()); needUpdate = true; } if (!StringUtils.equals(networkInfo.getPublicIp(), oldInterfaceMsg.getPublicIp())) { updateData.setPublicIp(networkInfo.getPublicIp()); needUpdate = true; } if (!StringUtils.equals(networkInfo.getCarrier(), oldInterfaceMsg.getIsp())) { updateData.setIsp(networkInfo.getCarrier()); needUpdate = true; } if (!StringUtils.equals(networkInfo.getType(), oldInterfaceMsg.getInterfaceType())) { updateData.setInterfaceType(networkInfo.getType()); needUpdate = true; } // 只有有字段变化时才执行更新 if (needUpdate) { updateData.setClientId(clientId); updateData.setMacAddress(oldInterfaceMsg.getMacAddress()); rmNetworkInterfaceService.updateRmNetworkInterfaceByMac(updateData); } } } }else{ for (NetworkInfo networkInfo : networkInfoList) { // 查询该网卡信息是否存在 RmNetworkInterface queryParam = new RmNetworkInterface(); queryParam.setClientId(clientId); queryParam.setMacAddress(networkInfo.getMac()); List exits = rmNetworkInterfaceService.selectRmNetworkInterfaceList(queryParam); if(exits.isEmpty()){ // 保存网卡信息 RmNetworkInterface insertData = new RmNetworkInterface(); // 业务ip和管理网ip insertData.setClientId(clientId); insertData.setIsp(networkInfo.getCarrier()); insertData.setCity(networkInfo.getCity()); insertData.setGateway(networkInfo.getGateway()); insertData.setInterfaceName(networkInfo.getName()); insertData.setIpv4Address(networkInfo.getIpv4()); insertData.setMacAddress(networkInfo.getMac()); insertData.setProvince(networkInfo.getProvince()); insertData.setPublicIp(networkInfo.getPublicIp()); insertData.setInterfaceType(networkInfo.getType()); insertData.setCreateTime(createTime); rmNetworkInterfaceService.insertRmNetworkInterface(insertData); }else{ RmNetworkInterface oldInterfaceMsg = exits.get(0); if(!StringUtils.equals(networkInfo.getName(),oldInterfaceMsg.getInterfaceName()) || !StringUtils.equals(networkInfo.getGateway(),oldInterfaceMsg.getGateway())){ // 查询该网卡信息是否旧数据 RmNetworkInterface query = new RmNetworkInterface(); query.setClientId(clientId); query.setMacAddress(networkInfo.getMac()); query.setNewFlag(999); List oldExits = rmNetworkInterfaceService.selectRmNetworkInterfaceList(query); if(!oldExits.isEmpty()){ // 先删除旧数据 oldExits.forEach(oldMsg ->{ rmNetworkInterfaceService.deleteRmNetworkInterfaceById(oldMsg.getId()); }); } // 先将已存在的改为旧数据 RmNetworkInterface oldData = new RmNetworkInterface(); oldData.setNewFlag(0); oldData.setClientId(clientId); oldData.setMacAddress(oldInterfaceMsg.getMacAddress()); rmNetworkInterfaceService.updateRmNetworkInterfaceByMac(oldData); RmNetworkInterface insertData = new RmNetworkInterface(); insertData.setClientId(clientId); insertData.setIsp(networkInfo.getCarrier()); insertData.setCity(networkInfo.getCity()); insertData.setGateway(networkInfo.getGateway()); insertData.setInterfaceName(networkInfo.getName()); insertData.setIpv4Address(networkInfo.getIpv4()); insertData.setMacAddress(networkInfo.getMac()); insertData.setProvince(networkInfo.getProvince()); insertData.setPublicIp(networkInfo.getPublicIp()); insertData.setInterfaceType(networkInfo.getType()); rmNetworkInterfaceService.insertRmNetworkInterface(insertData); }else { RmNetworkInterface updateData = new RmNetworkInterface(); // 检查其他字段是否需要更新 boolean needUpdate = false; // 逐个字段比较是否需要更新 if (!StringUtils.equals(networkInfo.getCity(), oldInterfaceMsg.getCity())) { updateData.setCity(networkInfo.getCity()); needUpdate = true; } if (!StringUtils.equals(networkInfo.getIpv4(), oldInterfaceMsg.getIpv4Address())) { updateData.setIpv4Address(networkInfo.getIpv4()); needUpdate = true; } if (!StringUtils.equals(networkInfo.getProvince(), oldInterfaceMsg.getProvince())) { updateData.setProvince(networkInfo.getProvince()); needUpdate = true; } if (!StringUtils.equals(networkInfo.getPublicIp(), oldInterfaceMsg.getPublicIp())) { updateData.setPublicIp(networkInfo.getPublicIp()); needUpdate = true; updateData.setBindIp("0"); // 修改绑定公网ip状态 RmResourceRegistrationRemote updateParam = new RmResourceRegistrationRemote(); updateParam.setClientId(clientId); updateParam.setMultiPublicIpStatus("0"); remoteRevenueConfigService.updateStatusByResource(updateParam, SecurityConstants.INNER); } if (!StringUtils.equals(networkInfo.getCarrier(), oldInterfaceMsg.getIsp())) { updateData.setIsp(networkInfo.getCarrier()); needUpdate = true; } if (!StringUtils.equals(networkInfo.getType(), oldInterfaceMsg.getInterfaceType())) { updateData.setInterfaceType(networkInfo.getType()); needUpdate = true; } // 只有有字段变化时才执行更新 if (needUpdate) { updateData.setClientId(clientId); updateData.setMacAddress(oldInterfaceMsg.getMacAddress()); rmNetworkInterfaceService.updateRmNetworkInterfaceByMac(updateData); } } } } } } } } /** * 获取最新策略 * @param deviceMessage */ private void handleNewPolicyMessage(DeviceMessage deviceMessage) { List interfaces = JsonDataParser.parseJsonData(deviceMessage.getData(), RegisterMsgVo.class); if(!interfaces.isEmpty()) { RegisterMsgVo registerMsgVo = interfaces.get(0); String clientId = registerMsgVo.getClientId(); // 如果未下发监控策略,下发 rmMonitorPolicyService.issuePolicyMsgByClientId(clientId); // 如果未下发服务器脚本策略,下发 rmDeploymentPolicyService.issueDeployPolicyMsgByClientId(clientId); // 如果路由有变化,更新路由信息 rmNetworkInterfaceService.updateRouteMsg(clientId); } } /** * 服务器注册 * @param message */ private void handleRegisterMessage(DeviceMessage message) { List interfaces = JsonDataParser.parseJsonData(message.getData(), RegisterMsgVo.class); if(!interfaces.isEmpty()) { String clientId = message.getClientId(); RegisterMsgVo registerMsg = interfaces.get(0); // 自动注册服务器信息 RmRegisterMsgRemote rmRegisterMsgRemote = new RmRegisterMsgRemote(); BeanUtils.copyProperties(registerMsg, rmRegisterMsgRemote); int rows = remoteRevenueConfigService.innerAddRegist(rmRegisterMsgRemote, SecurityConstants.INNER).getData(); if(rows == 2){ // 注册成功,下发优先级为0的策略 rmMonitorPolicyService.issueDefaultPolicyByClientId(message.getClientId()); } // 时间戳转换 long timestamp = registerMsg.getTimestamp(); long millis = timestamp * 1000; Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 String timeStr = DateUtils.parseDateToStr("yyyy-MM-dd HH:mm:ss", createTime); List networkInfoList = registerMsg.getNetworkInfo(); if(!networkInfoList.isEmpty()){ if(networkInfoList.size()==1){ NetworkInfo networkInfo = networkInfoList.get(0); // 查询该网卡信息是否存在 RmNetworkInterface queryParam = new RmNetworkInterface(); queryParam.setClientId(clientId); queryParam.setMacAddress(networkInfo.getMac()); queryParam.setNewFlag(1); List exits = rmNetworkInterfaceService.selectRmNetworkInterfaceList(queryParam); if(exits.isEmpty()){ // 保存网卡信息 RmNetworkInterface insertData = new RmNetworkInterface(); // 业务ip和管理网ip insertData.setBindIp("3"); insertData.setClientId(clientId); insertData.setIsp(networkInfo.getCarrier()); insertData.setCity(networkInfo.getCity()); insertData.setGateway(networkInfo.getGateway()); insertData.setInterfaceName(networkInfo.getName()); insertData.setIpv4Address(networkInfo.getIpv4()); insertData.setMacAddress(networkInfo.getMac()); insertData.setProvince(networkInfo.getProvince()); insertData.setPublicIp(networkInfo.getPublicIp()); insertData.setInterfaceType(networkInfo.getType()); insertData.setCreateTime(createTime); rmNetworkInterfaceService.insertRmNetworkInterface(insertData); }else{ RmNetworkInterface oldInterfaceMsg = exits.get(0); if(!StringUtils.equals(networkInfo.getName(),oldInterfaceMsg.getInterfaceName()) || !StringUtils.equals(networkInfo.getGateway(),oldInterfaceMsg.getGateway())){ // 查询该网卡信息是否旧数据 RmNetworkInterface query = new RmNetworkInterface(); query.setClientId(clientId); query.setMacAddress(oldInterfaceMsg.getMacAddress()); query.setNewFlag(999); List oldExits = rmNetworkInterfaceService.selectRmNetworkInterfaceList(query); if(!oldExits.isEmpty()){ // 先删除旧数据 oldExits.forEach(oldMsg ->{ rmNetworkInterfaceService.deleteRmNetworkInterfaceById(oldMsg.getId()); }); } // 先将已存在的改为旧数据 RmNetworkInterface oldData = new RmNetworkInterface(); oldData.setNewFlag(0); oldData.setMacAddress(oldInterfaceMsg.getMacAddress()); oldData.setClientId(clientId); rmNetworkInterfaceService.updateRmNetworkInterfaceByMac(oldData); RmNetworkInterface insertData = new RmNetworkInterface(); insertData.setClientId(clientId); insertData.setIsp(networkInfo.getCarrier()); insertData.setCity(networkInfo.getCity()); insertData.setGateway(networkInfo.getGateway()); insertData.setInterfaceName(networkInfo.getName()); insertData.setIpv4Address(networkInfo.getIpv4()); insertData.setMacAddress(networkInfo.getMac()); insertData.setProvince(networkInfo.getProvince()); insertData.setPublicIp(networkInfo.getPublicIp()); insertData.setInterfaceType(networkInfo.getType()); rmNetworkInterfaceService.insertRmNetworkInterface(insertData); }else { RmNetworkInterface updateData = new RmNetworkInterface(); // 检查其他字段是否需要更新 boolean needUpdate = false; // 逐个字段比较是否需要更新 if (!StringUtils.equals(networkInfo.getCity(), oldInterfaceMsg.getCity())) { updateData.setCity(networkInfo.getCity()); needUpdate = true; } if (!StringUtils.equals(networkInfo.getIpv4(), oldInterfaceMsg.getIpv4Address())) { updateData.setIpv4Address(networkInfo.getIpv4()); needUpdate = true; } if (!StringUtils.equals(networkInfo.getProvince(), oldInterfaceMsg.getProvince())) { updateData.setProvince(networkInfo.getProvince()); needUpdate = true; } if (!StringUtils.equals(networkInfo.getPublicIp(), oldInterfaceMsg.getPublicIp())) { updateData.setPublicIp(networkInfo.getPublicIp()); needUpdate = true; } if (!StringUtils.equals(networkInfo.getCarrier(), oldInterfaceMsg.getIsp())) { updateData.setIsp(networkInfo.getCarrier()); needUpdate = true; } if (!StringUtils.equals(networkInfo.getType(), oldInterfaceMsg.getInterfaceType())) { updateData.setInterfaceType(networkInfo.getType()); needUpdate = true; } // 只有有字段变化时才执行更新 if (needUpdate) { updateData.setClientId(clientId); updateData.setMacAddress(oldInterfaceMsg.getMacAddress()); rmNetworkInterfaceService.updateRmNetworkInterfaceByMac(updateData); } } } }else{ for (NetworkInfo networkInfo : networkInfoList) { // 查询该网卡信息是否存在 RmNetworkInterface queryParam = new RmNetworkInterface(); queryParam.setMacAddress(networkInfo.getMac()); queryParam.setClientId(clientId); List exits = rmNetworkInterfaceService.selectRmNetworkInterfaceList(queryParam); if(exits.isEmpty()){ // 保存网卡信息 RmNetworkInterface insertData = new RmNetworkInterface(); // 业务ip和管理网ip insertData.setClientId(clientId); insertData.setIsp(networkInfo.getCarrier()); insertData.setCity(networkInfo.getCity()); insertData.setGateway(networkInfo.getGateway()); insertData.setInterfaceName(networkInfo.getName()); insertData.setIpv4Address(networkInfo.getIpv4()); insertData.setMacAddress(networkInfo.getMac()); insertData.setProvince(networkInfo.getProvince()); insertData.setPublicIp(networkInfo.getPublicIp()); insertData.setInterfaceType(networkInfo.getType()); insertData.setCreateTime(createTime); rmNetworkInterfaceService.insertRmNetworkInterface(insertData); }else{ RmNetworkInterface oldInterfaceMsg = exits.get(0); if(!StringUtils.equals(networkInfo.getName(),oldInterfaceMsg.getInterfaceName()) || !StringUtils.equals(networkInfo.getGateway(),oldInterfaceMsg.getGateway())){ // 查询该网卡信息是否旧数据 RmNetworkInterface query = new RmNetworkInterface(); query.setClientId(clientId); query.setMacAddress(oldInterfaceMsg.getMacAddress()); query.setNewFlag(999); List oldExits = rmNetworkInterfaceService.selectRmNetworkInterfaceList(query); if(!oldExits.isEmpty()){ // 先删除旧数据 oldExits.forEach(oldMsg ->{ rmNetworkInterfaceService.deleteRmNetworkInterfaceById(oldMsg.getId()); }); } // 先将已存在的改为旧数据 RmNetworkInterface oldData = new RmNetworkInterface(); oldData.setNewFlag(0); oldData.setClientId(clientId); oldData.setMacAddress(oldInterfaceMsg.getMacAddress()); rmNetworkInterfaceService.updateRmNetworkInterfaceByMac(oldData); RmNetworkInterface insertData = new RmNetworkInterface(); insertData.setClientId(clientId); insertData.setIsp(networkInfo.getCarrier()); insertData.setCity(networkInfo.getCity()); insertData.setGateway(networkInfo.getGateway()); insertData.setInterfaceName(networkInfo.getName()); insertData.setIpv4Address(networkInfo.getIpv4()); insertData.setMacAddress(networkInfo.getMac()); insertData.setProvince(networkInfo.getProvince()); insertData.setPublicIp(networkInfo.getPublicIp()); insertData.setInterfaceType(networkInfo.getType()); rmNetworkInterfaceService.insertRmNetworkInterface(insertData); }else { RmNetworkInterface updateData = new RmNetworkInterface(); // 检查其他字段是否需要更新 boolean needUpdate = false; // 逐个字段比较是否需要更新 if (!StringUtils.equals(networkInfo.getCity(), oldInterfaceMsg.getCity())) { updateData.setCity(networkInfo.getCity()); needUpdate = true; } if (!StringUtils.equals(networkInfo.getIpv4(), oldInterfaceMsg.getIpv4Address())) { updateData.setIpv4Address(networkInfo.getIpv4()); needUpdate = true; } if (!StringUtils.equals(networkInfo.getProvince(), oldInterfaceMsg.getProvince())) { updateData.setProvince(networkInfo.getProvince()); needUpdate = true; } if (!StringUtils.equals(networkInfo.getPublicIp(), oldInterfaceMsg.getPublicIp())) { updateData.setPublicIp(networkInfo.getPublicIp()); needUpdate = true; updateData.setBindIp("0"); // 修改绑定公网ip状态 RmResourceRegistrationRemote updateParam = new RmResourceRegistrationRemote(); updateParam.setClientId(clientId); updateParam.setMultiPublicIpStatus("0"); remoteRevenueConfigService.updateStatusByResource(updateParam, SecurityConstants.INNER); } if (!StringUtils.equals(networkInfo.getCarrier(), oldInterfaceMsg.getIsp())) { updateData.setIsp(networkInfo.getCarrier()); needUpdate = true; } if (!StringUtils.equals(networkInfo.getType(), oldInterfaceMsg.getInterfaceType())) { updateData.setInterfaceType(networkInfo.getType()); needUpdate = true; } // 只有有字段变化时才执行更新 if (needUpdate) { updateData.setClientId(clientId); updateData.setMacAddress(oldInterfaceMsg.getMacAddress()); rmNetworkInterfaceService.updateRmNetworkInterfaceByMac(updateData); } } } } } } } } /** * agent更新响应 * @param message */ private void handleAgentUpdateRspMessage(DeviceMessage message) { List rspVoList = JsonDataParser.parseJsonData(message.getData(), RspVo.class); if (!rspVoList.isEmpty()) { RspVo rsp = rspVoList.get(0); if(rsp.getResCode() == 1){ RmAgentManagement rmAgentManagement = new RmAgentManagement(); rmAgentManagement.setClientId(message.getClientId()); rmAgentManagement.setLastUpdateResult("1"); rmAgentManagement.setLastUpdateTime(DateUtils.getNowDate()); rmAgentManagementService.updateRmAgentManagementByHardwareSn(rmAgentManagement); }else{ RmAgentManagement rmAgentManagement = new RmAgentManagement(); rmAgentManagement.setClientId(message.getClientId()); rmAgentManagement.setLastUpdateResult("0"); rmAgentManagement.setLastUpdateTime(DateUtils.getNowDate()); rmAgentManagementService.updateRmAgentManagementByHardwareSn(rmAgentManagement); } } } /** * 注册消息处理器 */ private void registerHandler(String dataType, Consumer handler) { messageHandlers.put(dataType, handler); } /** * 处理设备消息(对外暴露的主方法) */ public void handleMessage(DeviceMessage message) { String dataType = message.getDataType(); Consumer handler = messageHandlers.get(dataType); if (handler != null) { handler.accept(message); } else { log.warn("未知数据类型:{}", dataType); } } // ========== 具体的消息处理方法 ========== /** * 网络流量数据入库 * @param message */ private void handleNetMessage(DeviceMessage message) { List interfaces = JsonDataParser.parseJsonData(message.getData(), InitialBandwidthTraffic.class); if(!interfaces.isEmpty()){ // 时间戳转换 long timestamp = interfaces.get(0).getTimestamp(); long millis = timestamp * 1000; Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 String timeStr = DateUtils.parseDateToStr("yyyy-MM-dd HH:mm:ss",createTime); InitialBandwidthTraffic data = new InitialBandwidthTraffic(); interfaces.forEach(iface -> { iface.setClientId(message.getClientId()); iface.setCreateTime(createTime); // 发送流量 iface.setOutSpeed(dataProcessUtil.bytesToBits(iface.getOutSpeed())); // 接收流量 iface.setInSpeed(dataProcessUtil.bytesToBits(iface.getInSpeed())); }); // 批量入库集合 data.setList(interfaces); // 初始流量数据入库 initialBandwidthTrafficService.batchInsert(data); EpsInitialTrafficDataRemote epsInitialTrafficDataRemote = new EpsInitialTrafficDataRemote(); epsInitialTrafficDataRemote.setStartTime(timeStr); epsInitialTrafficDataRemote.setEndTime(timeStr); // 复制到业务初始库 remoteRevenueConfigService.autoSaveServiceTrafficData(epsInitialTrafficDataRemote, SecurityConstants.INNER); }else{ throw new RuntimeException("NET流量data数据为空"); } } /** * docker数据入库 * @param message */ private void handleDockerMessage(DeviceMessage message) { List dockers = JsonDataParser.parseJsonData(message.getData(), InitialDockerInfo.class); if(!dockers.isEmpty()){ // 时间戳转换 long timestamp = dockers.get(0).getTimestamp(); long millis = timestamp * 1000; Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 dockers.forEach(iface -> { iface.setClientId(message.getClientId()); iface.setCreateTime(createTime); }); // 初始容器数据入库 initialDockerInfoService.batchInsertInitialDockerInfo(dockers); }else{ throw new RuntimeException("DOCKER容器data数据为空"); } } /** * cpu数据入库 * @param message */ private void handleCpuMessage(DeviceMessage message) { List cpus = JsonDataParser.parseJsonData(message.getData(),InitialCpuInfo.class); // 时间戳转换 long timestamp = cpus.get(0).getTimestamp(); long millis = timestamp * 1000; Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 if(!cpus.isEmpty()){ cpus.forEach(iface -> { iface.setClientId(message.getClientId()); iface.setCreateTime(createTime); }); // 初始CPU数据入库 initialCpuInfoService.batchInsertInitialCpuInfo(cpus); }else{ throw new RuntimeException("CPUdata数据为空"); } } /** * 磁盘数据入库 * @param message */ private void handleDiskMessage(DeviceMessage message) { List disks = JsonDataParser.parseJsonData(message.getData(), InitialDiskInfo.class); // 时间戳转换 long timestamp = disks.get(0).getTimestamp(); long millis = timestamp * 1000; Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 if(!disks.isEmpty()){ disks.forEach(iface -> { iface.setClientId(message.getClientId()); iface.setCreateTime(createTime); }); // 初始磁盘数据入库 initialDiskInfoService.batchInsertInitialDiskInfo(disks); }else{ throw new RuntimeException("磁盘data数据为空"); } } /** * 内存数据入库 * @param message */ private void handleMemoryMessage(DeviceMessage message) { List memorys = JsonDataParser.parseJsonData(message.getData(), InitialMemoryInfo.class); if(!memorys.isEmpty()){ // 时间戳转换 long timestamp = memorys.get(0).getTimestamp(); long millis = timestamp * 1000; Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 memorys.forEach(iface -> { iface.setClientId(message.getClientId()); iface.setCreateTime(createTime); }); // 初始内存数据入库 initialMemoryInfoService.batchInsertInitialMemoryInfo(memorys); }else{ throw new RuntimeException("内存data数据为空"); } } /** * 挂载点数据入库 * @param message */ private void handleMountPointMessage(DeviceMessage message) { List mountPointInfos = JsonDataParser.parseJsonData(message.getData(), InitialMountPointInfo.class); if(!mountPointInfos.isEmpty()){ // 时间戳转换 long timestamp = mountPointInfos.get(0).getTimestamp(); long millis = timestamp * 1000; Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 mountPointInfos.forEach(iface -> { iface.setClientId(message.getClientId()); iface.setCreateTime(createTime); }); // 初始挂载点数据入库 initialMountPointInfoService.batchInsertInitialMountPointInfo(mountPointInfos); }else{ throw new RuntimeException("挂载点data数据为空"); } } /** * 系统其他信息 * @param message */ private void handleOtherSystemMessage(DeviceMessage message) { List otherData = JsonDataParser.parseJsonData(message.getData(), CollectDataVo.class); if(!otherData.isEmpty()){ CollectDataVo systemDataVo = otherData.get(0); if (systemDataVo != null){ try { InitialSystemOtherCollectData insertData = new InitialSystemOtherCollectData(); // 时间戳转换 long timestamp = systemDataVo.getTimestamp(); long millis = timestamp * 1000; Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 insertData.setClientId(message.getClientId()); insertData.setCreateTime(createTime); String collectType = systemDataVo.getType(); String collectValue = systemDataVo.getValue(); insertData.setCollectType(collectType); insertData.setCollectValue(collectValue); iInitialSystemOtherCollectDataService.insertInitialSystemOtherCollectData(insertData); } catch (Exception e) { log.error("解析JSON数据失败: {}, value: {}", e.getMessage(), systemDataVo.getValue()); // 可以选择保存原始数据或进行其他错误处理 } } } } /** * 监听心跳 * @param message */ private void handleHeartbeatMessage(DeviceMessage message) { List heartbeats = JsonDataParser.parseJsonData(message.getData(), InitialHeartbeatListen.class); if(!heartbeats.isEmpty()){ InitialHeartbeatListen heartbeat = heartbeats.get(0); String clientId = message.getClientId(); String version = heartbeat.getVersion(); log.debug("处理心跳消息,客户端ID: {}, 时间: {}", clientId, heartbeat.getTimestamp()); // 添加逻辑节点标识 RmResourceRegistrationRemote updateData = new RmResourceRegistrationRemote(); updateData.setClientId(message.getClientId()); updateData.setLogicalNodeId(heartbeat.getLogicalNode()); updateData.setAgentVersion(version); updateData.setOnboardTime(DateUtils.getNowDate()); remoteRevenueConfigService.innerUpdateRegist(updateData, SecurityConstants.INNER); // agent更新结果存储 RmAgentManagement query = new RmAgentManagement(); query.setClientId(clientId); List agentManagements = rmAgentManagementService.selectRmAgentManagementList(query); if(!agentManagements.isEmpty()){ RmAgentManagement rmAgentManagement = agentManagements.get(0); if(!StringUtils.equals(rmAgentManagement.getAgentVersion(), version)){ // 存储更新结果 RmAgentManagement updateResultData = new RmAgentManagement(); updateResultData.setId(rmAgentManagement.getId()); updateResultData.setAgentVersion(version); updateResultData.setLastUpdateTime(DateUtils.getNowDate()); updateResultData.setLastUpdateResult("1"); rmAgentManagementService.updateRmAgentManagementByHardwareSn(updateResultData); } } // 使用Redis存储状态 String statusKey = HEARTBEAT_STATUS_PREFIX + clientId; String timeKey = HEARTBEAT_TIME_PREFIX + clientId; String recoveryCountKey = HEARTBEAT_RECOVERY_COUNT_PREFIX + clientId; try { // 记录处理前状态(调试用) String prevStatus = redisTemplate.opsForValue().get(statusKey); String prevTime = redisTemplate.opsForValue().get(timeKey); log.debug("客户端ID: {} 处理前状态 - status: {}, time: {}", clientId, prevStatus, prevTime); // 使用事务确保原子性操作 redisTemplate.execute(new SessionCallback() { @Override public Object execute(RedisOperations operations) throws DataAccessException { operations.multi(); // 重置丢失计数为0,设置最后心跳时间 operations.opsForValue().set(statusKey, "0"); operations.opsForValue().set(timeKey, String.valueOf(System.currentTimeMillis())); return operations.exec(); } }); log.debug("客户端ID: {} 心跳处理完成,重置状态和时间", clientId); // 检查是否之前有告警状态 if (Boolean.TRUE.equals(redisTemplate.hasKey(HEARTBEAT_ALERT_PREFIX + clientId))) { // 获取当前恢复次数 String recoveryCountStr = redisTemplate.opsForValue().get(recoveryCountKey); int recoveryCount = (recoveryCountStr == null) ? 1 : Integer.parseInt(recoveryCountStr) + 1; if (recoveryCount == 2) { // 达到2次恢复,执行状态修改 log.warn("客户端ID: {} 心跳恢复达到2次,修改设备状态为在线", clientId); insertHeartbeatLog(clientId, "2", "心跳恢复,设备在线状态改为在线"); redisTemplate.delete(HEARTBEAT_ALERT_PREFIX + clientId); redisTemplate.delete(recoveryCountKey); // 清除恢复计数器 // 修改资源状态 updateResourceStatus(clientId, "1"); } else { // 未达到2次,只记录恢复次数 log.info("客户端ID: {} 心跳恢复第{}次", clientId, recoveryCount); redisTemplate.opsForValue().set(recoveryCountKey, String.valueOf(recoveryCount)); } } } catch (Exception e) { log.error("处理心跳消息异常, clientId: {}", clientId, e); } } } // 添加一个定时任务方法,定期检查心跳状态 @Scheduled(fixedRate = 60000) // 每60s检查一次 public void checkHeartbeatStatus() { long currentTime = System.currentTimeMillis(); log.debug("开始心跳状态检查,当前时间: {}", currentTime); // 获取所有客户端时间键 Set timeKeys = redisTemplate.keys(HEARTBEAT_TIME_PREFIX + "*"); if (timeKeys == null) { log.debug("未找到任何心跳时间键"); return; } log.debug("找到 {} 个客户端需要检查", timeKeys.size()); for (String timeKey : timeKeys) { String clientId = timeKey.substring(HEARTBEAT_TIME_PREFIX.length()); String statusKey = HEARTBEAT_STATUS_PREFIX + clientId; String alertKey = HEARTBEAT_ALERT_PREFIX + clientId; try { // 检查是否已经存在告警 String existingAlert = redisTemplate.opsForValue().get(alertKey); if ("1".equals(existingAlert)) { log.debug("客户端ID: {} 已有告警,跳过检查", clientId); continue; // 如果已有告警,跳过处理 } String lastTimeStr = redisTemplate.opsForValue().get(timeKey); if (lastTimeStr == null) { log.debug("客户端ID: {} 时间键为空,跳过", clientId); continue; } long lastHeartbeatTime = Long.parseLong(lastTimeStr); long timeDiff = currentTime - lastHeartbeatTime; log.debug("客户端ID: {} 最后心跳: {}, 时间差: {}ms, 超时阈值: {}ms", clientId, lastHeartbeatTime, timeDiff, HEARTBEAT_TIMEOUT); if (timeDiff > HEARTBEAT_TIMEOUT) { // 心跳超时处理 - 使用原子操作增加计数 Long lostCount = redisTemplate.opsForValue().increment(statusKey); if (lostCount == 1) { // 确保第一次增加时值为1 redisTemplate.opsForValue().set(statusKey, "1"); lostCount = 1L; } log.warn("客户端ID: {} 心跳超时,连续次数: {}, 时间差: {}ms", clientId, lostCount, timeDiff); if (lostCount >= 3) { log.warn("客户端ID: {} 连续三次心跳丢失,触发告警", clientId); insertHeartbeatLog(clientId, "3", "连续三次心跳丢失"); redisTemplate.opsForValue().set(HEARTBEAT_ALERT_PREFIX + clientId, "1"); // 设置告警后删除timeKey和statusKey redisTemplate.delete(timeKey); redisTemplate.delete(statusKey); log.info("客户端ID: {} 已设置告警并清理心跳记录", clientId); // 修改资源状态 updateResourceStatus(clientId, "0"); } } else { // 如果心跳正常,重置丢失次数 String currentStatus = redisTemplate.opsForValue().get(statusKey); if (!"0".equals(currentStatus)) { redisTemplate.opsForValue().set(statusKey, "0"); log.debug("客户端ID: {} 心跳正常,重置丢失次数从 {} 到 0", clientId, currentStatus); } else { log.debug("客户端ID: {} 心跳正常,状态已是0", clientId); } } } catch (Exception e) { log.error("检查心跳状态异常, clientId: {}", clientId, e); } } log.debug("心跳状态检查完成"); } // 更新资源状态的公共方法 private void updateResourceStatus(String clientId, String status) { log.info("开启更新资源状态========"); RmResourceRegistrationRemote query = new RmResourceRegistrationRemote(); query.setClientId(clientId); R registerMsgR = remoteRevenueConfigService.getListByHardwareSn(query, SecurityConstants.INNER); if(registerMsgR != null && registerMsgR.getData() != null){ RmResourceRegistrationRemote rmResourceRegistrationRemote = new RmResourceRegistrationRemote(); rmResourceRegistrationRemote.setOnlineStatus(status); rmResourceRegistrationRemote.setClientId(clientId); remoteRevenueConfigService.updateStatusByResource(rmResourceRegistrationRemote, SecurityConstants.INNER); } RmSwitchManagementRemote rmSwitchManagementRemote = new RmSwitchManagementRemote(); rmSwitchManagementRemote.setClientId(clientId); R> rmSwitchManagementRemoteListR = remoteRevenueConfigService.getSwitchNameByClientId(rmSwitchManagementRemote, SecurityConstants.INNER); if(rmSwitchManagementRemoteListR != null && rmSwitchManagementRemoteListR.getData()!=null && !rmSwitchManagementRemoteListR.getData().isEmpty()){ RmSwitchManagementRemote switchUpdate = new RmSwitchManagementRemote(); switchUpdate.setClientId(clientId); switchUpdate.setOnlineStatus(status); remoteRevenueConfigService.updateSwitchMsgByClientId(switchUpdate, SecurityConstants.INNER); } } // 插入心跳日志到数据库 private void insertHeartbeatLog(String machineId, String status, String remark) { try { InitialHeartbeatListenLog listenLog = new InitialHeartbeatListenLog(); listenLog.setClientId(machineId); listenLog.setStatus(status); // 0-离线 1-在线 2-恢复 3-三次丢失 listenLog.setRemark(remark); listenLog.setCreateTime(new Date()); // 调用DAO或Service插入日志 initialHeartbeatListenLog.insertInitialHeartbeatListenLog(listenLog); log.info("已记录心跳日志,客户端ID: {}, 状态: {}", machineId, status); } catch (Exception e) { log.error("插入心跳日志失败", e); } } /** * 应答信息 * @param message */ private RspVo handleResponseMessage(DeviceMessage message) { List rspVoList = JsonDataParser.parseJsonData(message.getData(), RspVo.class); if (!rspVoList.isEmpty()) { RspVo rsp = rspVoList.get(0); log.info("应答信息:{}",rsp); return rsp; } return null; } /** * 脚本策略应答信息 * @param message */ private void handleScriptRspMessage(DeviceMessage message) { List rspVoList = JsonDataParser.parseJsonData(message.getData(), RspVo.class); if (!rspVoList.isEmpty()) { RspVo rsp = rspVoList.get(0); // 时间戳转换 long timestamp = rsp.getTimestamp(); long millis = timestamp * 1000; Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒 // 查询资源信息 RmResourceRegistrationRemote queryParam = new RmResourceRegistrationRemote(); queryParam.setClientId(message.getClientId()); R resourceMsg = remoteRevenueConfigService.getListByHardwareSn(queryParam, SecurityConstants.INNER); RmResourceRegistrationRemote resourceMsgData = resourceMsg.getData(); if(rsp.getResCode() == 1){ if(rsp.getResult() != null){ List resultVos = JsonDataParser.parseJsonData(rsp.getResult(), RspResultVo.class); RspResultVo rspResultVo = resultVos.get(0); // 构建脚本执行结果实体类 RmResourceRemote insertData = new RmResourceRemote(); insertData.setClientId(message.getClientId()); insertData.setScriptId(Long.valueOf(rspResultVo.getScriptId())); insertData.setResultFlag(1); insertData.setDescription(rsp.getResult()); insertData.setCreateTime(createTime); // 执行插入sql rmResourceRemoteService.insertRmResourceRemote(insertData); log.info("脚本执行结果入库成功:{}",rsp); } }else{ // 构建脚本执行结果实体类 RmResourceRemote insertData = new RmResourceRemote(); insertData.setClientId(message.getClientId()); insertData.setResultFlag(0); if(rsp.getResult() != null){ List resultVos = JsonDataParser.parseJsonData(rsp.getResult(), RspResultVo.class); RspResultVo rspResultVo = resultVos.get(0); insertData.setDescription(rsp.getResult()); insertData.setScriptId(Long.valueOf(rspResultVo.getScriptId())); } insertData.setCreateTime(createTime); // 执行插入sql rmResourceRemoteService.insertRmResourceRemote(insertData); log.error("脚本执行失败:{}", rsp); } } } }