RocketMQ高级特性四-消息过滤

目录

前言

Broker端过滤

定义与概述

消息过滤分类

原理机制

使用场景

优缺点

Java代码示例 - Tag过滤

Java代码示例 - SQL92过滤 

客户端过滤

定义与概述

原理机制

使用场景

优缺点

Java代码示例

总结


前言

消息过滤是RocketMQ的一项高级特性,它允许消费者根据特定的条件来筛选感兴趣的消息,从而避免无关消息的处理,提升消费效率和性能。RocketMQ支持两种主要的消息过滤方式:Broker端过滤客户端过滤。注:文章中部分内容来源于Apache RocketMQ官网

Broker端过滤

定义与概述

Broker端过滤是指在Broker接收到消息后,基于消息的标签(Tag)或用户自定义属性进行过滤。只有满足过滤条件的消息才会被推送给消费者,从而减少消费者端的处理压力。

消息过滤分类
对比项Tag标签过滤SQL属性过滤
过滤目标消息的Tag标签。消息的属性,包括用户自定义属性以及系统属性(Tag是一种系统属性)。
过滤能力精准匹配。SQL语法匹配。
适用场景简单过滤场景、计算逻辑简单轻量。复杂过滤场景、计算逻辑较复杂。
原理机制
  • Tag过滤:每条消息可以带有一个或多个标签(Tag),消费者在订阅时可以指定感兴趣的标签,Broker只会将带有匹配标签的消息发送给消费者。这种方式过滤效率高,因为过滤逻辑是在Broker端实现的。

    例如,如果某条消息的标签是"TagA",消费者在订阅时指定只接收"TagA"的消息,那么只有这些消息会被推送到消费者。

  • SQL92过滤:RocketMQ支持使用SQL92标准的语法对消息属性进行过滤。开发者可以在消息发送时设置自定义属性,消费者在订阅时使用SQL92表达式进行筛选。此方式更灵活,可以基于多种条件进行复杂的过滤。

    例如,假设消息包含一个名为"age"的属性,消费者可以使用age > 30这样的SQL语句进行过滤。

使用场景
  • 高效消息消费:适用于有大量不同类型消息的场景,消费者只需处理特定类型的消息。例如,在订单系统中,不同的消费者可能只关注某些特定类型的订单消息。
  • 复杂过滤需求:在需要基于多条件组合进行消息筛选时,SQL92过滤提供了很大的灵活性。
优缺点
  • 优点

    • 减少网络流量:通过Broker端过滤,可以减少不必要的消息传输,降低网络带宽消耗。
    • 提高消费效率:消费者只需处理满足过滤条件的消息,减少了处理无关消息的开销。
  • 缺点

    • 处理复杂性增加:SQL92过滤需要消息带有自定义属性,增加了消息发送时的复杂性。
    • 配置管理复杂:需要在消费者订阅时配置过滤规则,且过滤规则复杂度较高时可能增加运维管理的难度。
Java代码示例 - Tag过滤
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;public class TagFilterConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("TopicTest", "TagA || TagB");  // 只订阅TagA或TagB的消息consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msg : msgs) {System.out.printf("Received Message: %s%n", new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();}
}
Java代码示例 - SQL92过滤 
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;public class SQLFilterConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("TopicTest", MessageSelector.bySql("age > 30"));consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msg : msgs) {System.out.printf("Received Message: %s%n", new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();}
}

客户端过滤

定义与概述

客户端过滤是指消息被发送到消费者后,由消费者在本地进行过滤。与Broker端过滤相比,客户端过滤的灵活性更高,因为消费者可以根据实际业务需求实现自定义的过滤逻辑。

原理机制
  • 消息接收后过滤:消费者接收到消息后,在处理逻辑中根据业务需求进行过滤。此种方式不依赖于Broker的过滤机制,而是在消费者端实现特定逻辑。

    例如,消费者可以在接收到消息后检查消息体的内容,决定是否处理该消息。

使用场景
  • 个性化过滤需求:适用于需要根据复杂业务逻辑进行过滤的场景。例如,根据消息体内容进行复杂的判断,而不是简单的标签或属性匹配。
  • 实时调整过滤规则:消费者可以在运行时动态调整过滤逻辑,适应变化的业务需求。
