RocketMQ 入门:MQ 基础概念、集群搭建与高可用配置,消息示例详解

RocketMQ 入门

视频地址:

千锋教育RocketMQ全套视频教程,快速掌握MQ消息中间件

什么是 MQ ?

Message Queue(消息 队列),从字面上理解:首先它是一个队列。FIFO 先进先出的数据结构 —— 队列。

消息队列就是所谓的存放消息的队列。

消息队列解决的不是存放消息的队列的目的,解决的是通信问题

1、同步通信情况下

比如以电商订单系统为例,如果各服务之间使用同步通信,不仅耗时较久,且过程中受到网络波动的影响,不能保证高成功率。因此,使用异步的通信方式对架构进行改造。

image

2、使用消息队列后。异步通信的情况下

  • 使⽤异步的通信方式对模块间的调⽤进行解耦,可以快速的提升系统的吞吐量。
  • 上游执行完消息的发送业务后立即获得结果,下游多个服务订阅到消息后各⾃消费。
  • 通过消息队列,屏蔽底层的通信协议,使得解藕并行消费得以实现。

image

RocketMQ 的基本概念

技术架构

image

RocketMQ 架构上主要分为四部分,如上图所示:

  1. Producer:消息发布的⻆⾊,⽀持分布式集群⽅式部署。Producer 通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递的过程⽀持快速失败并且低延迟。

  2. Consumer:消息消费的角色,支持分布式集群方式部署。⽀持以 push 推,pull 拉两种模式对消息进行消费。同时也⽀持集群方式广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。

  3. NameServer:NameServer 是⼀个非常简单的 Topic 路由注册中心,其⻆⾊类似 Dubbo 中的 zookeeper,支持 Broker 的动态注册与发现

    • 主要包括两个功能:Broker 管理,NameServer 接收 Broker 集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查 Broker 是否还存活;路由信息管理,每个 NameServer 将保存关于 Broker 集群的整个路由信息和⽤于客户端查询的队列信息。

    • 然后 Producer 和 Conumser 通过 NameServer 就可以知道整个 Broker 集群的路由信息,从⽽进⾏消息的投递和消费。

    • NameServer 通常也是集群的⽅式部署,各实例间相互不进⾏信息通讯。因为 Broker 是向每⼀台 NameServer 注册⾃⼰的路由信息,所以每⼀个 NameServer 实例上⾯都保存⼀份完整的路由信息。

    • 当某个 NameServer 因某种原因下线了,Broker 仍然可以向其它 NameServer 同步其路由信息,Producer,Consumer 仍然可以动态感知 Broker 的路由的信息。

  4. BrokerServer:Broker 主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker 包含了以下几个重要⼦模块。

    • Remoting Module(负责处理请求):整个 Broker 的实体,负责处理来自 clients 端的请求。
    • Client Manager(负责管理订阅信息):负责管理客户端(Producer/Consumer)和维护 Consumer 的 Topic 订阅信息
    • Store Service:提供⽅便简单的 API 接⼝处理消息存储到物理硬盘和查询功能。
    • HA Service(提供数据同步功能):⾼可⽤服务,提供 Master Broker 和 Slave Broker 之间的数据同步功能。
    • Index Service(索引服务):根据特定的 Message key 对投递到 Broker 的消息进⾏索引服务,以提供消息的快速查询。

简而言之

NameServer 服务器在消息发送时充当了路由信息的提供者,帮助生产者找到正确的 Broker 来发送消息。它起到了注册和发现 Broker 的作用,以确保消息能够准确地路由到目标位置

当【生产者】发送消息时,它会根据 Topic 选择合适的 Broker 来发送消息。Broker 负责存储和管理消息,每个 Broker 上可以管理多个 Topic。因此,Topic 是在 Broker 上创建和管理的,而不是在 NameServer 服务器上。

部署架构

image

RocketMQ 网络部署特点

  1. NameServer 是⼀个⼏乎⽆状态节点,可集群部署,节点之间⽆任何信息同步。

  2. Broker 部署相对复杂,Broker 分为 Master 与 Slave,⼀个 Master 可以对应多个 Slave,但是⼀个 Slave 只能对应⼀个 Master

    • Master 与 Slave 的对应关系通过指定相同的 BrokerName,不同的 BrokerId 来定义,BrokerId 为 0 表示 Master,⾮ 0 表示 Slave。Master 也可以部署多个。

    • 每个 Broker 与 NameServer 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 NameServer。

    • 注意:当前 RocketMQ 版本在部署架构上⽀持⼀ Master 多 Slave,但只有 BrokerId=1 的从服务器才会参与消息的读负载

  3. Producer 与 NameServer 集群中的其中⼀个节点(随机选择)建⽴⻓连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Master 建⽴⻓连接,且定时向 Master 发送⼼跳。Producer 完全⽆状态,可集群部署。

  4. Consumer 与 NameServer 集群中的其中⼀个节点(随机选择)建⽴⻓连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建⽴⻓连接,且定时向 Master、Slave 发送⼼跳。Consumer 既可以从 Master 订阅消息,也可以从 Slave 订阅消息,消费者在向 Master 拉取消息时,Master 服务器会根据拉取偏移量与最⼤偏移量的距离(判断是否读⽼消息,产⽣读 I/O),以及从服务器是否可读等因素建议下⼀次是从 Master 还是 Slave 拉取。

结合部署架构图,描述集群⼯作流程

  1. 启动 NameServer,NameServer 起来后监听端⼝,等待 Broker、Producer、Consumer 连上来,相当于⼀个路由控制中心
  2. Broker 启动,跟所有的 NameServer 保持⻓连接,定时发送⼼跳包
    • 心跳包中包含当前 Broker 信息(IP+端⼝等)以及存储所有 Topic 信息。
    • 注册成功后,NameServer 集群中就有 Topic 跟 Broker 的映射关系。
    • 收发消息前,先创建 Topic,创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时⾃动创建 Topic。
  3. Producer 发送消息,启动时先跟 NameServer 集群中的其中⼀台建⽴⻓连接,并从 NameServer 中获取当前发送的 Topic 存在哪些 Broker 上,轮询从队列列表中选择⼀个队列,然后与队列所在的 Broker 建⽴⻓连接从⽽向 Broker 发消息。
  4. Consumer 跟 Producer 类似,跟其中⼀台 NameServer 建⽴⻓连接,获取当前订阅的 Topic 存在哪些 Broker 上,然后直接跟 Broker 建⽴连接通道,开始消费消息。

快速开始

下载 RocketMQ

官网下载地址:https://rocketmq.apache.org/zh/download/

1、运行版【二进制压缩包】:

image

2、另一个是【源码】 – source 下载

