增加交换机心跳、优化用户自定义列表功能
This commit is contained in:
@@ -97,6 +97,14 @@ public interface RemoteRevenueConfigService
|
|||||||
*/
|
*/
|
||||||
@PostMapping("/switchManagement/getSwitchNameByClientId")
|
@PostMapping("/switchManagement/getSwitchNameByClientId")
|
||||||
public R<List<RmSwitchManagementRemote>> getSwitchNameByClientId(@RequestBody RmSwitchManagementRemote rmSwitchManagementRemote, @RequestHeader(SecurityConstants.FROM_SOURCE) String source);
|
public R<List<RmSwitchManagementRemote>> getSwitchNameByClientId(@RequestBody RmSwitchManagementRemote rmSwitchManagementRemote, @RequestHeader(SecurityConstants.FROM_SOURCE) String source);
|
||||||
|
/**
|
||||||
|
* 根据clientId修改交换机在线状态
|
||||||
|
* @param rmSwitchManagementRemote
|
||||||
|
* @param source
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
@PostMapping("/switchManagement/updateSwitchMsgByClientId")
|
||||||
|
public R<Integer> updateSwitchMsgByClientId(@RequestBody RmSwitchManagementRemote rmSwitchManagementRemote, @RequestHeader(SecurityConstants.FROM_SOURCE) String source);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 自动注册
|
* 自动注册
|
||||||
|
|||||||
@@ -72,6 +72,11 @@ public class RemoteRevenueConfigFallbackFactory implements FallbackFactory<Remot
|
|||||||
return R.fail("根据clientId获取交换机信息失败:" + throwable.getMessage());
|
return R.fail("根据clientId获取交换机信息失败:" + throwable.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public R<Integer> updateSwitchMsgByClientId(RmSwitchManagementRemote rmSwitchManagementRemote, String source) {
|
||||||
|
return R.fail("根据clientId修改交换机信息失败:" + throwable.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public R<Integer> innerAddRegist(RmRegisterMsgRemote rmRegisterMsgRemote, String source) {
|
public R<Integer> innerAddRegist(RmRegisterMsgRemote rmRegisterMsgRemote, String source) {
|
||||||
return R.fail("自动注册失败:" + throwable.getMessage());
|
return R.fail("自动注册失败:" + throwable.getMessage());
|
||||||
|
|||||||
@@ -165,8 +165,11 @@ public class RmResourceRegistrationController extends BaseController
|
|||||||
public R<RmResourceRegistration> getListByHardwareSn(@RequestBody RmResourceRegistration rmResourceRegistration)
|
public R<RmResourceRegistration> getListByHardwareSn(@RequestBody RmResourceRegistration rmResourceRegistration)
|
||||||
{
|
{
|
||||||
List<RmResourceRegistration> list = rmResourceRegistrationService.selectRmResourceRegistrationList(rmResourceRegistration);
|
List<RmResourceRegistration> list = rmResourceRegistrationService.selectRmResourceRegistrationList(rmResourceRegistration);
|
||||||
|
if(list != null && !list.isEmpty()){
|
||||||
return R.ok(list.get(0));
|
return R.ok(list.get(0));
|
||||||
}
|
}
|
||||||
|
return R.ok(new RmResourceRegistration());
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* 查询资源注册列表
|
* 查询资源注册列表
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -132,4 +132,13 @@ public class RmSwitchManagementController extends BaseController
|
|||||||
List<RmSwitchManagement> list = rmSwitchManagementService.getAllSwitchNameTree();
|
List<RmSwitchManagement> list = rmSwitchManagementService.getAllSwitchNameTree();
|
||||||
return success(list);
|
return success(list);
|
||||||
}
|
}
|
||||||
|
/**
|
||||||
|
* 修改交换机管理
|
||||||
|
*/
|
||||||
|
@PostMapping("/updateSwitchMsgByClientId")
|
||||||
|
@InnerAuth
|
||||||
|
public R<Integer> updateSwitchMsgByClientId(@RequestBody RmSwitchManagement rmSwitchManagement)
|
||||||
|
{
|
||||||
|
return R.ok(rmSwitchManagementService.updateRmSwitchManagement(rmSwitchManagement));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -487,7 +487,7 @@ public class InitialSwitchInfoDetailsServiceImpl implements IInitialSwitchInfoDe
|
|||||||
if(!serverSnList.isEmpty()){
|
if(!serverSnList.isEmpty()){
|
||||||
for (RmEpsTopologyManagement rmEpsTopologyManagement : serverSnList) {
|
for (RmEpsTopologyManagement rmEpsTopologyManagement : serverSnList) {
|
||||||
queryParam.setName(rmEpsTopologyManagement.getInterfaceName());
|
queryParam.setName(rmEpsTopologyManagement.getInterfaceName());
|
||||||
if("1".equals(rmEpsTopologyManagement.getConnectedDeviceType())){
|
if("1".equals(rmEpsTopologyManagement.getConnectedDeviceType()) && rmEpsTopologyManagement.getServerClientId() != null){
|
||||||
queryParam.setServerClientId(rmEpsTopologyManagement.getServerClientId());
|
queryParam.setServerClientId(rmEpsTopologyManagement.getServerClientId());
|
||||||
// 根据业务变更情况选择计算方式
|
// 根据业务变更情况选择计算方式
|
||||||
calculateChangedSwitchBandwidth(queryParam, dailyStartTime, dailyEndTime, calculationMode);
|
calculateChangedSwitchBandwidth(queryParam, dailyStartTime, dailyEndTime, calculationMode);
|
||||||
|
|||||||
@@ -143,7 +143,19 @@
|
|||||||
<if test="createBy != null">create_by = #{createBy},</if>
|
<if test="createBy != null">create_by = #{createBy},</if>
|
||||||
<if test="updateBy != null">update_by = #{updateBy},</if>
|
<if test="updateBy != null">update_by = #{updateBy},</if>
|
||||||
</trim>
|
</trim>
|
||||||
where id = #{id}
|
<where>
|
||||||
|
<choose>
|
||||||
|
<when test="id != null">
|
||||||
|
and id = #{id}
|
||||||
|
</when>
|
||||||
|
<when test="clientId != null and clientId != ''">
|
||||||
|
and client_id = #{clientId}
|
||||||
|
</when>
|
||||||
|
<otherwise>
|
||||||
|
and 1=0
|
||||||
|
</otherwise>
|
||||||
|
</choose>
|
||||||
|
</where>
|
||||||
</update>
|
</update>
|
||||||
|
|
||||||
<delete id="deleteRmSwitchManagementById" parameterType="Long">
|
<delete id="deleteRmSwitchManagementById" parameterType="Long">
|
||||||
|
|||||||
@@ -61,7 +61,6 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
|
|||||||
<update id="updateUserTableColumnConfig" parameterType="UserTableColumnConfig">
|
<update id="updateUserTableColumnConfig" parameterType="UserTableColumnConfig">
|
||||||
update user_table_column_config
|
update user_table_column_config
|
||||||
<trim prefix="SET" suffixOverrides=",">
|
<trim prefix="SET" suffixOverrides=",">
|
||||||
<if test="pageRoute != null and pageRoute != ''">page_route = #{pageRoute},</if>
|
|
||||||
<if test="columnConfig != null and columnConfig != ''">column_config = #{columnConfig},</if>
|
<if test="columnConfig != null and columnConfig != ''">column_config = #{columnConfig},</if>
|
||||||
<if test="createBy != null">create_by = #{createBy},</if>
|
<if test="createBy != null">create_by = #{createBy},</if>
|
||||||
<if test="createTime != null">create_time = #{createTime},</if>
|
<if test="createTime != null">create_time = #{createTime},</if>
|
||||||
|
|||||||
@@ -14,10 +14,7 @@ import com.ruoyi.rocketmq.service.*;
|
|||||||
import com.ruoyi.rocketmq.utils.DataProcessUtil;
|
import com.ruoyi.rocketmq.utils.DataProcessUtil;
|
||||||
import com.ruoyi.rocketmq.utils.JsonDataParser;
|
import com.ruoyi.rocketmq.utils.JsonDataParser;
|
||||||
import com.ruoyi.system.api.RemoteRevenueConfigService;
|
import com.ruoyi.system.api.RemoteRevenueConfigService;
|
||||||
import com.ruoyi.system.api.domain.EpsInitialTrafficDataRemote;
|
import com.ruoyi.system.api.domain.*;
|
||||||
import com.ruoyi.system.api.domain.NetworkInfo;
|
|
||||||
import com.ruoyi.system.api.domain.RmRegisterMsgRemote;
|
|
||||||
import com.ruoyi.system.api.domain.RmResourceRegistrationRemote;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.BeanUtils;
|
import org.springframework.beans.BeanUtils;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
@@ -961,12 +958,28 @@ public class MessageHandler {
|
|||||||
// 更新资源状态的公共方法
|
// 更新资源状态的公共方法
|
||||||
private void updateResourceStatus(String clientId, String status) {
|
private void updateResourceStatus(String clientId, String status) {
|
||||||
log.info("开启更新资源状态========");
|
log.info("开启更新资源状态========");
|
||||||
|
RmResourceRegistrationRemote query = new RmResourceRegistrationRemote();
|
||||||
|
query.setClientId(clientId);
|
||||||
|
R<RmResourceRegistrationRemote> registerMsgR = remoteRevenueConfigService.getListByHardwareSn(query, SecurityConstants.INNER);
|
||||||
|
if(registerMsgR != null && registerMsgR.getData() != null){
|
||||||
RmResourceRegistrationRemote rmResourceRegistrationRemote = new RmResourceRegistrationRemote();
|
RmResourceRegistrationRemote rmResourceRegistrationRemote = new RmResourceRegistrationRemote();
|
||||||
rmResourceRegistrationRemote.setOnlineStatus(status);
|
rmResourceRegistrationRemote.setOnlineStatus(status);
|
||||||
rmResourceRegistrationRemote.setRegistrationStatus(status);
|
rmResourceRegistrationRemote.setRegistrationStatus(status);
|
||||||
rmResourceRegistrationRemote.setClientId(clientId);
|
rmResourceRegistrationRemote.setClientId(clientId);
|
||||||
remoteRevenueConfigService.updateStatusByResource(rmResourceRegistrationRemote, SecurityConstants.INNER);
|
remoteRevenueConfigService.updateStatusByResource(rmResourceRegistrationRemote, SecurityConstants.INNER);
|
||||||
}
|
}
|
||||||
|
RmSwitchManagementRemote rmSwitchManagementRemote = new RmSwitchManagementRemote();
|
||||||
|
rmSwitchManagementRemote.setClientId(clientId);
|
||||||
|
R<List<RmSwitchManagementRemote>> rmSwitchManagementRemoteListR = remoteRevenueConfigService.getSwitchNameByClientId(rmSwitchManagementRemote, SecurityConstants.INNER);
|
||||||
|
if(rmSwitchManagementRemoteListR != null &&
|
||||||
|
rmSwitchManagementRemoteListR.getData()!=null &&
|
||||||
|
!rmSwitchManagementRemoteListR.getData().isEmpty()){
|
||||||
|
RmSwitchManagementRemote switchUpdate = new RmSwitchManagementRemote();
|
||||||
|
switchUpdate.setClientId(clientId);
|
||||||
|
switchUpdate.setOnlineStatus(status);
|
||||||
|
remoteRevenueConfigService.updateSwitchMsgByClientId(switchUpdate, SecurityConstants.INNER);
|
||||||
|
}
|
||||||
|
}
|
||||||
// 插入心跳日志到数据库
|
// 插入心跳日志到数据库
|
||||||
private void insertHeartbeatLog(String machineId, String status, String remark) {
|
private void insertHeartbeatLog(String machineId, String status, String remark) {
|
||||||
try {
|
try {
|
||||||
|
|||||||
@@ -1,7 +1,12 @@
|
|||||||
package com.ruoyi.rocketmq.snmp.scheduler;
|
package com.ruoyi.rocketmq.snmp.scheduler;
|
||||||
|
|
||||||
|
import com.alibaba.fastjson.JSONObject;
|
||||||
|
import com.ruoyi.common.core.enums.MsgEnum;
|
||||||
|
import com.ruoyi.rocketmq.domain.DeviceMessage;
|
||||||
|
import com.ruoyi.rocketmq.domain.InitialHeartbeatListen;
|
||||||
import com.ruoyi.rocketmq.domain.vo.CollectVo;
|
import com.ruoyi.rocketmq.domain.vo.CollectVo;
|
||||||
import com.ruoyi.rocketmq.domain.vo.SwitchOidVo;
|
import com.ruoyi.rocketmq.domain.vo.SwitchOidVo;
|
||||||
|
import com.ruoyi.rocketmq.producer.MessageProducer;
|
||||||
import com.ruoyi.rocketmq.snmp.DynamicOidCollector;
|
import com.ruoyi.rocketmq.snmp.DynamicOidCollector;
|
||||||
import com.ruoyi.rocketmq.snmp.dto.CollectionResult;
|
import com.ruoyi.rocketmq.snmp.dto.CollectionResult;
|
||||||
import com.ruoyi.rocketmq.snmp.dto.SwitchOidDto;
|
import com.ruoyi.rocketmq.snmp.dto.SwitchOidDto;
|
||||||
@@ -91,6 +96,8 @@ public class MultiSwitchCollectionScheduler {
|
|||||||
|
|
||||||
// 创建交换机配置
|
// 创建交换机配置
|
||||||
SwitchOidDto switchVo = createSwitchConfig(switchIp, community, port, collectVos, switchOidVo);
|
SwitchOidDto switchVo = createSwitchConfig(switchIp, community, port, collectVos, switchOidVo);
|
||||||
|
// 交换机心跳监测
|
||||||
|
startHeartbeatCheck(switchIp, clientId, switchVo);
|
||||||
|
|
||||||
// 初始化存储结构
|
// 初始化存储结构
|
||||||
Map<String, ScheduledFuture<?>> switchTasks = new ConcurrentHashMap<>();
|
Map<String, ScheduledFuture<?>> switchTasks = new ConcurrentHashMap<>();
|
||||||
@@ -239,7 +246,6 @@ public class MultiSwitchCollectionScheduler {
|
|||||||
private ScheduledFuture<?> createCollectionTask(String switchIp, String clientId, SwitchOidDto switchVo, CollectVo collectVo) {
|
private ScheduledFuture<?> createCollectionTask(String switchIp, String clientId, SwitchOidDto switchVo, CollectVo collectVo) {
|
||||||
String type = collectVo.getType();
|
String type = collectVo.getType();
|
||||||
long interval = collectVo.getInterval() != null ? collectVo.getInterval() : 300L;
|
long interval = collectVo.getInterval() != null ? collectVo.getInterval() : 300L;
|
||||||
|
|
||||||
if (isTrafficCollection(type)) {
|
if (isTrafficCollection(type)) {
|
||||||
// 流量采集:固定时间点(0,5,10,15...分钟)
|
// 流量采集:固定时间点(0,5,10,15...分钟)
|
||||||
return startTrafficCollection(switchIp, clientId, switchVo, collectVo);
|
return startTrafficCollection(switchIp, clientId, switchVo, collectVo);
|
||||||
@@ -249,6 +255,77 @@ public class MultiSwitchCollectionScheduler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 启动心跳检测任务(30秒一次)
|
||||||
|
*/
|
||||||
|
private ScheduledFuture<?> startHeartbeatCheck(String switchIp, String clientId, SwitchOidDto switchVo) {
|
||||||
|
MessageProducer messageProducer = new MessageProducer();
|
||||||
|
// 确保采集器已初始化
|
||||||
|
if (!ensureCollectorInitialized()) {
|
||||||
|
log.error("SNMP采集器未初始化,无法启动交换机 {} 的心跳检测任务", switchIp);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
// 创建Cron表达式:每30s执行一次
|
||||||
|
String cronExpression = "30 * * * * ?";
|
||||||
|
|
||||||
|
|
||||||
|
Runnable heartbeatTask = () -> {
|
||||||
|
try {
|
||||||
|
// 获取当前时间作为检测时间戳
|
||||||
|
long checkTimestamp = System.currentTimeMillis();
|
||||||
|
|
||||||
|
log.debug("[心跳检测] 开始检测交换机 {} 的心跳状态, 时间: {}", switchIp, checkTimestamp);
|
||||||
|
|
||||||
|
// 使用系统名称OID (1.3.6.1.2.1.1.5.0) 进行心跳检测
|
||||||
|
// 创建专门用于心跳检测的OID配置
|
||||||
|
Map<String, String> heartbeatOids = new HashMap<>();
|
||||||
|
heartbeatOids.put("1.3.6.1.2.1.1.5.0", "sysName");
|
||||||
|
|
||||||
|
// 临时设置心跳检测OID
|
||||||
|
switchVo.setOtherOID(heartbeatOids);
|
||||||
|
|
||||||
|
long startTime = System.currentTimeMillis();
|
||||||
|
CollectionResult result = dynamicOidCollector.getInterfaceInfoByType(switchVo, "other");
|
||||||
|
long responseTime = System.currentTimeMillis() - startTime;
|
||||||
|
|
||||||
|
// 设置检测时间戳
|
||||||
|
result.setTimestamp(checkTimestamp);
|
||||||
|
if(result != null && result.isSuccess()){
|
||||||
|
// 构建心跳消息
|
||||||
|
DeviceMessage message = new DeviceMessage();
|
||||||
|
message.setClientId(clientId);
|
||||||
|
message.setDataType(MsgEnum.心跳上报.getValue());
|
||||||
|
InitialHeartbeatListen initialHeartbeatListen = new InitialHeartbeatListen();
|
||||||
|
initialHeartbeatListen.setClientId(clientId);
|
||||||
|
initialHeartbeatListen.setStrength(31L);
|
||||||
|
initialHeartbeatListen.setName("saa平台");
|
||||||
|
initialHeartbeatListen.setTimestamp(checkTimestamp);
|
||||||
|
String jsonStr = JSONObject.toJSONString(initialHeartbeatListen);
|
||||||
|
message.setData(jsonStr);
|
||||||
|
// 发送消息
|
||||||
|
messageProducer.sendAsyncProducerMessage(
|
||||||
|
"tr_agent_up",
|
||||||
|
"",
|
||||||
|
"heartbeat",
|
||||||
|
JSONObject.toJSONString(message)
|
||||||
|
);
|
||||||
|
log.debug("[心跳检测] 完成检测交换机 {} 的心跳状态, 响应时间: {}ms", switchIp, responseTime);
|
||||||
|
}else{
|
||||||
|
log.error("[心跳检测] 检测交换机 {} 心跳失败: {}", switchIp, result.getErrorMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("[心跳检测] 检测交换机 {} 心跳失败: {}", switchIp, e.getMessage());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
CronTrigger trigger = new CronTrigger(cronExpression);
|
||||||
|
ScheduledFuture<?> future = taskScheduler.schedule(heartbeatTask, trigger);
|
||||||
|
|
||||||
|
log.info("交换机 {} 心跳检测任务启动,检测间隔: 30秒", switchIp);
|
||||||
|
return future;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 判断是否为流量采集
|
* 判断是否为流量采集
|
||||||
*/
|
*/
|
||||||
|
|||||||
Reference in New Issue
Block a user