搭建服务【有2个】
一个是NameServer还有一个是Broker,要两个服务协同
后台启动服务
允许服务端自己创建topic,如果客户端传来的topic没有的话
然后搞个rocket mq的仪表盘,application.yml改成自己的地址【只需要指定NameServer的地址,不需要Broker地址】
后台启动
前提知识
所有的Broker类似于微服务构建的,NameServer相当于在注册中心 ,所有的Broker都往NameServer上注册,这也就是为什么只要配置NameServer的地址的原因。
主从交错分开,高可用,NameServer三台都要部署
文件夹里有线程的默认配置,只需要改一些参数
BrokerA主节点
brokerId > 0的是从节点
fileReserberTime表示日志保存120小时
deleteWhen表示凌晨四点开始删除过期文件
brokerRole=ASYNC_MASTER主从消息是怎么同步的?
同步:来一条消息同步一条
异步:过一段时间同步一条
flushDiskType:刷盘类型
同步:来一条消息就马上写到磁盘去
异步:先写缓存,攒一批再写到磁盘去
启动服务 -c指定配置文件
BrokerA从节点
启动服务 -c指定配置文件
BrokerB主节点
启动服务 -c指定配置文件
BrokerB从节点
启动服务 -c指定配置文件
Dashaboard无需再去修改,每个NameServer都是平等的,指定哪个都能看到
要修改也可以
假如停掉一台Broker服务,此时broker-b主服务和broker-a从服务停了
我们可以看到broker还是slave并没有切换成master【kafaka会自动主从切换,这个过程还要做消息同步,这个过程会导致消息丢失】
rocket不会自动切换,那就不会消息丢失。
如果此时发1000条消息,那只有master才会接收到消息,slave只会负责从master上备份消息 ,那此时broker-b的从节点就不会接收消息,所以保证了安全性,但是服务的可用性就降低了【必然有一个取舍】
虽然有2个分片了,但还是只有1个分片会收到消息,和没搭集群一个效果
那现在要实现高可用怎么办?
升级高可用集群Dledger【自动选主 从开源组织openMessage移过来的完整的功能模块】
文件夹自带线程的案例,修改一下配置即可
目录下3个文件,教你在一台机器上用3个Broker服务组成一个Dledger集群
原始配置:
Worker1配置
Worker2配置
集群名字相同,dLegerSelfId节点要不一样
Worker3配置
Broker启动的时候要指定配置文件【-C指令】
master一定编号是0
现在停掉master 128
129就变成master
消息不会丢 因为分片内数据是相同的
如果再下线一台呢?129下掉
只剩130并不会自动变成Master了【因为Dledger算法是基于Raft协议的 多数同意算法】
停了2台,1台就不是多数,就无法产生master
Dledger集群还有个作用是在各个节点之间同步消息,这个各个节点之间commitLog日志文件是强一致的,每时每刻都是一致的,所以就不会有消息丢失的问题
既然做到了强一致,肯定会对写文件的效率有影响的,影响了吞吐量【在5.x的版本把选举和日志拆开了】
整体架构
中间这个是服务端的核心服务,氛围NameServer和Broker
Broker:进行消息读写和处理服务端的请求
NameServer:负责对Broker的信息进行注册
客户端【生产者消费者】:只需要和NameServer打交道,只需要知道NameServer在哪,然后请求被转发到某个Broker某个分片上去,内部有个消息路由的机制
然后Broker可以做普通集群也能做Dledger高可用集群
消息转发逻辑
生产者发消息需要指向某个topic,topic会将消息分别存到不同的Broker上去【Kafka里叫partition rocketMq叫做MessageQueue 然后消费者去拉消息】
其实每个NameServer节点之间是互相独立的,没有任何信息的交互,都是全量数据,每个Broker都会往3个NameServer同时注册,他们3个NameServer组成集群的唯一作用是客户端可以指定3个NameServer,如果有其中一个挂了会自动换成另一个NameServer。NameServer上有所有的Broker地址
benchmark可以做压测
这里的topic只是一个逻辑上的概念,topic并不存储消息,消息是存储在一个个MessageQueue里的,尽量平均分配在不同的Broker上面
最小位点和最大位点表示消息的偏移量或者编号,从小到大递增,该Queue消息从0存到2。
使用
1.需要先指明生产者所属的组,可以有多个生产者属于相同的组【理论上代码都要一样】。
2.指定NameServer地址
3.start,说明和Broker是有多次交互的,并不是1次就完了
topic必不可少,tag是用来过滤的,可以理解为一个标识,可以给消息打一个标识。body是消息体
有3种发送消息方式
发消息的三种方式
void sendOneWay()
生产者只是往服务端发一个消息,发完就不管了,不管服务端有没有收到或者响应都不关心,单向发送,性能最高。
SendResult send()
生产者只是往服务端发一个消息,发完以后会拿到一个响应,就能用来判断Broker有没有正常接收到这条消息【安全性高,性能低】
SendCallback()
异步发送,往producer里传了msg外还需要传个接口,如果发送成功了怎么处理,发送失败了怎么处理。
性能比同步高
SendCallback()什么时候调用?
也需要Broker端返回一个消息,那如果Broker没响应怎么办?producer会超时的,默认3秒,可以设置
等待Broker超时的时间,8秒内没响应就会超时。
还有异步要一直处于start状态,不能close,不然就收不到Broker返回的回应了。
知道countDownLatch变成0了才会去关闭producer。
消费者
有两种模式,推模式和拉模式。
推模式【更常用】:
在Broker端会和Consumer建立一个连接,只要有消息到Broker了,就会主动将消息推到Consumer
需要指定消费者组【同一个消费者组的逻辑是相同的】
如果一个消费者组有多个Consumer,那Broker只会给其中一个推送消息
拉模式:
Consumer主动往Broker拉数据,如果有需要的数据就返回,没有就返回空
同样需要指定NameServer【也能通过环境变量指定就是了】
还能指定从哪里开始消费
FROM_TIMESTAMP:从哪个时间戳开始消费 同时要指定具体时间
FIRST_OFFSET:从MessageQueue队列的第一个开始消费
LAST_OFFSET:从MessageQueue队列的我上一次消费到的开始消费
还要订阅哪个topic
subExpression用来做过滤的,默认*就行
消息来了就会触发这个方法
最后需要向Broker返回一个状态,用来更新Broker端的消费者位点
CONSUME_SUCCESS:更新成功后续不会重新推送消息了
RECONSUME_LATER:更新失败,过段时间后重试。或者手动抛异常也会认为是失败【重试失败一定次数后 进入死信队列】
Consumer也需要一直启动,Broker才能往里头推
0号队列有2个消费者,broker-a和broker-b
代理者位点:
表示当前队列里消息屯了多少条消息,1就是1条,2就是2条,消费者位点表示消费者把它消费到哪个地方了
消费者位点:
是以订阅者组为维度来记录的,并不关心是哪个消费者终端
差值表示还有多少条消息没有消费
同一个消费者组,两个不同的终端,如果发2条消息,这两条消息只会交由其中一个消费者处理。【集群消费机制】
广播消息机制【只需修改消费者端代码与生产者无关】
给所有消费者都去推消息【适合做通知类的消息】
Broker并不会再记录消费者位点了,而是移到客户端,客户端自己去保存消费者位点。
这样缺陷本地记录有问题就可能导致消费不连续。所以广播消息不能保证消息的安全性。
Message都是存在Broker上面的不是MessageQueue上。
在RocketMQ中,消息最终是存储在Broker上的。但是,MessageQueue这个概念是与Broker紧密关联的,用于描述Broker中消息的逻辑分布和组织结构。
RocketMQ的架构主要包含以下几个组件:
- Producer:消息生产者,负责发送消息到Broker。
- Broker:消息处理中心,负责消息的存储、转发等。
- Consumer:消息消费者,从Broker拉取消息进行消费。
- NameServer:提供轻量级的服务发现和路由功能。
在Broker内部,消息是组织在不同的MessageQueue中的。每个Topic可以有多个MessageQueue,这些队列分布在一个或多个Broker上。当Producer发送消息时,它会根据负载均衡策略将消息发送到某个特定Topic的MessageQueue中。Broker会负责将消息实际存储到磁盘或内存上。
因此,可以说Broker是物理存储消息的地方,而MessageQueue是Broker内部的逻辑分区,用于管理消息的存储和消费。这种设计使得RocketMQ能够进行水平扩展,因为可以通过增加Broker和MessageQueue数量来提高系统的处理能力和容错性。
顺序消费
发的时候按订单0-9发,步骤按0-5发,可以看到订单顺序不是按发的,但是步骤顺序是对的。【保证1组消息是有序的,不保证消费者消息拉取顺序和生产者发的消息顺序是一致的。】
保证局部有序
生产者
需要多传2个参数,一个MessageQueueSelector对象和orderId。
参数List<MessageQueue>mqs 能拿到服务端Broker下所有的MessageQueue,msg消息,arg就是第三个参数orderId,用来自定义排序选择队列。
这里是按照orderId来选择MessageQueue,效果:orderId相同的会被发到同一个MessageQueue。
如果生产者将消息发送到同一个队列,即使是一批一批地发送,只要消息在生产者端是按顺序发送的,RocketMQ 会保持该队列中消息的顺序。这是因为消息是顺序存储在队列中的,并且在消费时会按照存储的顺序进行消费。
消费者端
需要注册监听器给一个MessageListenerOrderly()对象
它会锁定一个队列,当一个队列消息拉去完才去拉其他队列,它不一定是单线程,也可以是多线程,背后原理是通过线程池来做的并发控制。
返回值多了一个阻塞当前队列一段时间,比如有一条消息消费失败了,就不会处理后续消息,也无法消费后续消息。
之前传的是并发的监听器对象MessageListenerConturrently()
它拉消息是多个线程拉的,没办法保证消息的顺序性。并发消费是指消费者可以同时处理多个消息,不保证消息的顺序,一次拉32条。
如果要保证全局有序怎么办?
那就只能单队列模式,这样性能就非常差了。
kafka拉消息是,即使只有一个topic和partition,但是它好多个批次一起拉,这样就保证不了顺序了。
这3个批次是并行拉的【并不能严格保证顺序】
延迟消息
生产者
直接配置 这里是索引 也就是10s,延迟10秒发送到消费者【rabbit mq没有延时消息 通过死信队列或者插件】
5.x版本支持精确到时间戳【实际按秒发送消息】
实现延时消息的原理:
18个延时级别其实对应这个topic下的18个队列,每个队列按照不同的频率来扫描,等时间到了再转到真正的那个队列里去
批量消息【应对消息非常多的情况】
将所有的消息都放到一个list里【建议一个消息体大小不要超过1MB】
建议将消息分成一个个小的批次
假如我现在有个RPC请求,我的消息非常多,要怎么提升消息传输性能?
这时候就可以用批量消息来处理,减少IO,同时可以用二进制数组传输,比起String字符串占用更小。
过滤消息
生产者端
发消息时指定消息所属的tag
消费者端
同时需要订阅tag,这里只消费tag A 和tag C
那这个过滤是在哪里做的呢?
如果在Broker端做【RocketMq优先保证网络传输性能】
好处:网络传输的数据就更少,网络传输性能提高了
坏处:Broker更繁忙了,需要增加额外逻辑处理
不建议定制很复杂的过滤机制 会加大服务端压力
如果在消费端做
坏处:网络传输性能下降
好处:Broker不用处理这个逻辑了
生产者可以通过putUserProperty除了tag外自己来添加属性【默认false没开启 防止Broker过于繁忙】
sql过于复杂对Broker压力会比较大
消费者端可以写复杂的sql预计实现消息过滤
事务消息
Rocket提供一种机制,落本地库和发送消息要么同时成功,要么同时失败。
Rocket只保证前一半是原子性的,后半部分的原子性由消费者自己保证,因为如果消费者收到消息就证明上游是正常的了,消费者自己保证能把消息写到mysql,搞定。
如果消息写mysql失败了,抛个异常,Broker会多次投递消息,最终保证消息写入mysql。
不是标准的分布式事务解决方案
生产者端【与消费者无关 只与生产者有关】
核心就这一行代码
监听器主要有2个方法
如果tag是 TagA 表示事务要提交,如果是TagB表示回滚
checkLocalTranscation()方法:
C提交事务,D回滚
消费者这边就先收到TagA的数据,后收到TagC的数据 其他就收不到了
所以生产者在发送消息的时候优先触发executeLocalTranscantion()方法,然后过一段时间才会执行checkLocalTranscation()方法
1.先会发个half半消息到Serever端【消费者感知不到 会挪到系统的topic里】,然后Server会返回个消息,用来嗅探Server端是否正常.
2.Server端正常了才去执行自己的本地方法【比如写到数据库】,执行完后向Server端返回一个执行状态,如果是Commit那就会把消息直接推送到MessageQueue然后给消费者。如果是Rollback那就直接把消息扔掉,不会往消费者推送
还有个状态是UNNO如果执行本地方法太久了,一直都没返回,Server端会等一段时间发一条消息给生产者发一个请求来确认本地事务是否执行完成,然后生产者就会去检查本地事务是否执行完成然后来返回信息Commit 或 Rollback,如果还没完成继续返回UNNO。
回查时长和次数可以配置:
时间:
默认回查15次
如果还是UNNO,就直接丢弃消息
使用场景
如果12306,下单完成等待支付,正常是下完单之后启动一个定时任务来检查是否下单完成
用户5分钟下才支付成功,你用定时任务15分钟扫一次,那就不知道用户是否支付成功,就无法分配座位。
我们可以这么使用:【也可以使用延时队列来操作】
下完单写入mysql同时创建一个支付订单,然后生产者主动返回UNNO状态,让Server来回查是否支付成功,检查到支付成功就往下游推送,然后分配座位。
这时候参数灵活配置。
这段代码有什么问题?
想用@Transcation进行事务控制,但是这个代码无法进行事务控制。这个只能控制数据库的事务
假如第三步失败了,第二步是无法回滚的。
改成事务消息机制来做即可。
解决方案
1.上面的在本地事务执行,执行完发送UNNO到Server端,下面的代码回查时候执行,两个都成功了消息提交。
2.让本地事务执行上面和下面代码,执行成功发送Rollback,失败发送Commit,然后如果Server端收到Commit,说明执行失败了,就去删除刚才新插入的订单表.如果删除失败——重试,让Broker再发条信息过来消费。
小总结:
MessageQueue可以配置文件设置,建议是Broker的1.5-2倍
队列不是无界队列,消息最长保留时间的配置的,所以最小点位不会一直是0,最早消息被删掉0就没了
一个消费者可以处理多个MessageQueue,但一个MessageQueue只会交由1个消费者处理
消费者数量最好等于队列数量,要是多了,那多的消费者就是混子。
ACL权限控制机制
需要将权限控制设置为true
重试队列
这些消息曾经已经推送给消费者,但消费者处理失败了进入重试队列。如果出现重试队列,证明消费者服务是有问题的。重试队列是根据消费者组来的,如果你以往的消费者订阅了多个topic,都可能进入一个队列。
- 一个消费者组可以订阅一个或多个Topic,消费者组内的所有消费者实例共同协作消费这些Topic的消息。
- 同一个Topic的消息可以被多个消费者组订阅,每个消费者组独立消费,互不影响。这样可以实现消息的广播(每个消费者组都会收到所有消息)或消息的集群消费(每个消费者组内部的负载均衡)。
- 如果多个消费者实例属于同一个消费者组,并且订阅了同一个Topic,那么这些消费者实例会在内部进行协调,确保每个消息只被一个消费者实例消费一次(集群消费模式)。
总结来说,消费者组是一种逻辑上的分组方式,它允许多个消费者协调消费相同Topic的消息。通过消费者组,RocketMQ不仅能提供消息的高可用性和负载均衡,还能根据不同的业务需求实现不同的消费模式。
死信队列默认无法消费,需要手动修改才能消费。
消费者组代表的一般是相同的处理逻辑,如果想处理逻辑不同也可以,但不利于消息回溯。
Springboot 整合RocketMq
生产者固定写法
这里convertAndSend和send都可以,convertAndSend会自动转成Rocket mq内部的消息格式,
不仅能发消息其实还能收消息,对应其实就是客户端的拉模式
消费者固定写法
在注解里声明一个消费者@RocketMQMesaageListener
然后还要实现RocketMQListener这个接口
自动就会拉取消息,然后再onMessage里处理
和原生的api相比,整合完Springboot后使用也会更简单
顺序消费
同步顺序消费,指定topic和消息,然后直接直接指定一个key,按照这个key的哈希去选择一个队列去发送,如果key相同就选择一个队列去发消息,无需实现这个算法,已经有默认实现了。
异步发送还会注册个回调函数,如果客户端给个返回的话,还能通过回调去判断消息有没有成功,超时之类。
事务消息
只需要发送的时候用sendMessageInTransaction就行了
监听者
直接往spring容器里声明,里面也有两个和原生一样的方法,所以当执行sendMessageInTransaction()时候,就会触发监听器的excuteLocalTransaction()方法执行本地事务,
如果是UNNO就会过段时间回查
那如果我有不同的消费逻辑,不同的事务消息发送者怎么办?都不能用同一个监听器吧?
提供了默认的rocketMQTemplate,对生产的一个封装。
在springboot中没有办法直接声明多个生产者,如果要声明用另外的注解来声明
然后@RocketMQTransactionListener注解有个RocketMQTransactionBeanName属性,也就是这个事务监听器是针对哪个rocketTemplate来配置的,默认是rocketMQTemplate名字。
像刚才上面声明的,他在容器里的名字就是extRocketMQTemplate
源码分析
消费者
为什么一个注解就能消费消息了?
关键都在一个配置类里头
因为继承了SmartInitializingSingleton类,spring加载的时候就会触发afterSingletonsInstantiated()
找到有@RocketMQMesaageListener的所有bean,然后调用registerContainer()方法
注册完了然后启动Container,Container相当于Consumer的一个载体
这里就有个consumer.start(),就是和原生的方式一样
RocketMQTemplate是怎么注入的?
通过spring.factories文件
自动配置类就会触发这个配置
spring.factories 文件在 Spring Boot 中扮演着重要的角色,它是 Spring Boot 自动配置(auto-configuration)机制的核心部分。这个文件通常位于资源目录(resources)下的 META-INF 文件夹中。在 Spring Boot 启动过程中,它会被用来加载预定义的配置类和其他组件。
主要用途包括:
- 自动配置:spring.factories 文件中可以声明一系列自动配置类(auto-configuration classes),这些类通常带有 @Configuration 注解,它们定义了条件化的配置,这样 Spring Boot 就可以根据 classpath 中的类、Bean 的存在性以及其他因素来决定是否应用这些配置。
- 条件化配置:自动配置类通常会使用 @Conditional 相关的注解(如 @ConditionalOnClass、@ConditionalOnMissingBean 等),这些注解让 Spring Boot 能够在满足特定条件时才启用特定的配置。
- 启动过程中的扩展点:除了自动配置之外,spring.factories 文件还可以用于注册其他类型的组件,比如 ApplicationContextInitializer、ApplicationListener、FailureAnalyzer 等,这些组件可以在应用启动过程中的不同阶段提供自定义的行为。
- Spring Boot 特性:比如,Spring Boot 的各种 starter 项目通过这个机制来提供它们的默认配置。当开发者将特定的 starter 添加到项目的依赖中时,相关的自动配置类会被自动加载,从而减少了开发者需要手动配置的工作量。
结构上,spring.factories 文件是一个简单的键值对列表,格式如下:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.example.AutoConfig1,\
com.example.AutoConfig2
在上面的例子中,EnableAutoConfiguration 是一个键,它对应多个自动配置类,这些类通过逗号分隔。在 Spring Boot 应用程序启动时,会自动检测到这些配置并应用它们。
这个文件中的每一行都定义了一个接口或类的全限定名(Fully Qualified Name, FQN),后面跟着一个等号(=),然后是一个反斜杠(\)作为续行符,最后是一个或多个实现了该接口或扩展了该类的全限定名,它们使用逗号(,)分隔。在这个例子中:
-
- org.springframework.boot.autoconfigure.EnableAutoConfiguration是一个键,表示Spring Boot在启动时会查找实现了自动配置的类。这个键其实是Spring Boot内部使用的,用来识别所有参与自动配置的类。
- 反斜杠\是一个续行符,表示这一行后面的内容仍然属于前面的键(在这种格式中通常用于提高可读性,避免一行过长)。
- com.example.AutoConfig1和com.example.AutoConfig2是与该键相关联的值,它们都是全限定的类名。这些类包含了自动配置的逻辑,通常是带有@Configuration注解的Java类,定义了当特定条件满足时如何配置Spring应用上下文中的bean。
在Spring Boot应用启动过程中,Spring Boot的自动配置系统会加载spring.factories文件,并查找EnableAutoConfiguration键下列出的所有类。然后,Spring Boot会创建这些类的实例,应用它们提供的配置。如果这些自动配置类有条件注解(如@ConditionalOnClass、@ConditionalOnBean等),那么Spring Boot只有在这些条件满足时才会应用它们的配置逻辑。
这样的设计允许开发者通过简单地将依赖项添加到项目中来启用大量默认配置,这是Spring Boot“约定优于配置”哲学的一部分,极大地简化了Spring应用的配置工作。开发者也可以通过创建自己的spring.factories文件来提供额外的自动配置类,从而扩展或覆盖Spring Boot的默认配置。
总的来说,spring.factories 是 Spring Boot 提供“约定优于配置”理念的关键机制,通过它,开发者可以轻松地利用 Spring Boot 提供的自动配置支持,同样也可以创建自己的自动配置类来简化第三方库的集成。
在这里头就会注入RocketMQTemplate,这里面就有个producer,其实就是用来构造producer的
Springboot整合其实就是在原生api上做了点封装。
NameServer启动过程
1.创建一个Controller
2.启动一个Controller
这里new了两个配置类,NameServer的一些配置就是通过这两个来解析的
NameServer默认就是9876端口
解析命令行的-C参数
启动Broker的时候会指定-c 配置文件地址,NameServer同样也支持
-p可以打印一些核心参数
然后就是构建Controller的一些参数结束。
启动Controller start();
1.初始化定时任务,初始化失败就退出
2.启动服务
初始化
1.重新加载配置文件
2.构建远端服务 响应rpc请求
执行定时任务的一些方法,说明routeInfoManager用来管理broker的【扫描不活跃的Broker】
后面和安全相关 不关心
Start()方法
启动一个远端服务,下面是和TLS安全相关,不关心
那为什么只有RPC的Server端,没有RPC的客户端呢?意味着NameServer只接收请求,不往外发请求。
NameServer和注册中心不一样,它不保证数据的一致性,不同的NameServer之间数据是没有任何交互的
如果要交互肯定要有Clinet端。
小结
Broker服务的启动过程
直接一个start()方法,简单粗暴。
也包括创建Controller和启动Controller
1.设定版本
2.封装对应的参数
3.Broker既要作为服务端响应别人请求,也要作为客户端往别的服务发请求
比起NameServer的核心配置多了个Netty客户端
这里就是指定的服务远端的ip,如果不指定就有可能拿到的是内网网卡地址
消息存储的相关配置
然后如果是SLAVE会比MASTER的内存少10%
-c指定Broker的配置文件
构建Controller,把核心配置传进去
然后进行初始化。
初始化
Broker初始化构建了非常多的东西
构建RPC请求的服务端,基于Netty框架构建的服务端。
下面还构建了个fastRemotingServer
生产者发消息的时候可以指定VIP channel,主要就是用来分摊请求,没啥其他作用,普通业务走普通,几个重要的vip业务走vip,功能都一样。
声明了一些线程池
然后注册Processor(),然后开启一些定时任务,比如持久化到磁盘
Broker要往各个NameServer完成注册
代码最后还有初始化事务【用的java提供的spi机制 这就是Rocket mq 预留的一个扩展点】
所以这里留了三个扩展点
RPCHook【RPC 钩子 在发送RPC请求之前和之后可以做点事】
start()方法
就干了一个事情 Controller 启动,然后打一些日志
启动一些核心服务
其中有个Broker往外发请求的api,这里面就封装了个remotingClinet,启动了个Netty的客户端,相当于和远端NameServer建立了长连接
然后启动了个定时任务 结束。Broker往NameServer注册心跳的方法。
10秒后启动,