RocketMQ集成Springboot 三种消息发送方式
生产者
引入依赖
<!--⽗⼯程--><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.2.RELEASE</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.4</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-autoconfigure</artifactId></dependency></dependencies>
/***生产者的代码书写demo**/
@SpringBootTest
public class RockerMQTest {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Testpublic void sendMsg(){Message msg = MessageBuilder.withPayload("发送同步消息1").build();rocketMQTemplate.send("helloTopicBoot",msg);}/*** 异步发送消息,成功或者失败之后进行回调*/@Testpublic void sendASYNCMsg() throws InterruptedException {System.out.println("发送前");Message msg = MessageBuilder.withPayload("boot发送异步消息").build();rocketMQTemplate.asyncSend("helloTopicBoot", msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("消息发送状态:"+sendResult.getSendStatus());}@Overridepublic void onException(Throwable throwable) {System.out.println("消息发送失败");}});System.out.println("发送完毕");//睡五秒,不睡的话整个方法就结束了不能进行回调了TimeUnit.SECONDS.sleep(5);}/*** 一次性消息无论消息的结果是什么,通常用于日志等丢失一小部分数据无关紧要的情况下使用*/@Testpublic void sendOnewayMsg(){Message msg = MessageBuilder.withPayload("boot发送一次性消息").build();rocketMQTemplate.sendOneWay("helloTopicBoot",msg);}
}
//application.yml配置文件
rocketmq:name-server: 10.0.0.130:9876producer:group: my-group
消费者
引入依赖
<!--⽗⼯程--><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.2.RELEASE</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.4</version></dependency></dependencies>
实现一个监听器对象,重写其中的消费消息的方法。使用注解@RocketMQMessageListener(consumerGroup = “htpConsumerGroup”,topic = “helloTopicBoot”)
consumerGroup组必须是唯一的,helloTopicBoot表示要监听的主题
@Component
@RocketMQMessageListener(consumerGroup = "htpConsumerGroup",topic = "helloTopicBoot")
public class HelloTopicListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {System.out.println("收到的消息:"+new String(messageExt.getBody(), Charset.defaultCharset()));}
}
最后生产者启动测试类发送消息,消费者运行主程序一直运行即可.