在Broker端进行消息过滤

        在Broker端进行消息过滤,可以减少无效消息发送到Consumer,少占用网络带宽从而提高吞吐量。Broker端有三种方式进行消息过滤。

1.消息的Tag和Key

        对一个应用来说,尽可能只用一个Topic,不同的消息子类型用Tag来标识(每条消息只能有一个Tag),服务器端基于Tag进行过滤,并不需要读取消息体的内容,所以效率很高。发送消息设置了Tag以后,消费方在订阅消息时,才可以利用Tag在Broker端做消息过滤。其次是消息的Key。对发送的消息设置好Key,以后可以根据这个Key来查找消息。所以这个Key一般用消息在业务层面的唯一标识码来表示,这样后续查询消息异常,消息丢失等都很方便。Broker会创建专门的索引文件,来存储Key到消息的映射,由于是哈希索引,应尽量使Key唯一,避免潜在的哈希冲突。Tag和Key的主要差别是使用场景不同,Tag用在Consumer的代码中,用来进行服务端消息过滤,Key主要用于通过命令行查询消息。

2.通过Tag进行过滤

        用Tag方式进行过滤的方法是传入感兴趣的Tag标签,Tag标签是一个普通字符串,是在创建Message的时候添加的,一个Message只能有一个Tag。使用Tag方式过滤非常高效,Broker端可以在ConsumeQueue中做这种过滤,只从CommitLog里读取过滤后被命中的消息。看一下ConsumerQueue的存储格式,如图7-1所示。

图7-1 ConsumerQueue的存储格式

Consume Queue的第三部分存储的是Tag对应的hashcode,是一个定长的字符串,通过Tag过滤的过程就是对比定长的hashcode。经过hashcode对比,符合要求的消息被从CommitLog读取出来,不用担心Hash冲突问题,消息在被消费前,会对比完整的Message Tag字符串,消除Hash冲突造成的误读。

3.用SQL表达式的方式进行过滤

        使用Tag方式过滤虽然高效,但是支持的逻辑比较简单,在构造Message的时候,还可以通过putUserProperty函数来增加多个自定义的属性,基于这些属性可以做复杂的过滤逻辑,如代码清单7-1所示。

代码清单7-1 在消息中增加自定义属性

Message msg = new Message("TopicTest",
    tag,
    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// Set some properties.
msg.putUserProperty("a", String.valueOf(i));
msg.putUserProperty("b",  “hello”);

 

代码中这个消息就有了两个特殊的属性值a和b,我们用类似SQL表达式的方式对消息进行过滤,用法如下(目前只支持在PushConsumer中实现这种过滤):

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");  // only subsribe messages have property a, also a >=0 and a <= 3

consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently()
{    

@Override    

public ConsumeConcurrentlyStatus consumeMessage
    (List<MessageExt> msgs, ConsumeConcurrentlyContext context)
{        

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;    

}

});

consumer.start();
 

类似SQL的过滤表达式,支持如下语法:

·数字对比,比如>、>=、<、<=、BETWEEN、=;

·字符串对比,比如=、<>、IN;

·IS NULL or IS NOT NULL;

·逻辑符号AND、OR、NOT。

支持的数据类型:

·数字型,比如123、3.1415;

·字符型,比如'abc'、注意必须用单引号;

·NULL,这个特殊字符;

·布尔型,TRUEorFALSE。

SQL表达式方式的过滤需要Broker先读出消息里的属性内容,然后做SQL计算,增大磁盘压力,没有Tag方式高效。

4.Filter Server方式过滤

        Filter Server是一种比SQL表达式更灵活的过滤方式,允许用户自定义Java函数,根据Java函数的逻辑对消息进行过滤。要使用Filter Server,首先要在启动Broker前在配置文件里加上filterServer-Nums=3这样的配置,Broker在启动的时候,就会在本机启动3个Filter Server进程。Filter Server类似一个RocketMQ的Consumer进程,它从本机Broker获取消息,然后根据用户上传过来的Java函数进行过滤,过滤后的消息再传给远端的Consumer。这种方式会占用很多Broker机器的CPU资源,要根据实际情况谨慎使用。上传的java代码也要经过检查,不能有申请大内存、创建线程等这样的操作,否则容易造成Broker服务器宕机。实现过滤逻辑的示例如代码清单7-2所示。

代码清单7-2 实现过滤逻辑的代码示例

