心跳监测内容修改,ip对应数据改为clientId对应硬件SN,监控模板,策略bug修改

This commit is contained in:
gaoyutao
2025-09-25 19:12:52 +08:00
parent 939d8d0286
commit cc9593b4bf
28 changed files with 1447 additions and 762 deletions

View File

@@ -77,6 +77,13 @@ public interface EpsServerRevenueConfigMapper
* @return 服务器收益方式配置
*/
public Map getNodeMsgByIp(@Param("ipAddress") String ipAddress);
/**
* 根据sn查询服务器信息
*
* @param hardwareSn 硬件SN
* @return 服务器收益方式配置
*/
public Map getNodeMsgBySn(@Param("hardwareSn") String hardwareSn);
int updateEpsServerRevenueConfigByServerSn(EpsServerRevenueConfig epsServerRevenueConfig);

View File

@@ -173,13 +173,8 @@ public class EpsServerRevenueConfigServiceImpl implements IEpsServerRevenueConfi
List<EpsInitialTrafficData> dataList = epsInitialTrafficDataService.getAllTraficMsg(epsInitialTrafficData);
List<EpsInitialTrafficData> batchList = new ArrayList<>();
for (EpsInitialTrafficData initialTrafficData : dataList) {
// 根据ip查询节点名称
String ip = initialTrafficData.getIpV4() == null ? "" : initialTrafficData.getIpV4();
String mac = initialTrafficData.getMac() == null ? "" : initialTrafficData.getMac();
Map nodeMsg = epsServerRevenueConfigMapper.getNodeMsgByIp(ip);
if(nodeMsg == null){
nodeMsg = epsServerRevenueConfigMapper.getNodeMsgByIp(mac);
}
// 根据clientId查询节点名称
Map nodeMsg = epsServerRevenueConfigMapper.getNodeMsgBySn(initialTrafficData.getClientId());
// 赋值
if(nodeMsg != null){
initialTrafficData.setServiceSn(getNullableString(nodeMsg, "hardwareSn"));

View File

@@ -134,10 +134,10 @@ public class InitialSwitchInfoDetailsServiceImpl implements IInitialSwitchInfoDe
details.setId(null);
// 根据接口名称查询交换机信息
String interfaceName = details.getName();
String switchIp = details.getSwitchIp();
String switchSn = details.getSwitchSn();
RmEpsTopologyManagement rmEpsTopologyManagement = new RmEpsTopologyManagement();
rmEpsTopologyManagement.setInterfaceName(interfaceName);
rmEpsTopologyManagement.setSwitchIpAddress(switchIp);
rmEpsTopologyManagement.setSwitchSn(switchSn);
List<RmEpsTopologyManagement> managements = rmEpsTopologyManagementMapper.selectRmEpsTopologyManagementList(rmEpsTopologyManagement);
// 赋值
if(!managements.isEmpty()){

View File

@@ -54,7 +54,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
</select>
<insert id="insertAllInterfaceName" parameterType="AllInterfaceName" useGeneratedKeys="true" keyProperty="id">
insert into all_interface_name
insert IGNORE into all_interface_name
<trim prefix="(" suffix=")" suffixOverrides=",">
<if test="clientId != null and clientId != ''">client_id,</if>
<if test="interfaceName != null and interfaceName != ''">interface_name,</if>
@@ -166,7 +166,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
<!-- 批量插入接口名称 -->
<insert id="batchInsert" parameterType="java.util.List">
INSERT INTO all_interface_name
INSERT IGNORE INTO all_interface_name
(
interface_name,
client_id,

View File

@@ -148,6 +148,27 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
and rrr.ip_address = #{ipAddress}
</where>
</select>
<select id="getNodeMsgBySn" parameterType="String" resultType="java.util.Map">
SELECT
rrr.hardware_sn AS hardwareSn,
rrr.resource_name AS resourceName,
esrc.revenue_method AS revenueMethod,
esrc.business_code AS businessCode,
esrc.business_name AS businessName,
esrc.package_bandwidth AS packageBandwidth
FROM
rm_resource_registration rrr
LEFT JOIN
eps_server_revenue_config esrc
ON
rrr.hardware_sn = esrc.hardware_sn
<where>
and rrr.registration_status = '1'
and rrr.hardware_sn = #{hardwareSn}
</where>
</select>
<select id="countBySn" parameterType="String">
<include refid="selectEpsServerRevenueConfigVo"/>
where hardware_sn = #{hardwareSn}

View File

@@ -248,7 +248,6 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
<update id="updateStatusByResource" parameterType="RmResourceRegistration">
update rm_resource_registration
<trim prefix="SET" suffixOverrides=",">
<if test="hardwareSn != null">hardware_sn = #{hardwareSn},</if>
<if test="resourceType != null">resource_type = #{resourceType},</if>
<if test="resourceName != null">resource_name = #{resourceName},</if>
<if test="ipAddress != null">ip_address = #{ipAddress},</if>
@@ -280,7 +279,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
<if test="updaterId != null">updater_id = #{updaterId},</if>
<if test="updaterName != null">updater_name = #{updaterName},</if>
</trim>
where ip_address = #{ipAddress}
where hardware_sn = #{hardwareSn}
</update>
<select id="getRegistrationByIds" parameterType="String" resultMap="RmResourceRegistrationResult">
<include refid="selectRmResourceRegistrationVo"/>

View File

@@ -1,42 +1,21 @@
package com.ruoyi.rocketmq.consumer;
import com.alibaba.fastjson.JSON;
import com.ruoyi.common.core.constant.SecurityConstants;
import com.ruoyi.common.core.domain.R;
import com.ruoyi.common.core.utils.DateUtils;
import com.ruoyi.common.core.utils.StringUtils;
import com.ruoyi.rocketmq.domain.*;
import com.ruoyi.rocketmq.domain.vo.RspVo;
import com.ruoyi.rocketmq.domain.DeviceMessage;
import com.ruoyi.rocketmq.enums.MessageCodeEnum;
import com.ruoyi.rocketmq.handler.DeviceMessageHandler;
import com.ruoyi.rocketmq.producer.ConsumeException;
import com.ruoyi.rocketmq.service.*;
import com.ruoyi.rocketmq.utils.JsonDataParser;
import com.ruoyi.system.api.RemoteRevenueConfigService;
import com.ruoyi.system.api.domain.AllInterfaceNameRemote;
import com.ruoyi.system.api.domain.EpsInitialTrafficDataRemote;
import com.ruoyi.system.api.domain.InitialSwitchInfoDetailsRemote;
import com.ruoyi.system.api.domain.RmResourceRegistrationRemote;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* 消息监听
@@ -44,44 +23,6 @@ import java.util.stream.Collectors;
@Slf4j
@Component
public class RocketMsgListener implements MessageListenerConcurrently {
// 心跳状态
private static final String HEARTBEAT_STATUS_PREFIX = "heartbeat:status:";
// 心跳时间
private static final String HEARTBEAT_TIME_PREFIX = "heartbeat:time:";
// 心跳告警
private static final String HEARTBEAT_ALERT_PREFIX = "heartbeat:alert:";
String HEARTBEAT_RECOVERY_COUNT_PREFIX = "heartbeat:recovery:count:";
private static final long HEARTBEAT_TIMEOUT = 180000; // 3分钟超时
@Autowired
private RedisTemplate<String, String> redisTemplate;
private final IInitialBandwidthTrafficService initialBandwidthTrafficService;
private final RemoteRevenueConfigService remoteRevenueConfigService;
@Autowired
private IInitialDockerInfoService initialDockerInfoService;
@Autowired
private IInitialCpuInfoService initialCpuInfoService;
@Autowired
private IInitialDiskInfoService initialDiskInfoService;
@Autowired
private IInitialMemoryInfoService initialMemoryInfoService;
@Autowired
private IInitialMountPointInfoService initialMountPointInfoService;
@Autowired
private IInitialSwitchInfoService initialSwitchInfoService;
@Autowired
private IInitialSystemInfoService initialSystemInfoService;
@Autowired
private IInitialSwitchInfoTempService initialSwitchInfoTempService;
@Autowired
private IInitialHeartbeatListenLogService initialHeartbeatListenLog;
@Autowired
public RocketMsgListener(IInitialBandwidthTrafficService initialBandwidthTrafficService,
RemoteRevenueConfigService remoteRevenueConfigService) {
this.initialBandwidthTrafficService = initialBandwidthTrafficService;
this.remoteRevenueConfigService = remoteRevenueConfigService;
}
@Autowired
private DeviceMessageHandler deviceMessageHandler;
@@ -122,42 +63,6 @@ public class RocketMsgListener implements MessageListenerConcurrently {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//业务处理成功
}
// 根据不同的topic处理不同的业务 这里以订单消息为例子
if (MessageCodeEnum.AGENT_MESSAGE_TOPIC.getCode().equals(topic)) {
// 拿到信息
DeviceMessage message = JSON.parseObject(body, DeviceMessage.class);
switch (message.getDataType()){
case "NET":
handleNetMessage(message);
break;
case "CPU":
handleCpuMessage(message);
break;
case "SYSTEM":
handleSystemMessage(message);
break;
case "DISK":
handleDiskMessage(message);
break;
case "POINT":
handleMountPointMessage(message);
break;
case "MEMORY":
handleMemoryMessage(message);
break;
case "DOCKER":
handleDockerMessage(message);
break;
case "SWITCHBOARD":
handleSwitchMessage(message);
break;
case "HEARTBEAT":
handleHeartbeatMessage(message);
break;
default:
log.warn("未知数据类型:{}",message.getDataType());
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//业务处理成功
}
}
}
// 消息消费失败
@@ -186,417 +91,4 @@ public class RocketMsgListener implements MessageListenerConcurrently {
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
/**
* 网络流量数据入库
* @param message
*/
private void handleNetMessage(DeviceMessage message) {
List<InitialBandwidthTraffic> interfaces = JsonDataParser.parseJsonData(message.getData(), InitialBandwidthTraffic.class);
if(!interfaces.isEmpty()){
// 时间戳转换
long timestamp = interfaces.get(0).getTimestamp();
long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp;
Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒
String timeStr = DateUtils.parseDateToStr("yyyy-MM-dd HH:mm:ss",createTime);
InitialBandwidthTraffic data = new InitialBandwidthTraffic();
interfaces.forEach(iface -> {
iface.setClientId(message.getClientId());
iface.setCreateTime(createTime);
});
// 批量入库集合
data.setList(interfaces);
// 初始流量数据入库
initialBandwidthTrafficService.batchInsert(data);
EpsInitialTrafficDataRemote epsInitialTrafficDataRemote = new EpsInitialTrafficDataRemote();
epsInitialTrafficDataRemote.setStartTime(timeStr);
epsInitialTrafficDataRemote.setEndTime(timeStr);
// 复制到业务初始库
remoteRevenueConfigService.autoSaveServiceTrafficData(epsInitialTrafficDataRemote, SecurityConstants.INNER);
}else{
throw new RuntimeException("NET流量data数据为空");
}
}
/**
* docker数据入库
* @param message
*/
private void handleDockerMessage(DeviceMessage message) {
List<InitialDockerInfo> dockers = JsonDataParser.parseJsonData(message.getData(), InitialDockerInfo.class);
if(!dockers.isEmpty()){
// 时间戳转换
long timestamp = dockers.get(0).getTimestamp();
long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp;
Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒
dockers.forEach(iface -> {
iface.setClientId(message.getClientId());
iface.setCreateTime(createTime);
});
// 初始容器数据入库
initialDockerInfoService.batchInsertInitialDockerInfo(dockers);
}else{
throw new RuntimeException("DOCKER容器data数据为空");
}
}
/**
* cpu数据入库
* @param message
*/
private void handleCpuMessage(DeviceMessage message) {
List<InitialCpuInfo> cpus = JsonDataParser.parseJsonData(message.getData(),InitialCpuInfo.class);
// 时间戳转换
long timestamp = cpus.get(0).getTimestamp();
long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp;
Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒
if(!cpus.isEmpty()){
cpus.forEach(iface -> {
iface.setClientId(message.getClientId());
iface.setCreateTime(createTime);
});
// 初始CPU数据入库
initialCpuInfoService.batchInsertInitialCpuInfo(cpus);
}else{
throw new RuntimeException("CPUdata数据为空");
}
}
/**
* 磁盘数据入库
* @param message
*/
private void handleDiskMessage(DeviceMessage message) {
List<InitialDiskInfo> disks = JsonDataParser.parseJsonData(message.getData(), InitialDiskInfo.class);
// 时间戳转换
long timestamp = disks.get(0).getTimestamp();
long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp;
Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒
if(!disks.isEmpty()){
disks.forEach(iface -> {
iface.setClientId(message.getClientId());
iface.setCreateTime(createTime);
});
// 初始磁盘数据入库
initialDiskInfoService.batchInsertInitialDiskInfo(disks);
}else{
throw new RuntimeException("磁盘data数据为空");
}
}
/**
* 内存数据入库
* @param message
*/
private void handleMemoryMessage(DeviceMessage message) {
List<InitialMemoryInfo> memorys = JsonDataParser.parseJsonData(message.getData(), InitialMemoryInfo.class);
if(!memorys.isEmpty()){
// 时间戳转换
long timestamp = memorys.get(0).getTimestamp();
long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp;
Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒
memorys.forEach(iface -> {
iface.setClientId(message.getClientId());
iface.setCreateTime(createTime);
});
// 初始内存数据入库
initialMemoryInfoService.batchInsertInitialMemoryInfo(memorys);
}else{
throw new RuntimeException("内存data数据为空");
}
}
/**
* 挂载点数据入库
* @param message
*/
private void handleMountPointMessage(DeviceMessage message) {
List<InitialMountPointInfo> mountPointInfos = JsonDataParser.parseJsonData(message.getData(), InitialMountPointInfo.class);
if(!mountPointInfos.isEmpty()){
// 时间戳转换
long timestamp = mountPointInfos.get(0).getTimestamp();
long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp;
Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒
mountPointInfos.forEach(iface -> {
iface.setClientId(message.getClientId());
iface.setCreateTime(createTime);
});
// 初始挂载点数据入库
initialMountPointInfoService.batchInsertInitialMountPointInfo(mountPointInfos);
}else{
throw new RuntimeException("挂载点data数据为空");
}
}
/**
* 交换机数据入库
* @param message
*/
private void handleSwitchMessage(DeviceMessage message) {
List<InitialSwitchInfo> switchInfos = JsonDataParser.parseJsonData(message.getData(), InitialSwitchInfo.class);
if(!switchInfos.isEmpty()){
// 时间戳转换
long timestamp = switchInfos.get(0).getTimestamp();
long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp;
Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒
String timeStr = DateUtils.parseDateToStr("yyyy-MM-dd HH:mm:ss",createTime);
// 查询临时表信息,计算实际流量值
InitialSwitchInfoTemp temp = new InitialSwitchInfoTemp();
temp.setClientId(message.getClientId());
List<InitialSwitchInfoTemp> tempList = initialSwitchInfoTempService.selectInitialSwitchInfoTempList(temp);
if(!tempList.isEmpty()){
// 1. 构建快速查找的Map
Map<String, InitialSwitchInfoTemp> tempMap = tempList.stream()
.collect(Collectors.toMap(
InitialSwitchInfoTemp::getName,
Function.identity(),
(existing, replacement) -> existing
));
// 2. 预计算除数(避免重复创建对象)
BigDecimal divisor = new BigDecimal(300);
// 3. 计算速度
switchInfos.forEach(switchInfo -> {
switchInfo.setClientId(message.getClientId());
switchInfo.setCreateTime(createTime);
InitialSwitchInfoTemp tempInfo = tempMap.get(switchInfo.getName());
if (tempInfo != null) {
// 计算inSpeed
if (switchInfo.getInBytes() != null && tempInfo.getInBytes() != null) {
BigDecimal inDiff = switchInfo.getInBytes().subtract(tempInfo.getInBytes());
switchInfo.setInSpeed(inDiff.divide(divisor, 2, RoundingMode.HALF_UP));
}
// 计算outSpeed
if (switchInfo.getOutBytes() != null && tempInfo.getOutBytes() != null) {
BigDecimal outDiff = switchInfo.getOutBytes().subtract(tempInfo.getOutBytes());
switchInfo.setOutSpeed(outDiff.divide(divisor, 2, RoundingMode.HALF_UP));
}
}
});
}else{
switchInfos.forEach(switchInfo -> {
switchInfo.setClientId(message.getClientId());
switchInfo.setCreateTime(createTime);
});
}
// 清空临时表对应switch信息
initialSwitchInfoTempService.truncateSwitchInfoTemp(message.getClientId());
// 临时表 用来计算inSpeed outSeppd
initialSwitchInfoTempService.batchInsertInitialSwitchInfoTemp(switchInfos);
// 初始交换机数据入库
initialSwitchInfoService.batchInsertInitialSwitchInfo(switchInfos);
// 业务表入库
InitialSwitchInfoDetailsRemote detailsRemote = new InitialSwitchInfoDetailsRemote();
detailsRemote.setClientId(message.getClientId());
detailsRemote.setStartTime(timeStr);
detailsRemote.setEndTime(timeStr);
remoteRevenueConfigService.autoSaveSwitchTraffic(detailsRemote, SecurityConstants.INNER);
}else{
throw new RuntimeException("交换机data数据为空");
}
}
/**
* 系统数据入库
* @param message
*/
private void handleSystemMessage(DeviceMessage message) {
List<InitialSystemInfo> systemInfos = JsonDataParser.parseJsonData(message.getData(), InitialSystemInfo.class);
if(!systemInfos.isEmpty()){
// 时间戳转换
long timestamp = systemInfos.get(0).getTimestamp();
long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp;
Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒
systemInfos.forEach(iface -> {
iface.setClientId(message.getClientId());
iface.setCreateTime(createTime);
});
// 初始系统数据入库
initialSystemInfoService.batchInsertInitialSystemInfo(systemInfos);
}else{
throw new RuntimeException("系统data数据为空");
}
}
/**
* 监听心跳
* @param message
*/
private void handleHeartbeatMessage(DeviceMessage message) {
List<InitialHeartbeatListen> heartbeats = JsonDataParser.parseJsonData(message.getData(), InitialHeartbeatListen.class);
if(!heartbeats.isEmpty()){
InitialHeartbeatListen heartbeat = heartbeats.get(0);
String clientId = message.getClientId();
log.info("处理心跳消息客户端ID: {}, 时间: {}", clientId, heartbeat.getTimestamp());
// 使用Redis存储状态
String statusKey = HEARTBEAT_STATUS_PREFIX + clientId;
String timeKey = HEARTBEAT_TIME_PREFIX + clientId;
String recoveryCountKey = HEARTBEAT_RECOVERY_COUNT_PREFIX + clientId; // 恢复次数计数器
try {
// 重置丢失计数为0设置最后心跳时间
redisTemplate.opsForValue().set(statusKey, "0");
redisTemplate.opsForValue().set(timeKey, String.valueOf(System.currentTimeMillis()));
// 检查是否之前有告警状态
if (Boolean.TRUE.equals(redisTemplate.hasKey(HEARTBEAT_ALERT_PREFIX + clientId))) {
// 获取当前恢复次数
String recoveryCountStr = redisTemplate.opsForValue().get(recoveryCountKey);
int recoveryCount = (recoveryCountStr == null) ? 1 : Integer.parseInt(recoveryCountStr) + 1;
if (recoveryCount == 2) {
// 达到2次恢复执行状态修改
log.warn("客户端ID: {} 心跳恢复达到2次修改设备状态为在线", clientId);
insertHeartbeatLog(clientId, "2", "心跳恢复,设备在线状态改为在线");
redisTemplate.delete(HEARTBEAT_ALERT_PREFIX + clientId);
redisTemplate.delete(recoveryCountKey); // 清除恢复计数器
// 修改资源状态
getResourceMsg(clientId, "1");
} else {
// 未达到2次只记录恢复次数
log.info("客户端ID: {} 心跳恢复第{}次", clientId, recoveryCount);
redisTemplate.opsForValue().set(recoveryCountKey, String.valueOf(recoveryCount));
}
}
} catch (Exception e) {
log.error("处理心跳消息异常, clientId: {}", clientId, e);
}
}
}
// 添加一个定时任务方法,定期检查心跳状态
@Scheduled(fixedRate = 60000) // 每分钟检查一次
public void checkHeartbeatStatus() {
long currentTime = System.currentTimeMillis();
// 获取所有客户端时间键
Set<String> timeKeys = redisTemplate.keys(HEARTBEAT_TIME_PREFIX + "*");
if (timeKeys == null) return;
for (String timeKey : timeKeys) {
String clientId = timeKey.substring(HEARTBEAT_TIME_PREFIX.length());
String statusKey = HEARTBEAT_STATUS_PREFIX + clientId;
try {
String lastTimeStr = redisTemplate.opsForValue().get(timeKey);
if (lastTimeStr == null) continue;
long lastHeartbeatTime = Long.parseLong(lastTimeStr);
if (currentTime - lastHeartbeatTime > HEARTBEAT_TIMEOUT) {
// 心跳超时处理
String lostCountStr = redisTemplate.opsForValue().get(statusKey);
int lostCount = (lostCountStr == null ? 0 : Integer.parseInt(lostCountStr)) + 1;
redisTemplate.opsForValue().set(statusKey, String.valueOf(lostCount));
log.warn("客户端ID: {} 心跳丢失,连续次数: {}", clientId, lostCount);
if (lostCount == 3) {
insertHeartbeatLog(clientId, "3", "连续三次心跳丢失");
redisTemplate.opsForValue().set(HEARTBEAT_ALERT_PREFIX + clientId, "1");
// 修改资源状态
getResourceMsg(clientId, "0");
}
}
} catch (Exception e) {
log.error("检查心跳状态异常, clientId: {}", clientId, e);
}
}
}
/**
* 修改资源在线状态
* @param clientId
* @param status
*/
private void getResourceMsg(String clientId, String status){
String ipAddress = null;
AllInterfaceNameRemote interfaceNameRemote = new AllInterfaceNameRemote();
interfaceNameRemote.setClientId(clientId);
// 1. 先获取交换机IP
interfaceNameRemote.setResourceType("2");
R<AllInterfaceNameRemote> switchResult = remoteRevenueConfigService.getMsgByClientId(
interfaceNameRemote, SecurityConstants.INNER);
if (switchResult != null && switchResult.getData() != null &&
StringUtils.isNotEmpty(switchResult.getData().getSwitchIp())) {
// 更新交换机状态
ipAddress = switchResult.getData().getSwitchIp();
updateResourceStatus(ipAddress, status);
// 2. 再获取服务器IP
interfaceNameRemote.setResourceType("1");
R<AllInterfaceNameRemote> serverResult = remoteRevenueConfigService.getMsgByClientId(
interfaceNameRemote, SecurityConstants.INNER);
if (serverResult != null && serverResult.getData() != null &&
StringUtils.isNotEmpty(serverResult.getData().getServerIp())) {
// 更新服务器状态
updateResourceStatus(serverResult.getData().getServerIp(), status);
}
} else {
// 3. 如果没有交换机IP只获取服务器IP
interfaceNameRemote.setResourceType("1");
R<AllInterfaceNameRemote> serverResult = remoteRevenueConfigService.getMsgByClientId(
interfaceNameRemote, SecurityConstants.INNER);
if (serverResult != null && serverResult.getData() != null &&
StringUtils.isNotEmpty(serverResult.getData().getServerIp())) {
// 更新服务器状态
updateResourceStatus(serverResult.getData().getServerIp(), status);
} else {
log.warn("未找到客户端ID: {} 对应的IP地址", clientId);
}
}
}
// 更新资源状态的公共方法
private void updateResourceStatus(String ipAddress, String status) {
RmResourceRegistrationRemote rmResourceRegistrationRemote = new RmResourceRegistrationRemote();
rmResourceRegistrationRemote.setOnlineStatus(status);
rmResourceRegistrationRemote.setIpAddress(ipAddress);
remoteRevenueConfigService.updateStatusByResource(rmResourceRegistrationRemote, SecurityConstants.INNER);
}
// 插入心跳日志到数据库
private void insertHeartbeatLog(String machineId, String status, String remark) {
try {
InitialHeartbeatListenLog listenLog = new InitialHeartbeatListenLog();
listenLog.setClientId(machineId);
listenLog.setStatus(status); // 0-离线 1-在线 2-恢复 3-三次丢失
listenLog.setRemark(remark);
listenLog.setCreateTime(new Date());
// 调用DAO或Service插入日志
initialHeartbeatListenLog.insertInitialHeartbeatListenLog(listenLog);
log.info("已记录心跳日志客户端ID: {}, 状态: {}", machineId, status);
} catch (Exception e) {
log.error("插入心跳日志失败", e);
}
}
/**
* 应答信息
* @param message
*/
private RspVo handleResponseMessage(DeviceMessage message) {
List<RspVo> rspVoList = JsonDataParser.parseJsonData(message.getData(), RspVo.class);
if (!rspVoList.isEmpty()) {
RspVo rsp = rspVoList.get(0);
log.info("应答信息:{}",rsp);
return rsp;
}
return null;
}
/**
* 注册应答处理
* @param message
*/
// private void handleRegisterMessage(DeviceMessage message) {
// RspVo rspVo = handleResponseMessage(message);
// String clientId = message.getClientId();
// if (rspVo != null && rspVo.getResCode() == 1) {
// RmResourceRegistrationRemote rmResourceRegistrationRemote = new RmResourceRegistrationRemote();
// rmResourceRegistrationRemote.setRegistrationStatus("1");
// rmResourceRegistrationRemote.setHardwareSn(clientId);
// remoteRevenueConfigService.updateStatusByResource(rmResourceRegistrationRemote, SecurityConstants.INNER);
// }else{
// if(rspVo == null){
// log.error("注册失败:应答信息为null");
// }else{
// log.error("注册失败:{}",rspVo.getResMsg());
// }
// }
// }
}

View File

@@ -0,0 +1,616 @@
package com.ruoyi.rocketmq.consumer;
import com.alibaba.fastjson.JSON;
import com.ruoyi.common.core.constant.SecurityConstants;
import com.ruoyi.common.core.domain.R;
import com.ruoyi.common.core.utils.DateUtils;
import com.ruoyi.common.core.utils.StringUtils;
import com.ruoyi.rocketmq.domain.*;
import com.ruoyi.rocketmq.domain.vo.RspVo;
import com.ruoyi.rocketmq.enums.MessageCodeEnum;
import com.ruoyi.rocketmq.handler.DeviceMessageHandler;
import com.ruoyi.rocketmq.producer.ConsumeException;
import com.ruoyi.rocketmq.service.*;
import com.ruoyi.rocketmq.utils.JsonDataParser;
import com.ruoyi.system.api.RemoteRevenueConfigService;
import com.ruoyi.system.api.domain.AllInterfaceNameRemote;
import com.ruoyi.system.api.domain.EpsInitialTrafficDataRemote;
import com.ruoyi.system.api.domain.InitialSwitchInfoDetailsRemote;
import com.ruoyi.system.api.domain.RmResourceRegistrationRemote;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.util.CollectionUtils;
import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* 消息监听 v1.0
*/
@Slf4j
public class RocketMsgListenerHistory implements MessageListenerConcurrently {
// 心跳状态
private static final String HEARTBEAT_STATUS_PREFIX = "heartbeat:status:";
// 心跳时间
private static final String HEARTBEAT_TIME_PREFIX = "heartbeat:time:";
// 心跳告警
private static final String HEARTBEAT_ALERT_PREFIX = "heartbeat:alert:";
String HEARTBEAT_RECOVERY_COUNT_PREFIX = "heartbeat:recovery:count:";
private static final long HEARTBEAT_TIMEOUT = 180000; // 3分钟超时
@Autowired
private RedisTemplate<String, String> redisTemplate;
private final IInitialBandwidthTrafficService initialBandwidthTrafficService;
private final RemoteRevenueConfigService remoteRevenueConfigService;
@Autowired
private IInitialDockerInfoService initialDockerInfoService;
@Autowired
private IInitialCpuInfoService initialCpuInfoService;
@Autowired
private IInitialDiskInfoService initialDiskInfoService;
@Autowired
private IInitialMemoryInfoService initialMemoryInfoService;
@Autowired
private IInitialMountPointInfoService initialMountPointInfoService;
@Autowired
private IInitialSwitchInfoService initialSwitchInfoService;
@Autowired
private IInitialSystemInfoService initialSystemInfoService;
@Autowired
private IInitialSwitchInfoTempService initialSwitchInfoTempService;
@Autowired
private IInitialHeartbeatListenLogService initialHeartbeatListenLog;
@Autowired
public RocketMsgListenerHistory(IInitialBandwidthTrafficService initialBandwidthTrafficService,
RemoteRevenueConfigService remoteRevenueConfigService) {
this.initialBandwidthTrafficService = initialBandwidthTrafficService;
this.remoteRevenueConfigService = remoteRevenueConfigService;
}
@Autowired
private DeviceMessageHandler deviceMessageHandler;
/**
* 消费消息
* @param list msgs.size() >= 1
* DefaultMQPushConsumer.consumeMessageBatchMaxSize=1you can modify here
* 这里只设置为1当设置为多个时list中只要有一条消息消费失败就会整体重试
* @param consumeConcurrentlyContext 上下文信息
* @return 消费状态 成功CONSUME_SUCCESS或者 重试 (RECONSUME_LATER)
*/
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
try{
//消息不等于空情况
if (!CollectionUtils.isEmpty(list)) {
//获取topic
for (MessageExt messageExt : list) {
// 解析消息内容
String body = new String(messageExt.getBody());
log.info("接受到的消息为:{}", body);
String tags = messageExt.getTags();
String topic = messageExt.getTopic();
String msgId = messageExt.getMsgId();
String keys = messageExt.getKeys();
int reConsume = messageExt.getReconsumeTimes();
// 消息已经重试了3次如果不需要再次消费则返回成功
if (reConsume == 3) {
// TODO 补偿信息
log.error("消息消费三次失败,消息内容:{}", body);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//根据业务返回是否正常
}
if(MessageCodeEnum.TONGRAN_AGENT_UP.getCode().equals(topic)){
// 拿到信息
DeviceMessage message = JSON.parseObject(body, DeviceMessage.class);
// 处理消息
deviceMessageHandler.handleMessage(message);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//业务处理成功
}
// 根据不同的topic处理不同的业务 这里以订单消息为例子
if (MessageCodeEnum.AGENT_MESSAGE_TOPIC.getCode().equals(topic)) {
// 拿到信息
DeviceMessage message = JSON.parseObject(body, DeviceMessage.class);
switch (message.getDataType()){
case "NET":
handleNetMessage(message);
break;
case "CPU":
handleCpuMessage(message);
break;
case "SYSTEM":
handleSystemMessage(message);
break;
case "DISK":
handleDiskMessage(message);
break;
case "POINT":
handleMountPointMessage(message);
break;
case "MEMORY":
handleMemoryMessage(message);
break;
case "DOCKER":
handleDockerMessage(message);
break;
case "SWITCHBOARD":
handleSwitchMessage(message);
break;
case "HEARTBEAT":
handleHeartbeatMessage(message);
break;
default:
log.warn("未知数据类型:{}",message.getDataType());
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//业务处理成功
}
}
}
// 消息消费失败
//broker会根据设置的messageDelayLevel发起重试默认16次
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
} catch (Exception e) {
// 调用 handleException 方法处理异常并返回处理结果
return handleException(e);
}
}
/**
* 异常处理
*
* @param e 捕获的异常
* @return 消息消费结果
*/
private static ConsumeConcurrentlyStatus handleException(final Exception e) {
Class exceptionClass = e.getClass();
if (exceptionClass.equals(UnsupportedEncodingException.class)) {
log.error(e.getMessage());
} else if (exceptionClass.equals(ConsumeException.class)) {
log.error(e.getMessage());
} else{
log.error(e.getMessage());
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
/**
* 网络流量数据入库
* @param message
*/
private void handleNetMessage(DeviceMessage message) {
List<InitialBandwidthTraffic> interfaces = JsonDataParser.parseJsonData(message.getData(), InitialBandwidthTraffic.class);
if(!interfaces.isEmpty()){
// 时间戳转换
long timestamp = interfaces.get(0).getTimestamp();
long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp;
Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒
String timeStr = DateUtils.parseDateToStr("yyyy-MM-dd HH:mm:ss",createTime);
InitialBandwidthTraffic data = new InitialBandwidthTraffic();
interfaces.forEach(iface -> {
iface.setClientId(message.getClientId());
iface.setCreateTime(createTime);
});
// 批量入库集合
data.setList(interfaces);
// 初始流量数据入库
initialBandwidthTrafficService.batchInsert(data);
EpsInitialTrafficDataRemote epsInitialTrafficDataRemote = new EpsInitialTrafficDataRemote();
epsInitialTrafficDataRemote.setStartTime(timeStr);
epsInitialTrafficDataRemote.setEndTime(timeStr);
// 复制到业务初始库
remoteRevenueConfigService.autoSaveServiceTrafficData(epsInitialTrafficDataRemote, SecurityConstants.INNER);
}else{
throw new RuntimeException("NET流量data数据为空");
}
}
/**
* docker数据入库
* @param message
*/
private void handleDockerMessage(DeviceMessage message) {
List<InitialDockerInfo> dockers = JsonDataParser.parseJsonData(message.getData(), InitialDockerInfo.class);
if(!dockers.isEmpty()){
// 时间戳转换
long timestamp = dockers.get(0).getTimestamp();
long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp;
Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒
dockers.forEach(iface -> {
iface.setClientId(message.getClientId());
iface.setCreateTime(createTime);
});
// 初始容器数据入库
initialDockerInfoService.batchInsertInitialDockerInfo(dockers);
}else{
throw new RuntimeException("DOCKER容器data数据为空");
}
}
/**
* cpu数据入库
* @param message
*/
private void handleCpuMessage(DeviceMessage message) {
List<InitialCpuInfo> cpus = JsonDataParser.parseJsonData(message.getData(),InitialCpuInfo.class);
// 时间戳转换
long timestamp = cpus.get(0).getTimestamp();
long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp;
Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒
if(!cpus.isEmpty()){
cpus.forEach(iface -> {
iface.setClientId(message.getClientId());
iface.setCreateTime(createTime);
});
// 初始CPU数据入库
initialCpuInfoService.batchInsertInitialCpuInfo(cpus);
}else{
throw new RuntimeException("CPUdata数据为空");
}
}
/**
* 磁盘数据入库
* @param message
*/
private void handleDiskMessage(DeviceMessage message) {
List<InitialDiskInfo> disks = JsonDataParser.parseJsonData(message.getData(), InitialDiskInfo.class);
// 时间戳转换
long timestamp = disks.get(0).getTimestamp();
long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp;
Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒
if(!disks.isEmpty()){
disks.forEach(iface -> {
iface.setClientId(message.getClientId());
iface.setCreateTime(createTime);
});
// 初始磁盘数据入库
initialDiskInfoService.batchInsertInitialDiskInfo(disks);
}else{
throw new RuntimeException("磁盘data数据为空");
}
}
/**
* 内存数据入库
* @param message
*/
private void handleMemoryMessage(DeviceMessage message) {
List<InitialMemoryInfo> memorys = JsonDataParser.parseJsonData(message.getData(), InitialMemoryInfo.class);
if(!memorys.isEmpty()){
// 时间戳转换
long timestamp = memorys.get(0).getTimestamp();
long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp;
Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒
memorys.forEach(iface -> {
iface.setClientId(message.getClientId());
iface.setCreateTime(createTime);
});
// 初始内存数据入库
initialMemoryInfoService.batchInsertInitialMemoryInfo(memorys);
}else{
throw new RuntimeException("内存data数据为空");
}
}
/**
* 挂载点数据入库
* @param message
*/
private void handleMountPointMessage(DeviceMessage message) {
List<InitialMountPointInfo> mountPointInfos = JsonDataParser.parseJsonData(message.getData(), InitialMountPointInfo.class);
if(!mountPointInfos.isEmpty()){
// 时间戳转换
long timestamp = mountPointInfos.get(0).getTimestamp();
long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp;
Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒
mountPointInfos.forEach(iface -> {
iface.setClientId(message.getClientId());
iface.setCreateTime(createTime);
});
// 初始挂载点数据入库
initialMountPointInfoService.batchInsertInitialMountPointInfo(mountPointInfos);
}else{
throw new RuntimeException("挂载点data数据为空");
}
}
/**
* 交换机数据入库
* @param message
*/
private void handleSwitchMessage(DeviceMessage message) {
List<InitialSwitchInfo> switchInfos = JsonDataParser.parseJsonData(message.getData(), InitialSwitchInfo.class);
if(!switchInfos.isEmpty()){
// 时间戳转换
long timestamp = switchInfos.get(0).getTimestamp();
long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp;
Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒
String timeStr = DateUtils.parseDateToStr("yyyy-MM-dd HH:mm:ss",createTime);
// 查询临时表信息,计算实际流量值
InitialSwitchInfoTemp temp = new InitialSwitchInfoTemp();
temp.setClientId(message.getClientId());
List<InitialSwitchInfoTemp> tempList = initialSwitchInfoTempService.selectInitialSwitchInfoTempList(temp);
if(!tempList.isEmpty()){
// 1. 构建快速查找的Map
Map<String, InitialSwitchInfoTemp> tempMap = tempList.stream()
.collect(Collectors.toMap(
InitialSwitchInfoTemp::getName,
Function.identity(),
(existing, replacement) -> existing
));
// 2. 预计算除数(避免重复创建对象)
BigDecimal divisor = new BigDecimal(300);
// 3. 计算速度
switchInfos.forEach(switchInfo -> {
switchInfo.setClientId(message.getClientId());
switchInfo.setCreateTime(createTime);
InitialSwitchInfoTemp tempInfo = tempMap.get(switchInfo.getName());
if (tempInfo != null) {
// 计算inSpeed
if (switchInfo.getInBytes() != null && tempInfo.getInBytes() != null) {
BigDecimal inDiff = switchInfo.getInBytes().subtract(tempInfo.getInBytes());
switchInfo.setInSpeed(inDiff.divide(divisor, 2, RoundingMode.HALF_UP));
}
// 计算outSpeed
if (switchInfo.getOutBytes() != null && tempInfo.getOutBytes() != null) {
BigDecimal outDiff = switchInfo.getOutBytes().subtract(tempInfo.getOutBytes());
switchInfo.setOutSpeed(outDiff.divide(divisor, 2, RoundingMode.HALF_UP));
}
}
});
}else{
switchInfos.forEach(switchInfo -> {
switchInfo.setClientId(message.getClientId());
switchInfo.setCreateTime(createTime);
});
}
// 清空临时表对应switch信息
initialSwitchInfoTempService.truncateSwitchInfoTemp(message.getClientId());
// 临时表 用来计算inSpeed outSeppd
initialSwitchInfoTempService.batchInsertInitialSwitchInfoTemp(switchInfos);
// 初始交换机数据入库
initialSwitchInfoService.batchInsertInitialSwitchInfo(switchInfos);
// 业务表入库
InitialSwitchInfoDetailsRemote detailsRemote = new InitialSwitchInfoDetailsRemote();
detailsRemote.setClientId(message.getClientId());
detailsRemote.setStartTime(timeStr);
detailsRemote.setEndTime(timeStr);
remoteRevenueConfigService.autoSaveSwitchTraffic(detailsRemote, SecurityConstants.INNER);
}else{
throw new RuntimeException("交换机data数据为空");
}
}
/**
* 系统数据入库
* @param message
*/
private void handleSystemMessage(DeviceMessage message) {
List<InitialSystemInfo> systemInfos = JsonDataParser.parseJsonData(message.getData(), InitialSystemInfo.class);
if(!systemInfos.isEmpty()){
// 时间戳转换
long timestamp = systemInfos.get(0).getTimestamp();
long millis = timestamp < 1_000_000_000L ? timestamp * 1000 : timestamp;
Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒
systemInfos.forEach(iface -> {
iface.setClientId(message.getClientId());
iface.setCreateTime(createTime);
});
// 初始系统数据入库
initialSystemInfoService.batchInsertInitialSystemInfo(systemInfos);
}else{
throw new RuntimeException("系统data数据为空");
}
}
/**
* 监听心跳
* @param message
*/
private void handleHeartbeatMessage(DeviceMessage message) {
List<InitialHeartbeatListen> heartbeats = JsonDataParser.parseJsonData(message.getData(), InitialHeartbeatListen.class);
if(!heartbeats.isEmpty()){
InitialHeartbeatListen heartbeat = heartbeats.get(0);
String clientId = message.getClientId();
log.info("处理心跳消息客户端ID: {}, 时间: {}", clientId, heartbeat.getTimestamp());
// 使用Redis存储状态
String statusKey = HEARTBEAT_STATUS_PREFIX + clientId;
String timeKey = HEARTBEAT_TIME_PREFIX + clientId;
String recoveryCountKey = HEARTBEAT_RECOVERY_COUNT_PREFIX + clientId; // 恢复次数计数器
try {
// 重置丢失计数为0设置最后心跳时间
redisTemplate.opsForValue().set(statusKey, "0");
redisTemplate.opsForValue().set(timeKey, String.valueOf(System.currentTimeMillis()));
// 检查是否之前有告警状态
if (Boolean.TRUE.equals(redisTemplate.hasKey(HEARTBEAT_ALERT_PREFIX + clientId))) {
// 获取当前恢复次数
String recoveryCountStr = redisTemplate.opsForValue().get(recoveryCountKey);
int recoveryCount = (recoveryCountStr == null) ? 1 : Integer.parseInt(recoveryCountStr) + 1;
if (recoveryCount == 2) {
// 达到2次恢复执行状态修改
log.warn("客户端ID: {} 心跳恢复达到2次修改设备状态为在线", clientId);
insertHeartbeatLog(clientId, "2", "心跳恢复,设备在线状态改为在线");
redisTemplate.delete(HEARTBEAT_ALERT_PREFIX + clientId);
redisTemplate.delete(recoveryCountKey); // 清除恢复计数器
// 修改资源状态
getResourceMsg(clientId, "1");
} else {
// 未达到2次只记录恢复次数
log.info("客户端ID: {} 心跳恢复第{}次", clientId, recoveryCount);
redisTemplate.opsForValue().set(recoveryCountKey, String.valueOf(recoveryCount));
}
}
} catch (Exception e) {
log.error("处理心跳消息异常, clientId: {}", clientId, e);
}
}
}
// 添加一个定时任务方法,定期检查心跳状态
@Scheduled(fixedRate = 60000) // 每分钟检查一次
public void checkHeartbeatStatus() {
long currentTime = System.currentTimeMillis();
// 获取所有客户端时间键
Set<String> timeKeys = redisTemplate.keys(HEARTBEAT_TIME_PREFIX + "*");
if (timeKeys == null) return;
for (String timeKey : timeKeys) {
String clientId = timeKey.substring(HEARTBEAT_TIME_PREFIX.length());
String statusKey = HEARTBEAT_STATUS_PREFIX + clientId;
String alertKey = HEARTBEAT_ALERT_PREFIX + clientId;
try {
// 检查是否已经存在告警
String existingAlert = redisTemplate.opsForValue().get(alertKey);
if ("1".equals(existingAlert)) {
continue; // 如果已有告警,跳过处理
}
String lastTimeStr = redisTemplate.opsForValue().get(timeKey);
if (lastTimeStr == null) continue;
long lastHeartbeatTime = Long.parseLong(lastTimeStr);
if (currentTime - lastHeartbeatTime > HEARTBEAT_TIMEOUT) {
// 心跳超时处理
String lostCountStr = redisTemplate.opsForValue().get(statusKey);
int lostCount = (lostCountStr == null ? 0 : Integer.parseInt(lostCountStr)) + 1;
redisTemplate.opsForValue().set(statusKey, String.valueOf(lostCount));
log.warn("客户端ID: {} 心跳丢失,连续次数: {}", clientId, lostCount);
if (lostCount >= 3) {
insertHeartbeatLog(clientId, "3", "连续三次心跳丢失");
redisTemplate.opsForValue().set(HEARTBEAT_ALERT_PREFIX + clientId, "1");
// 设置告警后删除timeKey和statusKey
redisTemplate.delete(timeKey);
redisTemplate.delete(statusKey);
log.info("客户端ID: {} 已设置告警并清理心跳记录", clientId);
// 修改资源状态
getResourceMsg(clientId, "0");
}
}else {
// 如果心跳正常,重置丢失次数
redisTemplate.opsForValue().set(statusKey, "0");
log.debug("客户端ID: {} 心跳正常,重置丢失次数", clientId);
}
} catch (Exception e) {
log.error("检查心跳状态异常, clientId: {}", clientId, e);
}
}
}
/**
* 修改资源在线状态
* @param clientId
* @param status
*/
private void getResourceMsg(String clientId, String status){
String ipAddress = null;
AllInterfaceNameRemote interfaceNameRemote = new AllInterfaceNameRemote();
interfaceNameRemote.setClientId(clientId);
// 1. 先获取交换机IP
interfaceNameRemote.setResourceType("2");
R<AllInterfaceNameRemote> switchResult = remoteRevenueConfigService.getMsgByClientId(
interfaceNameRemote, SecurityConstants.INNER);
if (switchResult != null && switchResult.getData() != null &&
StringUtils.isNotEmpty(switchResult.getData().getSwitchIp())) {
// 更新交换机状态
ipAddress = switchResult.getData().getSwitchIp();
updateResourceStatus(ipAddress, status);
// 2. 再获取服务器IP
interfaceNameRemote.setResourceType("1");
R<AllInterfaceNameRemote> serverResult = remoteRevenueConfigService.getMsgByClientId(
interfaceNameRemote, SecurityConstants.INNER);
if (serverResult != null && serverResult.getData() != null &&
StringUtils.isNotEmpty(serverResult.getData().getServerIp())) {
// 更新服务器状态
updateResourceStatus(serverResult.getData().getServerIp(), status);
}
} else {
// 3. 如果没有交换机IP只获取服务器IP
interfaceNameRemote.setResourceType("1");
R<AllInterfaceNameRemote> serverResult = remoteRevenueConfigService.getMsgByClientId(
interfaceNameRemote, SecurityConstants.INNER);
if (serverResult != null && serverResult.getData() != null &&
StringUtils.isNotEmpty(serverResult.getData().getServerIp())) {
// 更新服务器状态
updateResourceStatus(serverResult.getData().getServerIp(), status);
} else {
log.warn("未找到客户端ID: {} 对应的IP地址", clientId);
}
}
}
// 更新资源状态的公共方法
private void updateResourceStatus(String ipAddress, String status) {
RmResourceRegistrationRemote rmResourceRegistrationRemote = new RmResourceRegistrationRemote();
rmResourceRegistrationRemote.setOnlineStatus(status);
rmResourceRegistrationRemote.setRegistrationStatus(status);
rmResourceRegistrationRemote.setIpAddress(ipAddress);
remoteRevenueConfigService.updateStatusByResource(rmResourceRegistrationRemote, SecurityConstants.INNER);
}
// 插入心跳日志到数据库
private void insertHeartbeatLog(String machineId, String status, String remark) {
try {
InitialHeartbeatListenLog listenLog = new InitialHeartbeatListenLog();
listenLog.setClientId(machineId);
listenLog.setStatus(status); // 0-离线 1-在线 2-恢复 3-三次丢失
listenLog.setRemark(remark);
listenLog.setCreateTime(new Date());
// 调用DAO或Service插入日志
initialHeartbeatListenLog.insertInitialHeartbeatListenLog(listenLog);
log.info("已记录心跳日志客户端ID: {}, 状态: {}", machineId, status);
} catch (Exception e) {
log.error("插入心跳日志失败", e);
}
}
/**
* 应答信息
* @param message
*/
private RspVo handleResponseMessage(DeviceMessage message) {
List<RspVo> rspVoList = JsonDataParser.parseJsonData(message.getData(), RspVo.class);
if (!rspVoList.isEmpty()) {
RspVo rsp = rspVoList.get(0);
log.info("应答信息:{}",rsp);
return rsp;
}
return null;
}
/**
* 注册应答处理
* @param message
*/
// private void handleRegisterMessage(DeviceMessage message) {
// RspVo rspVo = handleResponseMessage(message);
// String clientId = message.getClientId();
// if (rspVo != null && rspVo.getResCode() == 1) {
// RmResourceRegistrationRemote rmResourceRegistrationRemote = new RmResourceRegistrationRemote();
// rmResourceRegistrationRemote.setRegistrationStatus("1");
// rmResourceRegistrationRemote.setHardwareSn(clientId);
// remoteRevenueConfigService.updateStatusByResource(rmResourceRegistrationRemote, SecurityConstants.INNER);
// }else{
// if(rspVo == null){
// log.error("注册失败:应答信息为null");
// }else{
// log.error("注册失败:{}",rspVo.getResMsg());
// }
// }
// }
}

View File

@@ -0,0 +1,102 @@
package com.ruoyi.rocketmq.controller;
import com.ruoyi.common.core.utils.poi.ExcelUtil;
import com.ruoyi.common.core.web.controller.BaseController;
import com.ruoyi.common.core.web.domain.AjaxResult;
import com.ruoyi.common.core.web.page.PageDomain;
import com.ruoyi.common.core.web.page.TableDataInfo;
import com.ruoyi.common.log.annotation.Log;
import com.ruoyi.common.log.enums.BusinessType;
import com.ruoyi.common.security.annotation.RequiresPermissions;
import com.ruoyi.rocketmq.domain.RmAgentManagement;
import com.ruoyi.rocketmq.service.IRmAgentManagementService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletResponse;
import java.util.List;
/**
* Agent管理Controller
*
* @author gyt
* @date 2025-09-15
*/
@RestController
@RequestMapping("/agentManagement")
public class RmAgentManagementController extends BaseController
{
@Autowired
private IRmAgentManagementService rmAgentManagementService;
/**
* 查询Agent管理列表
*/
@RequiresPermissions("rocketmq:management:list")
@GetMapping("/list")
public TableDataInfo list(RmAgentManagement rmAgentManagement)
{
PageDomain pageDomain = new PageDomain();
pageDomain.setPageNum(rmAgentManagement.getPageNum());
pageDomain.setPageSize(rmAgentManagement.getPageSize());
startPage(pageDomain);
List<RmAgentManagement> list = rmAgentManagementService.selectRmAgentManagementList(rmAgentManagement);
return getDataTable(list);
}
/**
* 导出Agent管理列表
*/
@RequiresPermissions("rocketmq:management:export")
@Log(title = "Agent管理", businessType = BusinessType.EXPORT)
@PostMapping("/export")
public void export(HttpServletResponse response, RmAgentManagement rmAgentManagement)
{
List<RmAgentManagement> list = rmAgentManagementService.selectRmAgentManagementList(rmAgentManagement);
ExcelUtil<RmAgentManagement> util = new ExcelUtil<RmAgentManagement>(RmAgentManagement.class);
util.exportExcel(response, list, "Agent管理数据");
}
/**
* 获取Agent管理详细信息
*/
@RequiresPermissions("rocketmq:management:query")
@GetMapping(value = "/{id}")
public AjaxResult getInfo(@PathVariable("id") Long id)
{
return success(rmAgentManagementService.selectRmAgentManagementById(id));
}
/**
* 新增Agent管理
*/
@RequiresPermissions("rocketmq:management:add")
@Log(title = "Agent管理", businessType = BusinessType.INSERT)
@PostMapping
public AjaxResult add(@RequestBody RmAgentManagement rmAgentManagement)
{
return toAjax(rmAgentManagementService.addRmAgentManagement(rmAgentManagement));
}
/**
* 修改Agent管理
*/
@RequiresPermissions("rocketmq:management:edit")
@Log(title = "Agent管理", businessType = BusinessType.UPDATE)
@PutMapping
public AjaxResult edit(@RequestBody RmAgentManagement rmAgentManagement)
{
return toAjax(rmAgentManagementService.updateRmAgentManagement(rmAgentManagement));
}
/**
* 删除Agent管理
*/
@RequiresPermissions("rocketmq:management:remove")
@Log(title = "Agent管理", businessType = BusinessType.DELETE)
@DeleteMapping("/{ids}")
public AjaxResult remove(@PathVariable Long[] ids)
{
return toAjax(rmAgentManagementService.deleteRmAgentManagementByIds(ids));
}
}

View File

@@ -50,10 +50,11 @@ public class RmMonitorPolicyController extends BaseController
@RequiresPermissions("rocketmq:policy:export")
@Log(title = "资源监控策略", businessType = BusinessType.EXPORT)
@PostMapping("/export")
public void export(HttpServletResponse response, RmMonitorPolicy rmMonitorPolicy)
public void export(HttpServletResponse response, @RequestBody RmMonitorPolicy rmMonitorPolicy)
{
List<RmMonitorPolicy> list = rmMonitorPolicyService.selectRmMonitorPolicyList(rmMonitorPolicy);
ExcelUtil<RmMonitorPolicy> util = new ExcelUtil<RmMonitorPolicy>(RmMonitorPolicy.class);
util.showColumn(rmMonitorPolicy.getProperties());
util.exportExcel(response, list, "资源监控策略数据");
}
@@ -90,7 +91,11 @@ public class RmMonitorPolicyController extends BaseController
@PutMapping
public AjaxResult edit(@RequestBody RmMonitorPolicy rmMonitorPolicy)
{
return toAjax(rmMonitorPolicyService.updateRmMonitorPolicy(rmMonitorPolicy));
int rows = rmMonitorPolicyService.updateRmMonitorPolicy(rmMonitorPolicy);
if(rows == -1){
return AjaxResult.error("资源监控策略新增失败,该资源组已绑定其他策略");
}
return toAjax(rows);
}
/**
* 资源监控策略下发

View File

@@ -52,10 +52,11 @@ public class RmMonitorTemplateController extends BaseController
@RequiresPermissions("rocketmq:template:export")
@Log(title = "监控模板", businessType = BusinessType.EXPORT)
@PostMapping("/export")
public void export(HttpServletResponse response, RmMonitorTemplate rmMonitorTemplate)
public void export(HttpServletResponse response, @RequestBody RmMonitorTemplate rmMonitorTemplate)
{
List<RmMonitorTemplate> list = rmMonitorTemplateService.selectRmMonitorTemplateList(rmMonitorTemplate);
ExcelUtil<RmMonitorTemplate> util = new ExcelUtil<RmMonitorTemplate>(RmMonitorTemplate.class);
util.showColumn(rmMonitorTemplate.getProperties());
util.exportExcel(response, list, "监控模板数据");
}

View File

@@ -1,9 +1,8 @@
package com.ruoyi.rocketmq.domain;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import com.ruoyi.common.core.annotation.Excel;
import com.ruoyi.common.core.web.domain.BaseEntity;
import lombok.Data;
/**
* 心跳信息日志对象 initial_heartbeat_listen_log
@@ -11,47 +10,16 @@ import com.ruoyi.common.core.web.domain.BaseEntity;
* @author gyt
* @date 2025-09-08
*/
@Data
public class InitialHeartbeatListenLog extends BaseEntity
{
private static final long serialVersionUID = 1L;
private Long id;
/** 客户端ID */
private String clientId;
/** 状态0-正常 1-恢复 2-两次丢失 3-三次丢失 */
@Excel(name = "状态0-正常 1-恢复 2-两次丢失 3-三次丢失")
private String status;
public void setClientId(String clientId)
{
this.clientId = clientId;
}
public String getClientId()
{
return clientId;
}
public void setStatus(String status)
{
this.status = status;
}
public String getStatus()
{
return status;
}
@Override
public String toString() {
return new ToStringBuilder(this,ToStringStyle.MULTI_LINE_STYLE)
.append("clientId", getClientId())
.append("status", getStatus())
.append("remark", getRemark())
.append("createTime", getCreateTime())
.append("updateTime", getUpdateTime())
.append("createBy", getCreateBy())
.append("updateBy", getUpdateBy())
.toString();
}
}

View File

@@ -0,0 +1,77 @@
package com.ruoyi.rocketmq.domain;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.ruoyi.common.core.annotation.Excel;
import com.ruoyi.common.core.web.domain.BaseEntity;
import lombok.Data;
import java.time.LocalDateTime;
import java.util.Date;
/**
* Agent管理对象 rm_agent_management
*
* @author gyt
* @date 2025-09-15
*/
@Data
public class RmAgentManagement extends BaseEntity
{
private static final long serialVersionUID = 1L;
/** 主键ID */
private Long id;
/** 硬件SN码 */
@Excel(name = "硬件SN码")
private String hardwareSn;
/** 资源名称 */
@Excel(name = "资源名称")
private String resourceName;
/** 内网IP地址 */
@Excel(name = "内网IP地址")
private String internalIp;
/** Agent状态0-离线1-在线2-异常 */
@Excel(name = "Agent状态0-离线1-在线2-异常")
private String status;
/** Agent版本号 */
@Excel(name = "Agent版本号")
private String agentVersion;
/** 执行方式 */
@Excel(name = "执行方式")
private Integer method;
/** 定时更新时间cron表达式 */
@Excel(name = "定时更新时间")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private LocalDateTime scheduledUpdateTime;
/** 文件地址格式 */
@Excel(name = "文件地址格式")
private Long fileUrlType;
/** 文件地址 */
@Excel(name = "文件地址")
private String fileUrl;
/** 文件目录 */
@Excel(name = "文件目录")
private String fileDirectory;
/** 最后一次更新结果success/failure */
@Excel(name = "最后一次更新结果", readConverterExp = "s=uccess/failure")
private String lastUpdateResult;
/** 最后一次更新时间 */
@JsonFormat(pattern = "yyyy-MM-dd")
@Excel(name = "最后一次更新时间", width = 30, dateFormat = "yyyy-MM-dd")
private Date lastUpdateTime;
/** 生效服务器id */
private String includeIds;
/** 生效服务器名称 */
private String includeNames;
}

View File

@@ -23,17 +23,6 @@ public class RmMonitorPolicy extends BaseEntity
/** 主键ID */
private Long id;
/** 模板ID */
private Long templateId;
/** 模板名称 */
@Excel(name = "关联监控模板")
private String templateName;
/** 资源组ID */
@Excel(name = "资源组ID")
private Long resourceGroupId;
/** 资源组名称 */
private String resourceGroupName;
/** 策略名称 */
@Excel(name = "策略名称")
@@ -43,17 +32,38 @@ public class RmMonitorPolicy extends BaseEntity
@Excel(name = "描述")
private String description;
/** 资源组ID */
private Long resourceGroupId;
/** 资源组名称 */
@Excel(name = "关联资源组")
private String resourceGroupName;
/** 模板ID */
private Long templateId;
/** 模板名称 */
@Excel(name = "关联监控模板")
private String templateName;
/** 状态0-待下发1-已下发 */
@Excel(name = "状态0-待下发1-已下发")
@Excel(name = "策略状态", readConverterExp = "0=待下发,1=已下发")
private String status;
/** 下发策略时间 */
@JsonFormat(pattern = "yyyy-MM-dd")
@Excel(name = "下发策略时间", width = 30, dateFormat = "yyyy-MM-dd")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@Excel(name = "下发策略时间", width = 30, dateFormat = "yyyy-MM-dd HH:mm:ss")
private Date deployTime;
/** 采集周期及id集合 */
private List<RmMonitorPolicyVo> collectionAndIdList;
/** 资源类型linux switch */
private String resourceType;
/** 查询条件名称 */
private String queryName;
/** 创建时间 */
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@Excel(name = "创建时间")
private Date createTime;
/** 修改时间 */
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@Excel(name = "修改时间")
private Date updateTime;
}

View File

@@ -1,9 +1,12 @@
package com.ruoyi.rocketmq.domain;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.ruoyi.common.core.annotation.Excel;
import com.ruoyi.common.core.web.domain.BaseEntity;
import lombok.Data;
import java.util.Date;
/**
* 监控模板对象 rm_monitor_template
*
@@ -23,7 +26,7 @@ public class RmMonitorTemplate extends BaseEntity
private String templateName;
/** 模板描述 */
@Excel(name = "模板描述")
@Excel(name = "描述")
private String description;
/** 监控项 */
@@ -35,11 +38,19 @@ public class RmMonitorTemplate extends BaseEntity
private String discoveryRules;
/** 资源组ID */
@Excel(name = "资源组ID")
private Long resourceGroupId;
/** 资源组名称 */
@Excel(name = "关联资源组")
private String resourceGroupName;
/** 资源类型(linux,switch) */
private String resourcyType;
/** 创建时间 */
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@Excel(name = "创建时间")
private Date createTime;
/** 修改时间 */
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@Excel(name = "修改时间")
private Date updateTime;
}

View File

@@ -0,0 +1,24 @@
package com.ruoyi.rocketmq.domain.vo;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;
import java.time.Instant;
import java.time.LocalDateTime;
@Data
public class AgentUpdateVo {
/**文件地址外网HTTPS地址 */
private String fileUrl;
/** 保存文件地址 */
private String filePath;
/** 执行命令List<String>格式Json字符串 */
private String commands;
/** 执行方式0、立即执行1、定时执行 */
private Integer method;
/** 定时时间执行方式为1、定时执行时该字段必传 */
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private LocalDateTime policyTime;
/** 时间戳 */
private long timestamp = Instant.now().getEpochSecond();
}

View File

@@ -1,26 +1,24 @@
package com.ruoyi.rocketmq.handler;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ruoyi.common.core.constant.SecurityConstants;
import com.ruoyi.common.core.domain.R;
import com.ruoyi.common.core.enums.MsgEnum;
import com.ruoyi.common.core.utils.DateUtils;
import com.ruoyi.common.core.utils.StringUtils;
import com.ruoyi.rocketmq.domain.*;
import com.ruoyi.rocketmq.domain.vo.CollectDataVo;
import com.ruoyi.rocketmq.domain.vo.RspVo;
import com.ruoyi.rocketmq.service.*;
import com.ruoyi.rocketmq.utils.JsonDataParser;
import com.ruoyi.system.api.RemoteRevenueConfigService;
import com.ruoyi.system.api.domain.AllInterfaceNameRemote;
import com.ruoyi.system.api.domain.EpsInitialTrafficDataRemote;
import com.ruoyi.system.api.domain.InitialSwitchInfoDetailsRemote;
import com.ruoyi.system.api.domain.RmResourceRegistrationRemote;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@@ -37,6 +35,7 @@ import java.util.stream.Collectors;
*/
@Slf4j
@Component
@EnableScheduling
public class DeviceMessageHandler {
private final Map<String, Consumer<DeviceMessage>> messageHandlers = new HashMap<>();
@@ -47,7 +46,7 @@ public class DeviceMessageHandler {
// 心跳告警
private static final String HEARTBEAT_ALERT_PREFIX = "heartbeat:alert:";
String HEARTBEAT_RECOVERY_COUNT_PREFIX = "heartbeat:recovery:count:";
private static final long HEARTBEAT_TIMEOUT = 180000; // 3分钟超时
private static final long HEARTBEAT_TIMEOUT = 30000; // 3分钟超时
@Autowired
@@ -507,75 +506,6 @@ public class DeviceMessageHandler {
throw new RuntimeException("交换机data数据为空");
}
}
/**
* 交换机数据入库
* @param message
*/
private void handleSwitchMessage(DeviceMessage message) {
List<InitialSwitchInfo> switchInfos = JsonDataParser.parseJsonData(message.getData(), InitialSwitchInfo.class);
if(!switchInfos.isEmpty()){
// 时间戳转换
long timestamp = switchInfos.get(0).getTimestamp();
long millis = timestamp * 1000;
Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒
String timeStr = DateUtils.parseDateToStr("yyyy-MM-dd HH:mm:ss",createTime);
// 查询临时表信息,计算实际流量值
InitialSwitchInfoTemp temp = new InitialSwitchInfoTemp();
temp.setClientId(message.getClientId());
List<InitialSwitchInfoTemp> tempList = initialSwitchInfoTempService.selectInitialSwitchInfoTempList(temp);
if(!tempList.isEmpty()){
// 1. 构建快速查找的Map
Map<String, InitialSwitchInfoTemp> tempMap = tempList.stream()
.collect(Collectors.toMap(
InitialSwitchInfoTemp::getName,
Function.identity(),
(existing, replacement) -> existing
));
// 2. 预计算除数(避免重复创建对象)
BigDecimal divisor = new BigDecimal(300);
// 3. 计算速度
switchInfos.forEach(switchInfo -> {
switchInfo.setClientId(message.getClientId());
switchInfo.setCreateTime(createTime);
InitialSwitchInfoTemp tempInfo = tempMap.get(switchInfo.getName());
if (tempInfo != null) {
// 计算inSpeed
if (switchInfo.getInBytes() != null && tempInfo.getInBytes() != null) {
BigDecimal inDiff = switchInfo.getInBytes().subtract(tempInfo.getInBytes());
switchInfo.setInSpeed(inDiff.divide(divisor, 2, RoundingMode.HALF_UP));
}
// 计算outSpeed
if (switchInfo.getOutBytes() != null && tempInfo.getOutBytes() != null) {
BigDecimal outDiff = switchInfo.getOutBytes().subtract(tempInfo.getOutBytes());
switchInfo.setOutSpeed(outDiff.divide(divisor, 2, RoundingMode.HALF_UP));
}
}
});
}else{
switchInfos.forEach(switchInfo -> {
switchInfo.setClientId(message.getClientId());
switchInfo.setCreateTime(createTime);
});
}
// 清空临时表对应switch信息
initialSwitchInfoTempService.truncateSwitchInfoTemp(message.getClientId());
// 临时表 用来计算inSpeed outSeppd
initialSwitchInfoTempService.batchInsertInitialSwitchInfoTemp(switchInfos);
// 初始交换机数据入库
initialSwitchInfoService.batchInsertInitialSwitchInfo(switchInfos);
// 业务表入库
InitialSwitchInfoDetailsRemote detailsRemote = new InitialSwitchInfoDetailsRemote();
detailsRemote.setClientId(message.getClientId());
detailsRemote.setStartTime(timeStr);
detailsRemote.setEndTime(timeStr);
remoteRevenueConfigService.autoSaveSwitchTraffic(detailsRemote, SecurityConstants.INNER);
}else{
throw new RuntimeException("交换机data数据为空");
}
}
/**
* 系统其他信息
@@ -609,27 +539,6 @@ public class DeviceMessageHandler {
}
}
}
/**
* 系统数据入库
* @param message
*/
private void handleSystemMessage(DeviceMessage message) {
List<InitialSystemInfo> systemInfos = JsonDataParser.parseJsonData(message.getData(), InitialSystemInfo.class);
if(!systemInfos.isEmpty()){
// 时间戳转换
long timestamp = systemInfos.get(0).getTimestamp();
long millis = timestamp * 1000;
Date createTime = new Date(millis / 1000 * 1000); // 去除毫秒
systemInfos.forEach(iface -> {
iface.setClientId(message.getClientId());
iface.setCreateTime(createTime);
});
// 初始系统数据入库
initialSystemInfoService.batchInsertInitialSystemInfo(systemInfos);
}else{
throw new RuntimeException("系统data数据为空");
}
}
/**
* 监听心跳
* @param message
@@ -662,7 +571,7 @@ public class DeviceMessageHandler {
redisTemplate.delete(HEARTBEAT_ALERT_PREFIX + clientId);
redisTemplate.delete(recoveryCountKey); // 清除恢复计数器
// 修改资源状态
getResourceMsg(clientId, "1");
updateResourceStatus(clientId, "1");
} else {
// 未达到2次只记录恢复次数
log.info("客户端ID: {} 心跳恢复第{}次", clientId, recoveryCount);
@@ -676,7 +585,7 @@ public class DeviceMessageHandler {
}
// 添加一个定时任务方法,定期检查心跳状态
@Scheduled(fixedRate = 60000) // 每分钟检查一次
@Scheduled(fixedRate = 30000) // 每30s检查一次
public void checkHeartbeatStatus() {
long currentTime = System.currentTimeMillis();
// 获取所有客户端时间键
@@ -686,8 +595,14 @@ public class DeviceMessageHandler {
for (String timeKey : timeKeys) {
String clientId = timeKey.substring(HEARTBEAT_TIME_PREFIX.length());
String statusKey = HEARTBEAT_STATUS_PREFIX + clientId;
String alertKey = HEARTBEAT_ALERT_PREFIX + clientId;
try {
// 检查是否已经存在告警
String existingAlert = redisTemplate.opsForValue().get(alertKey);
if ("1".equals(existingAlert)) {
continue; // 如果已有告警,跳过处理
}
String lastTimeStr = redisTemplate.opsForValue().get(timeKey);
if (lastTimeStr == null) continue;
@@ -701,12 +616,21 @@ public class DeviceMessageHandler {
log.warn("客户端ID: {} 心跳丢失,连续次数: {}", clientId, lostCount);
if (lostCount == 3) {
if (lostCount >= 3) {
insertHeartbeatLog(clientId, "3", "连续三次心跳丢失");
redisTemplate.opsForValue().set(HEARTBEAT_ALERT_PREFIX + clientId, "1");
// 设置告警后删除timeKey和statusKey
redisTemplate.delete(timeKey);
redisTemplate.delete(statusKey);
log.info("客户端ID: {} 已设置告警并清理心跳记录", clientId);
// 修改资源状态
getResourceMsg(clientId, "0");
updateResourceStatus(clientId, "0");
}
}else {
// 如果心跳正常,重置丢失次数
redisTemplate.opsForValue().set(statusKey, "0");
log.debug("客户端ID: {} 心跳正常,重置丢失次数", clientId);
}
} catch (Exception e) {
log.error("检查心跳状态异常, clientId: {}", clientId, e);
@@ -714,57 +638,15 @@ public class DeviceMessageHandler {
}
}
/**
* 修改资源在线状态
* @param clientId
* @param status
*/
private void getResourceMsg(String clientId, String status){
String ipAddress = null;
AllInterfaceNameRemote interfaceNameRemote = new AllInterfaceNameRemote();
interfaceNameRemote.setClientId(clientId);
// 1. 先获取交换机IP
interfaceNameRemote.setResourceType("2");
R<AllInterfaceNameRemote> switchResult = remoteRevenueConfigService.getMsgByClientId(
interfaceNameRemote, SecurityConstants.INNER);
if (switchResult != null && switchResult.getData() != null &&
StringUtils.isNotEmpty(switchResult.getData().getSwitchIp())) {
// 更新交换机状态
ipAddress = switchResult.getData().getSwitchIp();
updateResourceStatus(ipAddress, status);
// 2. 再获取服务器IP
interfaceNameRemote.setResourceType("1");
R<AllInterfaceNameRemote> serverResult = remoteRevenueConfigService.getMsgByClientId(
interfaceNameRemote, SecurityConstants.INNER);
if (serverResult != null && serverResult.getData() != null &&
StringUtils.isNotEmpty(serverResult.getData().getServerIp())) {
// 更新服务器状态
updateResourceStatus(serverResult.getData().getServerIp(), status);
}
} else {
// 3. 如果没有交换机IP只获取服务器IP
interfaceNameRemote.setResourceType("1");
R<AllInterfaceNameRemote> serverResult = remoteRevenueConfigService.getMsgByClientId(
interfaceNameRemote, SecurityConstants.INNER);
if (serverResult != null && serverResult.getData() != null &&
StringUtils.isNotEmpty(serverResult.getData().getServerIp())) {
// 更新服务器状态
updateResourceStatus(serverResult.getData().getServerIp(), status);
} else {
log.warn("未找到客户端ID: {} 对应的IP地址", clientId);
}
}
}
// 更新资源状态的公共方法
private void updateResourceStatus(String ipAddress, String status) {
private void updateResourceStatus(String clientId, String status) {
log.info("开启更新资源状态========");
RmResourceRegistrationRemote rmResourceRegistrationRemote = new RmResourceRegistrationRemote();
rmResourceRegistrationRemote.setOnlineStatus(status);
rmResourceRegistrationRemote.setIpAddress(ipAddress);
if("0".equals(status)){
rmResourceRegistrationRemote.setRegistrationStatus(status);
}
rmResourceRegistrationRemote.setHardwareSn(clientId);
remoteRevenueConfigService.updateStatusByResource(rmResourceRegistrationRemote, SecurityConstants.INNER);
}
// 插入心跳日志到数据库
@@ -835,24 +717,4 @@ public class DeviceMessageHandler {
}
}
}
/**
* 注册应答处理
* @param message
*/
// private void handleRegisterMessage(DeviceMessage message) {
// RspVo rspVo = handleResponseMessage(message);
// String clientId = message.getClientId();
// if (rspVo != null && rspVo.getResCode() == 1) {
// RmResourceRegistrationRemote rmResourceRegistrationRemote = new RmResourceRegistrationRemote();
// rmResourceRegistrationRemote.setRegistrationStatus("1");
// rmResourceRegistrationRemote.setHardwareSn(clientId);
// remoteRevenueConfigService.updateStatusByResource(rmResourceRegistrationRemote, SecurityConstants.INNER);
// }else{
// if(rspVo == null){
// log.error("注册失败:应答信息为null");
// }else{
// log.error("注册失败:{}",rspVo.getResMsg());
// }
// }
// }
}

View File

@@ -0,0 +1,64 @@
package com.ruoyi.rocketmq.mapper;
import com.ruoyi.rocketmq.domain.RmAgentManagement;
import java.util.List;
/**
* Agent管理Mapper接口
*
* @author gyt
* @date 2025-09-15
*/
public interface RmAgentManagementMapper
{
/**
* 查询Agent管理
*
* @param id Agent管理主键
* @return Agent管理
*/
public RmAgentManagement selectRmAgentManagementById(Long id);
/**
* 查询Agent管理列表
*
* @param rmAgentManagement Agent管理
* @return Agent管理集合
*/
public List<RmAgentManagement> selectRmAgentManagementList(RmAgentManagement rmAgentManagement);
/**
* 新增Agent管理
*
* @param rmAgentManagement Agent管理
* @return 结果
*/
public int insertRmAgentManagement(RmAgentManagement rmAgentManagement);
/**
* 修改Agent管理
*
* @param rmAgentManagement Agent管理
* @return 结果
*/
public int updateRmAgentManagement(RmAgentManagement rmAgentManagement);
/**
* 删除Agent管理
*
* @param id Agent管理主键
* @return 结果
*/
public int deleteRmAgentManagementById(Long id);
/**
* 批量删除Agent管理
*
* @param ids 需要删除的数据主键集合
* @return 结果
*/
public int deleteRmAgentManagementByIds(Long[] ids);
void updateRmAgentManagementBySn(RmAgentManagement rmAgentManagement);
}

View File

@@ -64,4 +64,5 @@ public interface RmTemplateLinuxMapper
void batchInsertRmTemplateLinux(@Param("list") List<RmTemplateLinux> linuxItems);
int deleteByTemplateId(Long templateId);
int updateByTemplateId(Long templateId);
}

View File

@@ -68,4 +68,6 @@ public interface RmTemplateSwitchMapper
void batchInsertRmTemplateSwitch(@Param("list") List<RmTemplateSwitch> switchItems);
int deleteByTemplateId(Long templateId);
int updateByTemplateId(Long templateId);
}

View File

@@ -0,0 +1,69 @@
package com.ruoyi.rocketmq.service;
import com.ruoyi.rocketmq.domain.RmAgentManagement;
import java.util.List;
/**
* Agent管理Service接口
*
* @author gyt
* @date 2025-09-15
*/
public interface IRmAgentManagementService
{
/**
* 查询Agent管理
*
* @param id Agent管理主键
* @return Agent管理
*/
public RmAgentManagement selectRmAgentManagementById(Long id);
/**
* 查询Agent管理列表
*
* @param rmAgentManagement Agent管理
* @return Agent管理集合
*/
public List<RmAgentManagement> selectRmAgentManagementList(RmAgentManagement rmAgentManagement);
/**
* 新增Agent管理
*
* @param rmAgentManagement Agent管理
* @return 结果
*/
public int insertRmAgentManagement(RmAgentManagement rmAgentManagement);
/**
* 修改Agent管理
*
* @param rmAgentManagement Agent管理
* @return 结果
*/
public int updateRmAgentManagement(RmAgentManagement rmAgentManagement);
/**
* 批量删除Agent管理
*
* @param ids 需要删除的Agent管理主键集合
* @return 结果
*/
public int deleteRmAgentManagementByIds(Long[] ids);
/**
* 删除Agent管理信息
*
* @param id Agent管理主键
* @return 结果
*/
public int deleteRmAgentManagementById(Long id);
/**
* 配置更新策略
* @param rmAgentManagement
* @return
*/
int addRmAgentManagement(RmAgentManagement rmAgentManagement);
}

View File

@@ -0,0 +1,177 @@
package com.ruoyi.rocketmq.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.ruoyi.common.core.constant.SecurityConstants;
import com.ruoyi.common.core.enums.MsgEnum;
import com.ruoyi.common.core.utils.DateUtils;
import com.ruoyi.rocketmq.domain.DeviceMessage;
import com.ruoyi.rocketmq.domain.RmAgentManagement;
import com.ruoyi.rocketmq.domain.vo.AgentUpdateVo;
import com.ruoyi.rocketmq.mapper.RmAgentManagementMapper;
import com.ruoyi.rocketmq.model.ProducerMode;
import com.ruoyi.rocketmq.producer.MessageProducer;
import com.ruoyi.rocketmq.service.IRmAgentManagementService;
import com.ruoyi.system.api.RemoteRevenueConfigService;
import com.ruoyi.system.api.domain.RmResourceRegistrationRemote;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
/**
* Agent管理Service业务层处理
*
* @author gyt
* @date 2025-09-15
*/
@Service
@Slf4j
public class RmAgentManagementServiceImpl implements IRmAgentManagementService
{
@Autowired
private RmAgentManagementMapper rmAgentManagementMapper;
@Value("${fileDictory.filePath}")
private String filePath;
// private static String COMMAND = "/usr/local/tongran/deploy.sh restart";
private static String COMMAND = "/data/saas-income/houduan/tr-client-test/deploy.sh restart";
private static String COMMAND2 = "cat /data/saas-income/houduan/tr-client-test/deploy.sh";
@Autowired
private ProducerMode producerMode;
@Autowired
private RemoteRevenueConfigService remoteRevenueConfigService;
/**
* 查询Agent管理
*
* @param id Agent管理主键
* @return Agent管理
*/
@Override
public RmAgentManagement selectRmAgentManagementById(Long id)
{
return rmAgentManagementMapper.selectRmAgentManagementById(id);
}
/**
* 查询Agent管理列表
*
* @param rmAgentManagement Agent管理
* @return Agent管理
*/
@Override
public List<RmAgentManagement> selectRmAgentManagementList(RmAgentManagement rmAgentManagement)
{
return rmAgentManagementMapper.selectRmAgentManagementList(rmAgentManagement);
}
/**
* 新增Agent管理
*
* @param rmAgentManagement Agent管理
* @return 结果
*/
@Override
public int insertRmAgentManagement(RmAgentManagement rmAgentManagement)
{
rmAgentManagement.setCreateTime(DateUtils.getNowDate());
return rmAgentManagementMapper.insertRmAgentManagement(rmAgentManagement);
}
/**
* 修改Agent管理
*
* @param rmAgentManagement Agent管理
* @return 结果
*/
@Override
public int updateRmAgentManagement(RmAgentManagement rmAgentManagement)
{
rmAgentManagement.setUpdateTime(DateUtils.getNowDate());
return rmAgentManagementMapper.updateRmAgentManagement(rmAgentManagement);
}
/**
* 批量删除Agent管理
*
* @param ids 需要删除的Agent管理主键
* @return 结果
*/
@Override
public int deleteRmAgentManagementByIds(Long[] ids)
{
return rmAgentManagementMapper.deleteRmAgentManagementByIds(ids);
}
/**
* 删除Agent管理信息
*
* @param id Agent管理主键
* @return 结果
*/
@Override
public int deleteRmAgentManagementById(Long id)
{
return rmAgentManagementMapper.deleteRmAgentManagementById(id);
}
/**
* 配置更新策略
* @param rmAgentManagement
* @return
*/
@Override
public int addRmAgentManagement(RmAgentManagement rmAgentManagement) {
// 设备
String idStr = rmAgentManagement.getIncludeIds();
String[] ids = idStr.split(",");
List<RmResourceRegistrationRemote> registrationRemotes = remoteRevenueConfigService.getRegistrationByIds(ids, SecurityConstants.INNER).getData();
for (RmResourceRegistrationRemote resourceMsg : registrationRemotes) {
rmAgentManagement.setHardwareSn(resourceMsg.getHardwareSn());
rmAgentManagement.setResourceName(resourceMsg.getResourceName());
rmAgentManagement.setInternalIp(resourceMsg.getIpAddress());
// 查询该资源是否已经配置
RmAgentManagement agentQueryParam = new RmAgentManagement();
agentQueryParam.setHardwareSn(resourceMsg.getHardwareSn());
List<RmAgentManagement> agentManagements = rmAgentManagementMapper.selectRmAgentManagementList(agentQueryParam);
if(!agentManagements.isEmpty()){
// 如果存在,修改
rmAgentManagementMapper.updateRmAgentManagementBySn(rmAgentManagement);
}else{
// 如果不存在,添加
rmAgentManagementMapper.insertRmAgentManagement(rmAgentManagement);
}
// 构建更新策略
AgentUpdateVo agentUpdateVo = new AgentUpdateVo();
agentUpdateVo.setFileUrl(rmAgentManagement.getFileUrl());
agentUpdateVo.setFilePath("/data/saas-income/houduan/tr-client-test/temp");
List<String> commandList = new ArrayList<>();
commandList.add(COMMAND);
commandList.add(COMMAND2);
agentUpdateVo.setCommands(JSONObject.toJSONString(commandList));
agentUpdateVo.setMethod(rmAgentManagement.getMethod());
if(rmAgentManagement.getMethod() == 1){
agentUpdateVo.setPolicyTime(rmAgentManagement.getScheduledUpdateTime());
}
try {
DeviceMessage deviceMessage = new DeviceMessage();
deviceMessage.setClientId(resourceMsg.getHardwareSn());
deviceMessage.setDataType(MsgEnum.Agent版本更新.getValue());
deviceMessage.setData(JSONObject.toJSONString(agentUpdateVo));
MessageProducer messageProducer = new MessageProducer();
messageProducer.sendAsyncProducerMessage(
producerMode.getAgentTopic(),
"",
"",
JSONObject.toJSONString(deviceMessage)
);
} catch (Exception e) {
log.error("发送设备配置失败deviceId: {}", resourceMsg.getHardwareSn(), e);
}
}
return 1;
}
}

View File

@@ -119,9 +119,13 @@ public class RmMonitorPolicyServiceImpl implements IRmMonitorPolicyService
queryParam.setResourceGroupId(resourceGroupId);
List<RmMonitorPolicy> exits = rmMonitorPolicyMapper.selectRmMonitorPolicyList(queryParam);
if(!exits.isEmpty()){
return -1;
if(exits.get(0).getId() != rmMonitorPolicy.getId()){
return -1;
}
}
rmMonitorPolicy.setUpdateTime(DateUtils.getNowDate());
// 策略状态改为待下发
rmMonitorPolicy.setStatus("0");
rmMonitorPolicyMapper.updateRmMonitorPolicy(rmMonitorPolicy);
// 模板绑定资源组信息
RmMonitorTemplate rmMonitorTemplate = new RmMonitorTemplate();
@@ -132,6 +136,8 @@ public class RmMonitorPolicyServiceImpl implements IRmMonitorPolicyService
List<RmMonitorPolicyVo> collectionAndIdList = rmMonitorPolicy.getCollectionAndIdList();
if(!collectionAndIdList.isEmpty()){
if("linux".equals(rmMonitorPolicy.getResourceType())){
// 修改之前先置空所有采集周期
rmTemplateLinuxMapper.updateByTemplateId(rmMonitorPolicy.getTemplateId());
for (RmMonitorPolicyVo rmMonitorPolicyVo : collectionAndIdList) {
// 添加采集周期
RmTemplateLinux rmTemplateLinux = new RmTemplateLinux();
@@ -140,6 +146,8 @@ public class RmMonitorPolicyServiceImpl implements IRmMonitorPolicyService
rmTemplateLinuxMapper.updateRmTemplateLinux(rmTemplateLinux);
}
}else if("switch".equals(rmMonitorPolicy.getResourceType())){
// 修改之前先置空所有采集周期
rmTemplateSwitchMapper.updateByTemplateId(rmMonitorPolicy.getTemplateId());
for (RmMonitorPolicyVo rmMonitorPolicyVo : collectionAndIdList) {
// 添加采集周期
RmTemplateSwitch rmTemplateSwitch = new RmTemplateSwitch();
@@ -163,6 +171,14 @@ public class RmMonitorPolicyServiceImpl implements IRmMonitorPolicyService
@Override
public int deleteRmMonitorPolicyByIds(Long[] ids)
{
for (Long id : ids) {
// 根据id查询策略详情
RmMonitorPolicy rmMonitorPolicy = rmMonitorPolicyMapper.selectRmMonitorPolicyById(id);
Long templateId = rmMonitorPolicy.getTemplateId();
// 删除策略的时候,把该模板的采集周期置空
rmTemplateLinuxMapper.updateByTemplateId(templateId);
rmTemplateSwitchMapper.updateByTemplateId(templateId);
}
return rmMonitorPolicyMapper.deleteRmMonitorPolicyByIds(ids);
}

View File

@@ -1,10 +1,11 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.ruoyi.rocketmq.mapper.InitialHeartbeatListenLogMapper">
<resultMap type="InitialHeartbeatListenLog" id="InitialHeartbeatListenLogResult">
<result property="id" column="id" />
<result property="clientId" column="client_id" />
<result property="status" column="status" />
<result property="remark" column="remark" />
@@ -15,22 +16,23 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
</resultMap>
<sql id="selectInitialHeartbeatListenLogVo">
select client_id, status, remark, create_time, update_time, create_by, update_by from initial_heartbeat_listen_log
select id, client_id, status, remark, create_time, update_time, create_by, update_by from initial_heartbeat_listen_log
</sql>
<select id="selectInitialHeartbeatListenLogList" parameterType="InitialHeartbeatListenLog" resultMap="InitialHeartbeatListenLogResult">
<include refid="selectInitialHeartbeatListenLogVo"/>
<where>
<where>
<if test="clientId != null and clientId != ''"> and client_id = #{clientId}</if>
<if test="status != null and status != ''"> and status = #{status}</if>
</where>
</select>
<select id="selectInitialHeartbeatListenLogByClientId" parameterType="String" resultMap="InitialHeartbeatListenLogResult">
<select id="selectInitialHeartbeatListenLogById" parameterType="Long" resultMap="InitialHeartbeatListenLogResult">
<include refid="selectInitialHeartbeatListenLogVo"/>
where client_id = #{clientId}
where id = #{id}
</select>
<insert id="insertInitialHeartbeatListenLog" parameterType="InitialHeartbeatListenLog">
<insert id="insertInitialHeartbeatListenLog" parameterType="InitialHeartbeatListenLog" useGeneratedKeys="true" keyProperty="id">
insert into initial_heartbeat_listen_log
<trim prefix="(" suffix=")" suffixOverrides=",">
<if test="clientId != null">client_id,</if>
@@ -40,7 +42,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
<if test="updateTime != null">update_time,</if>
<if test="createBy != null">create_by,</if>
<if test="updateBy != null">update_by,</if>
</trim>
</trim>
<trim prefix="values (" suffix=")" suffixOverrides=",">
<if test="clientId != null">#{clientId},</if>
<if test="status != null">#{status},</if>
@@ -49,12 +51,13 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
<if test="updateTime != null">#{updateTime},</if>
<if test="createBy != null">#{createBy},</if>
<if test="updateBy != null">#{updateBy},</if>
</trim>
</trim>
</insert>
<update id="updateInitialHeartbeatListenLog" parameterType="InitialHeartbeatListenLog">
update initial_heartbeat_listen_log
<trim prefix="SET" suffixOverrides=",">
<if test="clientId != null">client_id = #{clientId},</if>
<if test="status != null">status = #{status},</if>
<if test="remark != null">remark = #{remark},</if>
<if test="createTime != null">create_time = #{createTime},</if>
@@ -62,17 +65,17 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
<if test="createBy != null">create_by = #{createBy},</if>
<if test="updateBy != null">update_by = #{updateBy},</if>
</trim>
where client_id = #{clientId}
where id = #{id}
</update>
<delete id="deleteInitialHeartbeatListenLogByClientId" parameterType="String">
delete from initial_heartbeat_listen_log where client_id = #{clientId}
<delete id="deleteInitialHeartbeatListenLogById" parameterType="Long">
delete from initial_heartbeat_listen_log where id = #{id}
</delete>
<delete id="deleteInitialHeartbeatListenLogByClientIds" parameterType="String">
delete from initial_heartbeat_listen_log where client_id in
<foreach item="clientId" collection="array" open="(" separator="," close=")">
#{clientId}
<delete id="deleteInitialHeartbeatListenLogByIds" parameterType="String">
delete from initial_heartbeat_listen_log where id in
<foreach item="id" collection="array" open="(" separator="," close=")">
#{id}
</foreach>
</delete>
</mapper>

View File

@@ -0,0 +1,148 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.ruoyi.rocketmq.mapper.RmAgentManagementMapper">
<resultMap type="RmAgentManagement" id="RmAgentManagementResult">
<result property="id" column="id" />
<result property="hardwareSn" column="hardware_sn" />
<result property="resourceName" column="resource_name" />
<result property="internalIp" column="internal_ip" />
<result property="status" column="status" />
<result property="agentVersion" column="agent_version" />
<result property="method" column="method" />
<result property="scheduledUpdateTime" column="scheduled_update_time" />
<result property="fileUrlType" column="file_url_type" />
<result property="fileUrl" column="file_url" />
<result property="fileDirectory" column="file_directory" />
<result property="lastUpdateResult" column="last_update_result" />
<result property="lastUpdateTime" column="last_update_time" />
<result property="createTime" column="create_time" />
<result property="updateTime" column="update_time" />
<result property="createBy" column="create_by" />
<result property="updateBy" column="update_by" />
</resultMap>
<sql id="selectRmAgentManagementVo">
select id, hardware_sn, resource_name, internal_ip, status, agent_version, method, scheduled_update_time, file_url_type, file_url, file_directory, last_update_result, last_update_time, create_time, update_time, create_by, update_by from rm_agent_management
</sql>
<select id="selectRmAgentManagementList" parameterType="RmAgentManagement" resultMap="RmAgentManagementResult">
<include refid="selectRmAgentManagementVo"/>
<where>
<if test="hardwareSn != null and hardwareSn != ''"> and hardware_sn = #{hardwareSn}</if>
<if test="resourceName != null and resourceName != ''"> and resource_name like concat('%', #{resourceName}, '%')</if>
<if test="internalIp != null and internalIp != ''"> and internal_ip = #{internalIp}</if>
<if test="status != null and status != ''"> and status = #{status}</if>
<if test="agentVersion != null and agentVersion != ''"> and agent_version = #{agentVersion}</if>
<if test="method != null "> and method = #{method}</if>
<if test="scheduledUpdateTime != null and scheduledUpdateTime != ''"> and scheduled_update_time = #{scheduledUpdateTime}</if>
<if test="fileUrlType != null "> and file_url_type = #{fileUrlType}</if>
<if test="fileUrl != null and fileUrl != ''"> and file_url = #{fileUrl}</if>
<if test="fileDirectory != null and fileDirectory != ''"> and file_directory = #{fileDirectory}</if>
<if test="lastUpdateResult != null and lastUpdateResult != ''"> and last_update_result = #{lastUpdateResult}</if>
<if test="lastUpdateTime != null "> and last_update_time = #{lastUpdateTime}</if>
</where>
</select>
<select id="selectRmAgentManagementById" parameterType="Long" resultMap="RmAgentManagementResult">
<include refid="selectRmAgentManagementVo"/>
where id = #{id}
</select>
<insert id="insertRmAgentManagement" parameterType="RmAgentManagement" useGeneratedKeys="true" keyProperty="id">
insert into rm_agent_management
<trim prefix="(" suffix=")" suffixOverrides=",">
<if test="hardwareSn != null and hardwareSn != ''">hardware_sn,</if>
<if test="resourceName != null and resourceName != ''">resource_name,</if>
<if test="internalIp != null">internal_ip,</if>
<if test="status != null and status != ''">status,</if>
<if test="agentVersion != null">agent_version,</if>
<if test="method != null">method,</if>
<if test="scheduledUpdateTime != null">scheduled_update_time,</if>
<if test="fileUrlType != null">file_url_type,</if>
<if test="fileUrl != null">file_url,</if>
<if test="fileDirectory != null">file_directory,</if>
<if test="lastUpdateResult != null">last_update_result,</if>
<if test="lastUpdateTime != null">last_update_time,</if>
<if test="createTime != null">create_time,</if>
<if test="updateTime != null">update_time,</if>
<if test="createBy != null">create_by,</if>
<if test="updateBy != null">update_by,</if>
</trim>
<trim prefix="values (" suffix=")" suffixOverrides=",">
<if test="hardwareSn != null and hardwareSn != ''">#{hardwareSn},</if>
<if test="resourceName != null and resourceName != ''">#{resourceName},</if>
<if test="internalIp != null">#{internalIp},</if>
<if test="status != null and status != ''">#{status},</if>
<if test="agentVersion != null">#{agentVersion},</if>
<if test="method != null">#{method},</if>
<if test="scheduledUpdateTime != null">#{scheduledUpdateTime},</if>
<if test="fileUrlType != null">#{fileUrlType},</if>
<if test="fileUrl != null">#{fileUrl},</if>
<if test="fileDirectory != null">#{fileDirectory},</if>
<if test="lastUpdateResult != null">#{lastUpdateResult},</if>
<if test="lastUpdateTime != null">#{lastUpdateTime},</if>
<if test="createTime != null">#{createTime},</if>
<if test="updateTime != null">#{updateTime},</if>
<if test="createBy != null">#{createBy},</if>
<if test="updateBy != null">#{updateBy},</if>
</trim>
</insert>
<update id="updateRmAgentManagement" parameterType="RmAgentManagement">
update rm_agent_management
<trim prefix="SET" suffixOverrides=",">
<if test="hardwareSn != null and hardwareSn != ''">hardware_sn = #{hardwareSn},</if>
<if test="resourceName != null and resourceName != ''">resource_name = #{resourceName},</if>
<if test="internalIp != null">internal_ip = #{internalIp},</if>
<if test="status != null and status != ''">status = #{status},</if>
<if test="agentVersion != null">agent_version = #{agentVersion},</if>
<if test="method != null">method = #{method},</if>
<if test="scheduledUpdateTime != null">scheduled_update_time = #{scheduledUpdateTime},</if>
<if test="fileUrlType != null">file_url_type = #{fileUrlType},</if>
<if test="fileUrl != null">file_url = #{fileUrl},</if>
<if test="fileDirectory != null">file_directory = #{fileDirectory},</if>
<if test="lastUpdateResult != null">last_update_result = #{lastUpdateResult},</if>
<if test="lastUpdateTime != null">last_update_time = #{lastUpdateTime},</if>
<if test="createTime != null">create_time = #{createTime},</if>
<if test="updateTime != null">update_time = #{updateTime},</if>
<if test="createBy != null">create_by = #{createBy},</if>
<if test="updateBy != null">update_by = #{updateBy},</if>
</trim>
where id = #{id}
</update>
<update id="updateRmAgentManagementBySn" parameterType="RmAgentManagement">
update rm_agent_management
<trim prefix="SET" suffixOverrides=",">
<if test="resourceName != null and resourceName != ''">resource_name = #{resourceName},</if>
<if test="internalIp != null">internal_ip = #{internalIp},</if>
<if test="status != null and status != ''">status = #{status},</if>
<if test="agentVersion != null">agent_version = #{agentVersion},</if>
<if test="method != null">method = #{method},</if>
<if test="scheduledUpdateTime != null">scheduled_update_time = #{scheduledUpdateTime},</if>
<if test="fileUrlType != null">file_url_type = #{fileUrlType},</if>
<if test="fileUrl != null">file_url = #{fileUrl},</if>
<if test="fileDirectory != null">file_directory = #{fileDirectory},</if>
<if test="lastUpdateResult != null">last_update_result = #{lastUpdateResult},</if>
<if test="lastUpdateTime != null">last_update_time = #{lastUpdateTime},</if>
<if test="createTime != null">create_time = #{createTime},</if>
<if test="updateTime != null">update_time = #{updateTime},</if>
<if test="createBy != null">create_by = #{createBy},</if>
<if test="updateBy != null">update_by = #{updateBy},</if>
</trim>
where hardware_sn = #{hardwareSn}
</update>
<delete id="deleteRmAgentManagementById" parameterType="Long">
delete from rm_agent_management where id = #{id}
</delete>
<delete id="deleteRmAgentManagementByIds" parameterType="String">
delete from rm_agent_management where id in
<foreach item="id" collection="array" open="(" separator="," close=")">
#{id}
</foreach>
</delete>
</mapper>

View File

@@ -24,13 +24,19 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
<select id="selectRmMonitorPolicyList" parameterType="RmMonitorPolicy" resultMap="RmMonitorPolicyResult">
<include refid="selectRmMonitorPolicyVo"/>
<where>
<where>
<if test="templateId != null "> and template_id = #{templateId}</if>
<if test="resourceGroupId != null "> and resource_group_id = #{resourceGroupId}</if>
<if test="policyName != null and policyName != ''"> and policy_name like concat('%', #{policyName}, '%')</if>
<if test="description != null and description != ''"> and description = #{description}</if>
<if test="status != null and status != ''"> and status = #{status}</if>
<if test="deployTime != null "> and deploy_time = #{deployTime}</if>
<if test="queryName != null and queryName != ''">
and (policy_name like concat('%', #{queryName}, '%')
or template_id in (select id from rm_monitor_template where template_name like concat('%', #{queryName}, '%'))
or resource_group_id in (select id from rm_resource_group where group_name like concat('%', #{queryName}, '%'))
)
</if>
</where>
</select>

View File

@@ -129,4 +129,9 @@
<delete id="deleteByTemplateId" parameterType="Long">
delete from rm_template_linux where template_id = #{templateId}
</delete>
<update id="updateByTemplateId" parameterType="RmTemplateLinux">
update rm_template_linux set collection_cycle = null
where template_id = #{templateId}
</update>
</mapper>

View File

@@ -161,4 +161,8 @@
<delete id="deleteByTemplateId" parameterType="Long">
delete from rm_template_switch where template_id = #{templateId}
</delete>
<update id="updateByTemplateId" parameterType="RmTemplateLinux">
update rm_template_switch set collection_cycle = null
where template_id = #{templateId}
</update>
</mapper>