rocketmq 订阅组_必须先理解的RocketMQ入门手册,才能再次深入解读

推荐阅读一下下

  • 2020年后想跳槽?MQ、ZK、Nginx、Kafk等分布式技术你都掌握了?
  • 阿里架构师推荐学习的《RabbitMQ实战指南》,渣渣的你都看过吗?

RocketMQ入门手册

RocketMQ是一个分布式、队列模型的开源消息中间件,前身是MetaQ,是阿里研发的一个队列模型的消息中间件,后开源给apache基金会成为了apache的顶级开源项目,具有高性能、高可靠、高实时、分布式特点,

同时,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。

具有以下特点:

  • 能够保证严格的消息顺序
  • 提供丰富的消息拉取模式
  • 高效的订阅者水平扩展能力
  • 实时的消息订阅机制
  • 亿级消息堆积能力

RocketMQ 架构原理分析

RocketMQ 架构

51e489a9e6b0f8768218cac020c7f0f6.png

NameServer (名称服务器):

  • 提供轻量级的服务发现和路由。NameServer接受来自Broker群集的注册,并提供检测信号机制以检查Broker是否还存在
  • 每个NameServer记录完整的路由信息(Broker 相关 Topic 等元信息,并给 Producer 提供 Consumer 查找 Broker 信息),提供相应的读写服务。

Broker(消息服务器): 消息存储中心,接收来自 Producer 的消息并存储, Consumer 从这里取得消息

  • 单个Broker节点与所有的NameServer节点保持长连接及心跳,并会定时将Topic信息注册到NameServer,(其底层通信是基于Netty实现的)
  • Broker负责消息存储,以Topic为维度支持轻量级的队列,单机可以支撑上万队列规模,支持消息推拉模型。
  • 具有上亿级消息堆积能力,同时可严格保证消息的有序性

Producer (生产者):

  • 负责产生消息,生产者向消息服务器发送由业务应用程序系统生成的消息
  • 生产者支持分布式部署。 分布式生产者通过多种负载平衡模式将消息发送到Broker集群。 发送过程支持快速失败并且延迟低
  • 三种方式发送消息:同步、异步和单向

Consumer(消费者):

  • 负责消费消息,消费者从消息服务器拉取信息并将其输入用户应用程序
  • 也支持“推和拉”模型中的分布式部署。
  • 它还支持集群使用和消息广播。 它提供了实时消息订阅机制,可以满足大多数消费者的需求。

Broker Server

Broker Server负责消息的存储和传递,消息查询,HA高可用等,Broker Server几个主要模块组成:

2fec24fe3ba10605c42e934839d1c9c8.png

Remoting Module(远程模块):broker入口,处理来自客户端的请求

Client Manager(客户端管理):管理client(生产者/消费者)并维护消费者的主题订阅

Store Service(存储服务):提供简单的API中数据库中存储或查询消息

HA Service(高可用服务):提供master broker和slave broker之间的数据同步功能

Index Service(索引服务):将message建立索引来提供快速的查询能力

RocketMQ 整体流程

9c0a66642a6a8e92a5901c65422e4aed.png
  1. 启动 NameServer,NameServer启动后进行端口监听,等待 Broker、Producer、Consumer 连上来,相当于一个路由控制中心
  2. Broker 启动,跟所有的 Namesrv 保持长连接,定时发送心跳包 心跳包中,包含当前 Broker 信息(IP+端口等)以及存储所有 Topic 信息 注册成功后,Namesrv 集群中就有 Topic 跟 Broker 的映射关系
  3. 收发消息前,先创建 Topic 。创建 Topic 时,需要指定该 Topic 要存储在哪些 Broker上。也可以在发送消息时自动创建Topic
  4. Producer 发送消息 启动时,先跟 Namesrv 集群中的其中一台建立长连接,并从Namesrv 中获取当前发送的 Topic 存在哪些 Broker 上 然后跟对应的 Broker 建立长连接,直接向 Broker 发消息
  5. Consumer 消费消息 跟其中一台 Namesrv 建立长连接,获取当前订阅 Topic 存在哪些 Broker 上 然后直接跟 Broker 建立连接通道,开始消费消息*RocketMQ的消息领域模型

