RocketMQ基本概念与入门

文章目录

  • MQ基本结构
  • 依赖
    • 案例:
      • product
      • Consumer
    • 核心概念
      • 1.nameserver
      • 2.broker
      • 3.主题队列
      • 4.queue队列
      • 5. 生产者
      • 6.消费者分组和生产者分组
      • 7.消费点位

MQ基本结构

在这里插入图片描述

  • message: 消息数据对象
  • product: 程序代码,生成消息,发送消息到队列
  • consumer: 程序代码,监听(绑定)队列,获取消息,执行消费代码
  • queue: Rocketmq rabbitmq kafka这些消息队列中间件软件.

依赖

<dependency><!--2.2.2底层rocketmq客户端4.9.1--><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.2</version>
</dependency>

案例:

product

public class MyProducer {/*** 向rocketmq发送第一条消息*/@Testpublic void sendTest01() throws Exception {//1.准备一个生产者对象,开启长链接DefaultMQProducer producer=new DefaultMQProducer();//对当前producer设置分组producer.setProducerGroup("first-producer-group");//连接nameserver localhost:9876producer.setNamesrvAddr("localhost:9876");//开启长链接producer.start();//2.封装一个消息对象,我们想要发送的内容,只是消息的一部分//创建一个消息对象Message message=new Message();//消息携带的内容 bodyString msg="当前发送的第一条消息";message.setBody(msg.getBytes(StandardCharsets.UTF_8));//设置消息主题,分类,按业务分类message.setTopic("first-topic-a");//主题标签 和key标识 //3.调用api方法将消息发送,接收返回结果,查看发送的信息比如状态//分为异步发送,同步发送,异步发送性能速度更高,但是无法保证成功.//同步发送,性能速度没有异步快,但是可以接收反馈结果SendResult send = producer.send(message);//result解析获取发送相关的信息System.out.println("发送状态:"+send.getSendStatus());System.out.println("消息到达主题,队列,broker信息:"+send.getMessageQueue());}
}

Consumer

public class MyConsumer1 {@Testpublic void consumerTest01() throws Exception {//1.构建一个消费者对象,连接nameserver创建长链接// push pull的区别 push消费端,消费的消息是队列推送给他的// pull 消费端代码执行一次pull 拉取过来一条消息// 收邮件 推的, 抢红包 拉取的DefaultMQPushConsumer consumer=new DefaultMQPushConsumer();//设置nameserver地址consumer.setNamesrvAddr("localhost:9876");//消费者分组consumer.setConsumerGroup("first-consumer-group-a");//定义监听的主题,消费端代码会根据定义的主题寻找nameserver路由信息,找到主题的队列进行绑定//topic 主题名称,subExpression 定义过滤逻辑 *表示匹配所有consumer.subscribe("first-topic-a","*");//2.执行api开始监听主题,实现队列的消费//提供给consumer一个监听器consumer.setMessageListener(new MessageListenerConcurrently() {/*** 推送过来的消息,都会调用consumerMessage执行消费逻辑* @param list 消息数据 list表示可以批量处理的消息,不是批量消息,list元素只有1个* @param consumeConcurrentlyContext* @return*/@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,ConsumeConcurrentlyContext consumeConcurrentlyContext) {//获取消息 由于不是批量发送只有list一个元素MessageExt messageExt = list.get(0);messageExt.getMsgId();//唯一的一个标识,每次消息组装的对象都会在发送时,生成一个msgIdbyte[] body = messageExt.getBody();//将消息转化String message=new String(body, StandardCharsets.UTF_8);System.out.println("消费端获取到消息:"+message);//context 控制返回确认信息 ackIndex顺序//返回消费状态 success 队列会将消息对应当前消费组,移动偏移量,记录消费完成return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//开启长连接consumer.start();while (true);}
}

核心概念

1.nameserver

NameServer是一个简单的 Topic 路由注册中心,支持 Topic、Broker 的动态注册与发现。
主要包括两个功能:

  • Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;
  • 路由信息管理,每个NameServer将保存关于 Broker 集群的整个路由信息和用于客户端查询的队列信息。Producer和Consumer通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。

NameServer通常会有多个实例部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,客户端仍然可以向其它NameServer获取路由信息。

