引入依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.1.1</version></dependency>
1. 定义RocketMQTemplateService工具类
@Service
public class RocketMQTemplateService {private static Logger logger = LoggerFactory.getLogger(RocketMQTemplateService.class);@Autowiredprivate RocketMQTemplate rocketMQTemplate;/*** @Description: 功能描述* 当发送的消息不重要时,采用OneWay方式,以提升吞吐量,一般用户日志存储* * @param: 参数描述* @param destination* @param payload* @throws: 异常描述*/public void sendOneWay(String destination, Object payload) {rocketMQTemplate.sendOneWay(destination, payload);}/*** @Description: 功能描述* 默认使用同步发送syncSend, 但拿不到回执;convertAndSend和send等价* * @param: 参数描述* @param destination* @param payload* @throws: 异常描述**/public void convertAndSend(String destination, Object payload) {rocketMQTemplate.convertAndSend(destination, payload);}/*** @Description: 功能描述* 同步发送,需设置延迟等级* * @param: 参数描述* @param destination* @param message* @param timeout* @param delayLevel* @return* @throws: 异常描述**/public SendResult syncSend(String destination, Object message, long timeout, int delayLevel) {return rocketMQTemplate.syncSend(destination, MessageBuilder.withPayload(message).build(), timeout, delayLevel);}/*** @Description: 功能描述* 异步发送,需设置延迟等级* * @param: 参数描述* @param destination* @param message* @param timeout* @param delayLevel* @throws: 异常描述**/public void asyncSend(String destination, Object message, long timeout, int delayLevel) {rocketMQTemplate.asyncSend(destination, MessageBuilder.withPayload(message).build(), new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {}@Overridepublic void onException(Throwable e) {logger.error("rocketmq发送异步消息异常:destination = " + destination + ";timeout = " + timeout + ";delayLevel = " + delayLevel + ";message = " + message + ";error = " + e.getMessage());}}, timeout, delayLevel);}/*** @Description: 功能描述* 异步发送,需设置延迟等级* * @param: 参数描述* @param destination* @param message* @param sendCallback* @param timeout* @param delayLevel* @throws: 异常描述**/public void asyncSend(String destination, Object message, SendCallback sendCallback, long timeout, int delayLevel) {rocketMQTemplate.asyncSend(destination, MessageBuilder.withPayload(message).build(), sendCallback, timeout, delayLevel);}
部分适用场景
当发送的消息不重要时,采用one-way方式,以提升吞吐量
当发送的消息很重要时,且对响应时间不敏感的时候采用sync方式
当发送的消息很重要时,且对响应时间很是敏感的时候采用async方式
2. 使用
@Autowiredprivate RocketMQTemplateService rocketMQTemplateService;public void 方法名(参数 参数) {// 以异步消息为例rocketMQTemplateService.asyncSend("TopicDetailId", 参数, 300000, RocketMqDelayLevel.getDelayLevel(30));}
监听类
@Component
@RocketMQMessageListener(topic = BizRocketMq.TopicDetailId,consumeMode = ConsumeMode.CONCURRENTLY,//分为:有序模式和无序模式,设置为无序模式(并发模式)messageModel = MessageModel.CLUSTERING,//分为:集群模式和广播模式;设置为集群模式;广播模式只能与无序模式匹配设置,并且广播模式只执行一次,切记切记!consumerGroup = BizRocketMq.GroupDetailId)
public class JiFenExChangeServiceListener implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener {@Overridepublic void onMessage(MessageExt messageExt) {你的业务逻辑}
}
消息中两id必须一致