Spring Boot 整合 RocketMQ 之普通消息

前言:

在消息中间件领域中 RocketMQ 是一种非常常见的消息中间件了,并且是由阿⾥巴巴开源的消息中间件 ,本篇简单分享一下 Spring Boot 项目集成 RocketMQ 的过程。

RocketMQ 系列文章传送门

RocketMQ 的介绍及核心概念讲解

Spring Boot 集成 RocketMQ 可以分为三大步,如下:

  • 在 proerties 或者 yml 文件中添加 RocketMQ 配置。
  • 项目 pom.xml 文件中引入 rocketmq-spring-boot-starter 依赖。
  • 注入 RocketMQTemplate 开始使用 RocketMQ ,其实这步以及算是使用了,不能算作集成了,但是集成了总归是要使用的,我把这里也算作一步了。

在 proerties 或者 yml 文件中添加 RabbitMQ 配置如下:

#RocketMQ 地址
rocketmq.name-server= dev-ztn-rocketmq.eminxing.com:19876
#消费组
rocketmq.consumer.group= consumer-group
#一次拉取消息的最大数量 默认 10 条
rocketmq.consumer.pull-batch-size=10
#发送消息的组 同一类消息发送到同一个组中
rocketmq.producer.group= producer-group
#发送消息的超时时间 默认 3000 毫秒
rocketmq.producer.send-message-timeout=3000
#同步发送消息失败重试次数 默认2
rocketmq.producer.retry-times-when-send-failed=2
#异步发送消息失败重试次数 默认2
rocketmq.producer.retry-times-when-send-async-failed=2
#消息的大小 默认 4M
rocketmq.producer.max-message-size=4096

项目 pom.xml 文件中引入 rocketmq-spring-boot-starter 依赖如下:

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version>
</dependency>

RocketMQ 使用

前文我们在分享 RocketMQ 核心概念的时候,我们知道了 RocketMQ 有同步消息、异步消息、顺序消息、延迟消息等,下面我们就根据消息的发送类型来演示 RocketMQ 的使用。

@RocketMQMessageListener 注解详解

我们在使用 RocketMQ 的时候有一个非常重要的注解 @RocketMQMessageListener,使用这个注解我们就可以轻松的完成 RocketMQ 消息的消费,这里对该注解的的属性进行解析,如下:

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RocketMQMessageListener {String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";//消费者分组String consumerGroup();//主题 String topic();//消息选择器类型//SelectorType.TAG:默认值 根据TAG选择 仅支持表达式格式如:“tag1 || tag2 || tag3” 如果表达式为 null 或者 “*”  表示可以订阅所有消息//SelectorType.SQL92:根据SQL92表达式选择SelectorType selectorType() default SelectorType.TAG;//selectorType 对应的表达式String selectorExpression() default "*";//consumeMode:消费模式 默认值 ConsumeMode.CONCURRENTLY 并行处理  ConsumeMode.ORDERLY  按顺序处理ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;//messageMode:消息模型  默认值 MessageModel.CLUSTERING 集群模式   MessageModel.BROADCASTING 广播模式MessageModel messageModel() default MessageModel.CLUSTERING;//消费者最大线程数 默认值 64int consumeThreadMax() default 64;//消息可能阻止使用线程的最长时间 单位:分钟long consumeTimeout() default 15L;//String accessKey() default "${rocketmq.consumer.access-key:}";String secretKey() default "${rocketmq.consumer.secret-key:}";//启用消息轨迹 默认不启用 falseboolean enableMsgTrace() default true;//自定义消息轨迹主题 String customizedTraceTopic() default "${rocketmq.consumer.customized-trace-topic:}";//服务地址String nameServer() default "${rocketmq.name-server:}";String accessChannel() default "${rocketmq.access-channel:}";
}

RocketMQ 发送单向消息

单向消息是指生产者 Producer 向 Broker 发送消息,执行发送消息的 API 后直接返回,不关注 Broker 的结果,简单说就负责发送消息不关注消息是否发送成功,这种模式的优点是发生消息耗时非常低,一般在微妙级别,通常用在消息可靠性要求不高的场景,例如记录日志等场景,下面我们来演示一下 RocketMQ 单向消息的发送。

