今年是不平凡的一年,对于每个个体都是。不论我们在哪儿,经历了什么,向前走总没错。虽然方向也很重要,但是不要在一个地方停太久,You young
编者荐语:RocketMQ 逐渐成为最主流的消息队列,学习 RocketMQ 是每个攻城狮要做的事
一、消息中间件
什么是消息系统
二、RocketMq简介
发展
概念术语
架构组成
三、安装 RocketMq
系统环境
安装部署 RocketMQ
RocketMq插件(可视化)
四、配置 RocketMq
启动RocketMQ
验证启动是否成功
生产环境 ACL 权限认证
五、RocketMq 集群
RocketMq 集群、拓展阅读
六、SpringBoot 集成 RocketMq
七、SpringMVC 架构中使用 Demo
参考
一、消息中间件
什么是消息系统
简单来说:消息被发送到队列中。“消息队列”(Message Queue)是在消息的传输过程中保存消息的容器。
- 拓展阅读
消息队列作为高并发系统的核心组件之一,能够帮助业务系统解构提升开发效率和系统稳定性。主要具有以下优势:
削峰填谷(主要解决瞬时写压力大于应用服务能力导致消息丢失、系统奔溃等问题) 系统解耦(解决不同重要程度、不同能力级别系统之间依赖导致一死全死) 提升性能(当存在一对多调用时,可以发一条消息给消息系统,让消息系统通知相关系统) 蓄流压测(线上有些链路不好压测,可以通过堆积一定量消息再放开来压测) 目前主流的MQ主要是Rocketmq、kafka、Rabbitmq,Rocketmq相比于Rabbitmq、kafka具有主要优势特性有:
• 支持事务型消息(消息发送和DB操作保持两方的最终一致性,rabbitmq和kafka不支持) • 支持结合rocketmq的多个系统之间数据最终一致性(多方事务,二方事务是前提) • 支持18个级别的延迟消息(rabbitmq和kafka不支持) • 支持指定次数和时间间隔的失败消息重发(kafka不支持,rabbitmq需要手动确认) • 支持consumer端tag过滤,减少不必要的网络传输(rabbitmq和kafka不支持) • 支持重复消费(rabbitmq不支持,kafka支持)
二、RocketMq简介
官网地址:http://rocketmq.apache.org/
发展
阿里巴巴消息中间件起源于2001年的五彩石项目,Notify 在这期间应运而生,用于交易核心消息的流转。
至2010年,B2B开始大规模使用ActiveMQ作为消息内核,随着阿里业务的快速发展,急需一款支持顺序消息,拥有海量消息堆积能力的消息中间件,MetaQ 1.0在2011年诞生。
到2012年,MetaQ已经发展到了MetaQ 3.0,并抽象出了通用的消息引擎RocketMQ。随后,将RocketMQ进行了开源,阿里的消息中间件正式走入了公众的视野。
到2015年,RocketMQ已经经历了多年双十一的洗礼,在可用性、可靠性以及稳定性等方面都有出色的表现。与此同时,云计算大行其道,阿里消息中间件基于RocketMQ推出了Aliware MQ 1.0,开始为阿里云上成千上万家企业提供消息服务。
到今年,MetaQ在2016年双十一承载了万亿级消息的流转,跨越了一个新的里程碑,同时RocketMQ进入Apache 孵化。
RocketMQ可以保证严格的消息顺序
概念术语
Producer Group
标识发送同一类消息的Producer,通常发送逻辑一致。发送普通消息的时候,仅标识使用,并无特别用处。若事务消息,如果某条发送某条消息的producer-A宕机,使得事务消息一直处于PREPARED状态并超时,则broker会回查同一个group的其 他producer,确认这条消息应该commit还是rollback。但开源版本并不支持事务消息。
Consumer Group
标识一类Consumer的集合名称,这类Consumer通常消费一类消息,且消费逻辑一致。同一个Consumer Group下的各个实例将共同消费topic的消息,起到负载均衡的作用。
消费进度以Consumer Group为粒度管理,不同Consumer Group之间消费进度彼此不受影响,即消息A被Consumer Group1消费过,也会再给Consumer Group2消费。
注:RocketMQ要求同一个Consumer Group的消费者必须要拥有相同的注册信息,即必须要听一样的topic(并且tag也一样)。
Topic
标识一类消息的逻辑名字,消息的逻辑管理单位。无论消息生产还是消费,都需要指定Topic。
Tag
RocketMQ支持给在发送的时候给topic打tag,同一个topic的消息虽然逻辑管理是一样的。但是消费topic1的时候,如果你订阅的时候指定的是tagA,那么tagB的消息将不会投递。
Message Queue
简称Queue或Q。消息物理管理单位。一个Topic将有若干个Q。若Topic同时创建在不通的Broker,则不同的broker上都有若干Q,消息将物理地存储落在不同Broker结点上,具有水平扩展的能力。
无论生产者还是消费者,实际的生产和消费都是针对Q级别。例如Producer发送消息的时候,会预先选择(默认轮询)好该Topic下面的某一条Q地发送;Consumer消费的时候也会负载均衡地分配若干个Q,只拉取对应Q的消息。
每一条message queue均对应一个文件,这个文件存储了实际消息的索引信息。并且即使文件被删除,也能通过实际纯粹的消息文件(commit log)恢复回来。
Offset
RocketMQ中,有很多offset的概念。但通常我们只关心暴露到客户端的offset。一般我们不特指的话,就是指逻辑Message Queue下面的offset。
注:逻辑offset的概念在RocketMQ中字面意思实际上和真正的意思有一定差别,这点在设计上显得有点混乱。祥见下面的解释。
可以认为一条逻辑的message queue是无限长的数组。一条消息进来下标就会涨1,而这个数组的下标就是offset。
max offset
字面上可以理解为这是标识message queue中的max offset表示消息的最大offset。但是从源码上看,这个offset实际上是最新消息的offset+1,即:下一条消息的offset。
min offset:
标识现存在的最小offset。而由于消息存储一段时间后,消费会被物理地从磁盘删除,message queue的min offset也就对应增长。这意味着比min offset要小的那些消息已经不在broker上了,无法被消费。
consumer offset
字面上,可以理解为标记Consumer Group在一条逻辑Message Queue上,消息消费到哪里即消费进度。但从源码上看,这个数值是消费过的最新消费的消息offset+1,即实际上表示的是下次拉取的offset位置。
消费者拉取消息的时候需要指定offset,broker不主动推送消息, offset的消息返回给客户端。
consumer刚启动的时候会获取持久化的consumer offset,用以决定从哪里开始消费,consumer以此发起第一次请求。
每次消息消费成功后,这个offset在会先更新到内存,而后定时持久化。在集群消费模式下,会同步持久化到broker,而在广播模式下,则会持久化到本地文件。
架构组成
三、安装 RocketMq
系统环境
JDK8 Windows10
安装部署 RocketMQ
- 下载
地址:http://rocketmq.apache.org/release_notes/release-notes-4.3.0/rocketmq-all-4.3.0-bin-release.zip
选择 Binary 进行下载
解压
环境变量配置
必须配置
变量名:ROCKETMQ_HOME 变量值(绝对路径):D:\rocketMq\rocketmq-all-4.3.0-bin-release
环境变量 Path 中也配置:%ROCKETMQ_HOME%\bin
RocketMq插件(可视化)
选择性安装
- 下载
git clone https://github.com/apache/rocketmq-externals.git
解压
修改配置
server.contextPath=
server.port=8088
#spring.application.index=true
spring.application.name=rocketmq-console
spring.http.encoding.charset=UTF-8
spring.http.encoding.enabled=true
spring.http.encoding.force=true
logging.config=classpath:logback.xml
#if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876
rocketmq.config.namesrvAddr=127.0.0.1:9876
#if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true
rocketmq.config.isVIPChannel=
#rocketmq-console's data path:dashboard/monitor
rocketmq.config.dataPath=/tmp/rocketmq-console/data
#set it false if you don't want use dashboard.default true
rocketmq.config.enableDashBoardCollect=true
- 编译
cd \rocketmq-externals\rocketmq-console
mvn clean package -Dmaven.test.skip=true
- 启动
cd \rocketmq-externals\rocketmq-console\target
java -jar rocketmq-console-ng-2.0.0.jar
- 启动成功
浏览器访问:http://127.0.0.1:8088
四、配置 RocketMq
启动RocketMQ
默认配置启动
- 启动
启动NAMESERVER:
Cmd命令框执行进入至‘MQ文件夹\bin’下,然后执行‘start mqnamesrv.cmd’,启动NAMESERVER。成功后会弹出提示框,此框勿关闭。
启动BROKER:
Cmd命令框执行进入至‘MQ文件夹\bin’下,然后执行‘start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true’,启动BROKER。成功后会弹出提示框,此框勿关闭。
验证启动是否成功
验证生产消息正常
执行如下命令:
export NAMESRV_ADDR=localhost:9876 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
能看到类似如下输出:
SendResult [sendStatus=SEND_OK, msgId=C0A82BC5F36C511D50C05B41...
验证消费消息正常
执行如下命令:
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
能看到类似如下输出:
ConsumeMessageThread_4 Receive New Messages: [MessageExt [queueId=3, stor....
其他命令
RocketMQ 自带的连接工具是 maadmin
,执行 mqadmin.cmd
就可以查看有哪些命令参数可用。
$ mqadmin.cmd
The most commonly used mqadmin commands are:
updateTopic Update or create topic
deleteTopic Delete topic from broker and NameServer.
updateSubGroup Update or create subscription group
deleteSubGroup Delete subscription group from broker.
updateBrokerConfig Update broker's config
updateTopicPerm Update topic perm
topicRoute Examine topic route info
topicStatus Examine topic Status info
topicClusterList get cluster info for topic
brokerStatus Fetch broker runtime status data
queryMsgById Query Message by Id
queryMsgByKey Query Message by Key
queryMsgByUniqueKey Query Message by Unique key
queryMsgByOffset Query Message by offset
queryMsgByUniqueKey Query Message by Unique key
printMsg Print Message Detail
printMsgByQueue Print Message Detail
sendMsgStatus send msg to broker.
brokerConsumeStats Fetch broker consume stats data
producerConnection Query producer's socket connection and client version
consumerConnection Query consumer's socket connection, client version and subscription
consumerProgress Query consumers's progress, speed
consumerStatus Query consumer's internal data structure
cloneGroupOffset clone offset from other group.
clusterList List all of clusters
topicList Fetch all topic list from name server
updateKvConfig Create or update KV config.
deleteKvConfig Delete KV config.
wipeWritePerm Wipe write perm of broker in all name server
resetOffsetByTime Reset consumer offset by timestamp(without client restart).
updateOrderConf Create or update or delete order conf
cleanExpiredCQ Clean expired ConsumeQueue on broker.
cleanUnusedTopic Clean unused topic on broker.
startMonitoring Start Monitoring
statsAll Topic and Consumer tps stats
allocateMQ Allocate MQ
checkMsgSendRT check message send response time
clusterRT List All clusters Message Send RT
getNamesrvConfig Get configs of name server.
updateNamesrvConfig Update configs of name server.
getBrokerConfig Get broker config by cluster or special broker!
queryCq Query cq command.
sendMessage Send a message
consumeMessage Consume message
See 'mqadmin help <command>' for more information on a specific command.
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=128m; support was removed in 8.0
启动成功后先简单执行一个命令(在 linux 上去掉 .cmd 即可)
- 查询 topic 列表
mqadmin.cmd topicList -n '127.0.0.1:9876'
生产环境 ACL 权限认证
修改配置
使用默认配置,像上一节我们就可以启动成功。但是在生产环境中,我们一定要加权限认证
打开 aclEnable 开关
在文件
conf/broker.conf
新增aclEnable=true
新建文件
conf/plain_acl.yml
,填入如下文本
globalWhiteRemoteAddresses:
accounts:
- accessKey: RocketMQ
secretKey: 12345678
whiteRemoteAddress:
admin: false
defaultTopicPerm: DENY
defaultGroupPerm: SUB
topicPerms:
- TopicTest=PUB
groupPerms:
# the group should convert to retry topic
- oms_consumer_group=DENY
- accessKey: admin
secretKey: 12345678
whiteRemoteAddress:
# if it is admin, it could access all resources
admin: true
- 重启 RocketMQ
start mqnamesrv.cmd
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
名词介绍
globalWhiteRemoteAddresses
全局白名单,其类型为数组,即支持多个配置。其支持的配置格式如下:
空 表示不设置白名单,该条规则默认返回false。“” 表示全部匹配,该条规则直接返回true,将会阻断其他规则的判断,请慎重使用。192.168.0.{100,101} 多地址配置模式,ip地址的最后一组,使用{},大括号中多个ip地址,用英文逗号(,)隔开。192.168.1.100,192.168.2.100 直接使用,分隔,配置多个ip地址。192.168..或192.168.100-200.10-20 每个IP段使用 "" 或"-"表示范围。
accounts
配置用户信息,该类型为数组类型。拥有accessKey、secretKey、whiteRemoteAddress、admin、defaultTopicPerm、defaultGroupPerm、topicPerms、groupPerms子元素。
accessKey 登录用户名,长度必须大于6个字符。
secretKey 登录密码。长度必须大于6个字符。
whiteRemoteAddress 用户级别的IP地址白名单。其类型为一个字符串,其配置规则与globalWhiteRemoteAddresses,但只能配置一条规则。
admin boolean类型,设置是否是admin。如下权限只有admin=true时才有权限执行。
UPDATE_AND_CREATE_TOPIC 更新或创建主题。UPDATE_BROKER_CONFIG 更新Broker配置。DELETE_TOPIC_IN_BROKER 删除主题。UPDATE_AND_CREATE_SUBSCRIPTIONGROUP 更新或创建订阅组信息。DELETE_SUBSCRIPTIONGROUP 删除订阅组信息。5. defaultTopicPerm 默认topic权限。该值默认为DENY(拒绝)。
defaultGroupPerm 默认消费组权限,该值默认为DENY(拒绝),建议值为SUB。
topicPerms 设置topic的权限。其类型为数组,其可选择值在下节介绍。
groupPerms 设置消费组的权限。其类型为数组,其可选择值在下节介绍。可以为每一消费组配置不一样的权限。
五、RocketMq 集群
RocketMq 集群、拓展阅读
参考:https://www.jianshu.com/p/2838890f3284
六、SpringBoot 集成 RocketMq
代码会同步到 GitHub,回复github领取
七、SpringMVC 架构中使用 Demo
参考
安装部署参考博客:https://www.imooc.com/article/290089
完整中文参考文章:http://www.itmuch.com/books/rocketmq/operation.html
阿里文档参考:https://help.aliyun.com/document_detail/29533.html
我是pub哥,Java服务端工程师,对大数据懂一点,三观要正。我们下期再见
财经相关:
最近几天白酒、半导体、新能源都在上涨,应该有很多被洗人,投资建议,有持仓的韭菜,下跌再抛出。