目录
一、引入依赖
二、配置文件
三、生产者
四、消费者
五、结果
一、引入依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version>
</dependency>
二、配置文件
#Rocketmq配置
rocketmq.name-server=192.168.11.99:9876
# 必须指定生产者组
rocketmq.producer.group=group01
# 消息发送超时时长,默认3s
rocketmq.producer.send-message-timeout=3000
# 同步发送消息失败重试次数,默认2
rocketmq.producer.retry-times-when-send-failed=3
# 异步发送消息失败重试次数,默认2
rocketmq.producer.retry-times-when-send-async-failed=3
三、生产者
package com.beiyou.rocket.provider;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;@Service
@Slf4j
public class Provider1 {/*** 生产者*/@Autowiredprivate RocketMQTemplate rocketTemplate;public void send(String msg) {Message<String> build = MessageBuilder.withPayload(msg).build();// 发送消息rocketTemplate.convertAndSend("topic_01", build);}
}
四、消费者
package com.beiyou.rocket.consumer;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;@Component
@RocketMQMessageListener(topic = "topic_01", consumerGroup = "group_205")
public class Consumer1 implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("消费者收到了消息: " + message);}
}
五、结果
六、.延迟消息
RocketMQ支持指定级别的延迟消息,即只能设置预设的几个时间等级的延迟,而不是任意时间延迟。目前RocketMQ社区版并不支持任意时间的精确延迟,RocketMQ在4.x版本只能够支持18种内置的延迟消息(1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h),具体实现方式是在发送消息时设置消息的延迟等级。
@Autowiredprivate RocketMQTemplate rocketMQTemplate;public void sendMessage(String topic, String message) {Message<String> msg = MessageBuilder.withPayload(message).build();rocketMQTemplate.syncSend(topic,msg,3000,1);log.info("发送成功");}
七、死信队列
@Service
@Slf4j
@RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group}", topic = "topic_01")
public class Consumer implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {@Overridepublic void onMessage(String message) {System.out.println("Received message: " + message);log.info("Received message: " + message);;throw new RuntimeException("test");}@Overridepublic void prepareStart(DefaultMQPushConsumer consumer) {// 设置最大重试次数consumer.setMaxReconsumeTimes(2);// 如下,设置其它consumer相关属性consumer.setPullBatchSize(16);}
}