延迟发送消息生产者配置
如下所示:@Bean注解向Spring容器注入一个名字叫delayOrderProducerBean、类型为OrderProducerBean 的对象(下文需要用到)
@Configuration
public class DelayProducerClient {@Autowiredprivate RocketMqDelayProperties rocketMqDelayProperties;@Lazy@Bean(name = "delayOrderProducerBean",initMethod = "start", destroyMethod = "shutdown")public OrderProducerBean buildDelayOrderProducer() {OrderProducerBean orderProducerBean = new OrderProducerBean();orderProducerBean.setProperties(rocketMqDelayProperties.getProducerProperties());return orderProducerBean;}
}
@Autowiredprivate RocketMqDelayProperties rocketMqDelayProperties;@Qualifier("delayOrderProducerBean")@Autowiredprivate OrderProducerBean delayOrderProducerBean;public void sendDelayMQMessage(DelayBaseBo delayBaseBo) {String jsonBody = JsonLUtils.toJSon(delayBaseBo.getDelayMqSub());log.info("shardingKey = {},tag = {},jsonBody = {}", delayBaseBo.getShardingKey(), delayBaseBo.getTag(), jsonBody);// 发送消息,只要不抛异常就是成功try {Message msg = new Message(// Message所属的TopicrocketMqDelayProperties.getTopic(),// Message Tag 可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在MQ服务器过滤delayBaseBo.getTag(),// msg key 一般为业务主键delayBaseBo.getShardingKey(),// Message Body 可以是任何二进制形式的数据, MQ不做任何干预// 需要Producer与Consumer协商好一致的序列化和反序列化方式jsonBody.getBytes());msg.setStartDeliverTime(LocalDateLUtils.getEpochMilli(delayBaseBo.getDelayTime()));SendResult sendResult = delayOrderProducerBean.send(msg, delayBaseBo.getShardingKey());assert sendResult != null;log.info("event消息发送成功,topic = {},key = {},messageId = {}", sendResult.getTopic(), delayBaseBo.getShardingKey(), sendResult.getMessageId());} catch (ONSClientException e) {//日志记录错误信息与入参信息log.error("event消息发送失败,shardingKey = {} ,jsonBody = {} , 异常信息:", delayBaseBo.getShardingKey(), jsonBody, e);}}
可以看到,延迟发送消息和实时发送消息的方法差不多,只不过多了一个msg.setStartDeliverTime()参数。