RocketMQ Message

ee2729b936d6c988162b7332f1a9c136.png

Topic(主题): 表示消息的第一级类型,是最细粒度的订阅单位(生产者传递消息和消费者提取消息标识)

  • 一条消息必须有一个Topic
  • 一个Group可以订阅多个Topic的消息
  • Topic一般为领域范围,比如交易消息

Tag(标签): 表示消息的第二级类型,可以是使用相同的Topic不同的Tag来表示同一业务模块的不同任务的消息,比如交易消息又可以分为:交易创建消息,交易完成消息等

  • 助于保持代码整洁和一致
  • 简化RocketMQ提供的查询系统

Message(消息体): 消息是要传递的信息。 Message中必须包含一个Topic,可选Tag和key-vaule键值对

Message Queue(消息队列): 所有消息队列都是持久化

  • 一个Topic下可以有多个Queue
  • Queue的引入使得消息的存储可以分布式集群化,具有了水平扩展能力

Group(组): 分为Producer Group(生产者组)和Consumer Group(消费者组),具有相同角色组成Group

  • 原生产者在交易后崩溃,broker可以联系同一生产者组的不同生产者实例以进行提交或回退交易。
  • 消费者组的消费者实例必须具有完全相同的主题订阅

RocketMQ 特性

Message Model(消息模式):

  • Clustering(集群式):当使用集群消费模式时,MQ 认为任意一条消息只需要被集群内的任意一个消费者处理即可
  • Broadcasting(广播式):当使用广播消费模式时,MQ 会将每条消息推送给集群内所有注册过的客户端,保证消息至少被每台机器消费一次

Message Order(消息顺序)

  • 使用DefaultMQPushConsumer时,可以决定按顺序或同时使用消息 Orderly:有序地使用消息意味着消息的消费顺序与生产者为每个消息队列发送消息的顺序相同。( 如果要处理必须强制执行全局顺序的情况,请确保您使用的主题只有一个消息队列) 如果指定按顺序使用,则消息使用的最大并发度是使用者组订阅的消息队列数 Concurrently:同时使用消息时,消息使用的最大并发性仅受为每个使用方客户端指定的线程池限制 在此模式下不再保证消息顺序

Message Types(消息类型)

  • 事务消息
  • 顺序消息
  • 延迟消息

RocketMQ单机版安装

  • 1. 下载编译源码
  # 下载$   > wget wget http://mirror.bit.edu.cn/apache/rocketmq/4.6.0/rocketmq-all-4.6.0-source- >   # 解压$  >unzip rocketmq-all-4.7.0-source-release.zip  > cd rocketmq-all-4.7.0/  # 编译$  > mvn -Prelease-all -DskipTests clean install -U  > cd distribution/target/rocketmq-4.7.0/rocketmq-4.7.0
  • 2. 启动 Name Server
 # 启动 Name Server 服务 > nohup sh bin/mqnamesrv & # 启动完成后,查看日志$ > tail -f ~/logs/rocketmqlogs/namesrv.log  The Name Server boot success...
  • 3. 启动 Broker

在 conf 目录下,RocketMQ 提供了多种 Broker 的配置文件:

  • broker.conf :单主,异步刷盘。
  • 2m/ :双主,异步刷盘。
  • 2m-2s-async/ :两主两从,异步复制,异步刷盘。
  • 2m-2s-sync/ :两主两从,同步复制,异步刷盘。
  • dledger/ :Dledger 集群,至少三节点
 # 启动 Broker服务 > nohup sh bin/mqbroker -n localhost:9876 & # 启动完成后,查看日志$ > tail -f ~/logs/rocketmqlogs/broker.log   The broker[%s, 172.30.30.233:10911] boot success...

