RocketMQ高级特性四-消息过滤

目录

前言

Broker端过滤

定义与概述

消息过滤分类

原理机制

使用场景

优缺点

Java代码示例 - Tag过滤

Java代码示例 - SQL92过滤 

客户端过滤

定义与概述

原理机制

使用场景

优缺点

Java代码示例

总结


前言

消息过滤是RocketMQ的一项高级特性,它允许消费者根据特定的条件来筛选感兴趣的消息,从而避免无关消息的处理,提升消费效率和性能。RocketMQ支持两种主要的消息过滤方式:Broker端过滤客户端过滤。注:文章中部分内容来源于Apache RocketMQ官网

Broker端过滤

定义与概述

Broker端过滤是指在Broker接收到消息后,基于消息的标签(Tag)或用户自定义属性进行过滤。只有满足过滤条件的消息才会被推送给消费者,从而减少消费者端的处理压力。

消息过滤分类
对比项Tag标签过滤SQL属性过滤
过滤目标消息的Tag标签。消息的属性,包括用户自定义属性以及系统属性(Tag是一种系统属性)。
过滤能力精准匹配。SQL语法匹配。
适用场景简单过滤场景、计算逻辑简单轻量。复杂过滤场景、计算逻辑较复杂。
原理机制
  • Tag过滤:每条消息可以带有一个或多个标签(Tag),消费者在订阅时可以指定感兴趣的标签,Broker只会将带有匹配标签的消息发送给消费者。这种方式过滤效率高,因为过滤逻辑是在Broker端实现的。

    例如,如果某条消息的标签是"TagA",消费者在订阅时指定只接收"TagA"的消息,那么只有这些消息会被推送到消费者。

  • SQL92过滤:RocketMQ支持使用SQL92标准的语法对消息属性进行过滤。开发者可以在消息发送时设置自定义属性,消费者在订阅时使用SQL92表达式进行筛选。此方式更灵活,可以基于多种条件进行复杂的过滤。

    例如,假设消息包含一个名为"age"的属性,消费者可以使用age > 30这样的SQL语句进行过滤。

使用场景
  • 高效消息消费:适用于有大量不同类型消息的场景,消费者只需处理特定类型的消息。例如,在订单系统中,不同的消费者可能只关注某些特定类型的订单消息。
  • 复杂过滤需求:在需要基于多条件组合进行消息筛选时,SQL92过滤提供了很大的灵活性。
优缺点
  • 优点

    • 减少网络流量:通过Broker端过滤,可以减少不必要的消息传输,降低网络带宽消耗。
    • 提高消费效率:消费者只需处理满足过滤条件的消息,减少了处理无关消息的开销。
  • 缺点

    • 处理复杂性增加:SQL92过滤需要消息带有自定义属性,增加了消息发送时的复杂性。
    • 配置管理复杂:需要在消费者订阅时配置过滤规则,且过滤规则复杂度较高时可能增加运维管理的难度。
Java代码示例 - Tag过滤
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;public class TagFilterConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("TopicTest", "TagA || TagB");  // 只订阅TagA或TagB的消息consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msg : msgs) {System.out.printf("Received Message: %s%n", new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();}
}
Java代码示例 - SQL92过滤 
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;public class SQLFilterConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("TopicTest", MessageSelector.bySql("age > 30"));consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msg : msgs) {System.out.printf("Received Message: %s%n", new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();}
}

客户端过滤

定义与概述

客户端过滤是指消息被发送到消费者后,由消费者在本地进行过滤。与Broker端过滤相比,客户端过滤的灵活性更高,因为消费者可以根据实际业务需求实现自定义的过滤逻辑。

原理机制
  • 消息接收后过滤:消费者接收到消息后,在处理逻辑中根据业务需求进行过滤。此种方式不依赖于Broker的过滤机制,而是在消费者端实现特定逻辑。

    例如,消费者可以在接收到消息后检查消息体的内容,决定是否处理该消息。

使用场景
  • 个性化过滤需求:适用于需要根据复杂业务逻辑进行过滤的场景。例如,根据消息体内容进行复杂的判断,而不是简单的标签或属性匹配。
  • 实时调整过滤规则:消费者可以在运行时动态调整过滤逻辑,适应变化的业务需求。
优缺点
  • 优点

    • 高度灵活:可以实现任何复杂的过滤逻辑,完全由消费者自行控制。
    • 动态调整:无需修改Broker配置,消费者可以随时根据业务需要调整过滤逻辑。
  • 缺点

    • 增加网络负载:所有消息都会被传输到消费者,增加了网络带宽的占用。
    • 效率较低:需要在客户端进行二次过滤,可能导致性能下降。