安装 RocketMQ

  1. 准备⼀台装有Linux系统的虚拟机。需要关闭防火墙。

  2. 安装 jdk,上传 jdk 安装包并解压缩在 /usr/local/java ⽬录下。

  3. 安装 rocketmq,上传 rocketmq 安装包并使⽤ unzip 命令解压缩在 /usr/local/rocketmq ⽬录下。

    # 进入指定目录
    cd /usr/local
    # 新建文件夹 
    mkdir rocketmq
    # 解压压缩包
    unzip [压缩包名]
    # 删除压缩包
    rm -rf [压缩包名]# -f:force,强制删除,否则会每个文件都询问是否删除
    # -r:表示递归,移除文件夹的时候需要使用
    
  4. 配置 jdk 和 rocketmq 的环境变量

    1、进入配置文件

    vim /etc/profile
    

    2、i 插入,插入后,ESC + :wq 退出

    export JAVA_HOME=/usr/local/java/jdk1.8.0_171
    export JRE_HOME=/usr/local/java/jdk1.8.0_171/jre
    export ROCKETMQ_HOME=/usr/local/rocketmq/rocketmq-all-5.1.2-bin-release
    export CLASSPATH=$CLASSPATH:$JAVA_HOME/lib:$JAVA_HOME/jre/lib
    export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$ROCKETMQ_HOME/bin:$PATH:$HOME/bin
    

    注意,RocketMQ 的环境变量⽤来加载 ROCKETMQ_HOME/conf 下的配置⽂件, 如果不配置则⽆法启动 NameServer 和 Broker。

    3、使环境变量生效

    source /etc/profile
    

    4、验证一下

    javac
    

    5、修改 bin/runserver.sh ⽂件,由于 RocketMQ 默认设置的 JVM 内存为 4G,但虚拟机⼀般是 2G 内存,因此调整为 512mb。

    cd ./binvim runserver.sh
    

    修改前:

    image

    修改后:

    image

启动 NameServer

1、在 bin ⽬录下使⽤静默⽅式启动

nohup ./mqnamesrv &# 建议使用带名称空间的形式 -n <服务器地址>:<端口>
nohup ./mqnamesrv -n 192.168.194.132:9876 &

2、查看是否启动成功

cat nohup.out

image

启动 Broker

1、修改 broker 的 JVM 参数配置,将默认 8G 内存修改为 512m。修改 bin/runbroker.sh ⽂件

vim runbroker.sh

修改

JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g"
# 改为
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m"

2、conf/broker.conf ⽂件中加⼊如下配置,开启⾃动创建 Topic 功能

vim broker.conf
# 开启⾃动创建 Topic 功能
autoCreateTopicEnable=true

3、以静默⽅式启动 broker

nohup ./mqbroker -n localhost:9876 &
# 可以使用 localhost,但不建议
nohup ./mqbroker -n 192.168.194.132:9876 &
# 查看 bin/nohup.out ⽇志,看是否开启成功
cat nohup.out

使用发送和接收信息验证 MQ

1、配置 nameserver 的环境变量

在发送/接收消息之前,需要告诉客户端 nameserver 的位置。配置环境变量 NAMESRV_ADDR

export NAMESRV_ADDR=localhost:9876

2、使⽤ bin/tools.sh ⼯具验证消息的发送,默认会发 1000 条消息发送的消息⽇志:

./tools.sh org.apache.rocketmq.example.quickstart.Producer

发送的消息日志:

...
SendResult [sendStatus=SEND_OK, msgId=AC110001E73F0133314B8998AB2503E7, offsetMsgId=C0A8C28400002A9F000000000003AC09, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=2], queueOffset=249]

3、使⽤ bin/tools.sh ⼯具验证消息的接收

./tools.sh org.apache.rocketmq.example.quickstart.Consumer

看到接收到的消息:

...
ConsumeMessageThread_please_rename_unique_group_name_4_20 Receive New Messages: [MessageExt [brokerName=localhost.localdomain, queueId=2, storeSize=241, queueOffset=232, sysFlag=0, bornTimestamp=1687857283710, bornHost=/192.168.194.132:59242, storeTimestamp=1687857283711, storeHost=/192.168.194.132:10911, msgId=C0A8C28400002A9F0000000000036C05, commitLogOffset=224261, bodyCRC=1379786659, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={CONSUME_START_TIME=1687857375912, MSG_REGION=DefaultRegion, UNIQ_KEY=AC110001E73F0133314B8998AA7E03A3, CLUSTER=DefaultCluster, MIN_OFFSET=0, TAGS=TagA, WAIT=true, TRACE_ON=true, MAX_OFFSET=250}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 51, 49], transactionId='null'}]] 

关闭服务器

1、关闭 broker

./mqshutdown broker

2、关闭 nameserver

./mqshutdown namesrv

关闭前:

image

关闭后:

image

搭建 RocketMQ 集群

保存服务的 高性能、高可用。

RocketMQ 集群模式

为了追求更好的性能,RocketMQ 的最佳实践方式都是在集群模式下完成。

RocketMQ 官⽅提供了三种集群搭建⽅式。

2 主 2 从异步通信方式

使用异步方式进行主从之间的数据复制,吞吐量大,但可能会丢消息(因为异步的情况下,从服务器 slave 可能会复制失败)。

使用 conf/2m-2s-async 文件夹内的配置文件做集群配置。

image

2 主 2 从同步通信方式

使⽤同步方式进行主从之间的数据复制,保证消息安全投递,不会丢失,但影响吞吐量(因为此时相对于生产者来说进行了阻塞)。

使用 conf/2m-2s-sync 文件夹内的配置文件做集群配置。

2 主 无 从方式

会存在单点故障,且读的性能没有前两种⽅式好。

使⽤ conf/2m-noslave ⽂件夹内的配置⽂件做集群配置。

Dledger 高可用集群

上述三种官方提供的集群没办法实现⾼可用,即在 master 节点挂掉后,slave 节点没办法⾃动被选举为新的 master,需要人工实现。

RocketMQ 在 4.5 版本之后引⼊了第三方的 Dleger 高可用集群。

搭建主从异步集群

旨在提供高可用性、数据冗余和负载均衡,以确保消息系统的可靠性和性能。

1、准备三台 Linux 服务器

三台 Linux 服务器中 nameserver 和 broker 之间的关系如下:

# 查看 ip 地址
ifconfig
服务器服务器IPNameServerBroker节点部署
服务器1192.168.194.133192.168.194.133:9876
服务器2192.168.194.134192.168.194.134:9876broker-a(master),broker-b-s(slave)
服务器3192.168.194.135192.168.194.135:9876broker-b(master),broker-a-s(slave)

