本文目的是:教会你使用Spring Boot集成RocketMQ。
- pom.xml文件引入rocketMQ依赖
<!-- rocketmq 依赖--><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.7.0</version></dependency><!-- 还有其它需要的jar包自由引入(注:fastjson不要使用低于1.2.60版本,会有安全漏洞) --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency>
- application.yaml 配置文件
#rocketmq的配置
rocketmq:name-server: 127.0.0.1:9876 # rocketmq访问地址producer:group: cxccccccc # 生产者组send-message-timeout: 3000 # 消息发送超时时长,默认3sretry-times-when-send-failed: 3 # 同步发送消息失败重试次数,默认2retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数,默认2
- 先去RocketMQ-Dashboard 创建Topic。
- Controller层代码,自行补全外壳
@Autowiredprivate MQProducerService producerService;@PostMapping("/test")@ResponseBodypublic Object test(@RequestBody LinkedHashMap<String,Object> params) {int type = Integer.parseInt(params.get("type").toString());switch (type) {case 1:producerService.send(new AjaxResult());break;case 2:// syncSend() 同步发送消息。return producerService.sendMsg("{\"code\": 200,\"msg\": \"操作成功\",\"data\":{}}");case 3:// asyncSend() 发送成功后会执行回调函数,执行响应的代码。producerService.sendAsyncMsg("你好啊,这里是消息内容");break;case 4:// 会延迟xx s后才会到MQ里面。SendResult sendResult = producerService.sendDelayMsg("hello,this is the message waiting to consume.", 2);return sendResult;case 5:// 发送单向消息,发完就发完了,没有返回值。producerService.sendOneWayMsg("今天是美好的一天");break;case 6:// 发送的消息带有tag 标签、带有KEY业务标识。SendResult sendResult1 = producerService.sendTagMsg("今天是蛋糕的一天");return sendResult1;default:throw new IllegalStateException("Unexpected value: " + type);}return null;}
- Service 层代码,MQProducerService
@Slf4j
@Component
public class MQProducerService {/*** UCS请求日志的rocketmq的主题*/private static final String topic = "MSG_DingAttendance";/*** 直接注入使用,用于发送消息到broker服务器*/@Autowiredprivate RocketMQTemplate rocketMQTemplate;/*** 普通发送(这里的参数对象ajaxResult可以随意定义,可以发送个对象,也可以是字符串等)** @param ajaxResult*/public void send(AjaxResult ajaxResult) {rocketMQTemplate.convertAndSend(topic + ":tag1", ajaxResult);}/*** 发送同步消息(阻塞当前线程,等待broker响应发送结果,这样不太容易丢失信息)* msgBody也可以是对象,SendResult为返回的发送结果)** @param msgBody* @return*/public SendResult sendMsg(String msgBody) {SendResult sendResult = rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build());log.info("【sendMsg】sendResult={}", JSON.toJSONString(sendResult));return sendResult;}/*** 发送异步消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑)* (适合对响应时间敏感的业务场景)** @param msgBody*/public void sendAsyncMsg(String msgBody) {rocketMQTemplate.asyncSend(topic, MessageBuilder.withPayload(msgBody).build(), new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {// 处理消息发送成功逻辑System.out.println("sendAsyncMsg发送成功");}@Overridepublic void onException(Throwable throwable) {// 处理消息发送异常逻辑System.out.println("sendAsyncMsg发送失败");}});}/*** 发送延时消息(上面的发送同步消息,delayLevel的值就为0,因为不延时)* 在start版本中 延时消息一共分为18个等级分别为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h** @param msgBody 消息体* @param delayLevel 延时等级*/public SendResult sendDelayMsg(String msgBody, int delayLevel) {return rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build(), 3000, delayLevel);}/*** 发送单向消息(只负责发送消息,不等待应答,不关心发送结果,如日志)** @param msgBody*/public void sendOneWayMsg(String msgBody) {rocketMQTemplate.sendOneWay(topic, MessageBuilder.withPayload(msgBody).build());}/*** 发送带tag的消息,直接在topic后面加上“:tag”,key在Header里面设置KEYS** @param msgBody*/public SendResult sendTagMsg(String msgBody) {String key = String.valueOf(IdUtils.createSnowflake());Message message = new Message(topic, key, msgBody.getBytes());return rocketMQTemplate.syncSend(topic + ":tag2", MessageBuilder.withPayload(message).setHeader("KEYS",key).build());}}
- 发送请求,去RocketMQ-Dashboard里面查看消息,都是成功的。