Java代码示例
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;public class ClientFilterConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("TopicTest", "*");  // 订阅所有消息consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msg : msgs) {// 自定义过滤逻辑if (new String(msg.getBody()).contains("specificWord")) {System.out.printf("Processing Message: %s%n", new String(msg.getBody()));// 处理消息}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();}
}

总结

RocketMQ的消息过滤机制为开发者提供了多种选择:

  • Broker端过滤适合需要高效过滤消息的场景,通过Tag或SQL92进行过滤,减少无关消息的传输和处理。
  • 客户端过滤适用于需要灵活、自定义过滤逻辑的场景,虽然增加了网络负载,但提供了更大的灵活性。

选择合适的过滤方式取决于具体的业务需求和系统架构,在实际应用中可以结合使用,以达到最佳的性能和功能平衡。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/878625.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

常见HTTP状态码、APUD响应状态字及含义

目录 一、HTTP状态码 二、APDU指令码 一、HTTP状态码 HTTP状态(HTTP Status Code)是用以表示网页服务器超文本传输协议响应状态的3位数字代码。 关于HTTP状态码更加详细介绍推荐阅读: http://t.csdnimg.cn/qSJv6http://t.csdnimg.cn/qSJv…

光敏电阻传感器详解(STM32)

目录 一、介绍 二、传感器原理 1.光敏电阻传感器介绍 2.原理图 三、程序设计 main.c文件 ldr.h文件 ldr.c文件 四、实验效果 五、资料获取 项目分享 一、介绍 光敏电阻器是利用半导体的光电导效应制成的一种电阻值随入射光的强弱而改变的电阻器,又称为光…

基于树莓派的儿童音频播发器—Yoto

Raspberry Pi 的开发可能性使吸引人的、以儿童为中心的音频播放器得以成型 Yoto Player 为孩子们提供了拥有和控制的绝佳体验,同时不会增加屏幕时间。得益于 Raspberry Pi 以及我们认可的经销商提供的支持和专业知识,Yoto Player 在英国取得了成功。 Yo…

七款最佳的渗透测试工具(非常详细)零基础入门到精通,收藏这一篇就够了

渗透测试工具是模拟对计算机系统、网络或 Web 应用程序的网络攻击的软件应用程序,它们的作用是在实际攻击者之前发现安全漏洞。它们可以作为系统的压力测试,揭示哪些区域可能会受到真正的威胁。 本文我将介绍七款最佳的渗透测试工具。 1 Kali Linux K…

Maven入门:自动化构建工具的基本概念与配置