单向消息生产者代码如下:

package com.order.service.rocketmq.producer;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class OneWayMessageProducer {@Autowiredprivate RocketMQTemplate rocketMqTemplate;//单向消息发送public void sendOneWayMessage(String message){rocketMqTemplate.sendOneWay("one-way-topic", MessageBuilder.withPayload(message).build());}}

单向消息消费者代码如下:

package com.order.service.rocketmq.consumer;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "one-way-group", topic = "one-way-topic")
public class OneWayMessageCousumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {log.info("单向消息消费成功:{}", message);}
}

触发单向消息发送代码如下:

@GetMapping("/send-one-way-message")
public String sendOneWayMessage(@RequestParam String message){oneWayMessageProducer.sendOneWayMessage(message);return "success";
}

单向消息发送测试结果如下:

2024-10-10 19:51:47.144  INFO 15172 --- [MessageThread_1] c.o.s.r.consumer.OneWayMessageCousumer   : 单向消息消费成功:send-one-way-message

RocketMQ 发送同步消息

发送同步消息是指生产者 Producer 向 Broker 发送消息,执行发送消息的 API 后同步等待, 直到 Broker 返回发送结
果,因为有等待动作,很明显发送同步消息会阻塞线程,因此性能相对会差一些,但是同步消息的可靠性高,因此这种方式得到广泛使用,例如短信通知,邮件通知,站内消息等场景。

同步消息生产者代码如下:

package com.order.service.rocketmq.producer;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import java.util.ArrayList;
import java.util.List;@Slf4j
@Component
public class SyncMessageProducer {@Autowiredprivate RocketMQTemplate rocketMqTemplate;/*** @param message:* @date 2024/10/10 17:47* @description 同步消息发送*/public void sendSyncMessage(String message) {rocketMqTemplate.syncSend("sync-topic", MessageBuilder.withPayload(message).build());}/*** @param message:* @date 2024/10/10 17:47* @description 批量发送同步消息*/public void sendSyncMessageBatch(String message) {Message<String> build = MessageBuilder.withPayload(message).build();List<Message<String>> msgList = new ArrayList<>();msgList.add(build);msgList.add(build);msgList.add(build);msgList.add(build);rocketMqTemplate.syncSend("sync-topic", msgList);}/*** @param message:* @date 2024/10/10 17:47* @description 发送同步消息设置超时时间 超时时间 1毫秒*/public void sendSyncMessageTimeout(String message) {//超时时间为 1 毫秒rocketMqTemplate.syncSend("sync-topic", MessageBuilder.withPayload(message).build(), 200);}/*** @param message:* @date 2024/10/10 17:47* @description 批量发送同步消息 超时时间 1毫秒*/public void sendSyncMessageBatchTimeout(String message) {Message<String> build = MessageBuilder.withPayload(message).build();List<Message<String>> msgList = new ArrayList<>();msgList.add(build);msgList.add(build);msgList.add(build);msgList.add(build);rocketMqTemplate.syncSend("sync-topic", msgList, 200);}}

在上面的同步消息发送代码中一共有四个方法,分别实现了同步消息发送、同步消息批量发送、带超时时间的同步消息发送、带超时时间的同步消息批量发送。

同步消息消息消费者代码如下:

package com.order.service.rocketmq.consumer;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "sync-group", topic = "sync-topic")
public class SyncMessageConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {log.info("同步消息消费成功:{}", message);}
}

触发单向消息发送代码如下:

@GetMapping("/send-sync-message")
public String sendSyncMessage(@RequestParam String message){syncMessageProducer.sendSyncMessage(message);return "success";
}@GetMapping("/send-sync-message-batch")
public String sendSyncMessageBatch(@RequestParam String message){syncMessageProducer.sendSyncMessageBatch(message);return "success";
}@GetMapping("/send-sync-message-timeout")
public String sendSyncMessageTimeout(@RequestParam String message){syncMessageProducer.sendSyncMessageTimeout(message);return "success";
}@GetMapping("/send-sync-message-batch-timeout")
public String sendSyncMessageBatchTimeout(@RequestParam String message){syncMessageProducer.sendSyncMessageBatchTimeout(message);return "success";
}