  • 总之: nameserver作为协调器, 谁的信息被用到,就要到nameserver注册,谁要用注册信息,就要到nameserver同步抓取.
    broker要作为rocketmq容器被生产者和消费者代码使用.

2.broker

Broker主要负责消息的存储、投递和查询以及服务高可用保证。

  • NameServer几乎无状态节点,因此可集群部署,节点之间无任何信息同步。Broker部署相对复杂。
  • 在 Master-Slave 架构中,Broker 分为 Master 与 Slave。一个Master可以对应多个Slave,但是一个Slave只能对应一个Master。Master 与 Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。

3.主题队列

  • 简单理解,主题是一类消息的集合,每次我们发送消息必须指定消息绑定某一个主题.

  • 生产者发送的某一条消息,只能指向一个主题,多条消息可以指向同一个主题,同一个主题中有多个消息队列保存消息,消费端可以根据订阅的主题消费不同主题的消息.这样可以实现业务隔离.

  • 比如电商主题可以是order,主题也可以是cart,还可以是product相关的…

  • 一类消息,从数据的格式,携带body格式,都是完全一致的. 不会出现"第一条消息的"body是普通字符串,第二条消息是个对象Json,不可能第一条消息延迟消息(支付订单倒计时取消),第二条消息普通同步消息.
    在这里插入图片描述

4.queue队列

存储消息的物理实体(最小单位)。一个Topic中可以包含多个Queue(分布式体现的关键),每个Queue中存放的就是该Topic的消息。一个Topic的Queue也被称为一个Topic中消息的分区**(Partition**)。

注意:一个Topic的Queue中的消息只能被一个消费者组中的一个消费者消费(消费点位逻辑)。一个Queue中的消息不允许同一个消费者组中的多个消费者同时消费。
在这里插入图片描述

5. 生产者

问题:通过上述概念的了解,生产者,nameserver,broker之间是如何交互的.

  • 启动 nameserver 保存broker路由信息
  • 主题一旦创建,保存broker里,同时生成队列,这些数据,作为路由信息保存nameserver
  • 生产者从nameserver拿到当前集群所注册信息(路由)
  • 发送消息的时候,连接具体的那个broker找具体的topic的具体queue实现消息发送,使用的具体信息,在返回的SendResult中体现

6.消费者分组和生产者分组

消息生产者,负责生产消息。本质上是程序中的一段代码.Producer投递消息到broker代理中.找到主题,负载均衡存放到队列中.
RocketMQ中的消息生产者都是以生产者组(Producer Group)的形式出现的。生产者组是同一类生产者的集合,产生同一类型的消息,这类Producer发送相同Topic类型的消息。一个生产者组可以同时向多个主题发送消息。
在这里插入图片描述
RocketMQ中的消息消费者都是以消费者组(Consumer Group)的形式出现的。消费者组是同一类消费者的集合,这类Consumer消费的是同一个Topic类型的消息,对应同一类消息数据。消费者组使得在消息消费方面,实现负载均衡(将一个Topic中的不同的Queue平均分配给同一个Consumer Group的不同的Consumer,注意,并不是将消息负载均衡)和容错(一个Consmer挂了,该Consumer Group中的其它Consumer可以接着执行消费逻辑.

由于主题中有多个队列,一组消费者,最多有和队列一样数量的消费成员,再多,无法绑定队列消费消息了.

7.消费点位

在队列中记录了所有和偏移量有关的数据比如:

  • 最小偏移量:都是0
  • 最大偏移量:当前消息的个数

在消费者中也在记录偏移量

  • 当前组对应主题队列的消费最小偏移量,和队列的最大偏移量(通过这两个值,能够知道当前消费者消费到哪个消息,还有多少没消费)
  • 在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/7786.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

全志F1C200S嵌入式驱动开发(spi-nor image制作)

【 声明:版权所有,欢迎转载,请勿用于商业用途。 联系信箱:feixiaoxing @163.com】 一般soc系统里面添加spi-nor flash芯片,特别是对linux soc来说,都是把它当成文件系统来使用的。spi-nor flash和spi-nand flash相比,虽然空间小了点,但是胜在稳定,这是很多工业…

(二)RabbitMQ【安装Erlang、安装RabbitMQ 、账户管理、管控台、Docker安装 】

Lison <dreamlison163.com>, v1.0.0, 2023.06.22 RabbitMQ【安装Erlang、安装RabbitMQ 、账户管理、管控台、Docker安装 】 文章目录 RabbitMQ【安装Erlang、安装RabbitMQ 、账户管理、管控台、Docker安装 】**安装Erlang**安装RabbitMQ账户管理管控台Docker安装RabbitM…

大数据技术之Hive2

目录标题 3、Hive 数据类型3.1 基本数据类型&#xff1a;3.2 集合数据类型&#xff1a;3.3 类型转化 4、DDL数据定义4.1 创建数据库4.2 查询数据库4.3 创建表4.4 管理表4.5 外部表4.6 管理表与外部表的相互转换4.7 分区表4.7.1 分区表基本操作4.7.2 分区表注意事项 4.7 修改表4…

【后端面经】微服务架构(1-4) | 降级:为什么每次大促的时候总是要把退款之类的服务停掉?

文章目录 一、 前置知识1、什么是降级?2、降级的典型应用3、为什么要降级?4、降级的分类5、如何降级?A) 降级的应用场景B) 跨服务降级C) 提供有损服务二、面试环节1、面试准备2、基本流程3、亮点方案A) 读写服务降级写服务B) 快慢路径降级慢路径三、章节汇总 在熔断章…