public class MessageFilterImpl implements MessageFilter {
    @Override
    public boolean match(MessageExt msg) {
        String property = msg.getUserProperty("SequenceId");
        if (property != null) {
            int id = Integer.parseInt(property);
            if ((id % 3) == 0 && (id > 10)) {
                return true;
            }
        }
        return false;
    }
}

 

上面代码实现了过滤逻辑,它是根据消息的“SequenceId”这个属性来过滤的,其实不一定要根据消息属性来过滤,也可以根据消息体的内容或其他特征过滤,如代码清单7-3所示。

代码清单7-3 使用FilterServer的Consumer示例

public static void main(String[] args) throws InterruptedException, MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Consumer-GroupNamecc4");
    // 使用Java代码,在服务器做消息过滤
    String filterCode = MixAll.file2String("/home/admin/MessageFilterImpl.java");
        consumer.subscribe("TopicFilter7", "com.alibaba.rocketmq.example.filter.MessageFilterImpl", filterCode);
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
            System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }

 

在使用Filter Server的Consumer例子中,主要是把实现过滤逻辑的类作为参数传到Broker端,Broker端的Filter Server会解析这个类,然后根据match函数里的逻辑进行过滤。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/151869.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

springboot引入redisson分布式锁及原理

1.引入依赖 <dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.6</version> </dependency>2.配置类创建bean /*** author qujingye* Classname RedissonConfig* Description TOD…

HDFS入门--学习笔记

1&#xff0c;大数据介绍 定义 数据指的是&#xff1a;一种可以被鉴别的、对客观事件进行记录的符号&#xff0c;除了可以是最简单的 数字外&#xff0c;也可以是各类符号、文字、图像、声音等。 通俗地说&#xff0c;数据就是对人类的行为及发生事件的一种记录。 存在的价值…

2023超详细的软件测试八股文(入门篇)

今天给大家分享软件测试面试题入门篇&#xff0c;看看大家能答对几题 一、请你说一说测试用例的边界 参考回答&#xff1a; 边界值分析法就是对输入或输出的边界值进行测试的一种黑盒测试方法。通常边界值分析法是作为对等价类划分法的补充&#xff0c;这种情况下&#xff0…

设计模式-创建型模式-单例模式

一、什么是单例模式 单例模式&#xff0c;属于创建类型的一种常用的设计模式。通过单例模式的方法创建的类在当前进程中只有一个实例&#xff08;根据需要&#xff0c;也有可能一个线程中属于单例&#xff0c;如&#xff1a;仅线程上下文内使用同一个实例&#xff09;。 对于系…

本地私域线上线下 线上和线下的小程序

私域商城是一种新型的零售模式&#xff0c;它将传统的线下实体店与线上渠道相结合&#xff0c;通过会员、营销、效率等方式&#xff0c;为消费者提供更加便利和高效的购物体验。私域商城的发展趋势表明&#xff0c;它将成为未来零售业的重要模式&#xff0c;引领零售业的创新和…

【设计模式】聊聊策略模式

策略模式的本质是为了消除if 、else代码&#xff0c;提供拓展点&#xff0c;对拓展开放&#xff0c;对修改关闭&#xff0c;也就是说我们开发一个功能的时候&#xff0c;要尽量的采用设计模式进行将不变的东西进行抽取出来&#xff0c;将变化的东西进行隔离开来&#xff0c;这样…

【开源】基于Vue.js的音乐偏好度推荐系统的设计和实现

项目编号&#xff1a; S 012 &#xff0c;文末获取源码。 \color{red}{项目编号&#xff1a;S012&#xff0c;文末获取源码。} 项目编号&#xff1a;S012&#xff0c;文末获取源码。 目录 一、摘要1.1 项目介绍1.2 项目录屏 二、系统设计2.1 功能模块设计2.1.1 音乐档案模块2.1…

【数据库】你听说过矢量数据库吗?

个人主页&#xff1a;【&#x1f60a;个人主页】 系列专栏&#xff1a;【❤️其他领域】 文章目录 前言什么是向量/矢量数据库嵌入模型使用向量数据库的优势与传统数据库的对比其他方面 AWS 如何支持您的矢量数据库需求&#xff1f;Amazon OpenSearch ServiceAmazon Aurora Pos…

[Docker]记一次使用jenkins将镜像文件推送到Harbor遇到的问题

系统版本&#xff1a; Ubuntu 18.01 私服&#xff1a; Harbor Docker版本&#xff1a; Docker version 18.09.5 首先需要明确的是&#xff0c;即在harbor里项目设置为公开&#xff0c;但是在push的时候还是需要用户验证的&#xff0c;即需要使用docker登录 docker login harbo…

CF1514 C. Product 1 Modulo N [妙妙题]

传送门:CF [前题提要]:感觉这道题是真的妙,解这道题的所有步骤都是一步一步按图索骥来的,有种玩解密游戏的感觉 题目很简单,就是求1~n中最长的子序列,使得这n个数的乘积模n为1. 乍一看很不好解决.那不妨先假设我们挑选了 k k k个数,然后这 k k k个数的乘积为 K K K, K K K模 …

【Unity】流体模拟(更新ing)

Fluid Simulation 参考于 Sebastian Lague 的项目进行分析学习 流体模拟视频链接 文章目录 Fluid Simulation2D流体Simulation2D.cs 2D流体 Simulation2D.cs 流体的边界用OnDrawGizmos设置流体的边界 void OnDrawGizmos(){Gizmos.color new Color(0, 1, 0, 0.4f);Gizmos.Dr…

vue中绑定class样式和条件渲染

绑定class样式 字符串写法&#xff1a; 适用于&#xff1a; 样式的类名不确定&#xff0c;需要动态指定 数组写法&#xff1a; 适用于&#xff1a; 要绑定的样式个数不确定&#xff0c;名字也不确定 绑定对象&#xff1a;适用于&#xff1a;要绑定的样式个数确定、名字确定、…

Python loglog()函数

常用坐标下的图像显示 import matplotlib.pyplot as plt import numpy as np import mathplt.figure() x_input np.linspace(1, 10, 50) y_input x_input**2plt.plot(x_input, y_input,r-,linewidth2) plt.show()在loglog函数尺度下的曲线 plt.loglog(x_input, y_input,r-,…

机器人走迷宫问题

题目 1.房间有XY的方格组成&#xff0c;例如下图为64的大小。每一个方格以坐标(x,y) 描述。 2.机器人固定从方格(0, 0)出发&#xff0c;只能向东或者向北前进&#xff0c;出口固定为房间的最东北角&#xff0c;如下图的 方格(5,3)。用例保证机器人可以从入口走到出口。 3.房间…

[JDK工具-2] javap 类文件解析工具-帮助理解class文件,了解Java编译器机制

文章目录 1. javap -version 版本信息2. javap -verbose 输出附加信息3. javap -l 显示行号和局部变量列表4. javap -c 对代码进行反汇编&#xff08;或叫反编译生成汇编代码&#xff0c;一般说反编译是生成java代码&#xff09;&#xff0c;分解方法代码&#xff0c;也就是显示…

想要成为CSS大师?这些技巧是你必须知道的!

前言 CSS 是网页设计中不可或缺的一部分&#xff0c;掌握一些实用的 CSS 技巧&#xff0c;可以让你在设计中展现出更多的创意和个性。本文将介绍一些 CSS 技巧&#xff0c;帮助你提升自己的技能&#xff0c;成为一个真正的 CSS 大师。 1. 改变 input 自动填充的背景颜色 这段 …

Ansible的dict的key里包含圆点.

环境 管理节点&#xff1a;CentOS Stream release 9控制节点&#xff1a;同上Ansible&#xff1a;2.15.4 从文件读取yaml数据 假设目标机器上有文件 data.yml 内容如下&#xff1a; a:b: 111c: 222d.e: 333现在要读取该文件内容&#xff0c;并转成yaml数据。 创建文件 tes…

高精度算法【Java】(待更新中~)

高进度加法 在Java中可以使用BigInteger进行高精度计算&#xff0c;除此也可以仿照竖式相加的计算原理进行计算。 BigInteger 提供所有 Java 的基本整数操作符的对应物&#xff0c;并提供 java.lang.Math 的所有相关方法。另外&#xff0c;BigInteger 还提供以下运算&#xff1…

UE5像素流送详细教程,以及解决黑边和鼠标消失问题

说明 本文是阅读UE5官方文档并且实践之后写的读书笔记,更多详细内容读者可以参考官方文档,本文只是因为在follow官方文档时发现有些操作界面不一样,有些细节遗漏了,所以才自己整理一篇笔记,以备后续使用。 读者按该文件指引操作后,可完成本地发布一个游戏,使用像素流网…

ts 联合react 实现ajax的封装,refreshtoken的功能

react ts混合双打&#xff0c;实现ajax的封装&#xff0c;以及401的特殊处理 import axios from axios import {AMDIN_EXPIRES_KEY,AMDIN_KEY,AMDIN_REFRESH_EXPIRES_KEY,AMDIN_REFRESH_KEY,COMMID_KEY,getToken,removeToken } from ../utils/user-token import { showMessage…