注意:这是交叉部署

  • 服务器 2 上的是:broker-a 的主节点和 broker-b 的从节点
  • 服务器 3 上的是:broker-b 的主节点和 broker-a 的从节点

三台服务器都需要安装 jdk 和 rocketmq,安装步骤参考上⼀章节(笔者用的是 CentOS ,不需要安装配置,不知道是不是这个原因)

克隆步骤:

第一步,点击克隆

image

第二步,创建链接克隆

image

2、启动三台 NameServer

nameserver 是一个轻量级的注册中心,broker 把自己的信息注册到 nameserver 上。因为 nameserver 是无状态的,所以直接启动即可。

三台 nameserver 之间不需要通信,而是被请求方来关联三台 nameserver 的地址。
修改三台服务器的的 runserver.sh 文件
修改 JVM 内存默认的 4g 为 512m。

JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -
XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

在每台服务器的 bin 目录下执行如下命令:

服务器 1:

nohup ./mqnamesrv -n 192.168.194.133:9876 &

服务器 2:

nohup ./mqnamesrv -n 192.168.194.134:9876 &

服务器 3:

nohup ./mqnamesrv -n 192.168.194.135:9876 &
3、配置 Broker

两对主从节点在不同的服务器上,服务器 1 上没有部署 broker。

需要修改每台 broker 的配置⽂件。

注意,同⼀台服务器上的两个 broker 保存路径不能⼀样。

  1. broker-a 的 master 节点

    服务器 2 上,进⼊到 conf/2m-2s-async ⽂件夹内,修改 broker-a.properties ⽂件

    # 所属集群名称
    brokerClusterName=DefaultCluster
    # broker名字
    brokerName=broker-a
    # broker所在服务器的ip(注意)
    brokerIP1=192.168.194.134
    # broker的id,0表示master,>0表示slave
    brokerId=0
    # 删除⽂件时间点,默认在凌晨4点
    deleteWhen=04
    # ⽂件保留时间为48⼩时
    fileReservedTime=48
    # broker的⻆⾊为master
    brokerRole=ASYNC_MASTER
    # 使⽤异步刷盘的⽅式
    flushDiskType=ASYNC_FLUSH
    # 名称服务器的地址列表(注意)
    namesrvAddr=192.168.194.133:9876;192.168.194.134:9876;192.168.194.135:9876
    # 在发送消息⾃动创建不存在的topic时,默认创建的队列数为4个
    defaultTopicQueueNums=4
    # 是否允许 Broker ⾃动创建Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=true
    # 是否允许 Broker ⾃动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=true
    # broker对外服务的监听端⼝
    listenPort=10911
    # abort⽂件存储路径
    abortFile=/usr/local/rocketmq/store/abort
    # 消息存储路径
    storePathRootDir=/usr/local/rocketmq/store
    # commitLog存储路径
    storePathCommitLog=/usr/local/rocketmq/store/commitlog
    # 消费队列存储路径
    storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
    # 消息索引存储路径
    storePathIndex=/usr/local/rocketmq/store/index
    # checkpoint⽂件存储路径
    storeCheckpoint=/usr/local/rocketmq/store/checkpoint
    # 限制的消息⼤⼩
    maxMessageSize=65536
    # commitLog每个⽂件的⼤⼩默认1G
    mapedFileSizeCommitLog=1073741824
    # ConsumeQueue每个⽂件默认存30W条,根据业务情况调整
    mapedFileSizeConsumeQueue=300000
    
  2. broker-a 的 slave 节点

    服务器 3 上,进⼊到 conf/2m-2s-async ⽂件夹内,修改 broker-a-s.properties ⽂件。

    brokerClusterName=DefaultCluster
    brokerName=broker-a
    # broker所在服务器的ip(注意)
    brokerIP1=192.168.194.135
    # broker的id,0表示master,>0表示slave
    brokerId=1
    deleteWhen=04
    fileReservedTime=48
    brokerRole=SLAVE
    flushDiskType=ASYNC_FLUSH
    # 名称服务器的地址列表(注意)
    namesrvAddr=192.168.194.133:9876;192.168.194.134:9876;192.168.194.135:9876
    defaultTopicQueueNums=4
    autoCreateTopicEnable=true
    autoCreateSubscriptionGroup=true
    listenPort=11011
    abortFile=/usr/local/rocketmq/store-slave/abort
    storePathRootDir=/usr/local/rocketmq/store-slave
    storePathCommitLog=/usr/local/rocketmq/store-slave/commitlog
    storePathConsumeQueue=/usr/local/rocketmq/store-slave/consumequeue
    storePathIndex=/usr/local/rocketmq/store-slave/index
    storeCheckpoint=/usr/local/rocketmq/store-slave/checkpoint
    maxMessageSize=65536
    
  3. broker-b 的 master 节点

    服务器 3 上,进⼊到 conf/2m-2s-async ⽂件夹内,修改 broker-b.properties ⽂件。

    brokerClusterName=DefaultCluster
    brokerName=broker-b
    # broker所在服务器的ip(注意)
    brokerIP1=192.168.194.135
    brokerId=0
    deleteWhen=04
    fileReservedTime=48
    brokerRole=ASYNC_MASTER
    flushDiskType=ASYNC_FLUSH
    # 名称服务器的地址列表(注意)
    namesrvAddr=192.168.194.133:9876;192.168.194.134:9876;192.168.194.135:9876
    defaultTopicQueueNums=4
    autoCreateTopicEnable=true
    autoCreateSubscriptionGroup=true
    listenPort=10911
    abortFile=/usr/local/rocketmq/store/abort
    storePathRootDir=/usr/local/rocketmq/store
    storePathCommitLog=/usr/local/rocketmq/store/commitlog
    storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
    storePathIndex=/usr/local/rocketmq/store/index
    storeCheckpoint=/usr/local/rocketmq/store/checkpoint
    maxMessageSize=65536
    
  4. broker-b 的 slave 节点

    服务器 2 上,进⼊到 conf/2m-2s-async ⽂件夹内,修改 broker-b-s.properties ⽂件。

    brokerClusterName=DefaultCluster
    # 
    brokerName=broker-b
    # 
    brokerIP1=192.168.194.134
    #
    brokerId=1
    deleteWhen=04
    fileReservedTime=48
    brokerRole=SLAVE
    flushDiskType=ASYNC_FLUSH
    # 名称服务器的地址列表(注意)
    namesrvAddr=192.168.194.133:9876;192.168.194.134:9876;192.168.194.135:9876
    defaultTopicQueueNums=4
    autoCreateTopicEnable=true
    autoCreateSubscriptionGroup=true
    listenPort=11011
    abortFile=/usr/local/rocketmq/store-slave/abort
    storePathRootDir=/usr/local/rocketmq/store-slave
    storePathCommitLog=/usr/local/rocketmq/store-slave/commitlog
    storePathConsumeQueue=/usr/local/rocketmq/store-slave/consumequeue
    storePathIndex=/usr/local/rocketmq/store-slave/index
    storeCheckpoint=/usr/local/rocketmq/store-slave/checkpoint
    maxMessageSize=65536
    
  5. 修改服务器 2 和服务器 3 的 bin/runbroker.sh ⽂件

    修改 JVM 内存默认的 8g 为 512m。(CentOS 的不需要)

    JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m"
    