Flutter Widget Life Cycle 组件生命周期

Flutter Widget Life Cycle 组件生命周期 视频 前言 了解 widget 生命周期&#xff0c;对我们开发组件还是很重要的。 今天会把无状态、有状态组件的几个生命周期函数一起过下。 原文 https://ducafecat.com/blog/flutter-widget-life-cycle 参考 https://api.flutter.dev/f…

chrome解决http自动跳转https问题

1.地址栏输入&#xff1a; chrome://net-internals/#hsts 2.找到底部Delete domain security policies一栏&#xff0c;输入想处理的域名&#xff0c;点击delete。 3.再次访问http域名不再自动跳转https了。

【Linux后端服务器开发】HTTPS协议

目录 一、加密算法 二、中间人攻击 三、CA认证 一、加密算法 HTTPS协议是什么&#xff1f;HTTPS协议也是一个应用层协议&#xff0c;是在HTTP协议的基础上引入了一个加密层。 HTTP协议内容是按照文本的方式明文传输的&#xff0c;这就导致在传输过程中出现一些被篡改的情况…

vray GPU渲染如何设置?最适合 VRay 渲染的 GPU 是什么?

Chaos 提供的 Vray GPU是一个独立的渲染引擎&#xff0c;提供 GPU 硬件加速。它还与 CPU&#xff08;处理器&#xff09;配合使用&#xff0c;并利用 CPU 和 GPU 硬件进行无缝混合渲染。 GPU 渲染使 Vray GPU 渲染引擎能够在系统的 GPU 而不是 CPU 上执行光线跟踪计算。由于 G…

STM32MP157驱动开发——按键驱动(阻塞与非阻塞)

“阻塞与非阻塞 ”机制&#xff1a; 阻塞&#xff1a; 使用 休眠唤醒机制&#xff0c;read函数会休眠&#xff0c;是阻塞的使用 poll 时&#xff0c;如果传入的超时时间不为 0&#xff0c;表示 read函数会休眠&#xff0c;这种访问方法也是阻塞的。 非阻塞 使用 poll 时&am…

解析数据可视化工具:如何选择最合适的软件

在当今信息爆炸的时代&#xff0c;数据已成为各行各业的重要资源。为了更好地理解和分析数据&#xff0c;数据可视化成为一种必不可少的工具。市面上数据可视化工具不说上千也有上百&#xff0c;什么帆软、powerbi、把阿里datav&#xff0c;腾讯云图、山海鲸可视化等等等等&…

N位分频器的实现