一、什么是Maven 目前无论使用IDEA还是Eclipse等其他IDE,使用里面 ANT 工具帮助我们进行编译,打包运行等工作。Apache基于ANT进行了升级,研发出了全新的自动化构建工具Maven。 Maven使用项目对象模型(POM-Project Object Model&…

视频合并在线工具哪个好?好用的视频合并工具推荐

当我们手握一堆零散却各有千秋的视频片段时,是否曾幻想过它们能像魔法般合并成一部完整、流畅的故事? 别担心,今天咱们就来一场“视频合并大冒险”,揭秘几款视频合并软件手机免费工具,帮助你在指尖上实现创意无限的视…

四、配置三层交换实验组网

一、实验拓扑 二、实验目的 通过配置交换机&#xff0c;令不同vlan间的主机能够互相通信 三、实验步骤 SW12 <Huawei>undo terminal monitor Info: Current terminal monitor is off. <Huawei>system-view Enter system view, return user view with CtrlZ. [H…

EDIUS X 10.34.9631 视频剪辑软件 下载 包含安装说明

下载地址(资源制作整理不易&#xff0c;下载使用需付费&#xff0c;不能接受请勿浪费时间下载) 链接&#xff1a;https://pan.baidu.com/s/1P2wKxVcSx5WzAtHXCaAp5A?pwd227i 提取码&#xff1a;227i

【Linux网络】应用层协议HTTP(1)

&#x1f389;博主首页&#xff1a; 有趣的中国人 &#x1f389;专栏首页&#xff1a; Linux网络 &#x1f389;其它专栏&#xff1a; C初阶 | C进阶 | 初阶数据结构 小伙伴们大家好&#xff0c;本片文章将会讲解 应用层协议HTTP 的相关内容。 如果看到最后您觉得这篇文章写得…

「深入理解」HTML Meta标签:网页元信息的配置

「深入理解」HTML Meta标签&#xff1a;网页元信息的配置 HTML的<meta>元素用于提供关于HTML文档的元数据&#xff08;metadata&#xff09;&#xff0c;这些信息对于浏览器和其他处理HTML文档的应用程序来说是非常有用的&#xff0c;如&#xff1a;<base>、<li…

【网络安全】服务基础第一阶段——第九节:Windows系统管理基础---- Windows_AD域

目录 一、域与活动目录 1.1 工作组 1.2 域 1.2.1 域&#xff08;Domain&#xff09; 1.2.2 域控制器&#xff08;Domain Controller&#xff0c;DC&#xff09; 1.2.3 功能和角色 1.2.4 管理和监控 1.2 5 域结构 1.3 组织单元&#xff08;Organizational Unit&#xff…

集成电路学习:什么是IP知识产权

一、IP&#xff1a;知识产权 IP是Intellectual Property的缩写&#xff0c;即知识产权。知识产权是一种无形的财产权&#xff0c;也称智力成果权&#xff0c;它指的是通过智力创造性劳动所获得的成果&#xff0c;并且是由智力劳动者对成果依法享有的专有权利。这种权利包括人身…

性能优化:自动化处理系统设计

性能优化&#xff1a;自动化处理系统设计 前言需求分析系统设计1. 调度中心2. 任务执行器3. 错误处理机制4. 通知系统5. 报表生成器6. 日志记录器 技术实现结语 前言 在当今这个信息爆炸、技术日新月异的时代&#xff0c;企业面临着前所未有的挑战和机遇。随着业务量的不断增长…

基于Yolov5_6.1、LPRNet、PySide6开发的车牌识别系统

项目概述 项目背景 随着车辆数量的不断增加&#xff0c;车牌识别系统在交通管理、停车场自动化等领域变得越来越重要。本项目利用先进的深度学习技术和现代图形用户界面框架来实现高效的车牌识别功能。 项目特点 高效识别&#xff1a;采用 YOLOv5_6.1 进行车牌定位&#xff…

差分传输与单端传输

差分与单端传输 本页讨论模拟信号传输中的两个概念&#xff1a;“单端”和“差分”。模拟信号用于将模拟仪器的输出传送到数字转换器。虽然数字信号对干扰的容忍度相对较高&#xff0c;但模拟信号却可能受到环境中电磁波的干扰和改变。本文档将解释这一问题&#xff0c;并描述…

OcrLiteNcnn:Windows环境打包及Java调用

目录结构 前言cmake安装源码下载说明Windows源码编译执行“cmake -DCMAKE_BUILD_TYPE=Release ..”执行“cmake --build . --config Release -- -m:6”编译完成识别图片命令行调用Java调用前言 Java实现OCR识别图片中的文字,小编先前整理过一篇在Linux环境中基于“ChineseOcr…

全局安装react

1、首先安装react脚手架 npm install -g create-react-app2、创建react项目 create-react-app my-app3、 PS D:\桌面\papers\subject> create-react-app my-react-appCreating a new React app in D:\桌面\papers\subject\my-react-app.Installing packages. This might …

Docker安装Neo4j图数据库和APOC插件

文章目录 一、前言二、安装Neo4j三、测试Neo4j四、安装APOC插件五、测试APOC插件 一、前言 官方文档&#xff1a;https://neo4j.com/docs/operations-manual/current/docker/introduction/ 二、安装Neo4j 我这里以 5.23.0 版的 Neo4j 为例 拉取镜像 docker pull neo4j:5.23.0…

HarmonyOS NEXT 应用运行异常记录与解决(持续整理版)

问题一 App Launch Failed to get the device apiVersion. 解决方案 进入到设备管理&#xff0c;点击对应开启的模拟器设备&#xff0c;先将模拟器关闭&#xff0c;然后点击查出掉用户数据&#xff0c;再重启。 重启之后&#xff0c;一般能解决。如果遇到还是显示拿不到apiVer…

bash反弹shell分析

目录 介绍步骤 介绍 与目标主机建立连接的原理是利用漏洞执行ShellCode。 GetShell的实质是&#xff1a;执行ShellCode&#xff0c;将目标主机的Shell重定向到攻击机。拿到Shell利于后续的渗透。 所谓的反弹Shell是指GetShell的过程由目标主机主动发起&#xff08;反向连接&a…