4、启动 broker
  • 在服务器 2 中启动 broker-a(master)和 broker-b-s(slave)

    -c 表示指定哪个配置文件,需要注意的是路径

nohup ./mqbroker -c ../conf/2m-2s-async/broker-a.properties &
nohup ./mqbroker -c ../conf/2m-2s-async/broker-b-s.properties &
  • 在服务器 3 中启动 broker-b(master)和 broker-a-s(slave)
nohup ./mqbroker -c ../conf/2m-2s-async/broker-b.properties &
nohup ./mqbroker -c ../conf/2m-2s-async/broker-a-s.properties &

可以用 jps 命令查看一下服务

image

验证集群

使用 RocketMQ 提供的 tools 工具验证集群是否正常工作。

  1. 在服务器 2 上配置环境变量

用于被 tools 中的生产者和消费者程序读取该变量。

vim /etc/profileexport NAMESRV_ADDR='192.168.194.133:9876;192.168.194.134:9876;192.168.194.135:9876'source /etc/profile
  1. 启动生产者
./tools.sh org.apache.rocketmq.example.quickstart.Producer

报错:

[root@localhost bin]# ./tools.sh org.apache.rocketmq.example.quickstart.Producer
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c0000000, 268435456, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 268435456 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /usr/local/rocketmq/rocketmq-all-5.1.2-bin-release/bin/hs_err_pid9766.log

原因:内存不够

  1. 启动消费者
./tools.sh org.apache.rocketmq.example.quickstart.Consumer

可视化管理控制平台安装

RocketMQ 没有提供可视化管理控制平台,可以使⽤第三方管理控制平台:https://github.com/apache/rocketmq-externals/tree/rocketmq-console-1.0.0/rocketmq-console

  • 下载管理控制平台

  • 解压缩在 linux 服务器上(放在 /rocketmq/rocketmq-externals 目录下)

1、修改 rocketmq-externals/rocketmq-externals-master/rocketmqconsole/src/main/resources/application.properties 配置⽂件中的 nameserver 地址

rocketmq.config.namesrvAddr=192.168.194.133:9876;192.168.194.134:9876;192.168.194.135:9876

2、回到 rocketmq-externals/rocketmq-externals-master/rocketmqconsole 路径下执⾏ maven 命令进⾏打包

  • 安装 maven 环境(可以先输入 mvn 看是否有环境)
apt install maven
  • 打包
mvn clean package -Dmaven.test.skip=true

报错:

[ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:2.6:resources (default-resources) on project rocketmq-console-ng: Execution default-resources of goal org.apache.maven.plugins:maven-resources-plugin:2.6:resources failed: A required class was missing while executing org.apache.maven.plugins:maven-resources-plugin:2.6:resources: org/apache/maven/shared/filtering/MavenFilteringException
[ERROR] -----------------------------------------------------
[ERROR] realm =    plugin>org.apache.maven.plugins:maven-resources-plugin:2.6
[ERROR] strategy = org.codehaus.plexus.classworlds.strategy.SelfFirstStrategy
[ERROR] urls[0] = file:/usr/local/maven/repository/org/apache/maven/plugins/maven-resources-plugin/2.6/maven-resources-plugin-2.6.jar
[ERROR] urls[1] = file:/usr/local/maven/repository/org/codehaus/plexus/plexus-utils/2.0.5/plexus-utils-2.0.5.jar
[ERROR] urls[2] = file:/usr/local/maven/repository/org/apache/maven/shared/maven-filtering/1.1/maven-filtering-1.1.jar
[ERROR] urls[3] = file:/usr/local/maven/repository/org/codehaus/plexus/plexus-interpolation/1.13/plexus-interpolation-1.13.jar
[ERROR] Number of foreign imports: 1
[ERROR] import: Entry[import  from realm ClassRealm[maven.api, parent: null]]
[ERROR] 
[ERROR] -----------------------------------------------------
[ERROR] : org.apache.maven.shared.filtering.MavenFilteringException
[ERROR] -> [Help 1]
[ERROR] 
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR] 
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/PluginContainerException

解决方法:在 pom.xml 文件里添加两个插件,然后重新打包 jar 包

      <plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-resources-plugin</artifactId><version>3.1.0</version></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-war-plugin</artifactId><version>3.1.0</version></plugin>

参考博客:https://blog.csdn.net/csdn565973850/article/details/125785792

也可以看一下这篇文章:可视化控制台安装

运行完成之后,进入到 /target 目录下就可以看到生成的 jar 包了

  • /target 目录下运行 jar 包(jar包版本注意看target目录下的内容)
java -jar rocketmq-console-ng-1.0.0.jar

报错,显示端口被占用,进入 application.properties 文件修改端口号(笔者修改的是 8081)

  • 访问所在服务器的 8081 端口,查看集群界面,可以看到之前部署的集群。

image

消息示例

1、构建基础 Java 基础环境

在 maven 项⽬中构建出 RocketMQ 消息示例的基础环境,即创建生产者程序和消费者程序。

通过⽣产者和消费者了解 RocketMQ 操作消息的原生 API

image

  • 引入依赖
        <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.7.1</version></dependency>
  • 编写生产者程序
public class BaseProducer {public static void main(String[] args) throws Exception {//1.创建生产者DefaultMQProducer producer = new DefaultMQProducer("my-producer-group1");//2.指定nameserver的地址producer.setNamesrvAddr("192.168.194.133:9876");//3.启动生产者producer.start();//4.创建消息for (int i = 0; i < 10; i++) {Message message = new Message("MyTopic1","TagA",("hello rocketmq"+i).getBytes(StandardCharsets.UTF_8));//5.发送消息SendResult sendResult = producer.send(message);System.out.println(sendResult);}//6.关闭生产者producer.shutdown();}}

创建消息 new Message() 的参数含义:

  1. topic(主题):指定往哪个 topic 去发消息
  2. tags:过滤消息
  3. keys:消息的 key
  4. flag:消息的标记
  5. body:消息具体的内容
  6. waitStoreMsgOK:指定是否等待消息存储操作完成,默认值为 true,此时发送方法会等待消息在 Broker 存储操作完成后才返回。
  • 编写消费者程序
public class BaseConsumer {public static void main(String[] args) throws MQClientException {//1.创建消费者对象DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consumer-group1");//2.指明nameserver的地址consumer.setNamesrvAddr("192.168.194.133:9876");//3.订阅主题:topic 和 过滤消息用的tag表达式consumer.subscribe("MyTopic1","*");//4.创建一个监听器,当broker把消息推过来时调用consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {
//                    System.out.println("收到的消息:"+new String(msg.getBody()));System.out.println("收到的消息:"+msg);}// 返回一个消费成功状态// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;return null;}});//5.启动消费者consumer.start();System.out.println("消费者已启动");}}
  • 启动消费者和生产者,验证消息的收发

发送消息

image

消费消息

image

具体流程图

消息队列中的消费者通过订阅或者轮询的方式来获取消息队列中的消息。

  • 在一些消息队列系统中,消费者可以根据消息的偏移量(Offset)来获取 Broker 中的消息。这通常用于支持消息的顺序消费或者消息的重放机制
  • 对于消息的重放机制,消费者可以根据消息的偏移量重新消费一条或多条消息。通过指定特定的偏移量,消费者可以重新消费该偏移量之后的消息,以实现消息的重放或者回溯消费
  • 不同的消息队列系统可能有不同的机制和方式来处理消息的偏移量。一些消息队列系统使用偏移量来定位消息在日志文件中的位置,而另一些系统可能使用消息的序号或者其他标识来标识消息的顺序或位置。

image

每个 broker 有 2 个主节点和 2 个从节点(副本)。当消息进入系统后,它会在这 4 个服务器节点之间进行轮询,以实现消息的分发和冗余备份。

在消息发送时,每个 Topic 通常会与一个 broker 绑定。然后,在该 broker(包含主从)上创建的队列会被分配【四个队列ID】。这四个队列 ID 用于标识该 Topic 在 broker 上的不同分区或分片。每个队列 ID 对应一个特定的消息队列,用于存储和处理消息。通过使用多个队列 ID,可以实现消息的并行处理和负载均衡。

2、简单消息示例

简单消息分成三种: 同步消息、异步消息、单向消息。

同步消息

生产者发送消息后,必须等待 broker 返回信息后才继续之后的业务逻辑,在 broker 返回信息之前,生产者阻塞等待。

同步消息的应用场景:如重要通知消息、短信通知、短信营销系统等。(注重安全性,不希望消息丢失)

/*** 同步生产者*/
public class SyncProducer {public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {// Instantiate with a producer group name. -- 使用生产者组名进行123实例化DefaultMQProducer producer = newDefaultMQProducer("producerGroup1");// Specify name server addresses. -- 指定名称服务器地址producer.setNamesrvAddr("192.168.194.133:9876");// Launch the instance. -- 启动实例producer.start();for (int i = 0; i < 100; i++) {// Create a message instance, specifying topic, tag and message body.// 创建消息实例,指定主题、标记和消息主体Message msg = new Message("TopicTest" /* Topic */,"TagA" /* Tag */,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);// Call send message to deliver message to one of brokers.// 调用发送消息以将消息传递到 broker集群 之一SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);}// Shut down once the producer instance is not longer in use. -- 关闭生产者producer.shutdown();}}

异步消息

生产者发完消息后,不需要等待 broker 的回信,可以直接执行之后的业务逻辑。生产者提供一个回调函数SendCallback())供 broker 调用,体现了异步的方式。

当消费者处理消息完成后,通过回调函数通知生产者,生产者可以根据需要进行后续的处理。

具体的流程如下:

  1. 生产者发送消息到消息队列中,同时提供一个回调函数作为参数。
  2. 消息队列将消息保存,并将消息与回调函数进行关联。
  3. 消费者从消息队列中获取消息进行处理。
  4. 消费者处理完消息后,消息队列会调用与该消息关联的回调函数。
  5. 生产者在回调函数中执行相应的操作,例如更新状态、记录日志、发送通知等。

异步消息的应用场景:一般用于响应时间敏感的业务场景(注重时间)。

/*** 异步消息生产者*/
public class AsyncProducer {public static void main(String[] args) throws Exception {// Instantiate with a producer group name.DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");// Specify name server addresses.producer.setNamesrvAddr("192.168.194.133:9876");// Launch the instance.producer.start();producer.setRetryTimesWhenSendAsyncFailed(0); // 设置异步发送失败时的重试次数int messageCount = 100;final CountDownLatch countDownLatch = new CountDownLatch(messageCount);for (int i = 0; i < messageCount; i++) {try {final int index = i;Message msg = new Message("Jodie_topic_1023","TagA","OrderID188","Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));producer.send(msg, new SendCallback() {// 定义回调函数@Overridepublic void onSuccess(SendResult sendResult) {countDownLatch.countDown();System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {countDownLatch.countDown();System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();}});} catch (Exception e) {e.printStackTrace();}}System.out.println("============="); // 会在消息反馈前打印countDownLatch.await(5, TimeUnit.SECONDS); // 等待全部消息返回完成producer.shutdown();}}

单向消息

生产者发送完消息后不需要等待任何回复,直接进行之后的业务逻辑。

应用场景:单向传输用于需要中等可靠性的情况,例如日志收集.

/*** 单向消息生产者*/
public class OnewayProducer {public static void main(String[] args) throws Exception {// Instantiate with a producer group name.DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");// Specify name server addresses.producer.setNamesrvAddr("192.168.194.133:9876");// Launch the instance.producer.start();for (int i = 0; i < 100; i++) {// Create a message instance, specifying topic, tag and message body.Message msg = new Message("TopicTest" /* Topic */,"TagA" /* Tag */,("Hello RocketMQ " +i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);// Call send message to deliver message to one of brokers.producer.sendOneway(msg);}// Wait for sending to complete -- 等待发送完成Thread.sleep(5000);producer.shutdown();}}

3、顺序消息

顺序消息指的是:消费者消费消息的顺序按照发送者发送消息的顺序执行。

顺序消息分成两种: 局部顺序和全局顺序。

局部顺序

局部消息指的是:消费者消费某个 topic 的某个队列中的消息是顺序的。

消费者使用 MessageListenerOrderly 类做消息监听,实现局部顺序。

/*** 顺序消息*/
public class OrderProducer {public static void main(String[] args) throws Exception {// Instantiate with a producer group name.MQProducer producer = new DefaultMQProducer("example_group_name");//名字服务器的地址已经在环境变量中配置好了:NAMESRV_ADDR=192.168.194.133:9876//Launch the instance.producer.start();for (int i = 0; i < 10; i++) {int orderId = i;for(int j = 0 ; j <= 5 ; j ++){Message msg =new Message("OrderTopicTest", "order_"+orderId, "KEY" + orderId,("order_"+orderId+" step " + j).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg, new MessageQueueSelector() { // 消息队列选择器@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;int index = id % mqs.size();return mqs.get(index);}}, orderId);System.out.printf("%s%n", sendResult);}}// server shutdownproducer.shutdown();}}

环境变量配置:

image

运行结果:

image

从纵向来看,是乱序的;从横向来看,是有序的:

image

全局顺序

消费者消费全部消息都是顺序的(消费者按顺序消费全部消息),通常需要满足特定主题只有一个队列,并确保消费者按顺序从队列中获取消息,这种应用场景相对较少且性能较差。(因为只有一个 broker,也就意味着不能实现高可用)