同步消息发送测试结果如下:

2024-10-14 14:37:22.346  INFO 26640 --- [MessageThread_1] c.o.s.r.consumer.SyncMessageConsumer     : 同步消息消费成功:send-sync-message

结果验证符合预期。

同步消息批量发送测试结果如下:

2024-10-14 14:38:04.120  INFO 26640 --- [MessageThread_4] c.o.s.r.consumer.SyncMessageConsumer     : 同步消息消费成功:send-sync-message-batch
2024-10-14 14:38:04.120  INFO 26640 --- [MessageThread_3] c.o.s.r.consumer.SyncMessageConsumer     : 同步消息消费成功:send-sync-message-batch
2024-10-14 14:38:04.120  INFO 26640 --- [MessageThread_5] c.o.s.r.consumer.SyncMessageConsumer     : 同步消息消费成功:send-sync-message-batch
2024-10-14 14:38:04.122  INFO 26640 --- [MessageThread_6] c.o.s.r.consumer.SyncMessageConsumer     : 同步消息消费成功:send-sync-message-batch

结果验证符合预期。

带超时时间同步消息发送测试结果如下:

2024-10-14 14:46:07.889  INFO 16760 --- [MessageThread_1] c.o.s.r.consumer.SyncMessageConsumer     : 同步消息消费成功:send-sync-message-timeout

结果验证符合预期,如果想要验证超时效果,直接把超时时间设置的小一点即可,后面我会统一演示超时效果。

带超时时间同步消息批量发送测试结果如下:

2024-10-14 14:47:05.539  INFO 16760 --- [MessageThread_3] c.o.s.r.consumer.SyncMessageConsumer     : 同步消息消费成功:send-sync-message-batch-timeout
2024-10-14 14:47:05.539  INFO 16760 --- [MessageThread_4] c.o.s.r.consumer.SyncMessageConsumer     : 同步消息消费成功:send-sync-message-batch-timeout
2024-10-14 14:47:05.539  INFO 16760 --- [MessageThread_5] c.o.s.r.consumer.SyncMessageConsumer     : 同步消息消费成功:send-sync-message-batch-timeout
2024-10-14 14:47:05.539  INFO 16760 --- [MessageThread_2] c.o.s.r.consumer.SyncMessageConsumer     : 同步消息消费成功:send-sync-message-batch-timeout

结果验证符合预期,如果想要验证超时效果,直接把超时时间设置的小一点即可,后面我会统一演示超时效果。

我们来演示一下超时效果,我们把超时时间修改为 10 毫秒时候,带超时时间同步消息发送测试结果如下:

org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeoutat org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:667) ~[rocketmq-client-4.8.0.jar:4.8.0]at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1343) ~[rocketmq-client-4.8.0.jar:4.8.0]at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:344) ~[rocketmq-client-4.8.0.jar:4.8.0]at org.apache.rocketmq.spring.core.RocketMQTemplate.syncSend(RocketMQTemplate.java:555) ~[rocketmq-spring-boot-2.2.0.jar:2.2.0]at org.apache.rocketmq.spring.core.RocketMQTemplate.syncSend(RocketMQTemplate.java:484) ~[rocketmq-spring-boot-2.2.0.jar:2.2.0]at com.order.service.rocketmq.producer.SyncMessageProducer.sendSyncMessageTimeout(SyncMessageProducer.java:57) ~[classes/:na]at com.order.service.controller.RocketMqController.sendSyncMessageTimeout(RocketMqController.java:47) ~[classes/:na]at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_121]at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_121]at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_121]at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_121]at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:197) ~[spring-web-5.3.6.jar:5.3.6]at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:141) ~[spring-web-5.3.6.jar:5.3.6]at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:106) ~[spring-webmvc-5.3.6.jar:5.3.6]at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:894) ~[spring-webmvc-5.3.6.jar:5.3.6]at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808) ~[spring-webmvc-5.3.6.jar:5.3.6]at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.3.6.jar:5.3.6]at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1060) ~[spring-webmvc-5.3.6.jar:5.3.6]at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:962) ~[spring-webmvc-5.3.6.jar:5.3.6]at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) ~[spring-webmvc-5.3.6.jar:5.3.6]at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898) ~[spring-webmvc-5.3.6.jar:5.3.6]at javax.servlet.http.HttpServlet.service(HttpServlet.java:626) ~[tomcat-embed-core-9.0.45.jar:4.0.FR]at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) ~[spring-webmvc-5.3.6.jar:5.3.6]at javax.servlet.http.HttpServlet.service(HttpServlet.java:733) ~[tomcat-embed-core-9.0.45.jar:4.0.FR]at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) ~[tomcat-embed-websocket-9.0.45.jar:9.0.45]at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) ~[spring-web-5.3.6.jar:5.3.6]at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.6.jar:5.3.6]at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) ~[spring-web-5.3.6.jar:5.3.6]at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.6.jar:5.3.6]at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:93) ~[spring-boot-actuator-2.4.5.jar:2.4.5]at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.6.jar:5.3.6]at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) ~[spring-web-5.3.6.jar:5.3.6]at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.6.jar:5.3.6]at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:542) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:143) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:357) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:374) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:893) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1707) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-9.0.45.jar:9.0.45]at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_121]at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_121]at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-9.0.45.jar:9.0.45]at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]

很明显提示超时了,超时测试依赖服务器的性能,因此如果想测试到理想的超时结果,建议将超时时间往小了设置。

RocketMQ 发送异步消息

发送异步消息是指生产者 Producer 向 Broker 发送消息,发送消息时指定消息发送成功及发送异常的回调方法,执行发送消息的 API 后立即返回,Producer 发送消息线程无需等待、不阻塞,对比同步消息,很明显异步消息的性能会更高,可靠性会略差,适用于对响应时间要求高的场景。

异步消息生产者代码如下:

package com.order.service.rocketmq.producer;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import java.util.ArrayList;
import java.util.List;@Slf4j
@Component
public class AsyncMessageProducer {@Autowiredprivate RocketMQTemplate rocketMqTemplate;/*** @param message:* @date 2024/200/200 17:47* @description 异步消息发送*/public void sendAsyncMessage(String message) {rocketMqTemplate.asyncSend("async-topic", MessageBuilder.withPayload(message).build(), new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("普通异步消息发送成功");}@Overridepublic void onException(Throwable throwable) {log.info("普通异步消息发送失败");}});}/*** @param message:* @date 2024/200/200 17:47* @description 批量发送异步消息*/public void sendAsyncMessageBatch(String message) {Message<String> build = MessageBuilder.withPayload(message).build();List<Message<String>> msgList = new ArrayList<>();msgList.add(build);msgList.add(build);msgList.add(build);msgList.add(build);rocketMqTemplate.asyncSend("async-topic", msgList, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("批量异步消息发送成功");}@Overridepublic void onException(Throwable throwable) {log.info("批量异步消息发送失败");}});}/*** @param message:* @date 2024/200/200 17:47* @description 发送异步消息设置超时时间 超时时间 1毫秒*/public void sendAsyncMessageTimeout(String message) {//超时时间为 1 毫秒rocketMqTemplate.asyncSend("async-topic", MessageBuilder.withPayload(message).build(), new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("普通异步带超时消息发送成功");}@Overridepublic void onException(Throwable throwable) {log.info("普通异步带超时消息发送失败");}}, 200);}/*** @param message:* @date 2024/200/200 17:47* @description 批量发送异步消息 超时时间 1毫秒*/public void sendAsyncMessageBatchTimeout(String message) {Message<String> build = MessageBuilder.withPayload(message).build();List<Message<String>> msgList = new ArrayList<>();msgList.add(build);msgList.add(build);msgList.add(build);msgList.add(build);rocketMqTemplate.asyncSend("async-topic", msgList, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("批量异步带超时消息发送成功");}@Overridepublic void onException(Throwable throwable) {log.info("批量异步带超时消息发送失败");}}, 200);}}

异步消息消费者代码如下:

package com.order.service.rocketmq.consumer;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "async-group", topic = "async-topic")
public class AsyncMessageConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {log.info("异步消息消费成功:{}", message);}
}

