RocketMQ源码阅读-九-自定义过滤规则Flitersrv
- 什么是Filtersrv
- Filtersrv注册到Broker
- 过滤类
- Consumer发起订阅设置过滤类代码
- Consumer上传过滤类代码
- Flitersrv编译过滤类代码
- 过滤消息
- Consumer 从 Filtersrv 拉取消息
- Flitersrv从Broker拉取消息
- Flitersrv的高可用
- 总结
什么是Filtersrv
Filtersrv ,负责自定义规则过滤 Consumer 从 Broker 拉取的消息。
其在系统中的位置如下图:
Filtersrv优点:
- 减少了 Broker 的负担
- 减少了 Consumer 接收无用的消息
缺点:
- 多了一层 Filtersrv 网络开销
Filtersrv注册到Broker
Flitersrv与Broker的对应关系为:
- 一个Flitersrv对应一个Broker
- 一个Broker对应多个Flitersrv
Flitersrv的高可用:
- 启动多个Flitersrv
- Flitersrv注册失败会自动退出
Flitersrv注册到Broker的核心代码在FiltersrvController类中,其初始化源码如下:
public boolean initialize() {MixAll.printObjectProperties(log, this.filtersrvConfig);this.remotingServer = new NettyRemotingServer(this.nettyServerConfig);this.remotingExecutor =Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(),new ThreadFactoryImpl("RemotingExecutorThread_"));this.registerProcessor();// 固定间隔注册到Brokerthis.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {FiltersrvController.this.registerFilterServerToBroker();}}, 15, 10, TimeUnit.SECONDS);this.defaultMQPullConsumer.setBrokerSuspendMaxTimeMillis(this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis() - 1000);this.defaultMQPullConsumer.setConsumerTimeoutMillisWhenSuspend(this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() - 1000);this.defaultMQPullConsumer.setNamesrvAddr(this.filtersrvConfig.getNamesrvAddr());this.defaultMQPullConsumer.setInstanceName(String.valueOf(UtilAll.getPid()));return true;
}
调用了FiltersrvController#registerFilterServerToBroker:
public void registerFilterServerToBroker() {try {RegisterFilterServerResponseHeader responseHeader =this.filterServerOuterAPI.registerFilterServerToBroker(this.filtersrvConfig.getConnectWhichBroker(), this.localAddr());this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper().setDefaultBrokerId(responseHeader.getBrokerId());if (null == this.brokerName) {this.brokerName = responseHeader.getBrokerName();}log.info("register filter server<{}> to broker<{}> OK, Return: {} {}",this.localAddr(),this.filtersrvConfig.getConnectWhichBroker(),responseHeader.getBrokerName(),responseHeader.getBrokerId());} catch (Exception e) {log.warn("register filter server Exception", e);log.warn("access broker failed, kill oneself");System.exit(-1); // 异常退出}
}
此方法会去 注册Filtersrv 到 Broker,如果注册失败,会关闭Filtersrv。
其中注册Flitersrv到Broker调用的FilterServerOuterAPI的registerFilterServerToBroker方法:
public RegisterFilterServerResponseHeader registerFilterServerToBroker(final String brokerAddr,final String filterServerAddr
) throws RemotingCommandException, RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, InterruptedException, MQBrokerException {RegisterFilterServerRequestHeader requestHeader = new RegisterFilterServerRequestHeader();requestHeader.setFilterServerAddr(filterServerAddr);RemotingCommand request =RemotingCommand.createRequestCommand(RequestCode.REGISTER_FILTER_SERVER, requestHeader);RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, 3000);assert response != null;switch (response.getCode()) {case ResponseCode.SUCCESS: {RegisterFilterServerResponseHeader responseHeader =(RegisterFilterServerResponseHeader) response.decodeCommandCustomHeader(RegisterFilterServerResponseHeader.class);return responseHeader;}default:break;}throw new MQBrokerException(response.getCode(), response.getRemark());
}
此方法构建注册请求,将filterServerAddr注册到Broker。
过滤类
Consumer会上传过滤类代码给Flitersrv,然后Flitersrv编译过滤类代码使用。其关系如下图:
Consumer发起订阅设置过滤类代码
Consumer在发起订阅时,可以进行过滤类代码的设置,DefaultMQPushConsumer#subscribe:
// 将主题订阅到消费订阅。
// 参数:
// topic – 要消费的主题。
// fullClassName – 全类名,必须扩展 org.apache.rocketmq.common.filter。消息过滤器
// filterClassSource – 类源代码,采用UTF-8文件编码,必须对你的代码安全负责
@Override
public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException {this.defaultMQPushConsumerImpl.subscribe(topic, fullClassName, filterClassSource);
}
Consumer上传过滤类代码
在 Consumer 心跳注册到 Broker 的同时,上传 过滤类代码 到 Broker 对应的所有 Filtersrv。类MQClientInstance核心代码如下:
public void sendHeartbeatToAllBrokerWithLock() {if (this.lockHeartbeat.tryLock()) {try {// 发送心跳到Brokerthis.sendHeartbeatToAllBroker();// 上传过滤类源码到Filtersrvthis.uploadFilterClassSource();} catch (final Exception e) {log.error("sendHeartbeatToAllBroker exception", e);} finally {this.lockHeartbeat.unlock();}} else {log.warn("lock heartBeat, but failed.");}
}
上传过滤类源码到Filtersrv调用方法MQClientInstance#uploadFilterClassSource:
/*** 上传过滤类到Filtersrv*/
private void uploadFilterClassSource() {Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();while (it.hasNext()) {Entry<String, MQConsumerInner> next = it.next();MQConsumerInner consumer = next.getValue();if (ConsumeType.CONSUME_PASSIVELY == consumer.consumeType()) {Set<SubscriptionData> subscriptions = consumer.subscriptions();for (SubscriptionData sub : subscriptions) {if (sub.isClassFilterMode() && sub.getFilterClassSource() != null) {final String consumerGroup = consumer.groupName();final String className = sub.getSubString();final String topic = sub.getTopic();final String filterClassSource = sub.getFilterClassSource();try {this.uploadFilterClassToAllFilterServer(consumerGroup, className, topic, filterClassSource);} catch (Exception e) {log.error("uploadFilterClassToAllFilterServer Exception", e);}}}}}
}
Flitersrv编译过滤类代码
Filtersrv 编译使用 Consumer 上传的 过滤类代码。核心代码在FilterClassManager#registerFilterClass:
/*** 注册过滤类** @param consumerGroup 消费分组* @param topic Topic* @param className 过滤类名* @param classCRC 过滤类源码CRC* @param filterSourceBinary 过滤类源码* @return 是否注册成功*/
public boolean registerFilterClass(final String consumerGroup, final String topic,final String className, final int classCRC, final byte[] filterSourceBinary) {final String key = buildKey(consumerGroup, topic);// 判断是否要注册新的过滤类boolean registerNew = false;FilterClassInfo filterClassInfoPrev = this.filterClassTable.get(key);if (null == filterClassInfoPrev) {registerNew = true;} else {if (this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) {if (filterClassInfoPrev.getClassCRC() != classCRC && classCRC != 0) { // 类有变化registerNew = true;}}}// 注册新的过滤类if (registerNew) {synchronized (this.compileLock) {filterClassInfoPrev = this.filterClassTable.get(key);if (null != filterClassInfoPrev && filterClassInfoPrev.getClassCRC() == classCRC) {return true;}try {FilterClassInfo filterClassInfoNew = new FilterClassInfo();filterClassInfoNew.setClassName(className);filterClassInfoNew.setClassCRC(0);filterClassInfoNew.setMessageFilter(null);if (this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) {String javaSource = new String(filterSourceBinary, MixAll.DEFAULT_CHARSET);// 编译新的过滤类Class<?> newClass = DynaCode.compileAndLoadClass(className, javaSource);// 创建新的过滤类对象Object newInstance = newClass.newInstance();filterClassInfoNew.setMessageFilter((MessageFilter) newInstance);filterClassInfoNew.setClassCRC(classCRC);}this.filterClassTable.put(key, filterClassInfoNew);} catch (Throwable e) {String info = String.format("FilterServer, registerFilterClass Exception, consumerGroup: %s topic: %s className: %s",consumerGroup, topic, className);log.error(info, e);return false;}}}return true;
}
此方法,将传进来的源代码,编译生成一个MessageFilter示例,保存在filterClassTable中。
过滤消息
再次回顾Flitersrv在整体中的位置:
Consumer从Broker拉取消息的时候,会经过Flitersrv过滤消息。实际上是从Flitersrv拉取消息。
Consumer 从 Filtersrv 拉取消息
Consumer 拉取 使用过滤类方式订阅 的消费消息时,从 Broker 对应的 Filtersrv 列表随机选择一个拉取消息。如果选择不到 Filtersrv,则无法拉取消息。因此,Filtersrv 一定要做高可用。
拉取消息的核心代码在类PullAPIWrapper#pullKernelImpl方法中:
/*** 拉取消息核心方法** @param mq 消息队列* @param subExpression 订阅表达式* @param subVersion 订阅版本号* @param offset 拉取队列开始位置* @param maxNums 批量拉取消息数量* @param sysFlag 拉取系统标识* @param commitOffset 提交消费进度* @param brokerSuspendMaxTimeMillis broker挂起请求最大时间* @param timeoutMillis 请求broker超时时间* @param communicationMode 通讯模式* @param pullCallback 拉取回调* @return 拉取消息结果。只有通讯模式为同步时,才返回结果,否则返回null。* @throws MQClientException 当寻找不到 broker 时,或发生其他client异常* @throws RemotingException 当远程调用发生异常时* @throws MQBrokerException 当 broker 发生异常时。只有通讯模式为同步时才会发生该异常。* @throws InterruptedException 当发生中断异常时*/
protected PullResult pullKernelImpl(final MessageQueue mq,final String subExpression,final long subVersion,final long offset,final int maxNums,final int sysFlag,final long commitOffset,final long brokerSuspendMaxTimeMillis,final long timeoutMillis,final CommunicationMode communicationMode,final PullCallback pullCallback
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {// 获取Broker信息FindBrokerResult findBrokerResult =this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),this.recalculatePullFromWhichNode(mq), false);if (null == findBrokerResult) {this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());findBrokerResult =this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),this.recalculatePullFromWhichNode(mq), false);}// 请求拉取消息if (findBrokerResult != null) {int sysFlagInner = sysFlag;if (findBrokerResult.isSlave()) {sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);}PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();requestHeader.setConsumerGroup(this.consumerGroup);requestHeader.setTopic(mq.getTopic());requestHeader.setQueueId(mq.getQueueId());requestHeader.setQueueOffset(offset);requestHeader.setMaxMsgNums(maxNums);requestHeader.setSysFlag(sysFlagInner);requestHeader.setCommitOffset(commitOffset);requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);requestHeader.setSubscription(subExpression);requestHeader.setSubVersion(subVersion);// 若订阅topic使用过滤类,使用filtersrv获取消息String brokerAddr = findBrokerResult.getBrokerAddr();if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);}PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(brokerAddr,requestHeader,timeoutMillis,communicationMode,pullCallback);return pullResult;}// Broker信息不存在,则抛出异常throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
代码第68行,计算从哪个Flitersrv拉取消息,PullAPIWrapper#computPullFromWhichFilterServer:
/*** 计算filtersrv地址。如果有多个filtersrv,随机选择一个。** @param topic Topic* @param brokerAddr broker地址* @return filtersrv地址* @throws MQClientException 当filtersrv不存在时*/
private String computPullFromWhichFilterServer(final String topic, final String brokerAddr)
throws MQClientException {ConcurrentHashMap<String, TopicRouteData> topicRouteTable = this.mQClientFactory.getTopicRouteTable();if (topicRouteTable != null) {TopicRouteData topicRouteData = topicRouteTable.get(topic);List<String> list = topicRouteData.getFilterServerTable().get(brokerAddr);if (list != null && !list.isEmpty()) {return list.get(randomNum() % list.size());}}throw new MQClientException("Find Filter Server Failed, Broker Addr: " + brokerAddr + " topic: "+ topic, null);
}
Flitersrv从Broker拉取消息
Flitersrv 向 Broker 拉取消息时,实际使用的 DefaultMQPullConsumer.java 的方法和逻辑:DefaultRequestProcessor#pullMessageForward:
/*** 拉取消息** @param ctx 拉取消息context* @param request 拉取消息请求* @return 响应* @throws Exception 当发生异常时*/
private RemotingCommand pullMessageForward(final ChannelHandlerContext ctx, final RemotingCommand request) throws Exception {final RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();final PullMessageRequestHeader requestHeader =(PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);final FilterContext filterContext = new FilterContext();filterContext.setConsumerGroup(requestHeader.getConsumerGroup());response.setOpaque(request.getOpaque());DefaultMQPullConsumer pullConsumer = this.filtersrvController.getDefaultMQPullConsumer();// 校验Topic过滤类是否完整final FilterClassInfo findFilterClass = this.filtersrvController.getFilterClassManager().findFilterClass(requestHeader.getConsumerGroup(), requestHeader.getTopic());if (null == findFilterClass) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("Find Filter class failed, not registered");return response;}if (null == findFilterClass.getMessageFilter()) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("Find Filter class failed, registered but no class");return response;}// 设置下次请求从 Broker主节点。responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);MessageQueue mq = new MessageQueue();mq.setTopic(requestHeader.getTopic());mq.setQueueId(requestHeader.getQueueId());mq.setBrokerName(this.filtersrvController.getBrokerName());long offset = requestHeader.getQueueOffset();int maxNums = requestHeader.getMaxMsgNums();final PullCallback pullCallback = new PullCallback() {@Overridepublic void onSuccess(PullResult pullResult) {responseHeader.setMaxOffset(pullResult.getMaxOffset());responseHeader.setMinOffset(pullResult.getMinOffset());responseHeader.setNextBeginOffset(pullResult.getNextBeginOffset());response.setRemark(null);switch (pullResult.getPullStatus()) {case FOUND:response.setCode(ResponseCode.SUCCESS);List<MessageExt> msgListOK = new ArrayList<MessageExt>();try {for (MessageExt msg : pullResult.getMsgFoundList()) {// 使用过滤类过滤消息boolean match = findFilterClass.getMessageFilter().match(msg, filterContext);if (match) {msgListOK.add(msg);}}if (!msgListOK.isEmpty()) {returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, msgListOK);return;} else {response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);}} catch (Throwable e) {final String error =String.format("do Message Filter Exception, ConsumerGroup: %s Topic: %s ",requestHeader.getConsumerGroup(), requestHeader.getTopic());log.error(error, e);response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(error + RemotingHelper.exceptionSimpleDesc(e));returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null);return;}break;case NO_MATCHED_MSG:response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);break;case NO_NEW_MSG:response.setCode(ResponseCode.PULL_NOT_FOUND);break;case OFFSET_ILLEGAL:response.setCode(ResponseCode.PULL_OFFSET_MOVED);break;default:break;}returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null);}@Overridepublic void onException(Throwable e) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("Pull Callback Exception, " + RemotingHelper.exceptionSimpleDesc(e));returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null);return;}};// 拉取消息pullConsumer.pullBlockIfNotFound(mq, null, offset, maxNums, pullCallback);return null;
}
此方法通过DefaultMQPullConsumer执行消息拉取,在回调方法PullCallback中(上述代码62行),执行过滤逻辑,没被过滤的,才保存。
Flitersrv的高可用
通过多个Flitersrv完成高可用,多个Flitersrv部署的示意图如下:
- 一个Consumer对应多个Flitersrv
- 一个Flitersrv对应一个Broker
- 一个Broker对应多个Flitersrv
Consumer会从所有Broker对应的Flitersrv中随即选择一个进行消息拉取。
总结
Flitersrv为了减少Broker的负担,且减少Consumer接收无用消息而生。
Flitersrv作为中间层,Consumer订阅时传过滤类给Broker,Broker将过滤类传给Flitersrv,Flitersrv处理并实例化过滤类。
消息拉取时,Consumer向Flitersrv拉取消息,Flitersrv先向Broker拉取消息,然后经过过滤类的过滤,再将满足条件的消息传给Consumer。