其中,参数:

  • 通过 -c 参数,配置读取的主 Broker 配置
  • 通过 -n 参数,设置 RocketMQ Namesrv 地址
  • 4. Send & Receive Messages(消息发送与接收)
    为简单起见,我们使用环境变量:NAMESRV_ADDR,如下所示:、
 # 设置 Name Servers的地址$ > export NAMESRV_ADDR=localhost:9876 # 生产消息$ > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer SendResult [sendStatus=SEND_OK, msgId= ... # 消费消息$  > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer ConsumeMessageThread_%d Receive New Messages: [MessageExt...

作者:Ccww
原文链接:https://juejin.im/post/5e9571ccf265da47cd35733a

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/356481.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

关于返回按钮的事情

在微信的项目涉及到多个不同的项目间跳转那么头部的返回按钮怎么办呢? 在项目之间跳转还有在微信菜单进入页面时我们用了一个中转页进行参数处理和页面跳转判断 目前要用到code,userId等从后端写好的带参数的url到前端获取然后去调用后端的一个校验code和…

.net数据源控件绑定mysql_理解asp.net中DropDownList编辑数据源,绑定数据库数据。...

一、理解asp.net绑定数据库终于学习到了连接数据库部分的内容,杨中科老师视频看起来挺轻松的,如果是高清版就更ok了。我发现我学习新的编程语言会有一个特点,都想要赶紧学习数据库,数据就是一切,有了数据才能操作一切的…

jetty代理jetty_如何在Jetty中使用SPDY