异步消息发送测试结果如下:

2024-10-14 15:15:08.215  INFO 16760 --- [ublicExecutor_1] c.o.s.r.producer.AsyncMessageProducer    : 普通异步消息发送成功
2024-10-14 15:15:08.222  INFO 16760 --- [MessageThread_1] c.o.s.r.consumer.AsyncMessageConsumer    : 异步消息消费成功:send-async-message

结果验证符合预期。

批量异步消息发送测试结果如下:

2024-10-14 15:15:39.681  INFO 16760 --- [ublicExecutor_2] c.o.s.r.producer.AsyncMessageProducer    : 批量异步消息发送成功
2024-10-14 15:15:39.682  INFO 16760 --- [MessageThread_2] c.o.s.r.consumer.AsyncMessageConsumer    : 异步消息消费成功:[{"payload":"send-async-message-batch","headers":{"id":"4bda5246-fbe0-ef6c-cd27-f5ae9123e01d","timestamp":1728890139672}},{"payload":"send-async-message-batch","headers":{"id":"4bda5246-fbe0-ef6c-cd27-f5ae9123e01d","timestamp":1728890139672}},{"payload":"send-async-message-batch","headers":{"id":"4bda5246-fbe0-ef6c-cd27-f5ae9123e01d","timestamp":1728890139672}},{"payload":"send-async-message-batch","headers":{"id":"4bda5246-fbe0-ef6c-cd27-f5ae9123e01d","timestamp":1728890139672}}]

结果验证符合预期。

异步带超时消息发送测试结果如下:

2024-10-14 15:16:20.643  INFO 16760 --- [ublicExecutor_3] c.o.s.r.producer.AsyncMessageProducer    : 普通异步带超时消息发送成功
2024-10-14 15:16:20.650  INFO 16760 --- [MessageThread_3] c.o.s.r.consumer.AsyncMessageConsumer    : 异步消息消费成功:send-async-message-timeout

结果验证符合预期。

批量异步带超时消息发送测试结果如下:

2024-10-14 15:16:43.326  INFO 16760 --- [ublicExecutor_4] c.o.s.r.producer.AsyncMessageProducer    : 批量异步带超时消息发送成功
2024-10-14 15:16:43.327  INFO 16760 --- [MessageThread_4] c.o.s.r.consumer.AsyncMessageConsumer    : 异步消息消费成功:[{"payload":"send-async-message-batch-timeout","headers":{"id":"a518ded2-c9e1-9588-58bf-5cd8ddd00024","timestamp":1728890203322}},{"payload":"send-async-message-batch-timeout","headers":{"id":"a518ded2-c9e1-9588-58bf-5cd8ddd00024","timestamp":1728890203322}},{"payload":"send-async-message-batch-timeout","headers":{"id":"a518ded2-c9e1-9588-58bf-5cd8ddd00024","timestamp":1728890203322}},{"payload":"send-async-message-batch-timeout","headers":{"id":"a518ded2-c9e1-9588-58bf-5cd8ddd00024","timestamp":1728890203322}}]

结果验证符合预期。

超时场景这里不再测试了,如果想验证超时效果,只需要将超时时间设置的尽可能小一点即可。

总结:本篇简单分享了 Spring 整合 RocketMQ,并完成单向消息、同步消息、异步消息的案例演示,在实际业务中只需要对案例代码进行丰富填充业务逻辑即可,希望可以帮助到大家,后面会持续分享延时消息、顺序消息、事务消息的使用案例。

如有不正确的地方欢迎各位指出纠正。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/56238.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

WPF -- LiveCharts的使用和源码

