-
pom.xml导入RocketMQ依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.2</version> </dependency>
-
application.yml中添加配置
rocketmq:name-server: 127.0.0.1:9876producer:group: xaccess-key: myaccesskeysecret-key: mysecretKeysend-message-timeout: 10000tls-enable: trueconsumer:group: xaccess-key: myaccesskeysecret-key: mysecretKeytls-enable: true
-
创建MQ工具类
public class MqUtil {private final RocketMQTemplate rocketMQTemplate;public MqUtil(RocketMQTemplate rocketMQTemplate) {this.rocketMQTemplate = rocketMQTemplate;}/*** 单条通知发送** @param topic 主题* @param message 消息*/public void convertAndSend(String topic, Object message) {rocketMQTemplate.convertAndSend(topic, message);}/*** 批量通知发送** @param topic 主题* @param messages 消息集合*/public <T extends Message<?>> SendResult syncSend(String topic, Collection<T> messages) {return rocketMQTemplate.syncSend(topic, messages);}/*** 批量通知发送** @param topic 主题* @param messages 消息集合* @param sendCallback 回调函数*/public <T extends Message<?>> void asyncSend(String topic, Collection<T> messages, SendCallback sendCallback) {rocketMQTemplate.asyncSend(topic, messages, sendCallback);} }
-
注入工具类Bean
@Bean public MqUtil mqUtil(RocketMQTemplate rocketMQTemplate) {return new MqUtil(rocketMQTemplate); }
-
测试发消息
@Resource private MqUtil mqUtil;@Test public void test() {mqUtil.convertAndSend(TopicConstant.TOPIC_B, "123456"); }
-
订阅接收消息
@Slf4j @Service @RequiredArgsConstructor(onConstructor = @__({@Autowired})) @RocketMQMessageListener(consumerGroup = GroupConstant.GROUP_A, topic = TopicConstant.TOPIC_B) public class TopicAConsumer implements RocketMQListener<Message> {@Overridepublic void onMessage(Message message) {log.info(JsonUtil.toJsonStr(message));} }