jetty代理jettySPDY是Google提出的一种新协议,是针对网络的新协议。 SPDY与HTTP兼容,但尝试通过压缩,多路复用和优先级降低网页负载。准确地说,快速的目标是:( http://dev.chromium.org/spdy/spdy-whitepap…

五指棋,贪吃蛇,中国银行ATM源码链接

https://i.cnblogs.com/Files.aspx 转载于:https://www.cnblogs.com/weixiaoling/p/6083710.html

mysql select db error_select error:不能用DB-library(如isql)不能用DB-library(如isql

在我们php连接mssql出现select error:不能用DB-library(如isql)或odbc3.7或更早版本将ntext数据或仅使用Unicode排序规则的Unicode数据发送到客户端不能用DB-Library(如 ISQL)或 ODBC 3.7或更早版本将 ntext 数据Warning: mssql_query() [function.mssql- query]: message: 不能…

排序算法之二分治法

MERGE(A,p,q,r) n1 q-p1 n2 r-q let L[1..n_11] and R[1..n_21] be new arraysfor i 1 to n_1   L[i] A[pi-1]for j 1 to n_2   R[j] A[qj] L[n_11] \infty R[n_21] \infty i 1 j 1for k p to r   ifL[i] \leq R[j]     A[k] L[i]     i i1else A[k…

将Java EE与jOOQ结合使用的初学者指南

Java EE附带了自己的持久性API:JPA。 当您想要将RDBMS实体(表/关系)映射到Java实体(类)时,JPA最强大,主要遵循1:1映射策略。 其背后的想法是,业务逻辑通常并不像关系代数…

niva mysql_Nivacat for mysql是一种第三方提供的()_学小易找答案

【单选题】以下不包括在OSI环境中的是【单选题】以下不是直接推动新疆建省原因的选项是( ) (3.0分)【填空题】TOEFL stands for 1____________________________. More than four thousand American universities and other schools require students seeking 2________ to take…

清理SYSAUX表空间

1.查看SYSAUX表空间中数据分布情况 col SEGMENT_NAME for a30 set lines 999 select * from (select segment_name,PARTITION_NAME,segment_type,bytes/1024/1024 from dba_segments where tablespace_nameSYSAUX order by 4 desc) where rownum<10; 2.删除WRH$_LATCH_CHILD…

mysql备份需要停应用吗_对于含有innodb表的实例进行文件拷贝备份时必须停mysql实例吗...

展开全部在实际环境中&#xff0c;62616964757a686964616fe59b9ee7ad9431333361313261时不时需要备份恢复单个或多个表(注意&#xff1a;这里除非明确指定&#xff0c;所说的表一律指InnoDB表)&#xff0c;而对于innodb引擎恢复单个表需要整体的恢复&#xff0c;xtrabackup也可…

Apache Camel 2.16发布–十大亮点

Apache Camel 2.16于上周五发布。 这篇博客文章是我尝试在此新版本中进行前10名&#xff08;加1作为奖励&#xff09;的亮点。 1.动态到 来自骆驼用户的最常见的常见问题是如何将消息发送到端点&#xff0c;uri应该使用消息中的动态值&#xff08;例如标头&#xff09;。 在…

[原创软件]体验组批量加分工具

软件主要功能&#xff1a; 从excel批量导入用户QQ、昵称、年龄信息批量粘贴用户QQ或昵称进行筛选批量添加积分软件界面截图&#xff1a; 开发环境及语言&#xff1a; c#.NET Framework 4.0Visual Studio 2015更新日志&#xff1a; v1.2&#xff08;2016.9.30&#xff09; 增加格…

java8 filter return_java8新特性Java 8 Streams filter示例 - Java教程

在这篇教程中我将向你展示 filter(), collect(), findAny() 和 orElse() 的用法1. Streams filter() 和 collect()1.1 在 Java 8 以前, 像下面这样过滤一个 List:BeforeJava8.javapackage com.mkyong.java8;import java.util.ArrayList;import java.util.Arrays;import java.ut…

java 线性表的表示和实现_线性表中顺序表的的理解和实现(java)

线性表的顺序表示指的是用一组地址连续的存储单元以此存储线性表的数据元素&#xff0c;这种表示也称作线性表的顺序存储结构或顺序映像。通常&#xff0c;称这种存储结构的线性表为顺序表。特点是&#xff1a;逻辑上相邻的数据元素&#xff0c;其物理次序上也是相邻的。顺序表…

akka连接是什么_什么是Akka?

akka连接是什么在深入探讨什么是Akka之前&#xff0c;让我们退后一步来了解并发编程的概念在应用程序开发世界中是如何演变的。 应用程序已从大型的整体程序演变为面向对象的模型。 随着Java EE和Spring框架的出现&#xff0c;应用程序设计演变为更多的基于流程或任务的设计模型…

java socket 判断断网_java socket 判断对方在线或离线、断线

1 在客户端和服务器端做心跳检测(这个后来我放弃了)2 客户端中断连接&#xff0c;服务器会抛出异常&#xff0c;利用这点3这是我为什么不用心跳包的原因&#xff0c;因为不需要了。 有的时候&#xff0c;我们可能网络突然中断了。但是服务器依然会监听那个连接。而且很长都不抛…

java 可重入读写锁 ReentrantReadWriteLock 详解

详见&#xff1a;http://blog.yemou.net/article/query/info/tytfjhfascvhzxcyt206 读写锁 ReadWriteLock读写锁维护了一对相关的锁&#xff0c;一个用于只读操作&#xff0c;一个用于写入操作。只要没有writer&#xff0c;读取锁可以由多个reader线程同时保持。写入锁是独占的…

将JPA Hibernate与OptaPlanner集成

我们一直在改进OptaPlanner与JEE其余部分的集成&#xff0c;因此&#xff0c;构建可以正常工作的最终用户应用程序更加容易。 让我们看一下改进的JPA Hibernate集成。 基础 JPA Hibernate和OptaPlanner都可以在POJO&#xff08;普通的旧Java对象&#xff09;上工作&#xff0c…

c java 系统开发_java开发系统内核:使用C语言开发系统应用程序

更详细的讲解和代码调试演示过程&#xff0c;请参看视频用java开发C语言编译器如果你对机器学习感兴趣&#xff0c;请参看一下链接&#xff1a;机器学习&#xff1a;神经网络导论我们的操作系统通过增添内核接口导出机制后&#xff0c;已经可以作为平台&#xff0c;运行应用程序…

php 跨二级域名 设置cookie

登入也设置cookie setcookie(user_id, authlogin("user_id", "ENCODE"), time()(3600*24*30), "/", ".lfb365.com"); authlogin()方法为将user_id进行编码&#xff0c;非内置方法 退出页清除cookie setcookie("user_id", &q…