LiveCharts 是一个开源的 .NET 图表库&#xff0c;特别适用于 WPF、WinForms 和其他 .NET 平台。它提供了丰富的图表类型和功能&#xff0c;使开发者能够轻松地在应用程序中创建动态和交互式图表。下面我将使用WPF平台创建一个测试实例。 一、LiveCharts的安装和使用 1.安装N…

VUE-鼠标移入到目标区域变成小手形状

工作中有个场景&#xff1a;当鼠标移动到某个区域时显示为“小手”状 其实这个东西特别简单&#xff0c;只是用的不多平常。 我们只需要给目标区域的style样式中加入【cursor: pointer;】这个鼠标就好了。 <span class"el-dropdown-link">下载 </span>…

软件测试与软件缺陷的基础知识

✨✨ 欢迎大家来访Srlua的博文&#xff08;づ&#xffe3;3&#xffe3;&#xff09;づ╭❤&#xff5e;✨✨ &#x1f31f;&#x1f31f; 欢迎各位亲爱的读者&#xff0c;感谢你们抽出宝贵的时间来阅读我的文章。 我是Srlua小谢&#xff0c;在这里我会分享我的知识和经验。&am…

计算机视觉之可做什么

1、计算机视觉的应用 计算机视觉在我们生活中已经有了很广泛的应用&#xff0c;在我们可见、不可见&#xff1b;可感知、不可感知的地方&#xff0c;深深地影响了我们的生活、生产方式。 日常生活&#xff1a;美颜相机、火车站刷脸进站、线上办理业务的身份认证、自动驾驶等等…

供应链PC实操落地实践|得物技术

目录 一、背景 二、实操模式 三、快捷码设计和使用 1. 快捷码布局 2. 快捷码的准确识别 3. 快捷码的高亮反馈 4. 快捷码打印和黏贴建议 四、问题复盘 1. 基准体验的梳理 五、实操环境检测 1. 实操环境的安装和配置 2. 实操环境检测能力的使用流程 3. 实操检测能力的技术架构 4.…

Vue 3 数组变更详解:哪些操作会修改原数组?| 笔记

Vue 3 使用 Proxy 来侦测响应式对象的变化&#xff0c;数组作为常用数据类型&#xff0c;自然也被 Vue 3 自动侦测和管理。在处理数组时&#xff0c;了解哪些操作会修改原数组&#xff0c;哪些操作不会修改原数组&#xff0c;对高效编写 Vue 应用程序至关重要。 本文将详细介绍…

LeetCode 面试经典150题 Z字形变换

题目&#xff1a; 将一个给定字符串 s 根据给定的行数 numRows &#xff0c;以从上往下、从左到右进行 Z 字形排列。 比如输入字符串为 "PAYPALISHIRING" 行数为 3 时&#xff0c;排列如下&#xff1a; P A H N A P L S I I G Y I R 之后&#xff0c;你…

windows recvfrom错误10014

使用windows socket的udp客户端与linux udp服务端通讯&#xff0c;windows发送数据正常&#xff0c;接收偶尔不正常&#xff0c;但是通过抓包发现服务端是发送了数据给客户端的&#xff0c;网上找了很急都没解决&#xff0c;最后发现是windows与linux下 recvfrom 最后一个参数有…

CSS——文字打字机效果

CSS——文字打字机效果 本文通过纯 CSS 实现文字的打字机效果&#xff0c;然后借助 JS 实现了扩展。 typewriter 基本思路 使用伪元素覆盖原文字&#xff0c;并且使用伪元素模拟闪烁的光标效果。 具体流程 首先是一些基本的设置 <!DOCTYPE html> <html lang"…

什么是Qseven?模块电脑(核心板)规范标准简介二

1.概念 Qseven是一种通用的、小尺寸计算机模块标准&#xff0c;适用于需要低功耗、低成本和高性能的应用。 Qseven模块电脑&#xff08;核心板&#xff09;采用230Pin金手指连接器 2.Qseven的起源 Qseven最初是由Congatec、SECO、MSC三家欧洲公司于2008年发起&#xff0c;旨在…