优缺点
  • 优点

    • 高度灵活:可以实现任何复杂的过滤逻辑,完全由消费者自行控制。
    • 动态调整:无需修改Broker配置,消费者可以随时根据业务需要调整过滤逻辑。
  • 缺点

    • 增加网络负载:所有消息都会被传输到消费者,增加了网络带宽的占用。
    • 效率较低:需要在客户端进行二次过滤,可能导致性能下降。
Java代码示例
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;public class ClientFilterConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("TopicTest", "*");  // 订阅所有消息consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msg : msgs) {// 自定义过滤逻辑if (new String(msg.getBody()).contains("specificWord")) {System.out.printf("Processing Message: %s%n", new String(msg.getBody()));// 处理消息}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();}
}

总结

RocketMQ的消息过滤机制为开发者提供了多种选择:

  • Broker端过滤适合需要高效过滤消息的场景,通过Tag或SQL92进行过滤,减少无关消息的传输和处理。
  • 客户端过滤适用于需要灵活、自定义过滤逻辑的场景,虽然增加了网络负载,但提供了更大的灵活性。

选择合适的过滤方式取决于具体的业务需求和系统架构,在实际应用中可以结合使用,以达到最佳的性能和功能平衡。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/878625.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

常见HTTP状态码、APUD响应状态字及含义

目录 一、HTTP状态码 二、APDU指令码 一、HTTP状态码 HTTP状态(HTTP Status Code)是用以表示网页服务器超文本传输协议响应状态的3位数字代码。 关于HTTP状态码更加详细介绍推荐阅读: http://t.csdnimg.cn/qSJv6http://t.csdnimg.cn/qSJv…

光敏电阻传感器详解(STM32)

目录 一、介绍 二、传感器原理 1.光敏电阻传感器介绍 2.原理图 三、程序设计 main.c文件 ldr.h文件 ldr.c文件 四、实验效果 五、资料获取 项目分享 一、介绍 光敏电阻器是利用半导体的光电导效应制成的一种电阻值随入射光的强弱而改变的电阻器,又称为光…

基于树莓派的儿童音频播发器—Yoto

Raspberry Pi 的开发可能性使吸引人的、以儿童为中心的音频播放器得以成型 Yoto Player 为孩子们提供了拥有和控制的绝佳体验,同时不会增加屏幕时间。得益于 Raspberry Pi 以及我们认可的经销商提供的支持和专业知识,Yoto Player 在英国取得了成功。 Yo…

streamlit示例-极简

登录注册多步骤任务和实时展示结果 封装后的 Streamlit 示例代码 访问演示地址 import streamlit as st import time# In-memory "database" for simplicity users_db {admin: {password: admin123, name: Admin, age: 30, favorite_color: blue} }def register_…

(C++ STL)list类的简单模拟实现与源码展示

list类的简单模拟实现 一、前言二、ListNode 单个节点的成员变量三、ListIterator 迭代器四、ReverseListIterator 迭代器五、list 的成员变量与初始化六、list 部分函数实现inserterase 七、list 源代码 以下代码环境为 VS2022 C。 一、前言 list类 本质上是数据结构中的双向…

JVM GC 调优

文章目录 引言I 调整JVM的默认堆内存配置1.1 java命令启动jar包时配置JVM 的内存参数1.2 基于Tomcat服务器部署的java应用,配置JVM 的内存参数II JVM GC 调优基本概念: 应用程序的响应时间(RT)和吞吐量(QPS)JVM调优原理调优思路调优方法JVM调优技巧建议引言 内存参数:ht…

七款最佳的渗透测试工具(非常详细)零基础入门到精通,收藏这一篇就够了

渗透测试工具是模拟对计算机系统、网络或 Web 应用程序的网络攻击的软件应用程序,它们的作用是在实际攻击者之前发现安全漏洞。它们可以作为系统的压力测试,揭示哪些区域可能会受到真正的威胁。 本文我将介绍七款最佳的渗透测试工具。 1 Kali Linux K…

Maven入门:自动化构建工具的基本概念与配置

