目录
- 一、简介
- 1.1、特点
- 1.2、场景
- 二、Maven依赖
- 三、application配置
- 四、生产者
- 4.1、测试类
- 4.2、运行结果
一、简介
RocketMQ 提供了一种单向发送消息的方法,在这种模式下,生产者只负责尽快地发送消息,而不需要关心消息是否被Broker接收,也不会收到任何消息发送结果的响应。这个方法就是RocketMQTemplate 的 sendOneWay。
1.1、特点
使用sendOneWay方法的特点如下:
- 发送速度极快 由于无需等待Broker的响应确认,生产者可以连续不断地发送消息,发送效率极高。
- 无法保证可靠性 生产者无法知道消息是否已成功发送到Broker,也无法重试发送失败的消息,可能会导致消息丢失。
- 返回值为void 该方法直接返回void,不会像同步或异步发送那样返回SendResult。
- 调用方式简单 调用方式非常简单,只需指定消息和Topic即可,无需额外参数。
- 吞吐量极高 生产者无需等待Broker响应,可以在极短时间内发送大量消息,适用于对吞吐量有极高要求的场景。
1.2、场景
一般来说,sendOneWay模式适用于以下场景:
- 允许出现少量消息丢失的场景,例如日志收集等。
- 对发送吞吐量和延迟要求极高的场景,例如需要在毫秒级别完成消息发送。
但在绝大多数正式场景中,由于无法保证消息可靠传输,通常不推荐使用这种发送模式。如果对消息可靠性有一定要求,建议使用同步或异步可靠发送模式。不管怎么样,我们还是看看怎么使用的吧。
二、Maven依赖
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>rocketmq</artifactId><groupId>com.alian</groupId><version>1.0.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>03-send-one-way-message</artifactId><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>com.alian</groupId><artifactId>common-rocketmq-dto</artifactId><version>1.0.0-SNAPSHOT</version></dependency></dependencies></project>
父工程已经在我上一篇文章里,通用公共包也在我上一篇文章里有说明,包括消费者。具体参考:RocketMQ笔记(一)SpringBoot整合RocketMQ发送同步消息
三、application配置
application.properties
server.port=8003# rocketmq地址
rocketmq.name-server=192.168.0.234:9876
# 默认的生产者组
rocketmq.producer.group=oneway_group
# 发送同步消息超时时间
rocketmq.producer.send-message-timeout=3000
# 用于设置在消息发送失败后,生产者是否尝试切换到下一个服务器。设置为 true 表示启用,在发送失败时尝试切换到下一个服务器
rocketmq.producer.retry-next-server=true
# 用于指定消息发送失败时的重试次数
rocketmq.producer.retry-times-when-send-failed=3
# 设置消息压缩的阈值,为0表示禁用消息体的压缩
rocketmq.producer.compress-message-body-threshold=0
四、生产者
由于这种单向消息不是很推荐,所有测试我们也简单测试吧。
4.1、测试类
SendOnewayMessageTest.java
@Slf4j
@SpringBootTest
public class SendOnewayMessageTest {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Testpublic void onewaySendStringMessage() {String topic = "string_message_topic";String message = "我是一条单向消息:onewaySendStringMessage";rocketMQTemplate.sendOneWay(topic, message);}@Testpublic void onewaySendStringMessageWithBuilder() {String topic = "string_message_topic";String message = "我是一条单向消息:onewaySendStringMessageWithBuilder";Message<String> msg = MessageBuilder.withPayload(message).build();rocketMQTemplate.sendOneWay(topic, msg);}@Testpublic void onewaySendJsonMessage() {String topic = "json_message_topic";JSONObject json = new JSONObject();json.put("nick", "单向消息");json.put("name", "Alian");json.put("age", "28");json.put("hobby", "java");rocketMQTemplate.sendOneWay(topic, json);}@Testpublic void onewaySendJsonMessageWithBuilder() {String topic = "json_message_topic";JSONObject json = new JSONObject();json.put("nick", "单向消息");json.put("name", "Alian");json.put("age", "28");json.put("hobby", "java");Message<JSONObject> msg = MessageBuilder.withPayload(json).build();rocketMQTemplate.sendOneWay(topic, msg);}@Testpublic void onewaySendJavaObjectMessage() {String topic = "java_object_message_topic";Member member = new Member();member.setId(10086L);member.setMemberName("Alian");member.setAge(28);member.setBirthday(new Date());rocketMQTemplate.sendOneWay(topic, member);}@Testpublic void onewaySendJavaObjectMessageWithBuilder() {String topic = "java_object_message_topic";Member member = new Member();member.setId(10086L);member.setMemberName("Alian");member.setAge(28);member.setBirthday(new Date());Message<Member> msg = MessageBuilder.withPayload(member).build();rocketMQTemplate.sendOneWay(topic, msg);}@AfterEachpublic void waiting() {try {Thread.sleep(3000L);} catch (InterruptedException e) {e.printStackTrace();}}
}
4.2、运行结果
2024-03-11 14:45:28.599 INFO 9232 --- [P_JAVA_OBJECT_1] c.a.c.JavaObjectMessageConsumer : java对象消费者接收到的消息: Member(id=10086, memberName=Alian, age=28, birthday=Mon Mar 11 14:45:28 CST 2024)
2024-03-11 14:45:31.596 INFO 9232 --- [P_JAVA_OBJECT_2] c.a.c.JavaObjectMessageConsumer : java对象消费者接收到的消息: Member(id=10086, memberName=Alian, age=28, birthday=Mon Mar 11 14:45:31 CST 2024)
2024-03-11 14:45:34.626 INFO 9232 --- [NT_GROUP_JSON_5] c.alian.concurrent.JsonMessageConsumer : json消费者接收到的消息: {"nick":"单向消息","name":"Alian","age":"28","hobby":"java"}
2024-03-11 14:45:37.636 INFO 9232 --- [_GROUP_STRING_1] c.a.concurrent.StringMessageConsumer : 字符串消费者接收到的消息: 我是一条单向消息:onewaySendStringMessage
2024-03-11 14:45:40.656 INFO 9232 --- [NT_GROUP_JSON_6] c.alian.concurrent.JsonMessageConsumer : json消费者接收到的消息: {"nick":"单向消息","name":"Alian","age":"28","hobby":"java"}
2024-03-11 14:45:43.666 INFO 9232 --- [_GROUP_STRING_2] c.a.concurrent.StringMessageConsumer : 字符串消费者接收到的消息: 我是一条单向消息:onewaySendStringMessageWithBuilder