RocketMQ系统性学习-SpringCloud Alibaba集成RocketMQ以及批量发送消息、消息过滤实战

文章目录

      • 批量发送消息
      • 消息过滤

批量发送消息

批量发送消息可以减少网络的 IO 开销,让多个消息通过 1 次网络开销就可以发送,提升数据发送的吞吐量

在这里插入图片描述

虽然批量发送消息可以减少网络 IO 开销,但是一次也不能发送太多消息

批量消息直接将多个消息放入集合中发送即可,生产者代码如下:

public class Producer {public static void main(String[] args) throws Exception {// 1、创建生产者对象DefaultMQProducer producer = new DefaultMQProducer("producer_group");// 2、为生产者对象设置 NameServer 地址producer.setNamesrvAddr("127.0.0.1:9876");// 3、把我们的生产者直接启动起来producer.start();// 4、创建消息、并发送消息List<Message> reqList = new ArrayList<>(12);for (int i = 0; i < 12; i++) {// public Message(String topic, String tags, String keys, byte[] body) {Message message = new Message("custom-batch-topic","batchTag","CUSTOM_BATCH",("("+i+")Hello Message From BATCH Producer, " +"date="+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())).getBytes());reqList.add(message);}// 利用生产者对象,将消息直接批量发送出去producer.send(reqList);System.out.println("Send Finished.");}
}

消费者代码如下:

public class Consumer {public static void main(String[] args) throws Exception {// 1、创建消费者对象DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("batch_group");// 2、为消费者对象设置 NameServer 地址consumer.setNamesrvAddr("127.0.0.1:9876");// 3、订阅主题consumer.subscribe("custom-batch-topic", "*");// 4、注册监听消息,并打印消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {String printMsg = new String(msg.getBody()) + ", recvTime: "+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date());System.out.println(printMsg);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 5、把消费者直接启动起来consumer.start();System.out.println("Consumer Started Finished.");}
}

消息过滤

消费者组中还可以有过滤操作,对同一个 Topic 下的消息的 Tag 标签进行过滤

但是使用消息过滤时需要 保证同一个消费组中消费的消息的 Tag 相同 ,如果同一个消费者组中的两个消费者订阅了不同的 Tag,比如消费者 A 订阅了 Tag1,消费者 B 订阅了 Tag2,那么可能 B 收到了 Tag1 的数据,发现不是自己想要的,于是将 Tag1 的数据过滤掉了,那么就导致了 A 也收不到 Tag1 的数据,造成数据消失的现象

消息过滤流程图如下:

在这里插入图片描述

消息过滤生产者如下:

public class FilterProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("producer_group",true);producer.setNamesrvAddr("127.0.0.1:9876");producer.start();List<Order> list = new ArrayList<>();for (int i = 0; i < 12; i ++) {Order order = new Order();order.orderId = i;order.desc = "desc:" + i;order.tag = "tag" + i % 3;list.add(order);}for (Order order : list) {Message msg = new Message("Filter-Test-Topic",order.tag,(order.toString()).getBytes());msg.setKeys("Filter_Tag");msg.putUserProperty("idx", new DecimalFormat("00").format(order.orderId));// 直接将 msg 发送出去producer.send(msg);}System.out.println("Send Finished.");}public static class Order {int orderId;String desc;String tag;@Overridepublic String toString() {return "orderId="+orderId+", desc="+desc+", tag="+tag;}}
}

过滤 tag 的几种用法:

过滤消息的 tag 主要修改一行代码:consumer.subscribe("Filter-Test-Topic", "tag1");,过滤也分几种情况:

  1. 过滤所有 tag

    consumer.subscribe("Filter-Test-Topic", "*");

  2. 过滤单个 tag

    consumer.subscribe("Filter-Test-Topic", "tag1");

  3. 过滤多个 tag

    consumer.subscribe("Filter-Test-Topic", "TG2 || TG3");

  4. 订阅 SQL92 方式(需要修改 custom.conf 文件,添加一行配置:enablePropertyFilter=true)

    consumer.subscribe("Filter-Test-Topic", MessageSelector.bySql("idx > 10"));

    这里的 idx > 10 的 idx 是在生产者中通过下边这行代码放入的:

    msg.putUserProperty("idx", new DecimalFormat("00").format(order.orderId));
    

消息过滤消费者代码如下(只过滤出 tag = tag1 的消息):

public class Subscribe02_Single_Consumer {public static void main(String[] args) throws Exception {// 1、创建消费者对象DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Subscribe02_Single_Consumer");// 2、为消费者对象设置 NameServer 地址consumer.setNamesrvAddr("127.0.0.1:9876");// 3、订阅主题consumer.subscribe("Filter-Test-Topic", "tag1");// 4、注册监听消息,并打印消息consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt msg : msgs) {String printMsg = new String(msg.getBody()) + ", recvTime: "+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date());System.out.println(printMsg);}return ConsumeOrderlyStatus.SUCCESS;}});// 5、把消费者直接启动起来consumer.start();System.out.println("Consumer Started Finished.");}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/229198.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C#基础——类、对象和属性

类&#xff1a;是具有相同属性和行为特征的集合 对象&#xff1a;对象是类的实例化&#xff0c;它具有类定义的所有特征和行为。 类的语法格式&#xff1a; 访问修饰符 class关键字 类名 两种创建类的方式 第一种方式就是在类的下面再创建一个类 第二种方式是在文件中添加一个…

【员工工资册】————大一期末答辩近满分作业分享

前言 大家好吖&#xff0c;欢迎来到 YY 滴项目系列 &#xff0c;热烈欢迎&#xff01; 本章主要内容面向接触过C语言的老铁 主要内容含&#xff1a; 欢迎订阅 YY滴C专栏&#xff01;更多干货持续更新&#xff01;以下是传送门&#xff01; PS&#xff1a;以下内容是部分展示&am…

springboot升级到3.2导致mybatis-plus启动报错

在springboot升级到3.2时&#xff0c;服务启动报错 java.lang.IllegalArgumentException: Invalid value type for attribute ‘factoryBeanObjectType’: java.lang.String&#xff1a; java.lang.IllegalArgumentException: Invalid value type for attribute factoryBeanOb…

55 代码审计-JAVA项目注入上传搜索或插件挖掘

目录 必备知识点演示案例:简易Demo段SQL注入及预编译IDEA审计插件FindBugs安装使用Fortify_SCA代码自动审计神器使用Ofcms后台SQL注入-全局搜索关键字Ofcms后台任意文件上传-功能点测试 涉及资源&#xff1a; 我们一般针对java项目&#xff0c;进行漏洞分析的话&#xff0c;主要…

【计算机视觉--解耦视频分割跟踪任何物体】

UIUC&Adobe开源|无需监督&#xff0c;使用解耦视频分割跟踪任何物体&#xff01;视频分割的训练数据往往昂贵且需要大量的标注工作。这限制了将端到端算法扩展到新的视频分割任务&#xff0c;特别是在大词汇量的情况下。为了在不为每个个别任务训练视频数据的情况下实现“跟…

HPM6750系列--第九篇 GPIO详解(中断操作)

一、目的 在上篇中《HPM6750系列--第九篇 GPIO详解&#xff08;基本操作&#xff09;》我们讲解了GPIO的基本操作&#xff0c;本篇继续讲解GPIO的中断处理。 二、介绍 将一个引脚设置为中断涉及到以下几个步骤&#xff08;此处我们以PZ02举例&#xff09;&#xff1a; 1.设置IO…

全球汽车行业的数字化转型:产品和后端的渐进之旅

如何管理汽车行业的数字化转型?在我们本篇文章中了解更多有关如何设定长期目标的信息。 正在改变汽车行业的26个数字化主题 最近一篇关于汽车行业数字化转型的论文确定了26个数字技术主题&#xff08;论文详情请点击阅读原文&#xff09;&#xff0c;分为三个主要集群: 1)驾驶…

社交网络分析3:社交网络隐私攻击、保护的基本概念和方法 + 去匿名化技术 + 推理攻击技术 + k-匿名 + 基于聚类的隐私保护算法

社交网络分析3&#xff1a;社交网络隐私攻击、保护的基本概念和方法 去匿名化技术 推理攻击技术 k-匿名 基于聚类的隐私保护算法 写在最前面社交网络隐私泄露用户数据暴露的途径复杂行为的隐私风险技术发展带来的隐私挑战经济利益与数据售卖防范措施 社交网络 用户数据隐私…

centOS7 安装tailscale并启用子网路由

1、在centOS7上安装Tailscale客户端 #安装命令所在官网位置&#xff1a;https://tailscale.com/download/linux #具体命令为&#xff1a; curl -fsSL https://tailscale.com/install.sh | sh #命令执行后如下图所示2、设置允许IP转发和IP伪装。 安装后&#xff0c;您可以启动…

Shell三剑客:正则表达式(元字符)

一、定义&#xff1a;元字符字符是这样一类字符&#xff0c;它们表达的是不同字面本身的含义 二、分类&#xff1a; 1、基本正则表达式元字符 # ^ 行首定位 [rootlocalhost ~]# grep root /etc/passwd root:x:0:0:root:/root:/bin/bash operator:x:11:0:operator:/root:/…

webgpu demo阅读 A-Buffer

A-Buffer 简单看看原理code 简单看看原理 这个是OIT里的链表方式&#xff0c;说的是首先把每个像素搞一个链表&#xff0c;然后把深度<opaque的存起来&#xff0c;最后排序&#xff0c;然后混合 code 这里就有这么一个depht判断 再看最后合成 可以看到&#xff0c;确实是…

六:爬虫-数据解析之BeautifulSoup4

六&#xff1a;bs4简介 基本概念&#xff1a; 简单来说&#xff0c;Beautiful Soup是python的一个库&#xff0c;最主要的功能是从网页抓取数据官方解释如下&#xff1a; Beautiful Soup提供一些简单的、python式的函数用来处理导航、搜索、修改分析树等功能。 它是一个工具箱…

hive常用SQL函数及案例

1 函数简介 Hive会将常用的逻辑封装成函数给用户进行使用&#xff0c;类似于Java中的函数。 好处&#xff1a;避免用户反复写逻辑&#xff0c;可以直接拿来使用。 重点&#xff1a;用户需要知道函数叫什么&#xff0c;能做什么。 Hive提供了大量的内置函数&#xff0c;按照其特…

React系列:嵌套路由的使用

🍁 作者:知识浅谈,CSDN博客专家,阿里云签约博主,InfoQ签约博主,华为云云享专家,51CTO明日之星 📌 擅长领域:全栈工程师、爬虫、ACM算法 💒 公众号:知识浅谈 🔥网站:vip.zsqt.cc 🤞嵌套路由的使用🤞 🎈嵌套路由是什么 在一级路由中又内嵌了其他路由,这…

企业安全建设与实践-复习资料

文章目录 二、企业安全建设与实践1、复习Windows及Linux基础命令。例如&#xff1a;用户创建、权限提升等。2、复习docker 基础命令&#xff1a;启动、关闭、导入、导入、下载等命令复习建议docker菜鸟教程。3、复习Windows策略相关知识点。4、复习基线加固部分内容。5、渗透测…

STM32启动过程

STM32启动模式&#xff08;自举模式&#xff09; M3/3/7等内核&#xff0c;复位后做的第一件事&#xff1a; 从地址0x0000 0000处取出栈指针MSP的初始值&#xff0c;该值就是栈顶地址。从地址0x0000 0004处取出程序计数器指针PC的初始值&#xff0c;该值是复位向量。 芯片厂商…

DevEco Studio IDE 创建项目时候配置环境

DevEco Studio IDE 创建项目时候配置环境 一、安装环境 操作系统: Windows 10 专业版 IDE:DevEco Studio 3.1 SDK:HarmonyOS 3.1 二、在配置向导的时候意外关闭配置界面该如何二次配置IDE环境。 打开IDE的界面是这样的。 点击Create Project进行环境配置。 点击OK后出现如…

嵌入式人工智能(钱多?好学?前景好?)

概念 嵌入式人工智能&#xff08;Embedded AI&#xff09;是指将人工智能&#xff08;AI&#xff09;技术集成到各种设备和系统中&#xff0c;使其具备智能化和自主性。与传统的中央化计算模型不同&#xff0c;嵌入式人工智能将AI能力嵌入到设备本身&#xff0c;使其能够在本地…

FPGA简易加减法计算器设计

题目要求&#xff1a; &#xff08;1&#xff09;设计10以内的加减法计算器。 &#xff08;2&#xff09;1个按键用于指定加法或减法&#xff0c;一个用于指定加数或被加数&#xff0c;还有两个分别控制加数或被加数的增加或减少。 &#xff08;3&#xff09;设置的结果和计算的…

“华为杯” 第二十届中国研究生数学建模竞赛 数模之星、华为之夜与颁奖大会

文章目录 一、前言二、主要内容三、总结 &#x1f349; CSDN 叶庭云&#xff1a;https://yetingyun.blog.csdn.net/ 一、前言 不以物喜&#xff0c;不以己悲。见众生&#xff0c;见自己。 作为荣获一等奖的学生代表&#xff0c;我有幸参加了 “华为杯” 第二十届中国研究生数学…