  1. 某个特定主题(topic)只有一个队列:这意味着所有相关的消息都被发送到同一个队列中,确保消息按照顺序存储在队列中。
  2. 消费者从该队列中顺序消费消息:消费者按照先后顺序从队列中获取消息,并且保证每个消息都被处理完毕后再获取下一个消息,以确保顺序性。

image

乱序消费

消费者消费消息不需要关注消息的顺序。消费者使用 MessageListenerConcurrently 类做消息监听。

public class OrderConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("OrderTopicTest", "*");consumer.registerMessageListener(new MessageListenerConcurrently() { // 一个普通的监听器@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for(MessageExt msg:msgs){System.out.println("消息内容:"+new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");}
}

如下图所示:

image

4、广播消息

广播是向主题 (topic) 的所有订阅者发送消息。订阅同一个 topic 的多个消费者,能全量收到生产者发送的所有消息。

消费者

public class BroadcastConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// set to broadcast mode -- 设置为广播模式consumer.setMessageModel(MessageModel.BROADCASTING);consumer.subscribe("TopicTest", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for(MessageExt msg:msgs){System.out.println("消息内容:"+new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Broadcast Consumer Started.%n");}
}

生产者

public class BroadcastProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");producer.start();for (int i = 0; i < 100; i++){Message msg = new Message("TopicTest","TagA","OrderID188",("Hello world"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);}producer.shutdown();}
}

5、延迟消息

延迟消息与普通消息的不同之处在于,它们要等到指定的时间之后才会被传递

延迟消息生产者

public class ScheduledProducer {public static void main(String[] args) throws Exception {// Instantiate a producer to send scheduled messagesDefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");// Launch producerproducer.start();int totalMessagesToSend = 100;for (int i = 0; i < totalMessagesToSend; i++) {Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());// 设置延迟等级为 3 级,此消息将在 10 秒后传递给消费者。message.setDelayTimeLevel(3);// Send the messageproducer.send(message);}// Shutdown producer after use.producer.shutdown();}
}

延迟等级如下所示:

image

系统为这 18 个等级配置了 18 个 topic,⽤于实现延迟队列的效果:

在商业版 RocketMQ 中,不仅可以设置延迟等级,还可以设置具体的延迟时间,但是在社区版 RocketMQ 中,只能设置延迟等级。

image

延迟消息消费者

public class ScheduledConsumer {public static void main(String[] args) throws MQClientException {// Instantiate message consumerDefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");// Subscribe topicsconsumer.subscribe("TestTopic", "*");// Register message listenerconsumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {for (MessageExt message : messages) {// Print approximate delay time period -- 打印大约延迟时间周期System.out.println("Receive message[msgId=" + message.getMsgId() + "] "+ (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// Launch consumerconsumer.start();}
}

6、批量消息

批量发送消息提高了传递小消息的性能

使用批量消息

public class BatchProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");producer.start();String topic = "BatchTest";List<Message> messages = new ArrayList<>();messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));producer.send(messages);producer.shutdown();}
}

超出限制的批量消息

官方建议批量消息的总大小不应超过 1m,实际不应超过 4m。

如果超过 4m 的批量消息需要进行分批处理,同时设置 broker 的配置参数为 4m (在 broker 的配置文件中修改: maxMessageSize=4194394

上面默认配置的是:

# 限制的消息⼤⼩
maxMessageSize=65536

具体代码如下:

public class MaxBatchProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");producer.start();//large batchString topic = "BatchTest";// 发送 一万 条信息List<Message> messages = new ArrayList<>(100*1000);for (int i = 0; i < 100*1000; i++) {messages.add(new Message(topic, "Tag", "OrderID" + i, ("Hello world " + i).getBytes()));}
//        producer.send(messages);//split the large batch into small ones: 将大批分成小批ListSplitter splitter = new ListSplitter(messages);while (splitter.hasNext()) {List<Message> listItem = splitter.next();producer.send(listItem);}producer.shutdown();}}

在使用 ListSplitter 进行消息拆分时,需要注意以下限制:

  1. 同一批次的消息应具有相同的主题,即拆分的消息列表应属于同一个主题。
  2. 同一批次的消息应具有相同的 waitStoreMsgOK 参数,即等待消息存储成功的设置应相同。
  3. ListSplitter 不支持拆分延迟消息,即消息的延迟级别应为 0。
  4. ListSplitter 不支持拆分事务消息,即不支持对事务消息使用该工具类进行拆分。

7、过滤消息

在大多数情况下,标签是一种简单而有用的设计,可以用来选择您想要的消息。

tag 过滤的生产者

public class TagProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.start();String[] tags = new String[] {"TagA", "TagB", "TagC"};for (int i = 0; i < 15; i++) {Message msg = new Message("TagFilterTest",tags[i % tags.length],"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg);// 将 sendResult 的值格式化为字符串,并以新的一行123打印到控制台System.out.printf("%s%n", sendResult);}producer.shutdown();}}

"%s%n" 的含义:

  • %s 是格式化字符串中的占位符,表示将要替换为字符串类型的值。
  • %n 则是换行符,表示在输出结果中添加一个新的空行。

tag 过滤的消费者

public class TagConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");consumer.subscribe("TagFilterTest", "TagA || TagC");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");}
}

使用 SQL 过滤

SQL 功能可以通过您在发送消息时输入的属性进行一些计算

在 RocketMQ 定义的语法下,可以实现一些有趣的逻辑。这是一个例子:

------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 10 | --------------------> Gotten
| b = 'abc'|
| c = true |
------------
------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 1 | --------------------> Missed
| b = 'abc'|
| c = true |
------------
  • 语法

RocketMQ 只定义了一些基本的语法来支持这个特性,也可以轻松扩展它。

  1. 数值比较,如 >, >=, <, <=, BETWEEN, =
  2. 字符比较,如 =, <>, IN
  3. IS NULLIS NOT NULL
  4. 逻辑 AND, OR, NOT

常量类型有:

