RocketMQ 是一款由阿里巴巴集团开发并开源给Apache软件基金会的分布式消息及流处理平台。以其高吞吐量、低延迟、高可用性等特点而广受欢迎。支持Java,C++, Python, Go, .NET等。
异步解耦:可以实现上游和下游业务系统的松耦合设计,使得服务部分节点异常不会影响到核心交易系统的正常运转。在电商、金融等分布式系统中,这种解耦设计尤为重要。
削峰填谷:在如秒杀、大促等大型活动中,系统会面临巨大的流量冲击。RocketMQ利用其高性能的消息处理能力,可以有效地应对这种流量冲击,保证系统的稳定运行。
顺序消息:支持顺序消息(分区有序),可以确保消息的先进先出。这在交易系统中的订单创建、支付、退款等流程中尤为重要,因为这些流程对消息的顺序有严格要求。
分布式事务消息:支持分布式事务消息,可以保证分布式事务的强一致性。这在涉及多个服务的分布式系统中非常有用,可以确保数据的一致性和完整性。
RocketMQ优点
高吞吐量和低延迟:能够处理大规模消息流,并提供低延迟的消息传递。这使得它非常适合处理高并发的应用场景,如电子商务和金融交易系统。
可靠性:具有高度可靠的消息传递机制。它支持消息持久化和复制,确保消息不会丢失,并能够在故障发生时进行自动恢复。
分布式扩展:支持水平扩展,可以方便地添加新的消息生产者和消费者来应对负载增加的情况。
易于部署:提供开箱即用的部署方式,非常适合在分布式系统中使用。
RocketMQ架构
生产者(Producer)
用于产生消息的运行实体,通常集成在业务系统的上游。
主题(Topic)
消息传输和存储的分组容器,内部由多个队列组成。
队列(MessageQueue)
消息传输和存储的实际单元容器,类似于其他消息队列中的分区。
消息(Message)
RocketMQ 的最小传输单元,具备不可变性。
消费者分组(ConsumerGroup)
发布订阅模型中定义的独立的消费身份分组,用于统一管理多个消费者。
消费者(Consumer)
消费消息的运行实体,集成在业务系统的下游。
订阅关系(Subscription)
发布订阅模型中消息过滤、重试、消费进度的规则配置。
部署RocketMQ(docker)
安装 Docker:
#debiancurl -fsSL https://get.docker.com -o get-docker.shsudo sh get-docker.sh
拉取 RocketMQ 镜像:
使用以下命令从 Docker Hub 拉取最新的 RocketMQ 镜像:
docker pull apache/rocketmq:latest
启动 RocketMQ NameServer:
RocketMQ 的 NameServer 是负责管理所有 Broker 节点的目录服务。可以使用以下命令启动 NameServer:
docker run -d --name rmqnamesrv -p 9876:9876 apache/rocketmq:latest sh mqnamesrv
```bash
## 启动 RocketMQ Broker:
RocketMQ 的 Broker 负责存储消息并处理生产者和消费者的请求。使用以下命令启动 Broker:```bash
docker run -d --name rmqbroker --link rmqnamesrv:namesrv -e "NAMESRV_ADDR=namesrv:9876" -e "BROKER_NAME=broker-a" -e "BROKER_ID=0" -e "AUTO_CREATE_TOPIC_ENABLE=true" -e "AUTO_CREATE_SUBSCRIPTION_GROUP=true" -p 10911:10911 -p 10909:10909 apache/rocketmq:latest sh mqbroker
验证
查看 NameServer 日志:
docker logs -f rmqnamesrv
查看 Broker 日志:
docker logs -f rmqbroker
使用 RocketMQ Console:
如果需要可视化管理 RocketMQ,可以运行 RocketMQ Console:
docker pull styletang/rocketmq-console-ng
docker run -d -p 8080:8080 --link rmqnamesrv:namesrv -e "JAVA_OPTS=-Drocketmq.namesrv.addr=namesrv:9876" styletang/rocketmq-console-ng
总结
# 拉取 RocketMQ 镜像
docker pull apache/rocketmq:latest# 启动 NameServer
docker run -d --name rmqnamesrv -p 9876:9876 apache/rocketmq:latest sh mqnamesrv# 启动 Broker
docker run -d --name rmqbroker --link rmqnamesrv:namesrv -e "NAMESRV_ADDR=namesrv:9876" -p 10911:10911 -p 10909:10909 apache/rocketmq:latest sh mqbroker# 启动 RocketMQ Console
docker pull styletang/rocketmq-console-ng
docker run -d -p 8080:8080 --link rmqnamesrv:namesrv -e "JAVA_OPTS=-Drocketmq.namesrv.addr=namesrv:9876" styletang/rocketmq-console-ng
如果是外网环境
run Broker 外网IP
mkdir -p /usr/data/rocketMQ/data/broker/logs
mkdir -p /usr/data/rocketMQ/data/broker/store
mkdir -p /usr/data/rocketMQ/data/broker/conf/
broket.conf
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
namesrvAddr = 外网ip:9876
brokerIP1 = 外网ip
autoCreateTopicEnable=true
docker run -p 0.0.0.0:10911:10911 -p 0.0.0.0:10909:10909 -d -v /usr/data/rocketMQ/data/broker/logs:/root/logs -v /usr/data/rocketMQ/data/broker/store:/root/store -v /usr/data/rocketMQ/data/broker/conf/broker.conf:/opt/rocketmq/conf/broker.conf --name rmqbroker --link rmqnamesrv:namesrv -e "NAMESRV_ADDR=namesrv:9876" -e "MAX_POSSIBLE_HEAP=200000000" -e "autoCreateTopicEnable=true" rocketmqinc/rocketmq sh mqbroker -c /opt/rocketmq/conf/broker.conf
.NET
NuGet NewLife.RocketMQ
Producer
//mainXTrace.UseConsole();var producer = new Producer{NameServerAddress = "x.x.x.x:9876",Topic = "t0529"};try{producer.Start();// 发送一条测试消息,以确保 Topic 被创建for (var i = 0; i < 10; i++){var str = "mqm" + i;//var str = Rand.NextString(1337);var sr = producer.Publish(str, "TagA");}}catch (Exception ex){Console.WriteLine($"Exception: {ex.Message}");}finally{producer.Stop();}
Consumer
var consumer = new Consumer{Topic = "t0529",Group = "test",NameServerAddress = "x.x.x.x:9876",FromLastOffset = false,//SkipOverStoredMsgCount = 0,//BatchSize = 20,Log = XTrace.Log,ClientLog = XTrace.Log,};consumer.OnConsume = OnConsume;consumer.Configure(MqSetting.Current);consumer.Start();_consumer = consumer;