Python中的SQLAlchemy:解锁数据库操作的新世界

引言 SQLAlchemy是一个Python SQL工具包和ORM&#xff0c;它提供了全面的企业级持久性模式。通过SQLAlchemy&#xff0c;你可以使用Python类来定义数据库表&#xff0c;并使用面向对象的方式来进行数据库操作&#xff0c;如查询、更新等。这种ORM方法不仅使代码更加简洁易读&a…

基因科技领军企业——桐树基因完成D轮融资,创新科技引领生命科学

2024年10月8日&#xff0c;无锡桐树生物科技有限公司&#xff08;以下简称桐树基因&#xff09;正式完成过亿元人民币D轮融资。本轮融资由无锡市梁溪科创产业投资基金&#xff08;博华资本管理&#xff09;领投&#xff0c;江苏建道创业投资有限公司跟投&#xff0c;总额过亿元…

简单谈谈Spring 中Aware是什么

在spring中&#xff0c;aware是spring提供的一种扩展机制 在一般情况中&#xff0c;是不需要感知容器的存在的&#xff0c;spring 会帮我们自动装配完成。 但是在一些特定的场景下&#xff0c;比如我需要获取spring容器中的某个对象&#xff0c;那么就需要获取到 spring 上下文…

大数据学习---快速了解clickhouse数据库

ClickHouse数据库介绍 ClickHouse是一款由Yandex开发的列式数据库管理系统&#xff08;DBMS&#xff09;&#xff0c;适用于在线分析处理&#xff08;OLAP&#xff09;场景。它具有高性能、可扩展性、实时更新等特点&#xff0c;适用于处理大规模数据。 特点 列式存储&#x…

【数据结构】二叉树(一)遍历

导言 前面以及有了堆的基础&#xff0c;现在来学习二叉树。二叉树的学习和前面的数据结构很不一样&#xff0c;前面我们主要学习用数据结构储存数据&#xff0c;以及实际手搓数据结构的增删查改&#xff1b;而学习二叉树主要是为我们以后学搜索二叉树以及后面的AVL树等数据结构…

Android中的View绘制流程

Android中的View绘制流程是一个复杂而精细的过程&#xff0c;它确保了应用程序中的用户界面能够准确、高效地呈现在用户眼前。以下将详细阐述Android View的绘制流程&#xff0c;包括测量&#xff08;Measure&#xff09;、布局&#xff08;Layout&#xff09;和绘制&#xff0…

2024.10.10计算机外部设备及调试培训

授课老师&#xff1a;杨戬 1.计算机组成 cpu&#xff0c;主板&#xff0c;内存&#xff0c;硬盘&#xff0c;电源&#xff0c;显示器&#xff0c;键盘和鼠标&#xff0c;光驱和显卡&#xff0c;其他外部设备。 2.虚拟机专业版转换 由于我们在2024.10.8的培训中已经安装了wi…

GPT4o,GPTo1-preview, 拼

兄弟们GPT刚开的 需要上车的扣&#xff0c;工作用 大家一起PIN分摊点压力。 在当今数字化的时代&#xff0c;程序员这一职业已经从幕后走到了前台&#xff0c;成为推动科技进步和社会变革的关键力量。编写代码、解决问题、不断学习新技术&#xff0c;程序员们的日常充满了挑战与…

React基础知识

说明&#xff1a;react版本为 18.3.1 React是什么 React由Meta公司研发&#xff0c;是一个用于构建Web和原生交互界面的库。&#xff08;开发基于浏览器的web应用和基于mac和android的移动应用&#xff09;React的优势 1.相较于传统基于DOM开发的优势&#xff1a;组件化的开…

【物流配送中心选址问题】基于退火算法混合粒子群算法

课题名称&#xff1a; 基于退火算法混合粒子群算法的物流配送中心选址问题 改进方向&#xff1a;模拟退火算法优化粒子群算法 代码获取方式&#xff08;付费&#xff09;&#xff1a; 模型说明&#xff1a; 待补充 Matlab仿真结果&#xff1a; 1. 模型优化后的仿真结果 2…