一、前言
本篇主要是围绕着MQTT这个点,在生产环境中可能存储某些主题随时订阅和取消的逻辑,因为自己本身项目的需要所以顺便把这部分补充一下
二、调整MQTT配置
1、调整 MqttConfig.java
在 MqttConfig.java 中新增两个方法
public void addTopic ( String subscribeTopic) { inbound ( ) . addTopic ( subscribeTopic) ; System . out. println ( "Subscribed to new topic: " + subscribeTopic) ; } public void removeTopic ( String unsubscribeTopic) { inbound ( ) . removeTopic ( unsubscribeTopic) ; System . out. println ( "Unsubscribed from topic: " + unsubscribeTopic) ; }
2、新增 MqttDynamicSubscriberService.java
@Slf4j
@Service
public class MqttDynamicSubscriberService { @Resource private MqttConfig mqttConfig; public void subscribeToNewTopic ( String topic) { mqttConfig. addTopic ( topic) ; } public void unsubscribeFromTopic ( String topic) { mqttConfig. removeTopic ( topic) ; }
}
3、新增 MqttTopicDynamicController.java
@RestController
@RequestMapping ( "/mqtt/dynamic/topic" )
public class MqttTopicDynamicController { @Resource private MqttDynamicSubscriberService mqttDynamicSubscriberService; @PostMapping ( "/subscribe" ) public void subscribe ( String topic) { mqttDynamicSubscriberService. subscribeToNewTopic ( topic) ; } @PostMapping ( "/unsubscribe" ) public void unsubscribe ( String topic) { mqttDynamicSubscriberService. unsubscribeFromTopic ( topic) ; }
}
4、补充多主题初始化问题
多主题消息订阅初始化,只要修改一下 MqttConfig#inbound 方法即可
@Bean public MqttPahoMessageDrivenChannelAdapter inbound ( ) { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter ( MqttClient . generateClientId ( ) , mqttClientFactory ( ) , "default-topic-one" , "default-topic-two" , "default-topic-three" ) ; adapter. setCompletionTimeout ( 5000 ) ; adapter. setConverter ( new DefaultPahoMessageConverter ( ) ) ; adapter. setQos ( 0 ) ; adapter. setOutputChannel ( mqttInputChannel ( ) ) ; return adapter; }