  1. 数字,如 123、3.1415
  2. 字符,如 'abc',必须用单引号
  3. NULL,特殊常数
  4. 布尔值,TRUEFALSE

使用注意: 只有推模式的消费者可以使用 SQL 过滤。拉模式是用不了的。

SQL 过滤的生产者示例

public class SQLProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.start();String[] tags = new String[] {"TagA", "TagB", "TagC"};for (int i = 0; i < 15; i++) {Message msg = new Message("SqlFilterTest",tags[i % tags.length],("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));// Set some properties. -- 设置一些属性msg.putUserProperty("a", String.valueOf(i));SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);}producer.shutdown();}
}

SQL 过滤的消费者示例

使用前提

在 broker 的配置文件中加上 enablePropertyFilter=true 属性。

public class SQLConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");// Don't forget to set enablePropertyFilter=true in brokerconsumer.subscribe("SqlFilterTest",MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +"and (a is not null and a between 0 and 3)"));consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");}
}

8、事务消息

事务消息的定义

它可以被认为是一个两阶段的提交消息实现,以确保分布式系统的最终一致性。

事务性消息确保本地事务的执行和消息的发送可以原子地执行。

事务消息有三种状态

  1. TransactionStatus.CommitTransaction: 提交事务,表示允许消费者消费该消息。
  2. TransactionStatus.RollbackTransaction: 回滚事务,表示该消息将被删除,不允许消费。
  3. TransactionStatus.Unknown: 中间状态,表示需要 MQ 回查才能确定状态。

事务消息的实现流程

事务消息的实现流程

场景联想:

下订单 – 支付,这两个阶段。

half 消息在哪里?

image

生产者

public class TransactionProducer {public static void main(String[] args) throws Exception {// 事务监听器TransactionListener transactionListener = new TransactionListenerImpl();TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");producer.setNamesrvAddr("192.168.194.133:9876");// 线程池ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setName("client-transaction-msg-check-thread");return thread;}});producer.setExecutorService(executorService);producer.setTransactionListener(transactionListener);producer.start();String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};for (int i = 0; i < 10; i++) {try {Message msg =new Message("TopicTest", tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.sendMessageInTransaction(msg, null);System.out.printf("%s%n", sendResult);Thread.sleep(10);} catch (MQClientException | UnsupportedEncodingException e) {e.printStackTrace();}}for (int i = 0; i < 100000; i++) {Thread.sleep(1000);}producer.shutdown();}
}

本地事务处理 – 事务监听器实现类 TransactionListenerImpl

public class TransactionListenerImpl implements TransactionListener {/*** When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.* 当发送事务准备(半)消息成功时,将调用此方123法 执行123本地事务。** @param msg Half(prepare) message* @param arg Custom business parameter 自定义业务参数* @return 事务状态*/@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {String tags = msg.getTags();if(StringUtils.contains(tags,"TagA")){return LocalTransactionState.COMMIT_MESSAGE;}else if(StringUtils.contains(tags,"TagB")){return LocalTransactionState.ROLLBACK_MESSAGE;}else{return LocalTransactionState.UNKNOW;}}/*** 回查本地事务** @param msg Check message* @return Transaction state*/@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {String tags = msg.getTags();if(StringUtils.contains(tags,"TagC")){return LocalTransactionState.COMMIT_MESSAGE;}else if(StringUtils.contains(tags,"TagD")){return LocalTransactionState.ROLLBACK_MESSAGE;}else{return LocalTransactionState.UNKNOW;}}
}

消费者

public class TransactionConsumer {public static void main(String[] args) throws MQClientException {//1.创建消费者对象DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consumer-group1");//2.指明nameserver的地址consumer.setNamesrvAddr("192.168.194.133:9876");//3.订阅主题:topic 和过滤消息用的tag表达式consumer.subscribe("TopicTest","*");//4.创建一个监听器,当broker把消息推过来时调用consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {
//                    System.out.println("收到的消息:"+new String(msg.getBody()));System.out.println("收到的消息:"+msg);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//5.启动消费者consumer.start();System.out.println("消费者已启动");}
}

使用限制

