Spring Boot因为方便易用,在Java中广泛使用,本章将说明如何在Spring项目中快速使用RocketMQ。
1.直接使用
在Spring Boot项目中,使用某个新的组件第一步通常是加入这个组件的依赖。下面以Maven为例,说明如何在pom.xml中加入RocketMQ的依赖,如代码清单8-1所示。
代码清单8-1 Maven方式的RocketMQ依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.2.0</version>
</dependency>
有了这个依赖,就可以在Spring Boot项目中开发RocketMQ的Producer和Consumer程序了。
使用RocketMQ集群,有很多参数要设置,我们可以在application.properties文件里加入自己命名的参数,然后通过@Value注解引入。几个重要的参数是:NameServer的地址、GoupName名称和Topic名称。此外还有一些针对Producer或Consumer的参数,可以写到properties文件里,也可以写到程序里。
依赖配置都做好以后,就可以着手开发Producer和Consumer程序了。我们可以把发送消息和消费消息的功能封装成Service,供其他代码引用。Producer和Consumer的初始化比较慢,不建议每发一个消息或者消费一个消息就启动和注销对应的Object,所以适合把初始化操作代码写到@PostConstruct函数里,把关闭操作代码写到@PreDestroy函数里。Spring Boot项目中的Producer程序示例如代码清单8-2所示。
代码清单8-2 Spring Boot项目中的Producer服务
@Service
public class ProducerService {
private DefaultMQProducer producer = null;
@PostConstruct
public void initMQProducer() {
producer = new DefaultMQProducer(“producerGoupName”);
producer.setNamesrvAddr(metaqNameserver);
producer.setRetryTimesWhenSendFailed(3);
try {
producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
public void send(String topic, String msg) {
Message msg = new Message(topic, "", "", msg.getBytes());
try {
producer.send(msg);
return;
} catch (Exception e) {
e.printStackTrace();
}
return;
}
@PreDestroy
public void shutDownProducer() {
if (producer != null) {
producer.shutdown();
}
}
}
使用Consumer的方式和使用Producer类似,但是具体设置会因为使用的具体Class不同而不同。调用shutdown函数是必要的,否则可能因为程序被强制关闭而丢消息。
2.通过Spring Messaging方式使用
直接使用的方式比较简单,也足够灵活,但不是很符合Spring风格,Spring Boot对于消息传递,有统一的接口模板,基于这个模板可以对接各种类型的消息通信组件,比如Kafka、RabbitMQ、RocketMQ等。使用这种方式,其基于不同消息队列收发消息的代码类似,方便在不同的消息队列间切换。
具体使用流程分为三个步骤:添加依赖、配置参数和引入模板。添加RocketMQ插件示例,如代码清单8-3所示。
代码清单8-3 Spring Boot的RocketMQ插件
<!--在pom.xml中添加依赖-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>spring-boot-starter-rocketmq</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
如果mvn找不到这个依赖,可以在GitHub上下载源码,本地构建。
然后是在properties文件中加入配置选项,如代码清单8-4所示。
代码清单8-4 Spring Boot的RocketMQ相关配置选项
## application.properties
spring.rocketmq.name-server=127.0.0.1:9876
spring.rocketmq.producer.group=my-group
spring.rocketmq.producer.retry-times-when-send-async-failed=0
spring.rocketmq.producer.send-msg-timeout=300000
spring.rocketmq.producer.compress-msg-body-over-howmuch=4096
spring.rocketmq.producer.max-message-size=4194304
spring.rocketmq.producer.retry-another-broker-when-not-store-ok=false
spring.rocketmq.producer.retry-times-when-send-failed=2
更多的配置选项,可以到源码中查找。由于Spring Boot项目和RocketMQ项目变化很快,具体如何以Spring Messaging的方式发送和接收消息,大家可以自行搜索相关的示例和说明。最新的文档可以参考Spring Boot文档的Messaging部分,以及GitHub中的rocketmq-externals项目。