一、什么是Maven 目前无论使用IDEA还是Eclipse等其他IDE,使用里面 ANT 工具帮助我们进行编译,打包运行等工作。Apache基于ANT进行了升级,研发出了全新的自动化构建工具Maven。 Maven使用项目对象模型(POM-Project Object Model&…

视频合并在线工具哪个好?好用的视频合并工具推荐

当我们手握一堆零散却各有千秋的视频片段时,是否曾幻想过它们能像魔法般合并成一部完整、流畅的故事? 别担心,今天咱们就来一场“视频合并大冒险”,揭秘几款视频合并软件手机免费工具,帮助你在指尖上实现创意无限的视…

四、配置三层交换实验组网

一、实验拓扑 二、实验目的 通过配置交换机&#xff0c;令不同vlan间的主机能够互相通信 三、实验步骤 SW12 <Huawei>undo terminal monitor Info: Current terminal monitor is off. <Huawei>system-view Enter system view, return user view with CtrlZ. [H…

3、DjangoAdmin导出excel和csv文件

一、导出Excel 1、安装openpyxl库 2、admin文件 # 导入openpyxl库中的Workbook类&#xff0c;用于创建Excel文件 from openpyxl import Workbook # 导入Django的admin模块&#xff0c;用于在Django admin后台注册和管理模型 from django.contrib import admin # 导入…

2025中国(西安)国际军民两用新材料展览会

时 间&#xff1a;2025年3月14-16日 地 点&#xff1a;西安国际会展中心 ◆展会背景Exhibition background&#xff1a; 随着科技的飞速发展&#xff0c;新材料在军事领域的应用逐渐凸显出…

EDIUS X 10.34.9631 视频剪辑软件 下载 包含安装说明

下载地址(资源制作整理不易&#xff0c;下载使用需付费&#xff0c;不能接受请勿浪费时间下载) 链接&#xff1a;https://pan.baidu.com/s/1P2wKxVcSx5WzAtHXCaAp5A?pwd227i 提取码&#xff1a;227i

ant design vue+vue3+ts实现一天内按钮只能点击2次,并置灰,且过当天0点时需复原~

1、需求&#xff1a;在主页面中点击新增按钮&#xff0c;弹出弹窗&#xff0c;此时弹窗中有一个确定按钮&#xff0c;需实现该确定按钮在当天0点前指点点击2次&#xff0c;超过2次置灰&#xff0c;过了零点复原。 思路&#xff1a;首先弹窗通过v-if显示与隐藏弹窗子组件&#…

【Linux网络】应用层协议HTTP(1)

&#x1f389;博主首页&#xff1a; 有趣的中国人 &#x1f389;专栏首页&#xff1a; Linux网络 &#x1f389;其它专栏&#xff1a; C初阶 | C进阶 | 初阶数据结构 小伙伴们大家好&#xff0c;本片文章将会讲解 应用层协议HTTP 的相关内容。 如果看到最后您觉得这篇文章写得…

OCI编程高级篇(十八) OCI连接池概念

数据库连接池已经不是新概念了&#xff0c;它以有限的连接让外部更多的客户来访问数据库&#xff0c;一般用于中间服务器中&#xff0c;OCI也有连接池的概念。OCI的连接池是由OCI自己管理的&#xff0c;不需要应用干预&#xff0c;程序通过函数从连接池中得到一个会话&#xff…

【Android】 工具篇:ProxyPin抓包详解---夜神模拟器

1️⃣ProxyPin介绍 ProxyPin是一种基于MITM(中间人攻击)的抓包工具,主要用于移动应用程序的安全测试和调试。下面是关于ProxyPin的详解。 2️⃣ 安裝和使用 安裝 下载地址 https://gitee.com/wanghongenpin/network-proxy-flutter/releases 直接拖入模拟器就可以了,打开…

github源码指引:共享内存、数据结构与算法:字符串池StringPool

初级代码游戏的专栏介绍与文章目录-CSDN博客 我的github&#xff1a;codetoys&#xff0c;所有代码都将会位于ctfc库中。已经放入库中我会指出在库中的位置。 这些代码大部分以Linux为目标但部分代码是纯C的&#xff0c;可以在任何平台上使用。 专题&#xff1a;共享内存、数…

5. MyBatis 如何实现数据库类型和 Java 类型的转换的?

MyBatis 在处理数据库查询结果或传递参数时&#xff0c;需要将数据库类型与 Java 类型之间进行转换。MyBatis 提供了多种方式来实现这种类型转换&#xff0c;主要通过内置的 TypeHandler&#xff08;类型处理器&#xff09;机制。 1. TypeHandler 的作用 TypeHandler 是 MyBat…

C++ STL 之哈希 map unordered_map

一. 概述 在C中&#xff0c;unordered_map 是一个关联容器&#xff0c;是一种基于哈希表的键值对容器&#xff0c;它存储了键值对&#xff08;key-value&#xff09;&#xff0c;其中每个键&#xff08;key&#xff09;都是唯一的。 二. map & unordered_map的区别 map内…