  • 事务性消息没有调度和批处理⽀持。
  • 为避免单条消息被检查次数过多,导致半队列消息堆积,我们默认将单条消息的检查次数限制为 15 次,但⽤户可以通过更改 “transactionCheckMax” 来更改此限制。参数在 broker 的配置中,如果⼀条消息的检查次数超过 “transactionCheckMax” 次,broker 默认会丢弃这条消息,同时打印错误日志。用户可以通过重写 “AbstractTransactionCheckListener” 类来改变这种行为。
  • 事务消息将在⼀定时间后检查,该时间由代理配置中的参数 “transactionTimeout” 确定。并且用户也可以在发送事务消息时通过设置用户属性 “CHECK_IMMUNITY_TIME_IN_SECONDS” 来改变这个限制,这个参数优先于 “transactionMsgTimeout” 参数。
  • ⼀个事务性消息可能会被检查或消费不止⼀次。
  • 提交给用户目标主题的消息 reput 可能会失败。目前,它取决于日志记录。高可用是由 RocketMQ 本身的高可用机制来保证的。如果要保证事务消息不丢失,保证事务完整性,推荐使用同步双写机制
  • 事务性消息的⽣产者 ID 不能与其他类型消息的⽣产者 ID 共享。与其他类型的消息不同,事务性消息允许向后查询。MQ 服务器通过其⽣产者 ID 查询客户端。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/240583.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

postgresql|数据库|LVM快照热备冷恢复数据库的思考

一&#xff0c; LVM快照备份的意义 数据库备份一直是数据库运维工作中的重点&#xff0c;一个完备的备份不仅仅是仅有后悔药的功能&#xff0c;还可能有迁移数据库的作用。 那么&#xff0c;数据库备份系统我们需要的&#xff0c;也就是看重的是四个点&#xff0c;甚至更多的…

【LeetCode刷题笔记】前缀树

208. 实现 Trie (前缀树) 解题思路: 1. 前缀树 Map实现 ,使用一个 Map<Character, Trie> 来存储 每个字符 对应的 若干子节点 ,在构造函数中初始化 根节点 root 为 当前对象实例 , 在 插入

java的XWPFDocument3.17版本学习

maven依赖 <dependency><groupId>org.apache.poi</groupId><artifactId>poi-ooxml</artifactId><version>3.17</version> </dependency> 测试类&#xff1a; import org.apache.poi.openxml4j.exceptions.InvalidFormatExcep…

Python+Playwright自动化测试--playwright处理浏览器多窗口切换

1.简介 浏览器多窗口的切换问题相比大家不会陌生吧&#xff0c;之前小编在javaselenium系列文章中就有介绍过。大致步骤就是&#xff1a;使用selenium进行浏览器的多个窗口切换测试&#xff0c;如果我们打开了多个网页&#xff0c;进行网页切换时&#xff0c;我们需要先获取各…

R软件包ConsensusCluster进行共识聚类(Consensus Clustering)

从下面论文看到这个方法&#xff1a; Wang, Xin, et al. "Deep learning using bulk RNA-seq data expands cell landscape identification in tumor microenvironment." Oncoimmunology 11.1 (2022): 2043662. 这篇论文基于 AI 方法对 bulk RNA-seq 数据识别肿瘤微环…

10、基于LunarLander登陆器的Dueling DDQN强化学习(含PYTHON工程)

10、基于LunarLander登陆器的Dueling DDQN强化学习&#xff08;含PYTHON工程&#xff09; LunarLander复现&#xff1a; 07、基于LunarLander登陆器的DQN强化学习案例&#xff08;含PYTHON工程&#xff09; 08、基于LunarLander登陆器的DDQN强化学习&#xff08;含PYTHON工程…

【动态规划算法(dp算法)】之背包问题

文章目录 背包问题动规五部曲一、0-1背包问题 &#xff1a;限制物品不可重复 (要么不选 要么选一个)二、完全背包问题&#xff1a;不限制重复&#xff08;要么不选 要么可以多选&#xff09;&#xff08;完全背包可以转化为0-1背包问题&#xff09; 动态规划&#xff1a;01背包…

使用OpenCV4实现工业缺陷检测的六种方法

目录 1 机器视觉2 缺陷检测3 工业上常见缺陷检测方法 1 机器视觉 机器视觉是使用各种工业相机&#xff0c;结合传感器跟电气信号实现替代传统人工&#xff0c;完成对象识别、计数、测量、缺陷检测、引导定位与抓取等任务。其中工业品的缺陷检测极大的依赖人工完成&#xff0c;…

企业数据治理:(1)概述

目录 确定数据治理的规范与标准 设立与数据治理规范和标准相匹配的组织架构 明确数据治理的范围 制定切实可行的治理目标与实施计划 数据治理是企业IT系统建设当中的重要组成部分&#xff0c;是一种综合性的方法和实践&#xff0c;目的在确保数据的完整性、准确性和一致性。…

部署tomcat单机多实例,keepalived+mysql的互为主从高可用,mysql+keepalived高可用

部署tomcat单机多实例 在Tomcat中部署单机多实例是一种常见的做法&#xff0c;它允许您在同一台服务器上运行多个独立的Tomcat实例&#xff0c;每个实例都有自己的配置、日志和应用程序。 安装jdk环境 首先配置java环境 [roottomcat ~]# tar xf jdk-8u211-linux-x64.tar.gz…

nodejs+vue+ElementUi会员制停车场车位系统

总之&#xff0c;智能停车系统使停车场管理工作规范化&#xff0c;系统化&#xff0c;程序化&#xff0c;避免停车场管理的随意性&#xff0c;提高信息处理的速度和准确性&#xff0c;能够及时、准确、有效的查询和修改停车场情况。 三、任务&#xff1a;小组任务和个人任务 智…

旅游海报图怎么做二维码展示?扫码即可查看图片

现在旅游攻略的海报可以做成二维码印刷在宣传单单页或者分享给用户来了解目的地的实际情况&#xff0c;出行路线、宣传海报等。用户只需要扫描二维码就可以查看内容&#xff0c;更加的方便省劲&#xff0c;那么旅游海报的图片二维码制作的技巧有哪些呢&#xff1f;使用图片二维…

2015年第四届数学建模国际赛小美赛A题飞机上的细长座椅解题全过程文档及程序

2015年第四届数学建模国际赛小美赛 A题 飞机上的细长座椅 原题再现&#xff1a; 航空公司座位是指在旅途中乘客可以乘坐的座位。一些航空公司现在推出了新的经济舱“超薄”座位。这些座椅除了重量较轻外&#xff0c;理论上还允许航空公司在不显著影响乘客舒适度的情况下增加运…

STL中优先队列的模拟实现与仿函数的介绍

文章目录 仿函数优先队列的模拟实现 仿函数 上回我们说到&#xff0c;优先队列的实现需要用到仿函数的特性 让我们再回到这里 这里我们发现他传入的用于比较的东西竟然是一个类模板&#xff0c;而不是我们所见到的函数 我们可以先创建一个类&#xff0c;用于比较大小 struc…

陶建辉在 CIAS 2023 谈“新能源汽车的数字化”

近年&#xff0c;中国的新能源汽车发展迅猛&#xff0c;在全球竞争中表现出色&#xff0c;已经连续 8 年保持全球销量第一。在新兴技术的推动下&#xff0c;新能源汽车的数字化转型也正在加速进行&#xff0c;从汽车制造到能源利用、人机交互&#xff0c;各个环节都在进行数字化…

RobotMaster学习——工序导入,参数设置,轨迹生成

目录 引出1.导入工序2.修改刀具其他刀具参数 3.进行工序分配4.设置TCP5.设置工作站6.工序整体导入配置7.进行计算 总结 引出 RobotMaster的操作流程&#xff0c;从导入工序到生产轨迹。 1.导入工序 2.修改刀具 要选择第七把刀具 其他刀具参数 第一把刀具 第二把刀具 第三把刀…

C语言的分支和循环语句

各位少年&#xff0c;今天和大家分享的是分支语句循环体语句&#xff0c;C语言是结构体的程序设计语言&#xff0c;这里的结构指的是&#xff08;顺序结构&#xff09;&#xff08;选择结构&#xff09;&#xff08;循环结构&#xff09;C语言是能够实现这三种结构的&#xff0…

作为程序员,你知道 Notion 吗?

Notion 是一款极其出色的个人笔记软件&#xff0c;它将“万物皆对象”的思维运用到笔记中&#xff0c;让使用者可以天马行空地去创造、拖拽、链接。也适用于康奈尔笔记法哦。 不知大家会不会有如下烦恼&#xff1a; 1.当你下载了许多 APP&#xff0c;也注册了许多账号&#x…

Prometheus-JVM

一. JVM监控 通过 jmx_exporter 启动端口来实现JVM的监控 Github Kubernetes Deployment Java 服务&#xff0c;修改 wget https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/0.19.0/jmx_prometheus_javaagent-0.19.0.jar# 编写配置文件&#xff0…

Nacos单机安装

采用的版本是Nacos Release 2.3.0 (Nov 30, 2023) alibaba/nacos GitHub 依赖于jdk&#xff0c;要先安装好jdk1.8。 修改配置 下载解压后&#xff0c;修改配置文件&#xff1a;conf/application.properties。 nacos.core.auth.plugin.nacos.token.secret.key 官方文档Na…