增加agent联调,获取最新策略应答功能

This commit is contained in:
gaoyutao
2025-10-22 17:54:46 +08:00
parent 4fdaadee65
commit 31d8114c05
45 changed files with 1686 additions and 341 deletions

View File

@@ -1,7 +1,5 @@
package com.ruoyi.rocketmq.handler;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ruoyi.common.core.constant.SecurityConstants;
import com.ruoyi.common.core.domain.R;
import com.ruoyi.common.core.enums.MsgEnum;
@@ -14,9 +12,11 @@ import com.ruoyi.rocketmq.domain.vo.RspVo;
import com.ruoyi.rocketmq.service.*;
import com.ruoyi.rocketmq.utils.DataProcessUtil;
import com.ruoyi.rocketmq.utils.JsonDataParser;
import com.ruoyi.rocketmq.utils.SwitchJsonDataParser;
import com.ruoyi.system.api.RemoteRevenueConfigService;
import com.ruoyi.system.api.domain.*;
import com.ruoyi.system.api.domain.EpsInitialTrafficDataRemote;
import com.ruoyi.system.api.domain.NetworkInfo;
import com.ruoyi.system.api.domain.RmRegisterMsgRemote;
import com.ruoyi.system.api.domain.RmResourceRegistrationRemote;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
@@ -29,12 +29,8 @@ import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.*;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* 设备消息处理器
@@ -72,24 +68,8 @@ public class MessageHandler {
@Autowired
private IInitialMountPointInfoService initialMountPointInfoService;
@Autowired
private IInitialSwitchInfoService initialSwitchInfoService;
@Autowired
private IInitialSystemInfoService initialSystemInfoService;
@Autowired
private IInitialSwitchInfoTempService initialSwitchInfoTempService;
@Autowired
private IInitialHeartbeatListenLogService initialHeartbeatListenLog;
@Autowired
private IInitialSwitchPowerSupplyService initialSwitchPowerSupplyService;
@Autowired
private IInitialSwitchFanInfoService initialSwitchFanInfoService;
@Autowired
private IInitialSwitchMpuInfoService initialSwitchMpuInfoService;
@Autowired
private IInitialSwitchOpticalModuleService initialSwitchOpticalModuleService;
@Autowired
private IInitialSwitchOtherCollectDataService insertInitialSwitchOtherInfo;
@Autowired
private IInitialSystemOtherCollectDataService iInitialSystemOtherCollectDataService;
@Autowired
private IRmResourceRemoteService rmResourceRemoteService;
@@ -99,8 +79,10 @@ public class MessageHandler {
private DataProcessUtil dataProcessUtil;
@Autowired
private IRmNetworkInterfaceService rmNetworkInterfaceService;
// 在类中添加
private static final ObjectMapper objectMapper = new ObjectMapper();
@Autowired
private IRmMonitorPolicyService rmMonitorPolicyService;
@Autowired
private IRmDeploymentPolicyService rmDeploymentPolicyService;
/**
@@ -108,14 +90,6 @@ public class MessageHandler {
*/
@PostConstruct
public void init() {
// 所有应答类消息使用同一个处理器
// registerHandler(MsgEnum.注册应答.getValue(), this::handleResponseMessage);
// registerHandler(MsgEnum.断开应答.getValue(), this::handleResponseMessage);
// registerHandler(MsgEnum.开启系统采集应答.getValue(), this::handleResponseMessage);
// registerHandler(MsgEnum.关闭系统采集应答.getValue(), this::handleResponseMessage);
// registerHandler(MsgEnum.开启交换机采集应答.getValue(), this::handleResponseMessage);
// registerHandler(MsgEnum.关闭交换机采集应答.getValue(), this::handleResponseMessage);
// registerHandler(MsgEnum.告警设置应答.getValue(), this::handleResponseMessage);
// registerHandler(MsgEnum.执行脚本策略应答.getValue(), this::handleScriptRspMessage);
// registerHandler(MsgEnum.Agent版本更新应答.getValue(), this::handleAgentUpdateRspMessage);
@@ -129,9 +103,7 @@ public class MessageHandler {
// registerHandler(MsgEnum.网络上报.getValue(), this::handleNetMessage);
// registerHandler(MsgEnum.挂载上报.getValue(), this::handleMountPointMessage);
// registerHandler(MsgEnum.系统其他上报.getValue(), this::handleOtherSystemMessage);
// registerHandler(MsgEnum.交换机上报.getValue(), this::handleSwitchDataMessage);
// registerHandler(MsgEnum.心跳上报.getValue(), this::handleHeartbeatMessage);
// registerHandler(MsgEnum.多公网IP探测.getValue(), this::handlePublicIpDetectMessage);
registerHandler(MsgEnum.心跳上报.getValue(), this::handleHeartbeatMessage);
}
/**
@@ -141,8 +113,12 @@ public class MessageHandler {
private void handleNewPolicyMessage(DeviceMessage deviceMessage) {
List<RegisterMsgVo> interfaces = JsonDataParser.parseJsonData(deviceMessage.getData(), RegisterMsgVo.class);
if(!interfaces.isEmpty()) {
// 获取该服务器的策略信息
RegisterMsgVo registerMsgVo = interfaces.get(0);
String clientId = registerMsgVo.getClientId();
// 如果未下发监控策略,下发
rmMonitorPolicyService.issuePolicyMsgByClientId(clientId);
// 如果未下发服务器脚本策略,下发
rmDeploymentPolicyService.issueDeployPolicyMsgByClientId(clientId);
}
}
@@ -253,17 +229,6 @@ public class MessageHandler {
}
}
/**
* 多公网IP探测
* @param message
*/
private void handlePublicIpDetectMessage(DeviceMessage message) {
List<RspVo> rspVoList = JsonDataParser.parseJsonData(message.getData(), RspVo.class);
//TODO 公网信息入库
// 查询公网信息
// 如果公网信息有变化修改公网IP状态为需绑定
}
/**
* agent更新响应
* @param message
@@ -448,248 +413,9 @@ public class MessageHandler {
throw new RuntimeException("挂载点data数据为空");
}
}
/**
* 交换机所有数据入库
* @param message
*/
private void handleSwitchDataMessage(DeviceMessage message) {
List<CollectDataVo> switchData = JsonDataParser.parseJsonData(message.getData(), CollectDataVo.class);
if(!switchData.isEmpty()){
CollectDataVo switchDataVo = switchData.get(0);
switch(switchDataVo.getType()){
case "switchNetCollect":
handleSwitchNetMessage(switchDataVo, message.getClientId());
break;
case "switchPwrCollect":
handleSwitchPwrMessage(switchDataVo, message.getClientId());
break;
case "switchModuleCollect":
handleSwitchModuleMessage(switchDataVo, message.getClientId());
break;
case "switchMpuCollect":
handleSwitchMpuMessage(switchDataVo, message.getClientId());
break;
case "switchFanCollect":
handleSwitchFanMessage(switchDataVo, message.getClientId());
break;
default:
handleSwitchOtherMessage(switchDataVo, message.getClientId());
break;
}
}else{
throw new RuntimeException("交换机data数据为空");
}
}
/**
* 电源发现数据
* @param switchDataVo
*/
private void handleSwitchPwrMessage(CollectDataVo switchDataVo, String clientId){
List<InitialSwitchPowerSupply> powerSupplyList = SwitchJsonDataParser.parseJsonData(switchDataVo.getValue(), InitialSwitchPowerSupply.class);
if (!powerSupplyList.isEmpty()){
for (InitialSwitchPowerSupply insertData : powerSupplyList) {
// 时间戳转换
long timestamp = switchDataVo.getTimestamp();
long millis = timestamp * 1000;
Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒
insertData.setClientId(clientId);
insertData.setCreateTime(createTime);
}
initialSwitchPowerSupplyService.insertBatchInitialSwitchPowerSupply(powerSupplyList);
}
}
/**
* 光模块发现数据
* @param switchDataVo
*/
private void handleSwitchModuleMessage(CollectDataVo switchDataVo, String clientId){
List<InitialSwitchOpticalModule> moduleList = SwitchJsonDataParser.parseJsonData(switchDataVo.getValue(), InitialSwitchOpticalModule.class);
if (!moduleList.isEmpty()){
for (InitialSwitchOpticalModule insertData : moduleList) {
// 时间戳转换
long timestamp = switchDataVo.getTimestamp();
long millis = timestamp * 1000;
Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒
insertData.setClientId(clientId);
insertData.setCreateTime(createTime);
}
initialSwitchOpticalModuleService.batchInitialSwitchOpticalModule(moduleList);
}
}
/**
* MPU发现数据
* @param switchDataVo
*/
private void handleSwitchMpuMessage(CollectDataVo switchDataVo, String clientId){
List<InitialSwitchMpuInfo> mpuList = SwitchJsonDataParser.parseJsonData(switchDataVo.getValue(), InitialSwitchMpuInfo.class);
if (!mpuList.isEmpty()){
for (InitialSwitchMpuInfo insertData : mpuList) {
// 时间戳转换
long timestamp = switchDataVo.getTimestamp();
long millis = timestamp * 1000;
Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒
insertData.setClientId(clientId);
insertData.setCreateTime(createTime);
}
initialSwitchMpuInfoService.insertBatchInitialSwitchMpuInfo(mpuList);
}
}
/**
* 风扇发现数据
* @param switchDataVo
*/
private void handleSwitchFanMessage(CollectDataVo switchDataVo, String clientId){
List<InitialSwitchFanInfo> fanList = SwitchJsonDataParser.parseJsonData(switchDataVo.getValue(), InitialSwitchFanInfo.class);
if (!fanList.isEmpty()){
for (InitialSwitchFanInfo insertData : fanList) {
// 时间戳转换
long timestamp = switchDataVo.getTimestamp();
long millis = timestamp * 1000;
Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒
insertData.setClientId(clientId);
insertData.setCreateTime(createTime);
}
initialSwitchFanInfoService.insertBatchInitialSwitchFanInfo(fanList);
}
}
/**
* 其他发现数据(默认处理)
* @param switchDataVo
*/
private void handleSwitchOtherMessage(CollectDataVo switchDataVo, String clientId) {
if (switchDataVo != null) {
try {
InitialSwitchOtherCollectData insertData = new InitialSwitchOtherCollectData();
// 设置基本信息
long timestamp = switchDataVo.getTimestamp();
Date createTime = new Date(timestamp * 1000 / 1000 * 1000);
insertData.setClientId(clientId);
insertData.setCreateTime(createTime);
String value = switchDataVo.getValue();
JsonNode jsonNode = objectMapper.readTree(value);
// 处理数组中的字符串JSON
if (jsonNode.isArray() && jsonNode.size() > 0) {
JsonNode firstElement = jsonNode.get(0);
if (firstElement.isTextual()) {
// 二次解析JSON字符串
JsonNode innerJsonNode = objectMapper.readTree(firstElement.asText());
if (innerJsonNode.isObject()) {
Iterator<Map.Entry<String, JsonNode>> fields = innerJsonNode.fields();
if (fields.hasNext()) {
Map.Entry<String, JsonNode> entry = fields.next();
String fieldName = entry.getKey();
String fieldValue = entry.getValue().asText();
insertData.setCollectType(fieldName);
if (!"null".equals(fieldValue)) {
insertData.setCollectValue(fieldValue);
insertInitialSwitchOtherInfo.insertInitialSwitchOtherCollectData(insertData);
}
}
}
}
}
} catch (Exception e) {
log.error("解析JSON数据失败: {}, value: {}", e.getMessage(), switchDataVo.getValue(), e);
}
}
}
/**
* 交换机网卡信息数据入库
* @param switchDataVo
*/
private void handleSwitchNetMessage(CollectDataVo switchDataVo, String clientId) {
List<InitialSwitchInfo> switchInfos = SwitchJsonDataParser.parseJsonData(switchDataVo.getValue(), InitialSwitchInfo.class);
if(!switchInfos.isEmpty()){
// 根据clientId查询交换机ip
RmResourceRegistrationRemote queryParam = new RmResourceRegistrationRemote();
queryParam.setHardwareSn(clientId);
R<RmResourceRegistrationRemote> registMsgR = remoteRevenueConfigService.getListByHardwareSn(queryParam, SecurityConstants.INNER);
if(registMsgR != null){
RmResourceRegistrationRemote registMsg = registMsgR.getData();
}
// 时间戳转换
long timestamp = switchDataVo.getTimestamp();
long millis = timestamp * 1000;
Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒
String timeStr = DateUtils.parseDateToStr("yyyy-MM-dd HH:mm:ss",createTime);
// 查询临时表信息,计算实际流量值
InitialSwitchInfoTemp temp = new InitialSwitchInfoTemp();
temp.setClientId(clientId);
List<InitialSwitchInfoTemp> tempList = initialSwitchInfoTempService.selectInitialSwitchInfoTempList(temp);
if(!tempList.isEmpty()){
// 1. 构建快速查找的Map
Map<String, InitialSwitchInfoTemp> tempMap = tempList.stream()
.collect(Collectors.toMap(
InitialSwitchInfoTemp::getName,
Function.identity(),
(existing, replacement) -> existing
));
// 2. 预计算除数(避免重复创建对象)
BigDecimal divisor = new BigDecimal(300);
// 3. 计算速度
switchInfos.forEach(switchInfo -> {
switchInfo.setClientId(clientId);
switchInfo.setCreateTime(createTime);
if(registMsgR != null && registMsgR.getData()!=null && registMsgR.getData().getSnmpCollectAddr()!=null){
switchInfo.setSwitchIp(registMsgR.getData().getSnmpCollectAddr());
}
InitialSwitchInfoTemp tempInfo = tempMap.get(switchInfo.getName());
if (tempInfo != null) {
// 计算inSpeed
if (switchInfo.getInBytes() != null && tempInfo.getInBytes() != null) {
BigDecimal inDiff = switchInfo.getInBytes().subtract(tempInfo.getInBytes());
// 字节转为bit
BigDecimal inDiffBit = inDiff.multiply(new BigDecimal(8)).setScale(0, RoundingMode.HALF_UP);
switchInfo.setInSpeed(inDiffBit.divide(divisor, 2, RoundingMode.HALF_UP));
}
// 计算outSpeed
if (switchInfo.getOutBytes() != null && tempInfo.getOutBytes() != null) {
BigDecimal outDiff = switchInfo.getOutBytes().subtract(tempInfo.getOutBytes());
// 字节转为bit
BigDecimal outDiffBit = outDiff.multiply(new BigDecimal(8)).setScale(0, RoundingMode.HALF_UP);
switchInfo.setOutSpeed(outDiffBit.divide(divisor, 2, RoundingMode.HALF_UP));
}
}
});
}else{
switchInfos.forEach(switchInfo -> {
switchInfo.setClientId(clientId);
switchInfo.setCreateTime(createTime);
if(registMsgR != null && registMsgR.getData()!=null && registMsgR.getData().getSnmpCollectAddr()!=null){
switchInfo.setSwitchIp(registMsgR.getData().getSnmpCollectAddr());
}
});
}
// 清空临时表对应switch信息
initialSwitchInfoTempService.truncateSwitchInfoTemp(clientId);
// 临时表 用来计算inSpeed outSeppd
initialSwitchInfoTempService.batchInsertInitialSwitchInfoTemp(switchInfos);
// 初始交换机数据入库
initialSwitchInfoService.batchInsertInitialSwitchInfo(switchInfos);
// 业务表入库
InitialSwitchInfoDetailsRemote detailsRemote = new InitialSwitchInfoDetailsRemote();
detailsRemote.setClientId(clientId);
detailsRemote.setStartTime(timeStr);
detailsRemote.setEndTime(timeStr);
remoteRevenueConfigService.autoSaveSwitchTraffic(detailsRemote, SecurityConstants.INNER);
}else{
throw new RuntimeException("交换机data数据为空");
}
}
/**
* 系统其他信息
@@ -733,7 +459,11 @@ public class MessageHandler {
InitialHeartbeatListen heartbeat = heartbeats.get(0);
String clientId = message.getClientId();
log.debug("处理心跳消息客户端ID: {}, 时间: {}", clientId, heartbeat.getTimestamp());
// 添加逻辑节点标识
RmResourceRegistrationRemote updateData = new RmResourceRegistrationRemote();
updateData.setClientId(message.getClientId());
updateData.setLogicalNodeId(heartbeat.getLogicalNode());
remoteRevenueConfigService.innerupdateRegist(updateData, SecurityConstants.INNER);
// 使用Redis存储状态
String statusKey = HEARTBEAT_STATUS_PREFIX + clientId;
String timeKey = HEARTBEAT_TIME_PREFIX + clientId;