与agent交互方式更改,生产者,消费者更改,监控模板策略微调,拓扑管理新增接口,资源组新增查重接口。

This commit is contained in:
gaoyutao
2025-09-22 21:24:06 +08:00
parent a797147d95
commit a9270027f4
27 changed files with 948 additions and 23 deletions

View File

@@ -31,7 +31,7 @@ public class ConsumerConfig {
@Bean
public DefaultMQPushConsumer getRocketMQConsumer() {
//构建客户端连接
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerMode.getGroupName());
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerMode.getAgentGroup());
//
consumer.setNamesrvAddr(consumerMode.getNamesrvAddr());
consumer.setConsumeThreadMin(consumerMode.getConsumeThreadMin());

View File

@@ -30,7 +30,7 @@ public class ProducerConfig {
@Bean
public DefaultMQProducer getRocketMQProducer() {
producer = new DefaultMQProducer(producerMode.getGroupName());
producer = new DefaultMQProducer(producerMode.getAgentGroup());
producer.setNamesrvAddr(producerMode.getNamesrvAddr());
//如果需要同一个jvm中不同的producer往不同的mq集群发送消息需要设置不同的instanceName
if(producerMode.getMaxMessageSize()!=null){

View File

@@ -6,7 +6,9 @@ import com.ruoyi.common.core.domain.R;
import com.ruoyi.common.core.utils.DateUtils;
import com.ruoyi.common.core.utils.StringUtils;
import com.ruoyi.rocketmq.domain.*;
import com.ruoyi.rocketmq.domain.vo.RspVo;
import com.ruoyi.rocketmq.enums.MessageCodeEnum;
import com.ruoyi.rocketmq.handler.DeviceMessageHandler;
import com.ruoyi.rocketmq.producer.ConsumeException;
import com.ruoyi.rocketmq.service.*;
import com.ruoyi.rocketmq.utils.JsonDataParser;
@@ -42,9 +44,11 @@ import java.util.stream.Collectors;
@Slf4j
@Component
public class RocketMsgListener implements MessageListenerConcurrently {
// 心跳状态
private static final String HEARTBEAT_STATUS_PREFIX = "heartbeat:status:";
// 心跳时间
private static final String HEARTBEAT_TIME_PREFIX = "heartbeat:time:";
// 心跳告警
private static final String HEARTBEAT_ALERT_PREFIX = "heartbeat:alert:";
String HEARTBEAT_RECOVERY_COUNT_PREFIX = "heartbeat:recovery:count:";
private static final long HEARTBEAT_TIMEOUT = 180000; // 3分钟超时
@@ -78,6 +82,8 @@ public class RocketMsgListener implements MessageListenerConcurrently {
this.initialBandwidthTrafficService = initialBandwidthTrafficService;
this.remoteRevenueConfigService = remoteRevenueConfigService;
}
@Autowired
private DeviceMessageHandler deviceMessageHandler;
/**
* 消费消息
@@ -108,6 +114,13 @@ public class RocketMsgListener implements MessageListenerConcurrently {
log.error("消息消费三次失败,消息内容:{}", body);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//根据业务返回是否正常
}
if(MessageCodeEnum.TONGRAN_AGENT_UP.getCode().equals(topic)){
// 拿到信息
DeviceMessage message = JSON.parseObject(body, DeviceMessage.class);
// 处理消息
deviceMessageHandler.handleMessage(message);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//业务处理成功
}
// 根据不同的topic处理不同的业务 这里以订单消息为例子
if (MessageCodeEnum.AGENT_MESSAGE_TOPIC.getCode().equals(topic)) {
// 拿到信息
@@ -143,7 +156,6 @@ public class RocketMsgListener implements MessageListenerConcurrently {
default:
log.warn("未知数据类型:{}",message.getDataType());
}
//处理你的业务
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//业务处理成功
}
}
@@ -553,4 +565,38 @@ public class RocketMsgListener implements MessageListenerConcurrently {
log.error("插入心跳日志失败", e);
}
}
/**
* 应答信息
* @param message
*/
private RspVo handleResponseMessage(DeviceMessage message) {
List<RspVo> rspVoList = JsonDataParser.parseJsonData(message.getData(), RspVo.class);
if (!rspVoList.isEmpty()) {
RspVo rsp = rspVoList.get(0);
log.info("应答信息:{}",rsp);
return rsp;
}
return null;
}
/**
* 注册应答处理
* @param message
*/
// private void handleRegisterMessage(DeviceMessage message) {
// RspVo rspVo = handleResponseMessage(message);
// String clientId = message.getClientId();
// if (rspVo != null && rspVo.getResCode() == 1) {
// RmResourceRegistrationRemote rmResourceRegistrationRemote = new RmResourceRegistrationRemote();
// rmResourceRegistrationRemote.setRegistrationStatus("1");
// rmResourceRegistrationRemote.setHardwareSn(clientId);
// remoteRevenueConfigService.updateStatusByResource(rmResourceRegistrationRemote, SecurityConstants.INNER);
// }else{
// if(rspVo == null){
// log.error("注册失败:应答信息为null");
// }else{
// log.error("注册失败:{}",rspVo.getResMsg());
// }
// }
// }
}

View File

@@ -21,5 +21,11 @@ public class InitialHeartbeatListen extends BaseEntity
/** 强度值 */
@Excel(name = "强度值")
private Long strength;
/** 服务名称 */
private String name;
/** 版本 */
private String version;
/** 服务启动时间 */
private Long startupTime;
}

View File

@@ -51,5 +51,7 @@ public class RmMonitorPolicy extends BaseEntity
private Date deployTime;
/** 采集周期及id集合 */
private List<RmMonitorPolicyVo> collectionAndIdList;
/** 资源类型linux switch */
private String resourceType;
}

View File

@@ -0,0 +1,18 @@
package com.ruoyi.rocketmq.domain.vo;
import lombok.Data;
import java.time.Instant;
@Data
public class RegisterSwitchVo {
/** 服务器ip */
private String clientIp;
/** 服务器端口 */
private Integer clientPort;
/** 交换机信息 */
private String switchBoard;
/** 时间戳(秒) */
private long timestamp = Instant.now().getEpochSecond();
}

View File

@@ -0,0 +1,19 @@
package com.ruoyi.rocketmq.domain.vo;
import lombok.Data;
@Data
public class RspVo {
/**
* 状态码0、失败1、成功
*/
private Integer resCode;
/**
* 描述
*/
private String resMag;
/**
* 时间戳
*/
private Long timestamp;
}

View File

@@ -0,0 +1,13 @@
package com.ruoyi.rocketmq.domain.vo;
import lombok.Data;
@Data
public class SwitchDataVo {
/** 类型 */
private String type;
/** 数据 */
private String value;
/** 时间戳 */
private long timestamp;
}

View File

@@ -0,0 +1,19 @@
package com.ruoyi.rocketmq.domain.vo;
import lombok.Data;
@Data
public class SwitchOidVo {
private String netOID;
private String moduleOID;
private String mpuOID;
private String pwrOID;
private String fanOID;
private String otherOID;
/** 团体名 */
private String community;
/** ip地址 */
private String ip;
/** 端口 */
private Integer port;
}

View File

@@ -15,6 +15,8 @@ public enum MessageCodeEnum {
*/
AGENT_MESSAGE_TOPIC("agent_up","agent数据采集的信息topic"),
TONGRAN_AGENT_UP("tongran_agent_up","agent数据采集的信息topic"),
/**
* 系统消息
*/

View File

@@ -13,7 +13,8 @@ public class MessageTopic {
public List<String> RocketMQTopicList(){
List<String> getTopicLists=new ArrayList<>();
// agent采集消息
getTopicLists.add("agent_up");
// getTopicLists.add("agent_up");
getTopicLists.add("tongran_agent_up");
return getTopicLists;
}

View File

@@ -0,0 +1,567 @@
package com.ruoyi.rocketmq.handler;
import com.ruoyi.common.core.constant.SecurityConstants;
import com.ruoyi.common.core.domain.R;
import com.ruoyi.common.core.enums.MsgEnum;
import com.ruoyi.common.core.utils.DateUtils;
import com.ruoyi.common.core.utils.StringUtils;
import com.ruoyi.rocketmq.domain.*;
import com.ruoyi.rocketmq.domain.vo.RspVo;
import com.ruoyi.rocketmq.domain.vo.SwitchDataVo;
import com.ruoyi.rocketmq.service.*;
import com.ruoyi.rocketmq.utils.JsonDataParser;
import com.ruoyi.system.api.RemoteRevenueConfigService;
import com.ruoyi.system.api.domain.AllInterfaceNameRemote;
import com.ruoyi.system.api.domain.EpsInitialTrafficDataRemote;
import com.ruoyi.system.api.domain.InitialSwitchInfoDetailsRemote;
import com.ruoyi.system.api.domain.RmResourceRegistrationRemote;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.*;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* 设备消息处理器
*/
@Slf4j
@Component
public class DeviceMessageHandler {
private final Map<String, Consumer<DeviceMessage>> messageHandlers = new HashMap<>();
// 心跳状态
private static final String HEARTBEAT_STATUS_PREFIX = "heartbeat:status:";
// 心跳时间
private static final String HEARTBEAT_TIME_PREFIX = "heartbeat:time:";
// 心跳告警
private static final String HEARTBEAT_ALERT_PREFIX = "heartbeat:alert:";
String HEARTBEAT_RECOVERY_COUNT_PREFIX = "heartbeat:recovery:count:";
private static final long HEARTBEAT_TIMEOUT = 180000; // 3分钟超时
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Autowired
private IInitialBandwidthTrafficService initialBandwidthTrafficService;
@Autowired
private RemoteRevenueConfigService remoteRevenueConfigService;
@Autowired
private IInitialDockerInfoService initialDockerInfoService;
@Autowired
private IInitialCpuInfoService initialCpuInfoService;
@Autowired
private IInitialDiskInfoService initialDiskInfoService;
@Autowired
private IInitialMemoryInfoService initialMemoryInfoService;
@Autowired
private IInitialMountPointInfoService initialMountPointInfoService;
@Autowired
private IInitialSwitchInfoService initialSwitchInfoService;
@Autowired
private IInitialSystemInfoService initialSystemInfoService;
@Autowired
private IInitialSwitchInfoTempService initialSwitchInfoTempService;
@Autowired
private IInitialHeartbeatListenLogService initialHeartbeatListenLog;
/**
* 初始化处理器映射
*/
@PostConstruct
public void init() {
// 所有应答类消息使用同一个处理器
registerHandler(MsgEnum.注册应答.getValue(), this::handleResponseMessage);
registerHandler(MsgEnum.断开应答.getValue(), this::handleResponseMessage);
registerHandler(MsgEnum.开启系统采集应答.getValue(), this::handleResponseMessage);
registerHandler(MsgEnum.关闭系统采集应答.getValue(), this::handleResponseMessage);
registerHandler(MsgEnum.开启交换机采集应答.getValue(), this::handleResponseMessage);
registerHandler(MsgEnum.关闭交换机采集应答.getValue(), this::handleResponseMessage);
registerHandler(MsgEnum.告警设置应答.getValue(), this::handleResponseMessage);
registerHandler(MsgEnum.执行脚本策略应答.getValue(), this::handleResponseMessage);
registerHandler(MsgEnum.Agent版本更新应答.getValue(), this::handleResponseMessage);
// 其他类型消息可以单独注册处理器
// registerHandler(MsgEnum.CPU上报.getValue(), this::handleCpuMessage);
// registerHandler(MsgEnum.磁盘上报.getValue(), this::handleDiskMessage);
// registerHandler(MsgEnum.容器上报.getValue(), this::handleDockerMessage);
// registerHandler(MsgEnum.内存上报.getValue(), this::handleMemoryMessage);
// registerHandler(MsgEnum.网络上报.getValue(), this::handleNetMessage);
// registerHandler(MsgEnum.挂载上报.getValue(), this::handleMountPointMessage);
// registerHandler(MsgEnum.系统其他上报.getValue(), this::handleSystemMessage);
registerHandler(MsgEnum.交换机上报.getValue(), this::handleSwitchDataMessage);
registerHandler(MsgEnum.心跳上报.getValue(), this::handleHeartbeatMessage);
}
/**
* 注册消息处理器
*/
private void registerHandler(String dataType, Consumer<DeviceMessage> handler) {
messageHandlers.put(dataType, handler);
}
/**
* 处理设备消息(对外暴露的主方法)
*/
public void handleMessage(DeviceMessage message) {
String dataType = message.getDataType();
Consumer<DeviceMessage> handler = messageHandlers.get(dataType);
if (handler != null) {
handler.accept(message);
} else {
log.warn("未知数据类型:{}", dataType);
}
}
// ========== 具体的消息处理方法 ==========
/**
* 网络流量数据入库
* @param message
*/
private void handleNetMessage(DeviceMessage message) {
List<InitialBandwidthTraffic> interfaces = JsonDataParser.parseJsonData(message.getData(), InitialBandwidthTraffic.class);
if(!interfaces.isEmpty()){
// 时间戳转换
long timestamp = interfaces.get(0).getTimestamp();
long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp;
Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒
String timeStr = DateUtils.parseDateToStr("yyyy-MM-dd HH:mm:ss",createTime);
InitialBandwidthTraffic data = new InitialBandwidthTraffic();
interfaces.forEach(iface -> {
iface.setClientId(message.getClientId());
iface.setCreateTime(createTime);
});
// 批量入库集合
data.setList(interfaces);
// 初始流量数据入库
initialBandwidthTrafficService.batchInsert(data);
EpsInitialTrafficDataRemote epsInitialTrafficDataRemote = new EpsInitialTrafficDataRemote();
epsInitialTrafficDataRemote.setStartTime(timeStr);
epsInitialTrafficDataRemote.setEndTime(timeStr);
// 复制到业务初始库
remoteRevenueConfigService.autoSaveServiceTrafficData(epsInitialTrafficDataRemote, SecurityConstants.INNER);
}else{
throw new RuntimeException("NET流量data数据为空");
}
}
/**
* docker数据入库
* @param message
*/
private void handleDockerMessage(DeviceMessage message) {
List<InitialDockerInfo> dockers = JsonDataParser.parseJsonData(message.getData(), InitialDockerInfo.class);
if(!dockers.isEmpty()){
// 时间戳转换
long timestamp = dockers.get(0).getTimestamp();
long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp;
Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒
dockers.forEach(iface -> {
iface.setClientId(message.getClientId());
iface.setCreateTime(createTime);
});
// 初始容器数据入库
initialDockerInfoService.batchInsertInitialDockerInfo(dockers);
}else{
throw new RuntimeException("DOCKER容器data数据为空");
}
}
/**
* cpu数据入库
* @param message
*/
private void handleCpuMessage(DeviceMessage message) {
List<InitialCpuInfo> cpus = JsonDataParser.parseJsonData(message.getData(),InitialCpuInfo.class);
// 时间戳转换
long timestamp = cpus.get(0).getTimestamp();
long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp;
Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒
if(!cpus.isEmpty()){
cpus.forEach(iface -> {
iface.setClientId(message.getClientId());
iface.setCreateTime(createTime);
});
// 初始CPU数据入库
initialCpuInfoService.batchInsertInitialCpuInfo(cpus);
}else{
throw new RuntimeException("CPUdata数据为空");
}
}
/**
* 磁盘数据入库
* @param message
*/
private void handleDiskMessage(DeviceMessage message) {
List<InitialDiskInfo> disks = JsonDataParser.parseJsonData(message.getData(), InitialDiskInfo.class);
// 时间戳转换
long timestamp = disks.get(0).getTimestamp();
long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp;
Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒
if(!disks.isEmpty()){
disks.forEach(iface -> {
iface.setClientId(message.getClientId());
iface.setCreateTime(createTime);
});
// 初始磁盘数据入库
initialDiskInfoService.batchInsertInitialDiskInfo(disks);
}else{
throw new RuntimeException("磁盘data数据为空");
}
}
/**
* 内存数据入库
* @param message
*/
private void handleMemoryMessage(DeviceMessage message) {
List<InitialMemoryInfo> memorys = JsonDataParser.parseJsonData(message.getData(), InitialMemoryInfo.class);
if(!memorys.isEmpty()){
// 时间戳转换
long timestamp = memorys.get(0).getTimestamp();
long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp;
Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒
memorys.forEach(iface -> {
iface.setClientId(message.getClientId());
iface.setCreateTime(createTime);
});
// 初始内存数据入库
initialMemoryInfoService.batchInsertInitialMemoryInfo(memorys);
}else{
throw new RuntimeException("内存data数据为空");
}
}
/**
* 挂载点数据入库
* @param message
*/
private void handleMountPointMessage(DeviceMessage message) {
List<InitialMountPointInfo> mountPointInfos = JsonDataParser.parseJsonData(message.getData(), InitialMountPointInfo.class);
if(!mountPointInfos.isEmpty()){
// 时间戳转换
long timestamp = mountPointInfos.get(0).getTimestamp();
long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp;
Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒
mountPointInfos.forEach(iface -> {
iface.setClientId(message.getClientId());
iface.setCreateTime(createTime);
});
// 初始挂载点数据入库
initialMountPointInfoService.batchInsertInitialMountPointInfo(mountPointInfos);
}else{
throw new RuntimeException("挂载点data数据为空");
}
}
/**
* 交换机所有数据入库
* @param message
*/
private void handleSwitchDataMessage(DeviceMessage message) {
List<SwitchDataVo> switchData = JsonDataParser.parseJsonData(message.getData(), SwitchDataVo.class);
if(!switchData.isEmpty()){
SwitchDataVo switchDataVo = switchData.get(0);
List<InitialSwitchInfo> switchInfos = JsonDataParser.parseJsonData(switchDataVo.getValue(), InitialSwitchInfo.class);
switch(switchDataVo.getType()){
case "switchNetCollect":
// handleSwitchMessage(message);
break;
case "switchPwrCollect":
break;
case "switchModuleCollect":
break;
case "switchMpuCollect":
break;
case "switchFanCollect":
break;
default:
break;
}
}else{
throw new RuntimeException("交换机data数据为空");
}
}
/**
* 交换机数据入库
* @param message
*/
private void handleSwitchMessage(DeviceMessage message) {
List<InitialSwitchInfo> switchInfos = JsonDataParser.parseJsonData(message.getData(), InitialSwitchInfo.class);
if(!switchInfos.isEmpty()){
// 时间戳转换
long timestamp = switchInfos.get(0).getTimestamp();
long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp;
Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒
String timeStr = DateUtils.parseDateToStr("yyyy-MM-dd HH:mm:ss",createTime);
// 查询临时表信息,计算实际流量值
InitialSwitchInfoTemp temp = new InitialSwitchInfoTemp();
temp.setClientId(message.getClientId());
List<InitialSwitchInfoTemp> tempList = initialSwitchInfoTempService.selectInitialSwitchInfoTempList(temp);
if(!tempList.isEmpty()){
// 1. 构建快速查找的Map
Map<String, InitialSwitchInfoTemp> tempMap = tempList.stream()
.collect(Collectors.toMap(
InitialSwitchInfoTemp::getName,
Function.identity(),
(existing, replacement) -> existing
));
// 2. 预计算除数(避免重复创建对象)
BigDecimal divisor = new BigDecimal(300);
// 3. 计算速度
switchInfos.forEach(switchInfo -> {
switchInfo.setClientId(message.getClientId());
switchInfo.setCreateTime(createTime);
InitialSwitchInfoTemp tempInfo = tempMap.get(switchInfo.getName());
if (tempInfo != null) {
// 计算inSpeed
if (switchInfo.getInBytes() != null && tempInfo.getInBytes() != null) {
BigDecimal inDiff = switchInfo.getInBytes().subtract(tempInfo.getInBytes());
switchInfo.setInSpeed(inDiff.divide(divisor, 2, RoundingMode.HALF_UP));
}
// 计算outSpeed
if (switchInfo.getOutBytes() != null && tempInfo.getOutBytes() != null) {
BigDecimal outDiff = switchInfo.getOutBytes().subtract(tempInfo.getOutBytes());
switchInfo.setOutSpeed(outDiff.divide(divisor, 2, RoundingMode.HALF_UP));
}
}
});
}else{
switchInfos.forEach(switchInfo -> {
switchInfo.setClientId(message.getClientId());
switchInfo.setCreateTime(createTime);
});
}
// 清空临时表对应switch信息
initialSwitchInfoTempService.truncateSwitchInfoTemp(message.getClientId());
// 临时表 用来计算inSpeed outSeppd
initialSwitchInfoTempService.batchInsertInitialSwitchInfoTemp(switchInfos);
// 初始交换机数据入库
initialSwitchInfoService.batchInsertInitialSwitchInfo(switchInfos);
// 业务表入库
InitialSwitchInfoDetailsRemote detailsRemote = new InitialSwitchInfoDetailsRemote();
detailsRemote.setClientId(message.getClientId());
detailsRemote.setStartTime(timeStr);
detailsRemote.setEndTime(timeStr);
remoteRevenueConfigService.autoSaveSwitchTraffic(detailsRemote, SecurityConstants.INNER);
}else{
throw new RuntimeException("交换机data数据为空");
}
}
/**
* 系统数据入库
* @param message
*/
private void handleSystemMessage(DeviceMessage message) {
List<InitialSystemInfo> systemInfos = JsonDataParser.parseJsonData(message.getData(), InitialSystemInfo.class);
if(!systemInfos.isEmpty()){
// 时间戳转换
long timestamp = systemInfos.get(0).getTimestamp();
long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp;
Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒
systemInfos.forEach(iface -> {
iface.setClientId(message.getClientId());
iface.setCreateTime(createTime);
});
// 初始系统数据入库
initialSystemInfoService.batchInsertInitialSystemInfo(systemInfos);
}else{
throw new RuntimeException("系统data数据为空");
}
}
/**
* 监听心跳
* @param message
*/
private void handleHeartbeatMessage(DeviceMessage message) {
List<InitialHeartbeatListen> heartbeats = JsonDataParser.parseJsonData(message.getData(), InitialHeartbeatListen.class);
if(!heartbeats.isEmpty()){
InitialHeartbeatListen heartbeat = heartbeats.get(0);
String clientId = message.getClientId();
log.debug("处理心跳消息客户端ID: {}, 时间: {}", clientId, heartbeat.getTimestamp());
// 使用Redis存储状态
String statusKey = HEARTBEAT_STATUS_PREFIX + clientId;
String timeKey = HEARTBEAT_TIME_PREFIX + clientId;
String recoveryCountKey = HEARTBEAT_RECOVERY_COUNT_PREFIX + clientId; // 恢复次数计数器
try {
// 重置丢失计数为0设置最后心跳时间
redisTemplate.opsForValue().set(statusKey, "0");
redisTemplate.opsForValue().set(timeKey, String.valueOf(System.currentTimeMillis()));
// 检查是否之前有告警状态
if (Boolean.TRUE.equals(redisTemplate.hasKey(HEARTBEAT_ALERT_PREFIX + clientId))) {
// 获取当前恢复次数
String recoveryCountStr = redisTemplate.opsForValue().get(recoveryCountKey);
int recoveryCount = (recoveryCountStr == null) ? 1 : Integer.parseInt(recoveryCountStr) + 1;
if (recoveryCount == 2) {
// 达到2次恢复执行状态修改
log.warn("客户端ID: {} 心跳恢复达到2次修改设备状态为在线", clientId);
insertHeartbeatLog(clientId, "2", "心跳恢复,设备在线状态改为在线");
redisTemplate.delete(HEARTBEAT_ALERT_PREFIX + clientId);
redisTemplate.delete(recoveryCountKey); // 清除恢复计数器
// 修改资源状态
getResourceMsg(clientId, "1");
} else {
// 未达到2次只记录恢复次数
log.info("客户端ID: {} 心跳恢复第{}次", clientId, recoveryCount);
redisTemplate.opsForValue().set(recoveryCountKey, String.valueOf(recoveryCount));
}
}
} catch (Exception e) {
log.error("处理心跳消息异常, clientId: {}", clientId, e);
}
}
}
// 添加一个定时任务方法,定期检查心跳状态
@Scheduled(fixedRate = 60000) // 每分钟检查一次
public void checkHeartbeatStatus() {
long currentTime = System.currentTimeMillis();
// 获取所有客户端时间键
Set<String> timeKeys = redisTemplate.keys(HEARTBEAT_TIME_PREFIX + "*");
if (timeKeys == null) return;
for (String timeKey : timeKeys) {
String clientId = timeKey.substring(HEARTBEAT_TIME_PREFIX.length());
String statusKey = HEARTBEAT_STATUS_PREFIX + clientId;
try {
String lastTimeStr = redisTemplate.opsForValue().get(timeKey);
if (lastTimeStr == null) continue;
long lastHeartbeatTime = Long.parseLong(lastTimeStr);
if (currentTime - lastHeartbeatTime > HEARTBEAT_TIMEOUT) {
// 心跳超时处理
String lostCountStr = redisTemplate.opsForValue().get(statusKey);
int lostCount = (lostCountStr == null ? 0 : Integer.parseInt(lostCountStr)) + 1;
redisTemplate.opsForValue().set(statusKey, String.valueOf(lostCount));
log.warn("客户端ID: {} 心跳丢失,连续次数: {}", clientId, lostCount);
if (lostCount == 3) {
insertHeartbeatLog(clientId, "3", "连续三次心跳丢失");
redisTemplate.opsForValue().set(HEARTBEAT_ALERT_PREFIX + clientId, "1");
// 修改资源状态
getResourceMsg(clientId, "0");
}
}
} catch (Exception e) {
log.error("检查心跳状态异常, clientId: {}", clientId, e);
}
}
}
/**
* 修改资源在线状态
* @param clientId
* @param status
*/
private void getResourceMsg(String clientId, String status){
String ipAddress = null;
AllInterfaceNameRemote interfaceNameRemote = new AllInterfaceNameRemote();
interfaceNameRemote.setClientId(clientId);
// 1. 先获取交换机IP
interfaceNameRemote.setResourceType("2");
R<AllInterfaceNameRemote> switchResult = remoteRevenueConfigService.getMsgByClientId(
interfaceNameRemote, SecurityConstants.INNER);
if (switchResult != null && switchResult.getData() != null &&
StringUtils.isNotEmpty(switchResult.getData().getSwitchIp())) {
// 更新交换机状态
ipAddress = switchResult.getData().getSwitchIp();
updateResourceStatus(ipAddress, status);
// 2. 再获取服务器IP
interfaceNameRemote.setResourceType("1");
R<AllInterfaceNameRemote> serverResult = remoteRevenueConfigService.getMsgByClientId(
interfaceNameRemote, SecurityConstants.INNER);
if (serverResult != null && serverResult.getData() != null &&
StringUtils.isNotEmpty(serverResult.getData().getServerIp())) {
// 更新服务器状态
updateResourceStatus(serverResult.getData().getServerIp(), status);
}
} else {
// 3. 如果没有交换机IP只获取服务器IP
interfaceNameRemote.setResourceType("1");
R<AllInterfaceNameRemote> serverResult = remoteRevenueConfigService.getMsgByClientId(
interfaceNameRemote, SecurityConstants.INNER);
if (serverResult != null && serverResult.getData() != null &&
StringUtils.isNotEmpty(serverResult.getData().getServerIp())) {
// 更新服务器状态
updateResourceStatus(serverResult.getData().getServerIp(), status);
} else {
log.warn("未找到客户端ID: {} 对应的IP地址", clientId);
}
}
}
// 更新资源状态的公共方法
private void updateResourceStatus(String ipAddress, String status) {
RmResourceRegistrationRemote rmResourceRegistrationRemote = new RmResourceRegistrationRemote();
rmResourceRegistrationRemote.setOnlineStatus(status);
rmResourceRegistrationRemote.setIpAddress(ipAddress);
remoteRevenueConfigService.updateStatusByResource(rmResourceRegistrationRemote, SecurityConstants.INNER);
}
// 插入心跳日志到数据库
private void insertHeartbeatLog(String machineId, String status, String remark) {
try {
InitialHeartbeatListenLog listenLog = new InitialHeartbeatListenLog();
listenLog.setClientId(machineId);
listenLog.setStatus(status); // 0-离线 1-在线 2-恢复 3-三次丢失
listenLog.setRemark(remark);
listenLog.setCreateTime(new Date());
// 调用DAO或Service插入日志
initialHeartbeatListenLog.insertInitialHeartbeatListenLog(listenLog);
log.info("已记录心跳日志客户端ID: {}, 状态: {}", machineId, status);
} catch (Exception e) {
log.error("插入心跳日志失败", e);
}
}
/**
* 应答信息
* @param message
*/
private RspVo handleResponseMessage(DeviceMessage message) {
List<RspVo> rspVoList = JsonDataParser.parseJsonData(message.getData(), RspVo.class);
if (!rspVoList.isEmpty()) {
RspVo rsp = rspVoList.get(0);
log.info("应答信息:{}",rsp);
return rsp;
}
return null;
}
/**
* 注册应答处理
* @param message
*/
// private void handleRegisterMessage(DeviceMessage message) {
// RspVo rspVo = handleResponseMessage(message);
// String clientId = message.getClientId();
// if (rspVo != null && rspVo.getResCode() == 1) {
// RmResourceRegistrationRemote rmResourceRegistrationRemote = new RmResourceRegistrationRemote();
// rmResourceRegistrationRemote.setRegistrationStatus("1");
// rmResourceRegistrationRemote.setHardwareSn(clientId);
// remoteRevenueConfigService.updateStatusByResource(rmResourceRegistrationRemote, SecurityConstants.INNER);
// }else{
// if(rspVo == null){
// log.error("注册失败:应答信息为null");
// }else{
// log.error("注册失败:{}",rspVo.getResMsg());
// }
// }
// }
}

View File

@@ -23,8 +23,8 @@ public class ConsumerMode {
private int consumeThreadMax;
@Value("${suning.rocketmq.conumer.consumeMessageBatchMaxSize}")
private int consumeMessageBatchMaxSize;
@Value("${suning.rocketmq.producer.agentTopic}")
@Value("${suning.rocketmq.conumer.agentTopic}")
private String agentTopic;
@Value("${suning.rocketmq.producer.agentGroup}")
@Value("${suning.rocketmq.conumer.agentGroup}")
private String agentGroup;
}

View File

@@ -5,10 +5,13 @@ import com.ruoyi.common.core.enums.MsgEnum;
import com.ruoyi.common.core.utils.DateUtils;
import com.ruoyi.rocketmq.domain.*;
import com.ruoyi.rocketmq.domain.vo.CollectVo;
import com.ruoyi.rocketmq.domain.vo.RegisterSwitchVo;
import com.ruoyi.rocketmq.domain.vo.RmMonitorPolicyVo;
import com.ruoyi.rocketmq.domain.vo.SwitchOidVo;
import com.ruoyi.rocketmq.mapper.RmMonitorPolicyMapper;
import com.ruoyi.rocketmq.mapper.RmMonitorTemplateMapper;
import com.ruoyi.rocketmq.mapper.RmTemplateLinuxMapper;
import com.ruoyi.rocketmq.mapper.RmTemplateSwitchMapper;
import com.ruoyi.rocketmq.model.ProducerMode;
import com.ruoyi.rocketmq.producer.MessageProducer;
import com.ruoyi.rocketmq.service.IRmMonitorPolicyService;
@@ -46,6 +49,8 @@ public class RmMonitorPolicyServiceImpl implements IRmMonitorPolicyService
@Autowired
private RmTemplateLinuxMapper rmTemplateLinuxMapper;
@Autowired
private RmTemplateSwitchMapper rmTemplateSwitchMapper;
@Autowired
private RmMonitorTemplateMapper rmMonitorTemplateMapper;
@Autowired
private DataProcessUtil dataProcessUtil;
@@ -137,12 +142,22 @@ public class RmMonitorPolicyServiceImpl implements IRmMonitorPolicyService
// 拿到采集周期
List<RmMonitorPolicyVo> collectionAndIdList = rmMonitorPolicy.getCollectionAndIdList();
if(!collectionAndIdList.isEmpty()){
for (RmMonitorPolicyVo rmMonitorPolicyVo : collectionAndIdList) {
// 添加采集周期
RmTemplateLinux rmTemplateLinux = new RmTemplateLinux();
rmTemplateLinux.setId(rmMonitorPolicyVo.getId());
rmTemplateLinux.setCollectionCycle(rmMonitorPolicyVo.getCollectionCycle());
rmTemplateLinuxMapper.updateRmTemplateLinux(rmTemplateLinux);
if("linux".equals(rmMonitorPolicy.getResourceType())){
for (RmMonitorPolicyVo rmMonitorPolicyVo : collectionAndIdList) {
// 添加采集周期
RmTemplateLinux rmTemplateLinux = new RmTemplateLinux();
rmTemplateLinux.setId(rmMonitorPolicyVo.getId());
rmTemplateLinux.setCollectionCycle(rmMonitorPolicyVo.getCollectionCycle());
rmTemplateLinuxMapper.updateRmTemplateLinux(rmTemplateLinux);
}
}else{
for (RmMonitorPolicyVo rmMonitorPolicyVo : collectionAndIdList) {
// 添加采集周期
RmTemplateSwitch rmTemplateSwitch = new RmTemplateSwitch();
rmTemplateSwitch.setId(rmMonitorPolicyVo.getId());
rmTemplateSwitch.setCollectionCycle(rmMonitorPolicyVo.getCollectionCycle());
rmTemplateSwitchMapper.updateRmTemplateSwitch(rmTemplateSwitch);
}
}
}
return 1;
@@ -197,10 +212,19 @@ public class RmMonitorPolicyServiceImpl implements IRmMonitorPolicyService
// 构建并发送采集配置
Map<String, Object> policyDetails = getRmMonitorPolicyMsgById(id);
boolean isSwitch = false;
if(policyDetails.get("switch") != null){
isSwitch = true;
}
SwitchOidVo switchOidVo = buildOids(policyDetails);
List<CollectVo> collectVos = buildCollectConfigurations(policyDetails);
// 去重
List<CollectVo> uniqueList = collectVos.stream().distinct().collect(Collectors.toList());
sendConfigurationsToDevices(devices, uniqueList);
if(isSwitch){
sendSwitchConfigurationsToDevices(devices, uniqueList, switchOidVo);
}else{
sendConfigurationsToDevices(devices, uniqueList);
}
// 更新策略状态为已下发
RmMonitorPolicy policyUpdate = new RmMonitorPolicy();
policyUpdate.setId(id);
@@ -214,6 +238,14 @@ public class RmMonitorPolicyServiceImpl implements IRmMonitorPolicyService
}
}
private SwitchOidVo buildOids(Map<String, Object> policyDetails) {
// 处理Switch配置
Map<String, List<RmTemplateSwitch>> switchConfigs = (Map<String, List<RmTemplateSwitch>>) policyDetails.get("switch");
// 拿到所有oid
SwitchOidVo switchOidVo = processSwitchOids(switchConfigs, new String[]{"switchOther", "switchMpu", "switchPwr", "switchNet", "switchModule", "switchFan"});
return switchOidVo;
}
/**
* 构建采集配置
@@ -269,6 +301,86 @@ public class RmMonitorPolicyServiceImpl implements IRmMonitorPolicyService
}
}
}
private <T> SwitchOidVo processSwitchOids(Map<String, List<T>> configs, String[] types) {
Map<String, Map<String, String>> resultMap = new HashMap<>();
// 初始化各个OID类型的Map
resultMap.put("otherOID", new HashMap<>());
resultMap.put("netOID", new HashMap<>());
resultMap.put("moduleOID", new HashMap<>());
resultMap.put("mpuOID", new HashMap<>());
resultMap.put("pwrOID", new HashMap<>());
resultMap.put("fanOID", new HashMap<>());
for (String type : types) {
if (configs.get(type) != null) {
for (T config : configs.get(type)) {
if (config instanceof RmTemplateSwitch) {
RmTemplateSwitch switchConfig = (RmTemplateSwitch) config;
String oid = switchConfig.getOid();
String metricKey = switchConfig.getMetricKey();
switch(type) {
case "switchOther":
resultMap.get("otherOID").put(oid, metricKey);
break;
case "switchMpu":
resultMap.get("mpuOID").put(oid, metricKey);
break;
case "switchPwr":
resultMap.get("pwrOID").put(oid, metricKey);
break;
case "switchNet":
if(oid != null && oid != "null"){
resultMap.get("netOID").put(oid, metricKey);
}
break;
case "switchModule":
resultMap.get("moduleOID").put(oid, metricKey);
break;
case "switchFan":
resultMap.get("fanOID").put(oid, metricKey);
break;
default:
break;
}
}
}
}
}
SwitchOidVo switchOidVo = new SwitchOidVo();
// 将Map转换为字符串格式
switchOidVo.setOtherOID(mapToString(resultMap.get("otherOID")));
switchOidVo.setNetOID(mapToString(resultMap.get("netOID")));
switchOidVo.setModuleOID(mapToString(resultMap.get("moduleOID")));
switchOidVo.setMpuOID(mapToString(resultMap.get("mpuOID")));
switchOidVo.setPwrOID(mapToString(resultMap.get("pwrOID")));
switchOidVo.setFanOID(mapToString(resultMap.get("fanOID")));
return switchOidVo;
}
// 将Map转换为字符串格式{"oid1":"metric1","oid2":"metric2"}
private String mapToString(Map<String, String> map) {
if (map == null || map.isEmpty()) {
return "{}";
}
StringBuilder sb = new StringBuilder("{");
boolean first = true;
for (Map.Entry<String, String> entry : map.entrySet()) {
if (!first) {
sb.append(",");
}
sb.append("\"").append(entry.getKey()).append("\":\"").append(entry.getValue()).append("\"");
first = false;
}
sb.append("}");
return sb.toString();
}
/**
* 发送配置到设备
@@ -287,6 +399,59 @@ public class RmMonitorPolicyServiceImpl implements IRmMonitorPolicyService
message.setData(configJson);
message.setDataType(MsgEnum.开启系统采集.getValue());
messageProducer.sendAsyncProducerMessage(
producerMode.getAgentTopic(),
"",
"",
JSONObject.toJSONString(message)
);
} catch (Exception e) {
log.error("发送设备配置失败deviceId: {}", device.getHardwareSn(), e);
}
}
}
/**
* 发送配置到设备
*/
private void sendSwitchConfigurationsToDevices(List<RmResourceRegistrationRemote> devices, List<CollectVo> collectVos, SwitchOidVo switchOidVo) {
MessageProducer messageProducer = new MessageProducer();
Map map = new HashMap();
map.put("collects", collectVos);
map.put("timestamp", Instant.now().getEpochSecond());
String configJson = JSONObject.toJSONString(map);
for (RmResourceRegistrationRemote device : devices) {
try {
String clientIp = device.getIpAddress();
String clientPort = device.getResourcePort();
// 交换机信息
String switchIp = device.getSnmpCollectAddr();
String switchPort = device.getSnmpCollectPort();
String community = device.getTeamName();
switchOidVo.setIp(switchIp);
switchOidVo.setCommunity(community);
switchOidVo.setPort(Integer.valueOf(switchPort));
RegisterSwitchVo registerSwitchVo = new RegisterSwitchVo();
registerSwitchVo.setClientIp(clientIp);
registerSwitchVo.setClientPort(Integer.valueOf(clientPort));
registerSwitchVo.setSwitchBoard(JSONObject.toJSONString(switchOidVo));
DeviceMessage registerMessage = new DeviceMessage();
registerMessage.setDataType(MsgEnum.注册.getValue());
registerMessage.setData(JSONObject.toJSONString(registerSwitchVo));
registerMessage.setClientId(device.getHardwareSn());
// 交换机注册
messageProducer.sendAsyncProducerMessage(
producerMode.getAgentTopic(),
"",
"",
JSONObject.toJSONString(registerMessage)
);
// 交换机采集
DeviceMessage message = new DeviceMessage();
message.setClientId(device.getHardwareSn());
message.setData(configJson);
message.setDataType(MsgEnum.开启交换机采集.getValue());
messageProducer.sendAsyncProducerMessage(
producerMode.getAgentTopic(),
"",