N位分频器的实现 一、 目的 使用verilog实现n位的分频器&#xff0c;可以是偶数&#xff0c;也可以是奇数 二、 原理 FPGA中n位分频器的工作原理可以简要概括为: 分频器的作用是将输入时钟频率分频,输出低于输入时钟频率的时钟信号。n位分频器可以将输入时钟频率分频2^n倍…

redis中缓存雪崩,缓存穿透,缓存击穿的原因以及解决方案

一 redis的缓存雪崩 1.1 缓存雪崩 在redis中&#xff0c;新&#xff0c;旧数据交替时候&#xff0c;旧数据进行了删除&#xff0c;新数据没有更新过来&#xff0c;造成在高并发环境下&#xff0c;大量请求查询redis没有数据&#xff0c;直接查询mysql&#xff0c;造成mysql的…

算法竞赛入门【码蹄集新手村600题】(MT1040-1060)

算法竞赛入门【码蹄集新手村600题】(MT1040-1060&#xff09; 目录MT1041 求圆面积和周长MT1042 求矩形的面积和周长MT1043 椭圆计算MT1044 三角形面积MT1045 平行四边形MT1046 菱形MT1047 梯形MT1048 扇形面积MT1049 三角形坐标MT1050 空间三角形MT1051 四边形坐标MT1052 直角…

LabVIEW使用支持向量机对脑磁共振成像进行图像分类

LabVIEW使用支持向量机对脑磁共振成像进行图像分类 医学成像是用于创建人体解剖学图像以进行临床研究、诊断和治疗的技术和过程。它现在是医疗技术发展最快的领域之一。通常用于获得医学图像的方式是X射线&#xff0c;计算机断层扫描&#xff08;CT&#xff09;&#xff0c;磁…

Python基于PyTorch实现循环神经网络回归模型(LSTM回归算法)项目实战

说明&#xff1a;这是一个机器学习实战项目&#xff08;附带数据代码文档视频讲解&#xff09;&#xff0c;如需数据代码文档视频讲解可以直接到文章最后获取。 1.项目背景 LSTM网络是目前更加通用的循环神经网络结构&#xff0c;全称为Long Short-Term Memory&#xff0c;翻…

全志F1C200S嵌入式驱动开发(spi-nor驱动)

【 声明:版权所有,欢迎转载,请勿用于商业用途。 联系信箱:feixiaoxing @163.com】 和v3s一样,f1c200s本身也支持spi-nor flash。当然,不管是norflash,还是nandflash,都是为了能够让程序脱离sd卡,直接依靠板子上面的flash,就可以完成正常地加载和运行工作。tf…

flutter开发实战-jsontodart及 生成Dart Model类

flutter开发实战-jsontodart及 生成Dart Model类。 在开发中&#xff0c;经常遇到请求的数据Json需要转换成model类。这里记录一下Jsontodart生成Dart Model类的方案。 一、JSON生成Dart Model类 在开发中经常用到将json转成map或者list。通过json.decode() 可以方便 JSON 字…

华为数通HCIP-BGP基础

AS&#xff08;自治系统/路由域&#xff09; 定义&#xff1a;运行相同路由协议&#xff0c;具有相同管理规则的区域&#xff08;一般为一个企业网&#xff09;&#xff1b; 指具有相同路由管理策略的区域合集&#xff0c;一般为一个园区网&#xff1b; 标识&#xff1a;…

制作一个简易的计算器app

1、Ui开发 笔者的Ui制作的制作的比较麻烦仅供参考&#xff0c;在这里使用了多个LinearLayout对屏幕进行了划分。不建议大家这样做最好使用GridLayout会更加快捷简单 笔者大致划分是这样的&#xff1a; 使用了四个大框&#xff0c;在第四个大框里面有多个小框 最终界面如下&am…

机器学习 day31(baseline)

语音识别的Jtrain、Jcv和人工误差 对于逻辑回归问题&#xff0c;Jtrain和Jcv可以用分类错误的比例&#xff0c;这一方式来代替单单只看Jtrain&#xff0c;不好区分是否高偏差。可以再计算人类识别误差&#xff0c;即人工误差&#xff0c;作为基准线Jtrain与baseline对比只高了…