RocketMQ中的消息过滤
RocketMQ设计了消息过滤,来解决大量无意义流量的传输:即对于客户端不需要的消息,
Broker就不会传输给客户端,以免浪费宽带,RocketMQ4.2.0支持Tag过滤、SQL92过滤、Filter Server过滤
Tag过滤
- 第一步:用户发送一个带Tag的消息
- 第二步:用户订阅一个Topic的Tag,RocketMQ Broker会保存订阅关系
- 第三步:在Broker端做Tag过滤。消费者在Pull消息时,RocketMQ Broker会根据Tag的HasCode进行对比,如果不满足条件,消息不会返回给消费者,以节约带宽也许你们会问,为什么不直接用字符串进行对比和过滤呢?原因是HashCode对比存在Hash碰撞而导致过滤失败,字符串比较的速度相较HashCode慢。HashCode对比是数字比较,Java底层可以直接通过位运算进行对比,
而字符串对比需要按照字符顺序比较,相比位运算更加耗时。由于HashCode对比有Hash碰撞的危险,所以才引出第四步 - 第四步:客户端Tag过滤。Hash碰撞相信大家都有所了解,就是不同的Tag计算出来的Hash值可能是一样的,在这种情况下过滤的消息是错误的,所以RocketMQ设计了客户端字符串对比功能,用来做第二次Tag过滤
- Tag过滤为什么设计成Broker端使用Hash过滤,而客户端使用Tag字符串进行对比过滤呢?
Broker端使用Hash过滤可以快速过滤海量消息,即使偶尔有"漏网之鱼",在客户端字符串
过滤后也能被成功过滤。这种层次设计 的过滤方式在做系统时可以参考
SQL过滤流程
- 第一步:消费订阅Topic,上传过滤SQL语句,RocketMQ Broker编译SQL保存
- 第二步:消费者Pull消息
- 第一次过滤:使用Bloom过滤器的isHit()方法做第一次过滤。Bloom过滤器效率高,但是也存在
缺陷,即只能判断不需要的消息,过滤后的消息也不保证都是需要消费的。 - 第二次过滤:执行编译后的SQL方法evaluate()即可过滤出最终的结果
在使用SQL过滤前,需要在启动Broker时配置如下几个参数:
enableConsumeQueueExt=true
filterSupportRetry=true
enablePropertyFilter=true
enableCalcFilterBitMap=true
FilterServer过滤流程
这是一种不常用但是非常灵活的过滤方式,要使用Filter Server过滤必须在启动Broker时,添加如下配置:filterServerNums=大于0的数字.这样就可以启动一个或多个过滤服务器,每个过滤服务在启动时会自动注册到Namesrv中
- 第一步:用户消费者从Namesrv获取Topic路由信息,同时上传自定义的过滤器实现类源代码到FilterServer中,FilterServer编译并实例化过滤器类
- 第二步:用户发送拉取消息请求到FilterServer,FilterServer通过Pull consumer从Broker拉取消息,
执行过滤类中的过滤方法,返回过滤后的消息