【项目实践04】【RocketMQ消息收发拦截器】

文章目录

  • 一、前言
  • 二、项目背景
  • 三、实现方案
    • 1. 关键接口
    • 2. 消息发送方
    • 3. 消息消费方
    • 4. 配置引入类
    • 5. 使用示例
  • 四、思路扩展
    • 1. 消费流程简介


一、前言

本系列用来记录一些在实际项目中的小东西,并记录在过程中想到一些小东西,因为是随笔记录,所以内容不会过于详细。

二、项目背景

原本的目的是想实现 MQ 消息之间的 TraceId 追踪。如下两个拦截器可以实现 MQ 消息之间 TraceId 的传递,不过项目后面转用 TLog 。(但是写都写了,不能浪费

三、实现方案

1. 关键接口

  1. 消息发送方拦截器,当RocketMQ 消息发送的时候会调用对应的方法进行拦截,实现该接口可进行消息发送前后的扩展。

    public interface MqSendInterceptor extends Ordered, Comparable<MqSendInterceptor> {/*** 消息发送前处理** @param message*/default void preHandle(Message message) {}/*** 消息发送后处理** @param destination* @param payload* @param e*/default void afterHandle(String destination, org.springframework.messaging.Message<?> payload, Exception e) {}/*** 拦截器优先级** @return*/@Overridedefault int getOrder() {return Integer.MIN_VALUE;}/*** 拦截器排序** @param o* @return*/@Overridedefault int compareTo(MqSendInterceptor o) {return getOrder() - o.getOrder();}
    }
    
  2. 消息消费方拦截器,当RocketMQ 消息接收消费的时候会调用对应的方法进行拦截,实现该接口可进行消息消费前后的扩展。

    public interface MqReceiveInterceptor extends Ordered, Comparable<MqReceiveInterceptor> {/*** 消息预处理** @param conusmerGroup* @param messageExt* @return*/void preHandle(String conusmerGroup, MessageExt messageExt);/*** 消息后处理** @param consumerGroup* @param messageExt*/void afterHandle(String consumerGroup, MessageExt messageExt, Exception exception);/*** 拦截器优先级** @return*/@Overridedefault int getOrder() {return Integer.MIN_VALUE;}/*** 拦截器排序** @param o* @return*/@Overridedefault int compareTo(MqReceiveInterceptor o) {return getOrder() - o.getOrder();}
    }
    

2. 消息发送方

消息发送的扩展逻辑实现其实很简单,直接继承的了基本的 RocketMQTemplate ,对一些方法进行了重写。重写的逻辑是在消息发送前后会调用 MqSendInterceptor 的对应方法即可。具体代码如下

@Slf4j
public class RocketMQPlusTemplate extends RocketMQTemplate implements InitializingBean {@Autowired(required = false)private List<MqSendInterceptor> mqSendInterceptors;@Overridepublic void afterPropertiesSet() throws Exception {if (mqSendInterceptors == null) {mqSendInterceptors = Lists.newArrayListWithCapacity(0);}Collections.sort(mqSendInterceptors);super.afterPropertiesSet();}@Overridepublic <T> T sendAndReceive(String destination, Message<?> message, Type type,String hashKey, long timeout, int delayLevel) {return handleMsgSupplier(() -> {if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {log.error("send request message failed. destination:{}, message is null ", destination);throw new IllegalArgumentException("`message` and `message.payload` cannot be null");}try {org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);if (delayLevel > 0) {rocketMsg.setDelayTimeLevel(delayLevel);}MessageExt replyMessage;if (Objects.isNull(hashKey) || hashKey.isEmpty()) {replyMessage = (MessageExt) getProducer().request(rocketMsg, timeout);} else {replyMessage = (MessageExt) getProducer().request(rocketMsg, getMessageQueueSelector(), hashKey, timeout);}return replyMessage != null ? (T) doConvertMessage(replyMessage, type) : null;} catch (Exception e) {log.error("send request message failed. destination:{}, message:{} ", destination, message);throw new MessagingException(e.getMessage(), e);}}, destination, message);}@Overridepublic void sendAndReceive(String destination, Message<?> message,RocketMQLocalRequestCallback rocketMQLocalRequestCallback, String hashKey, long t, int delayLevel) {handleMsgRunnable(() -> {if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {log.error("send request message failed. destination:{}, message is null ", destination);throw new IllegalArgumentException("`message` and `message.payload` cannot be null");}try {long timeout = t;org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);if (delayLevel > 0) {rocketMsg.setDelayTimeLevel(delayLevel);}if (timeout <= 0) {timeout = getProducer().getSendMsgTimeout();}RequestCallback requestCallback = null;if (rocketMQLocalRequestCallback != null) {requestCallback = new RequestCallback() {@Overridepublic void onSuccess(org.apache.rocketmq.common.message.Message message) {rocketMQLocalRequestCallback.onSuccess(doConvertMessage((MessageExt) message, getMessageType(rocketMQLocalRequestCallback)));}@Overridepublic void onException(Throwable e) {rocketMQLocalRequestCallback.onException(e);}};}if (Objects.isNull(hashKey) || hashKey.isEmpty()) {getProducer().request(rocketMsg, requestCallback, timeout);} else {getProducer().request(rocketMsg, getMessageQueueSelector(), hashKey, requestCallback, timeout);}} catch (Exception e) {log.error("send request message failed. destination:{}, message:{} ", destination, message);throw new MessagingException(e.getMessage(), e);}}, destination, message);}@Overridepublic <T extends Message> SendResult syncSend(String destination, Collection<T> messages, long timeout) {return handleMsgsSupplier(() -> {if (Objects.isNull(messages) || messages.size() == 0) {log.error("syncSend with batch failed. destination:{}, messages is empty ", destination);throw new IllegalArgumentException("`messages` can not be empty");}try {long now = System.currentTimeMillis();Collection<org.apache.rocketmq.common.message.Message> rmqMsgs = new ArrayList<>();for (Message msg : messages) {if (Objects.isNull(msg) || Objects.isNull(msg.getPayload())) {log.warn("Found a message empty in the batch, skip it");continue;}rmqMsgs.add(this.createRocketMqMessage(destination, msg));}SendResult sendResult = getProducer().send(rmqMsgs, timeout);long costTime = System.currentTimeMillis() - now;if (log.isDebugEnabled()) {log.debug("send messages cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());}return sendResult;} catch (Exception e) {log.error("syncSend with batch failed. destination:{}, messages.size:{} ", destination, messages.size());throw new MessagingException(e.getMessage(), e);}}, destination, messages);}@Overridepublic SendResult syncSendOrderly(String destination, Message<?> message, String hashKey, long timeout) {return handleMsgSupplier(() -> {if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {log.error("syncSendOrderly failed. destination:{}, message is null ", destination);throw new IllegalArgumentException("`message` and `message.payload` cannot be null");}try {long now = System.currentTimeMillis();org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);SendResult sendResult = getProducer().send(rocketMsg, getMessageQueueSelector(), hashKey, timeout);long costTime = System.currentTimeMillis() - now;if (log.isDebugEnabled()) {log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());}return sendResult;} catch (Exception e) {log.error("syncSendOrderly failed. destination:{}, message:{} ", destination, message);throw new MessagingException(e.getMessage(), e);}}, destination, message);}@Overridepublic SendResult syncSend(String destination, Message<?> message, long timeout, int delayLevel) {return handleMsgSupplier(() -> {if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {log.error("syncSend failed. destination:{}, message is null ", destination);throw new IllegalArgumentException("`message` and `message.payload` cannot be null");}try {long now = System.currentTimeMillis();org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);if (delayLevel > 0) {rocketMsg.setDelayTimeLevel(delayLevel);}SendResult sendResult = getProducer().send(rocketMsg, timeout);long costTime = System.currentTimeMillis() - now;if (log.isDebugEnabled()) {log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());}return sendResult;} catch (Exception e) {log.error("syncSend failed. destination:{}, message:{} ", destination, message);throw new MessagingException(e.getMessage(), e);}}, destination, message);}@Overridepublic void asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout,int delayLevel) {handleMsgRunnable(() -> {if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {log.error("asyncSend failed. destination:{}, message is null ", destination);throw new IllegalArgumentException("`message` and `message.payload` cannot be null");}try {org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);if (delayLevel > 0) {rocketMsg.setDelayTimeLevel(delayLevel);}getProducer().send(rocketMsg, sendCallback, timeout);} catch (Exception e) {log.info("asyncSend failed. destination:{}, message:{} ", destination, message);throw new MessagingException(e.getMessage(), e);}}, destination, message);}@Overridepublic void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback,long timeout) {handleMsgRunnable(() -> {if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {log.error("asyncSendOrderly failed. destination:{}, message is null ", destination);throw new IllegalArgumentException("`message` and `message.payload` cannot be null");}try {org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);getProducer().send(rocketMsg, getMessageQueueSelector(), hashKey, sendCallback, timeout);} catch (Exception e) {log.error("asyncSendOrderly failed. destination:{}, message:{} ", destination, message);throw new MessagingException(e.getMessage(), e);}}, destination, message);}@Overridepublic void sendOneWay(String destination, Message<?> message) {handleMsgRunnable(() -> {if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {log.error("sendOneWay failed. destination:{}, message is null ", destination);throw new IllegalArgumentException("`message` and `message.payload` cannot be null");}try {org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);getProducer().sendOneway(rocketMsg);} catch (Exception e) {log.error("sendOneWay failed. destination:{}, message:{} ", destination, message);throw new MessagingException(e.getMessage(), e);}}, destination, message);}@Overridepublic void sendOneWayOrderly(String destination, Message<?> message, String hashKey) {handleMsgRunnable(() -> {if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {log.error("sendOneWayOrderly failed. destination:{}, message is null ", destination);throw new IllegalArgumentException("`message` and `message.payload` cannot be null");}try {org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);getProducer().sendOneway(rocketMsg, getMessageQueueSelector(), hashKey);} catch (Exception e) {log.error("sendOneWayOrderly failed. destination:{}, message:{}", destination, message);throw new MessagingException(e.getMessage(), e);}}, destination, message);}@Overridepublic TransactionSendResult sendMessageInTransaction(final String destination,final Message<?> message, final Object arg) throws MessagingException {return handleMsgSupplier(() -> {try {if (((TransactionMQProducer) getProducer()).getTransactionListener() == null) {throw new IllegalStateException("The rocketMQTemplate does not exist TransactionListener");}org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);return getProducer().sendMessageInTransaction(rocketMsg, arg);} catch (MQClientException e) {throw RocketMQUtil.convert(e);}}, destination, message);}protected org.apache.rocketmq.common.message.Message createRocketMqMessage(String destination, Message<?> message) {Message<?> msg = this.doConvert(message.getPayload(), message.getHeaders(), null);final org.apache.rocketmq.common.message.Message rocketMessage =RocketMQUtil.convertToRocketMessage(getMessageConverter(), getCharset(),destination, msg);preHandle(rocketMessage);return rocketMessage;}private Object doConvertMessage(MessageExt messageExt, Type type) {if (Objects.equals(type, MessageExt.class)) {return messageExt;} else if (Objects.equals(type, byte[].class)) {return messageExt.getBody();} else {String str = new String(messageExt.getBody(), Charset.forName(getCharset()));if (Objects.equals(type, String.class)) {return str;} else {// If msgType not string, use objectMapper change it.try {if (type instanceof Class) {//if the messageType has not Generic Parameterreturn this.getMessageConverter().fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) type);} else {//if the messageType has Generic Parameter, then use SmartMessageConverter#fromMessage with third parameter "conversionHint".//we have validate the MessageConverter is SmartMessageConverter in this#getMethodParameter.return ((SmartMessageConverter) this.getMessageConverter()).fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) ((ParameterizedType) type).getRawType(), null);}} catch (Exception e) {log.error("convert failed. str:{}, msgType:{}", str, type);throw new RuntimeException("cannot convert message to " + type, e);}}}}private Type getMessageType(RocketMQLocalRequestCallback rocketMQLocalRequestCallback) {Class<?> targetClass = AopProxyUtils.ultimateTargetClass(rocketMQLocalRequestCallback);Type matchedGenericInterface = null;while (Objects.nonNull(targetClass)) {Type[] interfaces = targetClass.getGenericInterfaces();if (Objects.nonNull(interfaces)) {for (Type type : interfaces) {if (type instanceof ParameterizedType && (Objects.equals(((ParameterizedType) type).getRawType(), RocketMQLocalRequestCallback.class))) {matchedGenericInterface = type;break;}}}targetClass = targetClass.getSuperclass();}if (Objects.isNull(matchedGenericInterface)) {return Object.class;}Type[] actualTypeArguments = ((ParameterizedType) matchedGenericInterface).getActualTypeArguments();if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {return actualTypeArguments[0];}return Object.class;}/*** 消息** @param runnable* @param destination* @param message*/private void handleMsgRunnable(Runnable runnable, String destination, Message<?> message) {Exception exception = null;try {runnable.run();} catch (Exception e) {exception = e;throw e;} finally {afterHandle(destination, message, exception);}}/*** 消息处理** @param supplier* @param destination* @param message* @return*/private <R> R handleMsgSupplier(Supplier<R> supplier, String destination, Message<?> message) {Exception exception = null;try {return supplier.get();} catch (Exception e) {exception = e;throw e;} finally {afterHandle(destination, message, exception);}}/*** 消息** @param supplier* @param destination* @param messages* @param <T>* @param <P>* @return*/private <T, P extends Message<?>> T handleMsgsSupplier(Supplier<T> supplier, String destination, Collection<P> messages) {Exception exception = null;try {return supplier.get();} catch (Exception e) {exception = e;throw e;} finally {for (Message<?> message : messages) {afterHandle(destination, message, exception);}}}/*** 消息前置处理** @param message*/protected void preHandle(org.apache.rocketmq.common.message.Message message) {if (!Boolean.TRUE.toString().equalsIgnoreCase(message.getUserProperty(MqContants.NOT_INTERCEPT))) {for (MqSendInterceptor mqSendInterceptor : mqSendInterceptors) {mqSendInterceptor.preHandle(message);}}}/*** 消息后置处理** @param destination* @param message* @param exception*/protected void afterHandle(String destination, Message<?> message, Exception exception) {if (!Boolean.TRUE.equals(message.getHeaders().get(MqContants.NOT_INTERCEPT))) {for (MqSendInterceptor mqSendInterceptor : mqSendInterceptors) {mqSendInterceptor.afterHandle(destination, message, exception);}}log.warn("[mq发送][topic = {}, payload = {}, exception = {}]", destination, message.getPayload(), exception);}
}

3. 消息消费方

消息消费方的实现会更加复杂一点,通过 BeanPostProcessor 接口在 DefaultRocketMQListenerContainer 对象创建的时候为其创建代理对象。如下:

public class RocketMqBeanPostProcessor implements InstantiationAwareBeanPostProcessor, InitializingBean {@Autowired(required = false)private List<MqReceiveInterceptor> mqReceiveInterceptors;/*** 获取消费者组** @param container* @return*/private static String getConusmerGroup(DefaultRocketMQListenerContainer container) {String consumerGroup = null;RocketMQListener<?> rocketMQListener = container.getRocketMQListener();if (rocketMQListener != null) {final RocketMQMessageListener annotation =rocketMQListener.getClass().getAnnotation(RocketMQMessageListener.class);if (annotation != null) {consumerGroup = annotation.consumerGroup();}} else {RocketMQReplyListener<?, ?> rocketMQReplyListener = container.getRocketMQReplyListener();if (rocketMQReplyListener != null) {final RocketMQMessageListener annotation =rocketMQReplyListener.getClass().getAnnotation(RocketMQMessageListener.class);if (annotation != null) {consumerGroup = annotation.consumerGroup();}}}return consumerGroup;}@Overridepublic void afterPropertiesSet() {if (mqReceiveInterceptors == null) {mqReceiveInterceptors = Lists.newArrayListWithCapacity(0);}Collections.sort(mqReceiveInterceptors);}@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {// 对 DefaultRocketMQListenerContainer 处理,代理 messageListener 对象// 引入RocketMq 对消息的处理是交由 DefaultRocketMQListenerContainer 中的 messageListener 对象的if (bean instanceof DefaultRocketMQListenerContainer) {final DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;final DefaultMQPushConsumer consumer = container.getConsumer();if (consumer != null) {final MessageListener messageListener = consumer.getMessageListener();if (messageListener instanceof MessageListenerOrderly|| messageListener instanceof MessageListenerConcurrently) {// 重新设置代理后的对象consumer.setMessageListener((MessageListener)createProxy(getConusmerGroup(container), messageListener));}}}return bean;}/*** 为 MessageListener 创建代理对象** @param consumerGroup* @param messageListener* @return*/private Object createProxy(String consumerGroup, MessageListener messageListener) {return Proxy.newProxyInstance(MessageListener.class.getClassLoader(), messageListener.getClass().getInterfaces(),(proxy, method, args) -> {// 代理对象判断如果调用的是 consumeMessage 方法则进行增强,调用前后调用消息拦截器。if ("consumeMessage".equals(method.getName())) {Exception exception = null;try {for (MessageExt message : (List<MessageExt>) args[0]) {if (!Boolean.TRUE.toString().equalsIgnoreCase(message.getUserProperty(MqContants.NOT_INTERCEPT))) {for (MqReceiveInterceptor mqReceiveInterceptor : mqReceiveInterceptors) {mqReceiveInterceptor.preHandle(consumerGroup, message);}}}return method.invoke(messageListener, args);} catch (Exception e) {exception = e;throw e;} finally {for (MessageExt message : (List<MessageExt>) args[0]) {if (!Boolean.TRUE.toString().equalsIgnoreCase(message.getUserProperty(MqContants.NOT_INTERCEPT))) {for (MqReceiveInterceptor mqReceiveInterceptor : mqReceiveInterceptors) {mqReceiveInterceptor.afterHandle(consumerGroup, message, exception);}}}}}return method.invoke(messageListener, args);});}}

4. 配置引入类

项目实际上是做了一个 spring-boot-starter 的jar 包供其他服务引用的因此,这里是通过自动装配将 RocketMQAutoConfig 引入,然后 RocketMQAutoConfig 通过 @Import 引入了 RocketmqConfig 类。

@Import(RocketmqConfig.class)
public class RocketMQAutoConfig {
}

其中 RocketmqConfig 的实现如下:

@Configuration
@Configuration
public class RocketmqConfig implements ApplicationContextAware {public static final String PRODUCER_BEAN_NAME = "defaultMQProducer";public static final String CONSUMER_BEAN_NAME = "defaultLitePullConsumer";private ApplicationContext applicationContext;@Value("${rocketmq.name-server}")private String nameServer;@Value("${rocketmq.producer.group:default}")private String producerGroup;@Value("${rocketmq.producer.sendMessageTimeout}")private int sendMessageTimeout;@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext;}/*** DefaultMQProducer 注入* @return*/@Bean("defaultMQProducer")public DefaultMQProducer defaultMQProducer() {DefaultMQProducer defaultMQProducer = new DefaultMQProducer();defaultMQProducer.setProducerGroup(producerGroup);defaultMQProducer.setNamesrvAddr(nameServer);defaultMQProducer.setVipChannelEnabled(false);defaultMQProducer.setSendMsgTimeout(sendMessageTimeout);return defaultMQProducer;}/*** MQ 消息转换器* @return*/@Bean@ConditionalOnProperty(prefix = "oms", name = "rocket.bak", havingValue = "true")@ConditionalOnMissingBean(RocketMQMessageConverter.class)public RocketMQMessageConverter rocketmqMessageConverter() {return new RocketMQMessageConverter();}/*** mq 扩展,支持发送前后拦截* 这里直接参考 org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration#rocketMQTemplate(org.apache.rocketmq.spring.support.RocketMQMessageConverter) 的实现** @param rocketMQMessageConverter* @return*/@Bean(destroyMethod = "destroy")@ConditionalOnProperty(prefix = "oms", name = "rocket.plus", havingValue = "true")public RocketMQTemplate rocketMQTemplate(RocketMQMessageConverter rocketMQMessageConverter) {RocketMQTemplate rocketMQTemplate = new RocketMQPlusTemplate();if (applicationContext.containsBean(PRODUCER_BEAN_NAME)) {rocketMQTemplate.setProducer((DefaultMQProducer) applicationContext.getBean(PRODUCER_BEAN_NAME));}if (applicationContext.containsBean(CONSUMER_BEAN_NAME)) {rocketMQTemplate.setConsumer((DefaultLitePullConsumer) applicationContext.getBean(CONSUMER_BEAN_NAME));}rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter());return rocketMQTemplate;}/*** mq 扩展,支持接收前后拦截** @return*/@Bean@ConditionalOnProperty(prefix = "oms", name = "rocket.plus", havingValue = "true")public RocketMqBeanPostProcessor rocketMqBeanPostProcessor() {return new RocketMqBeanPostProcessor();}}

这里为 RocketMQ 收发增强功能做了一个配置项,如果需要开启增强功能则需要再服务的配置项中增加如下配置 :

# mq 扩展功能开启
oms.rocket.plus=true

5. 使用示例

  1. 编写两个MQ 日志打印拦截器

    // 日志打印的两个拦截器
    @Slf4j
    public class LogMqSendInterceptor implements MqSendInterceptor {@Overridepublic void afterHandle(String destination, Message<?> payload, Exception e) {try {log.info("[{} 发送消息][payLoad = {}, exception = {}]",destination, JSON.toJSONString(payload.getPayload()), e);} catch (Exception ex) {log.debug("[{} 日志打印错误]", destination, ex);}}
    }@Slf4j
    public class LogMqReceiveInterceptor implements MqReceiveInterceptor {@Overridepublic void preHandle(String conusmerGroup, MessageExt messageExt) {try {log.info("[{} 收到消息][conusmerGroup = {}, message = {}]",messageExt.getTopic(), conusmerGroup, new String(messageExt.getBody()));} catch (Exception e) {log.debug("[{} 日志打印错误]", messageExt.getTopic(), e);}}@Overridepublic void afterHandle(String consumerGroup, MessageExt messageExt, Exception exception) {}
    }/******************************************/
    // 日志追踪的两个拦截器
    @Slf4j
    public class TraceMqSendInterceptor implements MqSendInterceptor {@Value("${spring.application.name:''}")private String applicationName;@Overridepublic void preHandle(Message message) {String traceId = MDC.get(RpcConstants.MDC_TRACE_ID);if (StringUtils.isBlank(traceId)) {traceId = UUID.randomUUID().toString();log.info("[发送到Mq 消息][traceId 为空,随机生成 traceId = {}]", traceId);}message.putUserProperty(RpcConstants.MDC_TRACE_ID, traceId);if (StringUtils.isNotBlank(applicationName)) {message.putUserProperty(RpcConstants.APPLICATION_NAME, applicationName);}}@Overridepublic void afterHandle(String destination, org.springframework.messaging.Message<?> message, Exception e) {}}@Slf4j
    public class TraceMqReceiveInterceptor implements MqReceiveInterceptor {@Overridepublic void preHandle(String conusmerGroup, MessageExt messageExt) {String traceId = messageExt.getUserProperty(RpcConstants.MDC_TRACE_ID);final String applicationName = messageExt.getUserProperty(RpcConstants.APPLICATION_NAME);log.info("[接收到Mq 消息][messageId = {}, traceId = {}, sendAppName = {}]",messageExt.getMsgId(), traceId, applicationName);if (StringUtils.isBlank(traceId)) {traceId = UUID.randomUUID().toString();log.info("[接收到Mq 消息][traceId 为空, messageId = {}, 随机生成 traceId = {}]",messageExt.getMsgId(), traceId);}MDC.put(RpcConstants.MDC_TRACE_ID, traceId);}@Overridepublic void afterHandle(String consumerGroup, MessageExt messageExt, Exception exception) {}
    }
    
  2. 创建MQ消息收发者

    // 消费者
    @Slf4j
    @Component
    @RocketMQMessageListener(topic = "topic01", consumerGroup = "cg01")
    public class DemoMQListener implements RocketMQListener<String> {@Autowiredprivate ApplicationEventPublisher applicationEventPublisher;@Overridepublic void onMessage(String message) {log.info("收到消息 {}", message);}}// 生产者
    @Component
    public class DemoRunner implements ApplicationRunner {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Overridepublic void run(ApplicationArguments args) throws Exception {rocketMQTemplate.send("topic01",  MessageBuilder.withPayload("hello world").build());}
    }
    
  3. 启用增强功能:服务配置中增加该属性

    oms.rocket.plus=true
    
  4. MQ 收发日志如下(我懒得开两个服务所以一个服务进行收发),收发日志如下:
    在这里插入图片描述
    可以看到:

    1. LogMqSendInterceptor 和 LogMqReceiveInterceptor 对MQ 收发日志进行了打印
    2. TraceMqSendInterceptor 和 TraceMqReceiveInterceptor 进行了 TracceId 传递:生产者由于没有获取到 TraceId 直接随机生成一个TraceId 传递,消费者接收到 TraceId 后存入上下文,当前线程再进行日志打印时携带了TraceId。

四、思路扩展

1. 消费流程简介

消息发送方的原理很简单,这里不再赘述。主要看消息接受方的原理,RocketMQ 消费者启动消费的大体流程:

  1. 服务器启动时 ListenerContainerConfiguration#afterSingletonsInstantiated 方法中会为所有被 @RocketMQMessageListener 注解修饰的对象创建一个 DefaultRocketMQListenerContainer 实例注册到容器中,并调用 DefaultRocketMQListenerContainer#start 方法启动
  2. DefaultRocketMQListenerContainer实例创建时会调用 afterPropertiesSet 方法初始化DefaultRocketMQListenerContainer#DefaultMQPushConsumer对象。在初始化的过程中会赋值messageListener 属性为 DefaultMessageListenerOrderly 或 DefaultMessageListenerConcurrently 对象。
  3. 当消费者进行消息消费时会调用 DefaultMessageListenerOrderly 或 DefaultMessageListenerConcurrently 的 consumeMessage 方法,而在该方法中会去调用我们自己注入的被 @RocketMQMessageListener 注解修饰的类的 onMessage 方法。

其中具体代码如下:

    @Overridepublic void afterSingletonsInstantiated() {// 扫描容器中所有被 RocketMQMessageListener 注解修饰的类Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class).entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));// 对消费方进行注册beans.forEach(this::registerContainer);}private void registerContainer(String beanName, Object bean) {... if (!RocketMQListener.class.isAssignableFrom(bean.getClass()) && !RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName() + " or " + RocketMQReplyListener.class.getName());}// 获取并解析注解信息RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup());String topic = this.environment.resolvePlaceholders(annotation.topic());... // 注解校验validate(annotation);// 获取当前对象在 容器中的beanName,其中通过 AtomicLong 的自增来确保 BeanName 的唯一性String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),counter.incrementAndGet());GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;// 创建 DefaultRocketMQListenerContainer 并注册到容器中。genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,() -> createRocketMQListenerContainer(containerBeanName, bean, annotation));// 获取容器中刚注册的 DefaultRocketMQListenerContainer 判断其如果没有启动则启动。DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,DefaultRocketMQListenerContainer.class);if (!container.isRunning()) {try {container.start();} catch (Exception e) {log.error("Started container failed. {}", container, e);throw new RuntimeException(e);}}log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);}// 创建 DefaultRocketMQListenerContainer private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean,RocketMQMessageListener annotation) {DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();container.setRocketMQMessageListener(annotation);String nameServer = environment.resolvePlaceholders(annotation.nameServer());nameServer = StringUtils.isEmpty(nameServer) ? rocketMQProperties.getNameServer() : nameServer;String accessChannel = environment.resolvePlaceholders(annotation.accessChannel());container.setNameServer(nameServer);if (!StringUtils.isEmpty(accessChannel)) {container.setAccessChannel(AccessChannel.valueOf(accessChannel));}container.setTopic(environment.resolvePlaceholders(annotation.topic()));String tags = environment.resolvePlaceholders(annotation.selectorExpression());if (!StringUtils.isEmpty(tags)) {container.setSelectorExpression(tags);}container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));// 将实际的 Bean 保存到 DefaultRocketMQListenerContainer#rocketMQListener 或 rocketMQReplyListener 属性中if (RocketMQListener.class.isAssignableFrom(bean.getClass())) {container.setRocketMQListener((RocketMQListener) bean);} else if (RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {container.setRocketMQReplyListener((RocketMQReplyListener) bean);}container.setMessageConverter(rocketMQMessageConverter.getMessageConverter());container.setName(name);return container;}

DefaultRocketMQListenerContainer 实现了 InitializingBean 接口,因此在其实例创建的时候会调用 afterPropertiesSet 方法,在启动会初始化 DefaultRocketMQListenerContainer #DefaultMQPushConsumer 对象,这里我们主要关注 DefaultMessageListenerOrderly 和 DefaultMessageListenerConcurrently 两个类。

    @Overridepublic void afterPropertiesSet() throws Exception {initRocketMQPushConsumer();this.messageType = getMessageType();this.methodParameter = getMethodParameter();log.debug("RocketMQ messageType: {}", messageType);}//初始化 consumer 对象private void initRocketMQPushConsumer() throws MQClientException {...// 消费模式:顺序还是并发,根据不同的模式赋值不同的 MessageListenerswitch (consumeMode) {case ORDERLY:consumer.setMessageListener(new DefaultMessageListenerOrderly());break;case CONCURRENTLY:consumer.setMessageListener(new DefaultMessageListenerConcurrently());break;default:throw new IllegalArgumentException("Property 'consumeMode' was wrong.");}... }

而在 DefaultMessageListenerConcurrently 和 DefaultMessageListenerOrderly中消息的处理是交由 rocketMQListener#onMessage 来处理。

public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {@SuppressWarnings("unchecked")@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt messageExt : msgs) {log.debug("received msg: {}", messageExt);try {long now = System.currentTimeMillis();handleMessage(messageExt);long costTime = System.currentTimeMillis() - now;log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);} catch (Exception e) {log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e);context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}}public class DefaultMessageListenerOrderly implements MessageListenerOrderly {@SuppressWarnings("unchecked")@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt messageExt : msgs) {log.debug("received msg: {}", messageExt);try {long now = System.currentTimeMillis();handleMessage(messageExt);long costTime = System.currentTimeMillis() - now;log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);} catch (Exception e) {log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e);context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}}return ConsumeOrderlyStatus.SUCCESS;}}private void handleMessage(MessageExt messageExt) throws MQClientException, RemotingException, InterruptedException {if (rocketMQListener != null) {rocketMQListener.onMessage(doConvertMessage(messageExt));} else if (rocketMQReplyListener != null) {Object replyContent = rocketMQReplyListener.onMessage(doConvertMessage(messageExt));Message<?> message = MessageBuilder.withPayload(replyContent).build();org.apache.rocketmq.common.message.Message replyMessage = MessageUtil.createReplyMessage(messageExt, convertToBytes(message));consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(replyMessage, new SendCallback() {@Override public void onSuccess(SendResult sendResult) {if (sendResult.getSendStatus() != SendStatus.SEND_OK) {log.error("Consumer replies message failed. SendStatus: {}", sendResult.getSendStatus());} else {log.debug("Consumer replies message success.");}}@Override public void onException(Throwable e) {log.error("Consumer replies message failed. error: {}", e.getLocalizedMessage());}});}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/727146.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Apache POI Excel的读写

1、 POI介绍 Apache POI是用Java编写的免费开源的跨平台的Java API&#xff0c;Apache POI提供API给Java程 序对Microsoft Office格式档案读和写的功能&#xff0c;其中使用最多的就是使用POI操作Excel文 件。 jxl&#xff1a;专门操作Excel maven坐标&#xff1a; POI结构:…

【信息系统项目管理师】--【信息技术发展】--【新一代信息技术及应用】--【虚拟现实】

文章目录 第二章 信息技术发展2.2 新一代信息技术及应用2.2.6 虚拟现实1.技术基础2.关键技术3.应用和发展 第二章 信息技术发展 信息技术是在信息科学的基本原理和方法下&#xff0c;获取信息、处理信息、传输信息和使用信息的应用技术总称。从信息技术的发展过程来看&#xff…

封关了,不仅封掉了港漂们方便的回家之路

封关不仅堵住了港漂回国的便捷途径&#xff0c;也堵住了港漂修理内地大大小小的电器的路。 香港当然有维修工。 听邻居说修一个漏水的水龙头要港币1000元&#xff0c;检查微波炉要港币600元。 更换主板需要4000元&#xff0c;比新机还贵。 收回坏掉的洗衣机修理费是14000元&am…

专家解读:2024年十大项目管理工具综合排名与评价

2024年涌现出一批新的项目管理工具&#xff0c;各具特色的功能和设计为企业解决了诸多的管理难题。今天我们就来盘点2024年的十款项目管理工具Zoho Projects、AgileMaster、PlanItAll、CommuniQ、WorkFlowRanger、GanttGenius、RiskAssessor、TeamHarmony、BudgetBoss、CloudCo…

Qt6.6搭建WebAssembly

1.首先安装python &#xff0c; 链接&#xff1a;https://www.python.org/ 2.下载并安装qt6. 3.克隆emsdk工程 3.1 进入emsdk目录&#xff0c;然后更新emsdk代码 3.2 下载并安装最新的SDK工具。&#xff08;C:\Qt\emsdk>emsdk install --global latest&#xff09; 3.3…

【C++】C/C++内存管理详解

个人主页 &#xff1a; zxctscl 文章封面来自&#xff1a;艺术家–贤海林 如有转载请先通知 目录 1. 前言2. C/C内存分布3. C语言中动态内存管理方式4. C中动态内存管理4.1 new/delete操作内置类型4.2 new和delete操作自定义类型 5. operator new与operator delete函数5.1 oper…

算法沉淀——动态规划之其它背包问题与卡特兰数(leetcode真题剖析)

算法沉淀——动态规划之其它背包问题与卡特兰数 二维费用的背包问题01.一和零02.盈利计划 似包非包组合总和 Ⅳ 卡特兰数不同的二叉搜索树 二维费用的背包问题 01.一和零 题目链接&#xff1a;https://leetcode.cn/problems/ones-and-zeroes/ 给你一个二进制字符串数组 strs…

【网络】主机连接 TCP 三次握手

【网络】主机连接 TCP 三次握手 一、TCP连接3次握手二、TCP连接4次挥手三、为什么tcp要三次握手&#xff0c;两次行不四、为什么TCP挥手需要4次五、Netstat命令的连接状态包括:六、练习题 一、TCP连接3次握手 1、建立连接的时候是3次握手&#xff0c;客户端向服务器端发送SYN&…

数据结构界的终极幻神----树

目录 一.数的概念和分类 种类 二.重点概念 哈希树: 二叉树的线索化 什么是线索化 为什么要线索化 特殊的查找树 完全二叉树 三.手撕完全二叉树(堆) 重点讲解 向上搜索算法 向下搜索算法 一.数的概念和分类 树&#xff08;tree&#xff09;是包含 n(n≥0) [2] 个节…

3Dmax中VR渲染太阳光渲染参数怎么设置?渲染100云渲染助力

我们用3Dmax建模时一些场景会用到太阳光&#xff0c;那么渲染参数是如何设置的呢&#xff1f; 我们一起来看看&#xff0c;直接上图 以上就是详细的参数设置&#xff0c;大家可以用做参考&#xff0c;如果本地渲染慢的朋友可以考虑使用云渲染100 机器多&#xff0c;渲染稳定不…

基于51单片机的公交ic卡系统设计

目 录 摘 要 I Abstract II 引 言 1 1 总体方案设计 3 1.1 方案选择 3 1.2 硬件选择 3 1.3 系统工作原理 4 1.4 总体方案确定 5 2 系统硬件电路设计 6 2.1 主控模块电路设计 6 2.2 电源电路设计 8 2.3 显示电路模块设计 8 2.4 报警模块电路设计 10 2.5 RC522刷卡模块 10 2.6 独…

下属OKR与上级OKR对齐时,有几种方法?

下属的OKR&#xff08;Objectives and Key Results&#xff0c;即目标与关键成果&#xff09;与上级的OKR对齐&#xff0c;是确保组织目标一致性和团队协同工作的关键步骤。以下是几种常用的对齐方法&#xff1a; 直接映射法&#xff1a;下属的OKR直接反映并支撑上级的OKR。例如…

【二】【SQL Server】如何运用SQL Server中查询设计器通关数据库期末查询大题

教学管理系统201703153 教学管理系统数据库展示 成绩表展示 课程表展示 学生表展示 院系表展示 一、基本操作 设置复合主键 设置其他表的主键 设置字段取值范围 二、简单操作 第一题 第二题 第三题 第四题 结尾 最后&#xff0c;感谢您阅读我的文章&#xff0c;希望这些内容能…

(黑马出品_04)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式

&#xff08;黑马出品_04&#xff09;SpringCloudRabbitMQDockerRedis搜索分布式 微服务技术异步通信 今日目标1.初识MQ1.1.同步和异步通讯1.1.1.同步通讯1.1.2.异步通讯 1.2.技术对比 2.快速入门2.1.安装RabbitMQ2.1.1.单机部署(1).下载镜像方式…

SICP解读指南:深度阅读 “计算机领域三巨头” 之一(文末送书)

&#x1f308;个人主页&#xff1a;聆风吟_ &#x1f525;系列专栏&#xff1a;Linux实践室、网络奇遇记 &#x1f516;少年有梦不应止于心动&#xff0c;更要付诸行动。 文章目录 &#x1f4cb;前言一. 书籍介绍1.1 SICP侧重点1.2 SICP章节介绍 二. 书籍推荐2.1 书籍介绍2.2 推…

[HackMyVM]靶场 Wild

kali:192.168.56.104 主机发现 arp-scan -l # arp-scan -l Interface: eth0, type: EN10MB, MAC: 00:0c:29:d2:e0:49, IPv4: 192.168.56.104 Starting arp-scan 1.10.0 with 256 hosts (https://github.com/royhills/arp-scan) 192.168.56.1 0a:00:27:00:00:05 …

从安卓转战月薪6万的鸿蒙原来这么简单

近年来&#xff0c;各家大厂正在积极布局鸿蒙客户端开发&#xff0c;鸿蒙操作系统备受瞩目&#xff0c;不少安卓开发者纷纷转战鸿蒙&#xff0c;并取得了可观的经济回报。本文将为大家揭示&#xff0c;从安卓转战鸿蒙并获得月薪6万的简单之道&#xff0c;希望能给正在考虑转型的…

YOLOSHOW - YOLOv5 / YOLOv7 / YOLOv8 / YOLOv9 基于 Pyside6 的图形化界面

YOLOSHOW 是一个基于 PySide6&#xff08;Qt for Python&#xff09;开发的图形化界面应用程序&#xff0c;主要用于集成和可视化YOLO系列&#xff08;包括但不限于YOLOv5、YOLOv7、YOLOv8、YOLOv9&#xff09;的目标检测模型。YOLOSHOW 提供了一个用户友好的交互界面&#xff…

POS 之 最终确定性

Gasper Casper 是一种能将特定区块更新为 最终确定 状态的机制&#xff0c;使网络的新加入者确信他们正在同步规范链。当区块链出现多个分叉时&#xff0c;分叉选择算法使用累计投票来确保节点可以轻松选择正确的分叉。 最终确定性 最终确定性是某些区块的属性&#xff0c;意味…

Ajax、Axios、Vue、Element与其案例

目录 一.Ajax 二.Axios 三.Vue 四.Element 五.增删改查案例 一.依赖&#xff1a;数据库&#xff0c;mybatis&#xff0c;servlet&#xff0c;json-对象转换器 二.资源&#xff1a;elementvueaxios 三.pojo 四.mapper.xml与mapper接口 五.service 六.servlet 七.html页…