MQ 之 RocketMQ

前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到教程。

RocketMQ 是出自 A 公司的开源产品,用 Java 语言实现,在设计时参考了 Kafka,并做出了自己的一些改进,消息可靠性上比 Kafka 更好,目前,RocketMQ 的文档仍然不够丰富 1 2,社区仍然无法与 Kafka 比肩,但 A 公司已经推出了基于 RocketMQ 的云产品 3,相信未来 RocketMQ 也会有不错的发展。本文采用 RocketMQ 3.2.6 进行实验,由于 RocketMQ 与 Kafka 很相似,本文很多地方对两者做出了比较。

基本概念

RocketMQ 由于借鉴了 Kafka 的设计,包括组件的命名也很多与 Kafka 相似,下面摘抄一段《RocketMQ 原理简介》中的介绍,可以与 Kafka 的命名比对一下,

  • Producer,消息生产者,负责产生消息,一般由业务系统负责产生消息。
  • Consumer,消息消费者,负责消费消息,一般是后台系统负责异步消费。
  • Push Consumer,Consumer 的一种,应用通常向 Consumer 对象注册一个 Listener 接口,一旦收到消息,Consumer 对象立 刻回调 Listener 接口方法。
  • Pull Consumer,Consumer 的一种,应用通常主动调用 Consumer 的拉消息方法从 Broker 拉消息,主动权由应用控制。
  • Producer Group,一类 Producer 的集合名称,这类 Producer 通常发送一类消息,且发送逻辑一致。
  • Consumer Group,一类 Consumer 的集合名称,这类 Consumer 通常消费一类消息,且消费逻辑一致。
  • Broker,消息中转角色,负责存储消息,转发消息,一般也称为 Server。在 JMS 规范中称为 Provider。

《RocketMQ 原理简介》中还介绍了一些其他的概念,例如,广播消费和集群消费,广播消费是 Consumer Group 中对于同一条消息每个 Consumer 都消费,集群消费是 Consumer Group 中对于同一条消息只有一个 Consumer 消费。Kafka 采用的是集群消费,不支持广播消费(好吧,是我没有找到)。再例如,普通顺序消息和严格顺序消息,普通顺序消息在 Broker 重启情况下不会保证消息顺序性;严格顺序消息即使在异常情况下也会保证消息的顺序性。个人理解,所谓普通顺序消息,应该就是 Kafka 中的 Partition 级别有序,严格顺序消息,应该是 Topic 级别有序,但文中也提到,这样的有序级别是要付出代价的,Broker 集群中只要有一台机器不可用,则整个集群都不可用,降低服务可用性。使用这种模式,需要依赖同步双写,主备自动切换,但自动切换功能目前还未实现(我猜,自动切换仅仅是没开源吧)。说白了,严格顺序消息不具备生产可用性,自己玩玩还行,其应用场景主要是数据库 binlog 同步。

关于 RocketMQ 和 Kafka 的对比,可以参考 RocketMQ Wiki 中的文章 4,看看就行,不必较真。

关于顺序和分区

顺序性的话题,刚才已经提到了一些,RocketMQ 的实现应该不弱于 Kafka。对于分区,RocketMQ 似乎有意弱化了这个概念,只有在 Producer 中有一个参数 defaultTopicQueueNums,分区在 RocketMQ 中有时被称为队列。RocketMQ 的普通顺序消息模式,应该就是分区顺序性,这点与 Kafka 一致。

关于高可用

RocketMQ 实现高可用的方式有多种,《RocketMQ 用户指南》文档中提到的有:多主模式、多主多从异步复制模式、多主多从同步复制模式。多主模式下,性能较好,但是在 Broker 宕机的时候,该 Broker 上未消费的交易不可消费;多主多从异步复制模式,与 Kafka 的副本模式比较类似,主 Broker 宕机后,会自动切换到从 Broker,消息的消费不会出现间断;多主多从同步复制模式更进一步,采用同步刷盘的方式,避免了主 Broker 宕机带来的消息丢失,但是,目前不支持自动切换。

虽然 RocketMQ 提供了多种高可用方式,但是目前能生产使用的就只有多主多从异步复制模式,即使在这个模式上,其实现也比 Kafka 要差。因为 RocketMQ 的机制中,主从关系是人为指定的,主 Broker 上承担所有的消息派发,而 Kafka 的主从关系是通过选举的方式选出来的,每个分区的主节点都是不一样的,可以从不同的节点派发消息。Kafka 的模式是分散模式,有利于负载均衡,而且当一个 Broker 宕机的时候,只影响部分 Topic,而 RocketMQ 一旦主 Broker 宕机,会影响所有的 Topic。另外,Kafka 可以支持 Broker 间同步复制(通过设置 Broker 的 acks 参数),这样比的话,RocketMQ 就差太多了。

关于 RocketMQ 的介绍,网上的文章不算太多,也比较杂,《分布式开放消息系统(RocketMQ)的原理与实践》5 6 7这篇原理介绍的不错,推荐。

RocketMQ 的工具和编程接口

RocketMQ 的工具

相比较 Kafka 而言,RocketMQ 提供的工具要少一些,如下,

bin/mqadminbin/mqbrokerbin/mqbroker.numanode0bin/mqbroker.numanode1bin/mqbroker.numanode2bin/mqbroker.numanode3bin/mqfiltersrvbin/mqnamesrvbin/mqshutdown

除了进程启停之外,常用的运维命令都在 mqadmin 中,详见《RocketMQ 运维指令》文档。我实验中常用的一些命令如下,

sh mqnamesrv &sh mqbroker -c async-broker-a.properties &sh mqbroker -c async-broker-a-s.properties &sh mqadmin topicList -n 192.168.232.23:9876sh mqadmin topicRoute -n 192.168.232.23:9876 -t TopicTestjjjsh mqadmin clusterList -n 192.168.232.23:9876sh mqadmin deleteTopic -c DefaultCluster -n 192.168.232.23:9876 -t TopicTestjjjsh mqadmin consumerProgress -n 192.168.232.23:9876 -g ConsumerGroupNamecc4sh mqadmin deleteSubGroup -c DefaultCluster -n 192.168.232.23:9876 -g ConsumerGroupNamecc4sh mqadmin consumerConnection -n 192.168.232.23:9876 -g ConsumerGroupNamecc4

RocketMQ 使用了自己的 name server 来做调度(Kafka 用了 Zookeeper),使用 sh mqnamesrv 来启动,默认监听端口9876,sh mqnamesrv -m 可以查看所有默认参数,使用 -c xxxx.properties 参数来指定自定义配置。sh mqbroker 是用于启动 Broker 的命令,参数比较多,详细可以通过 sh mqbroker -m 查看默认参数,配置项细节后文再说。sh mqadmin 是运维命令入口,topicList 是列出所有 Topic;topicRoute 是列出单个 Topic 的详细信息;clusterList 是列出集群的信息;deleteTopic 是删除 Topic。consumerProgress 是查看消费者消费进度,deleteSubGroup 是删除消费者的订阅,consumerConnection 是查询消费者订阅的情况。

Broker 的配置是最多的,实验中我修改到的部分如下,其他使用默认,

brokerClusterName=DefaultClusterbrokerIP1=192.168.232.23brokerName=broker-abrokerId=0namesrvAddr=192.168.232.23:9876listenPort=10911deleteWhen=04fileReservedTime=120storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-a-asyncstorePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-a-async/commitlogbrokerRole=ASYNC_MASTERflushDiskType=ASYNC_FLUSH

配置文件中的多数配置看例子就可以知道意思,挑几个说一下。brokerName 和 brokerId, 同名的 Broker,ID 是0的是主节点,其他是从节点;deleteWhen,删除文件时间点,默认凌晨4点;fileReservedTime,文件保留时间,设置为120小时;brokerRole,Broker 的角色,ASYNC_MASTER 是异步复制主节点,SYNC_MASTER 是同步双写主节点,SLAVE 是备节点。

其实,这些工具的写法也基本一致,都是先做一些检查,最后运行 Java 程序,JVM 系统上的应用应该差不多都这样。

RocketMQ 的 Java API

RocketMQ 是用 Java 语言开发的,因此,其 Java API 相对是比较丰富的,当然也有部分原因是 RocketMQ 本身提供的功能就比较多。RocketMQ API 提供的功能包括,

  1. 广播消费,这个在之前已经提到过;
  2. 消息过滤,支持简单的 Message Tag 过滤,也支持按 Message Header、body 过滤;
  3. 顺序消费和乱序消费,之前也提到过,这里的顺序消费应该指的是普通顺序性,这一点与 Kafka 相同;
  4. Pull 模式消费,这个是相对 Push 模式来说的,Kafka 就是 Pull 模式消费;
  5. 事务消息,这个好像没有开源,但是 example 代码中有示例,总之,不推荐用;
  6. Tag,RocketMQ 在 Topic 下面又分了一层 Tag,用于表示消息类别,可以用来过滤,但是顺序性还是以 Topic 来看;

单看功能的话,即使不算事务消息,也不算 Tag,RocketMQ 也远超 Kafka,Kafka 应该只实现了 Pull 模式消费 + 顺序消费这2个功能。RocketMQ 的代码示例在 rocketmq-example 中,注意,代码是不能直接运行的,因为所有的代码都少了设置 name server 的部分,需要自己手动加上,例如,producer.setNamesrvAddr("192.168.232.23:9876");

先来看一下生产者的 API,比较简单,只有一种,如下,

import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.client.producer.DefaultMQProducer;import com.alibaba.rocketmq.client.producer.MessageQueueSelector;import com.alibaba.rocketmq.client.producer.SendResult;import com.alibaba.rocketmq.common.message.Message;import com.alibaba.rocketmq.common.message.MessageQueue;import java.util.List;public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");producer.setNamesrvAddr("192.168.232.23:9876");producer.start();for (int i = 0; i < 10; i++)try {{Message msg = new Message("TopicTest1",// topic"TagA",// tag"OrderID188",// key("RocketMQ "+String.format("%05d", i)).getBytes());// bodySendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;int index = id % mqs.size();return mqs.get(index);}}, i));System.out.println(String.format("%05d", i)+sendResult);}}catch (Exception e) {e.printStackTrace();}producer.shutdown();}}

可以发现,相比 Kafka 的 API,只多了 Tag,但实际上行为有很大不同。Kafka 的生产者客户端,有同步和异步两种模式,但都是阻塞模式,send 方法返回发送状态的 Future,可以通过 Future 的 get 方法阻塞获得发送状态。而 RocketMQ 采用的是同步非阻塞模式,发送之后立刻返回发送状态(而不是 Future)。正常情况下,两者使用上差别不大,但是在高可用场景中发生主备切换的时候,Kafka 的同步可以等待切换完成并重连,最后返回;而 RocketMQ 只能立刻报错,由生产者选择是否重发。所以,在生产者的 API 上,其实 Kafka 是要强一些的。

另外,RocketMQ 可以通过指定 MessageQueueSelector 类的实现来指定将消息发送到哪个分区去,Kafka 是通过指定生产者的 partitioner.class 参数来实现的,灵活性上 RocketMQ 略胜一筹。

再来看消费者的API,由于 RocketMQ 的功能比较多,我们先看 Pull 模式消费的API,如下,

import java.util.HashMap;import java.util.Map;import java.util.Set;import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;import com.alibaba.rocketmq.client.consumer.PullResult;import com.alibaba.rocketmq.client.consumer.store.OffsetStore;import com.alibaba.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.common.message.MessageExt;import com.alibaba.rocketmq.common.message.MessageQueue;public class PullConsumer {private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();public static void main(String[] args) throws MQClientException {DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");consumer.setNamesrvAddr("192.168.232.23:9876");consumer.start();Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1");for (MessageQueue mq : mqs) {System.out.println("Consume from the queue: " + mq);SINGLE_MQ: while (true) {try {long offset = consumer.fetchConsumeOffset(mq, true);PullResult pullResult =consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);if (null != pullResult.getMsgFoundList()) {for (MessageExt messageExt : pullResult.getMsgFoundList()) {System.out.print(new String(messageExt.getBody()));System.out.print(pullResult);System.out.println(messageExt);}}putMessageQueueOffset(mq, pullResult.getNextBeginOffset());switch (pullResult.getPullStatus()) {case FOUND:// TODObreak;case NO_MATCHED_MSG:break;case NO_NEW_MSG:break SINGLE_MQ;case OFFSET_ILLEGAL:break;default:break;}}catch (Exception e) {e.printStackTrace();}}}consumer.shutdown();}private static void putMessageQueueOffset(MessageQueue mq, long offset) {offseTable.put(mq, offset);}private static long getMessageQueueOffset(MessageQueue mq) {Long offset = offseTable.get(mq);if (offset != null)return offset;return 0;}}

这部分的 API 其实是和 Kafka 很相似的,唯一不同的是,RocketMQ 需要手工管理 offset 和指定分区,而 Kafka 可以自动管理(当然也可以手动管理),并且不需要指定分区(分区是在 Kafka 订阅的时候指定的)。例子中,RocketMQ 使用 HashMap 自行管理,也可以用 OffsetStore 接口,提供了两种管理方式,本地文件和远程 Broker。这部分感觉两者差不多。

下面再看看 Push 模式顺序消费,代码如下,

import java.util.List;import java.util.concurrent.atomic.AtomicLong;import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;import com.alibaba.rocketmq.common.message.MessageExt;public class Consumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");consumer.setNamesrvAddr("192.168.232.23:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("TopicTest1", "TagA || TagC || TagD");consumer.registerMessageListener(new MessageListenerOrderly() {AtomicLong consumeTimes = new AtomicLong(0);@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {context.setAutoCommit(false);System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);this.consumeTimes.incrementAndGet();if ((this.consumeTimes.get() % 2) == 0) {return ConsumeOrderlyStatus.SUCCESS;}else if ((this.consumeTimes.get() % 3) == 0) {return ConsumeOrderlyStatus.ROLLBACK;}else if ((this.consumeTimes.get() % 4) == 0) {return ConsumeOrderlyStatus.COMMIT;}else if ((this.consumeTimes.get() % 5) == 0) {context.setSuspendCurrentQueueTimeMillis(3000);return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.println("Consumer Started.");}}

虽然提供了 Push 模式,RocketMQ 内部实际上还是 Pull 模式的 MQ,Push 模式的实现应该采用的是长轮询,这点与 Kafka 一样。使用该方式有几个注意的地方,

  1. 接收消息的监听类要使用 MessageListenerOrderly
  2. ConsumeFromWhere 有几个参数,表示从头开始消费,从尾开始消费,还是从某个 TimeStamp 开始消费;
  3. 可以控制 offset 的提交,应该就是 context.setAutoCommit(false); 的作用;

控制 offset 提交这个特性非常有用,某种程度上扩展一下,就可以当做事务来用了,看代码 ConsumeMessageOrderlyService 的实现,其实并没有那么复杂,在不启用 AutoCommit 的时候,只有返回 COMMIT 才 commit offset;启用 AutoCommit 的时候,返回 COMMITROLLBACK(这个比较扯)、SUCCESS 的时候,都 commit offset。

后来发现,commit offset 功能在 Kafka 里面也有提供,使用新的 API,调用 consumer.commitSync

再看一个 Push 模式乱序消费 + 消息过滤的例子,消费者的代码如下,

import java.util.List;import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.common.message.MessageExt;public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupNamecc4");consumer.setNamesrvAddr("192.168.232.23:9876");consumer.subscribe("TopicTest1", MessageFilterImpl.class.getCanonicalName());consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("Consumer Started.");}}


接收消息的监听类使用的是 MessageListenerConcurrently;这个例子与之前顺序消费不同的地方在于,

  1. 回调方法中,使用的是自动 offset commit;
  2. 订阅的时候增加了消息过滤类 MessageFilterImpl

消息过滤类 MessageFilterImpl 的代码如下,

import com.alibaba.rocketmq.common.filter.MessageFilter;import com.alibaba.rocketmq.common.message.MessageExt;public class MessageFilterImpl implements MessageFilter {@Overridepublic boolean match(MessageExt msg) {String property = msg.getUserProperty("SequenceId");if (property != null) {int id = Integer.parseInt(property);if ((id % 3) == 0 && (id > 10)) {return true;}}return false;}}

RocketMQ 执行过滤是在 Broker 端,Broker 所在的机器会启动多个 FilterServer 过滤进程;Consumer 启动后,会向 FilterServer 上传一个过滤的 Java 类;Consumer 从 FilterServer 拉消息,FilterServer 将请求转发给 Broker,FilterServer 从 Broker 收到消息后,按照 Consumer 上传的 Java 过滤程序做过滤,过滤完成后返回给 Consumer。这种过滤方法可以节省网络流量,但是增加了 Broker 的负担。可惜我没有实验出来使用过滤的效果,即使是用 github wiki 上的例子8也没成功,不纠结了。RocketMQ 的按 Tag 过滤的功能也是在 Broker 上做的过滤,能用,是个很方便的功能。

还有一种广播消费模式,比较简单,可以去看代码,不再列出。

总之,RocketMQ 提供的功能比较多,比 Kafka 多很多易用的 API。

RocketMQ 的主备模式

按之前所说,只有 RocketMQ 的多主多从异步复制是可以生产使用的,因此只在这个场景下测试。另外,消息采用 Push 顺序模式消费。

假设集群采用2主2备的模式,需要启动4个 Broker,配置文件如下,

brokerName=broker-abrokerId=0listenPort=10911storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-a-asyncstorePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-a-async/commitlogbrokerRole=ASYNC_MASTERbrokerName=broker-abrokerId=1listenPort=10921storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-a-async-slavestorePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-a-async-slave/commitlogbrokerRole=SLAVEbrokerName=broker-bbrokerId=0listenPort=20911storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-b-asyncstorePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-b-async/commitlogbrokerRole=ASYNC_MASTERbrokerRole=ASYNC_MASTERbrokerName=broker-bbrokerId=1listenPort=20921storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-b-async-slavestorePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-b-async-slave/commitlogbrokerRole=SLAVE

另外,每个机构共通的配置项如下,

brokerClusterName=DefaultClusterbrokerIP1=192.168.232.23namesrvAddr=192.168.232.23:9876deleteWhen=04fileReservedTime=120flushDiskType=ASYNC_FLUSH

其他设置均采用默认。启动 NameServer 和所有 Broker,并试运行一下 Producer,然后看一下 TestTopic1 当前的情况,

$ sh mqadmin topicRoute -n 192.168.232.23:9876 -t TopicTest1{"brokerDatas":[{"brokerAddrs":{0:"192.168.232.23:20911",1:"192.168.232.23:20921"},"brokerName":"broker-b"},{"brokerAddrs":{0:"192.168.232.23:10911",1:"192.168.232.23:10921"},"brokerName":"broker-a"}],"filterServerTable":{},"queueDatas":[{"brokerName":"broker-a","perm":6,"readQueueNums":4,"topicSynFlag":0,"writeQueueNums":4},{"brokerName":"broker-b","perm":6,"readQueueNums":4,"topicSynFlag":0,"writeQueueNums":4}]}

可见,TestTopic1 在2个 Broker 上,且每个 Broker 备机也在运行。下面开始主备切换的实验,分别启动 Consumer 和 Producer 进程,消息采用 Pull 顺序模式消费。在消息发送接收过程中,使用 kill -9 停掉 broker-a 的主进程,模拟突然宕机。此时,TestTopic1 的状态如下,

$ sh mqadmin topicRoute -n 192.168.232.23:9876 -t TopicTest1{"brokerDatas":[{"brokerAddrs":{0:"192.168.232.23:20911",1:"192.168.232.23:20921"},"brokerName":"broker-b"},{"brokerAddrs":{1:"192.168.232.23:10921"},"brokerName":"broker-a"}],"filterServerTable":{},"queueDatas":[{"brokerName":"broker-a","perm":6,"readQueueNums":4,"topicSynFlag":0,"writeQueueNums":4},{"brokerName":"broker-b","perm":6,"readQueueNums":4,"topicSynFlag":0,"writeQueueNums":4}]}

此时,RocketMQ 已经恢复。

再来看看 Producer 和 Consumer 的日志,先看 Producer 的,如下,

......00578SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F0000000000126F08, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2], queueOffset=141]00579SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F0000000000126F9F, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=3], queueOffset=141]00580SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078D47, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=700]00581SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078DDE, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=700]00582SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078E75, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=699]00583SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078F0C, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=699]com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)00588SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078FA3, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=701]00589SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF000000000007903A, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=701]00590SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000790D1, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=700]00591SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000079168, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=700]com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)00596SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000791FF, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=702]00597SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000079296, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=702]00598SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF000000000007932D, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=701]00599SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000793C4, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=701]00600SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF000000000007945B, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=703]00601SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000794F2, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=703]00602SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000079589, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=702]00603SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000079620, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=702]......01389SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000965BE, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=900]01390SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000096655, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=899]01391SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000966EC, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=899]01392SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F0000000000127036, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=0], queueOffset=143]01393SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F00000000001270CD, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=1], queueOffset=141]01394SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F0000000000127164, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2], queueOffset=142]01395SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F00000000001271FB, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=3], queueOffset=142]01396SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000096783, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=901]01397SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF000000000009681A, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=901]

日志中显示,在发送完00583条消息之后,开始发生异常 connect to <192.168.232.23:10911> failed,原因应该是 broker-a 的主节点被 kill 掉。之后,从00596条消息开始,RocketMQ 又恢复正常,原因是 broker-b 已经开始提供服务,承担了所有的工作。然后,又重新启动了 broker-a 主节点,由于该节点的加入,从01392条消息开始,broker-a 又开始恢复工作。实验中可以验证,RocketMQ 所谓的多主多备模式,实际上,备机被弱化到无以复加,在主节点宕机的时候,备机无法接替主机的工作,而只是将尚未发送的数据发送出去,由剩下的主节点接替工作。也就是说,N 主 N 备的 RocketMQ 集群中,总共有 2N 台机器,实际工作的只有 N 台,如果有一台挂了,就只有 N-1 台工作了,机器的利用率太低了。

再来看一下 Consumer 的日志,如下,

RocketMQ 00551PullResult [pullStatus=FOUND, nextBeginOffset=696, minOffset=0, maxOffset=696, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=695, sysFlag=0, bornTimestamp=1469175032446, bornHost=/192.168.234.98:51987, storeTimestamp=1469175020973, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000007859C, commitLogOffset=492956, bodyCRC=943070764, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=696, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00559PullResult [pullStatus=FOUND, nextBeginOffset=697, minOffset=0, maxOffset=697, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=696, sysFlag=0, bornTimestamp=1469175032720, bornHost=/192.168.234.98:51987, storeTimestamp=1469175021247, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF00000000000787F8, commitLogOffset=493560, bodyCRC=921540126, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=697, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00567PullResult [pullStatus=FOUND, nextBeginOffset=698, minOffset=0, maxOffset=698, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=697, sysFlag=0, bornTimestamp=1469175033005, bornHost=/192.168.234.98:51987, storeTimestamp=1469175021533, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000078A54, commitLogOffset=494164, bodyCRC=2054744282, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=698, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00575PullResult [pullStatus=FOUND, nextBeginOffset=699, minOffset=0, maxOffset=699, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=698, sysFlag=0, bornTimestamp=1469175033286, bornHost=/192.168.234.98:51987, storeTimestamp=1469175021814, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000078CB0, commitLogOffset=494768, bodyCRC=225294519, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=699, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00583PullResult [pullStatus=FOUND, nextBeginOffset=700, minOffset=0, maxOffset=700, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=699, sysFlag=0, bornTimestamp=1469175033586, bornHost=/192.168.234.98:51987, storeTimestamp=1469175022113, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000078F0C, commitLogOffset=495372, bodyCRC=1670775117, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=700, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00591PullResult [pullStatus=FOUND, nextBeginOffset=701, minOffset=0, maxOffset=701, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=700, sysFlag=0, bornTimestamp=1469175037890, bornHost=/192.168.234.98:51987, storeTimestamp=1469175026418, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000079168, commitLogOffset=495976, bodyCRC=344150304, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=701, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00599PullResult [pullStatus=FOUND, nextBeginOffset=702, minOffset=0, maxOffset=702, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=701, sysFlag=0, bornTimestamp=1469175042200, bornHost=/192.168.234.98:51987, storeTimestamp=1469175030734, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF00000000000793C4, commitLogOffset=496580, bodyCRC=442030354, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=702, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00603PullResult [pullStatus=FOUND, nextBeginOffset=703, minOffset=0, maxOffset=703, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=702, sysFlag=0, bornTimestamp=1469175042345, bornHost=/192.168.234.98:51987, storeTimestamp=1469175030872, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000079620, commitLogOffset=497184, bodyCRC=688469276, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=703, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00607PullResult [pullStatus=FOUND, nextBeginOffset=704, minOffset=0, maxOffset=704, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=703, sysFlag=0, bornTimestamp=1469175042481, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031008, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000007987C, commitLogOffset=497788, bodyCRC=778367237, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=704, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00611PullResult [pullStatus=FOUND, nextBeginOffset=705, minOffset=0, maxOffset=705, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=704, sysFlag=0, bornTimestamp=1469175042615, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031143, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000079AD8, commitLogOffset=498392, bodyCRC=1578919281, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=705, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00615PullResult [pullStatus=FOUND, nextBeginOffset=706, minOffset=0, maxOffset=706, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=705, sysFlag=0, bornTimestamp=1469175042753, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031280, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000079D34, commitLogOffset=498996, bodyCRC=1500619112, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=706, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00619PullResult [pullStatus=FOUND, nextBeginOffset=707, minOffset=0, maxOffset=707, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=706, sysFlag=0, bornTimestamp=1469175042887, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031414, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000079F90, commitLogOffset=499600, bodyCRC=1355279683, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=707, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00623PullResult [pullStatus=FOUND, nextBeginOffset=708, minOffset=0, maxOffset=708, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=707, sysFlag=0, bornTimestamp=1469175043021, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031548, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000007A1EC, commitLogOffset=500204, bodyCRC=457136030, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=708, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00627PullResult [pullStatus=FOUND, nextBeginOffset=709, minOffset=0, maxOffset=709, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=708, sysFlag=0, bornTimestamp=1469175043154, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031681, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000007A448, commitLogOffset=500808, bodyCRC=475173767, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=709, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00631PullResult [pullStatus=FOUND, nextBeginOffset=710, minOffset=0, maxOffset=710, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=709, sysFlag=0, bornTimestamp=1469175043299, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031826, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000007A6A4, commitLogOffset=501412, bodyCRC=1814693875, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=710, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00635PullResult [pullStatus=FOUND, nextBeginOffset=711, minOffset=0, maxOffset=711, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=710, sysFlag=0, bornTimestamp=1469175043435, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031962, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000007A900, commitLogOffset=502016, bodyCRC=1799865322, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=711, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessageSync(MQClientAPIImpl.java:518)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessage(MQClientAPIImpl.java:433)at com.alibaba.rocketmq.client.impl.consumer.PullAPIWrapper.pullKernelImpl(PullAPIWrapper.java:237)at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullSyncImpl(DefaultMQPullConsumerImpl.java:304)at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullBlockIfNotFound(DefaultMQPullConsumerImpl.java:425)at com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer.pullBlockIfNotFound(DefaultMQPullConsumer.java:321)at com.comstar.demo.rocketmq.simple.PullConsumer.main(PullConsumer.java:56)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessageSync(MQClientAPIImpl.java:518)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessage(MQClientAPIImpl.java:433)at com.alibaba.rocketmq.client.impl.consumer.PullAPIWrapper.pullKernelImpl(PullAPIWrapper.java:237)at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullSyncImpl(DefaultMQPullConsumerImpl.java:304)at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullBlockIfNotFound(DefaultMQPullConsumerImpl.java:425)at com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer.pullBlockIfNotFound(DefaultMQPullConsumer.java:321)at com.comstar.demo.rocketmq.simple.PullConsumer.main(PullConsumer.java:56)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessageSync(MQClientAPIImpl.java:518)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessage(MQClientAPIImpl.java:433)at com.alibaba.rocketmq.client.impl.consumer.PullAPIWrapper.pullKernelImpl(PullAPIWrapper.java:237)at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullSyncImpl(DefaultMQPullConsumerImpl.java:304)at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullBlockIfNotFound(DefaultMQPullConsumerImpl.java:425)at com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer.pullBlockIfNotFound(DefaultMQPullConsumer.java:321)at com.comstar.demo.rocketmq.simple.PullConsumer.main(PullConsumer.java:56)Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=148, queueOffset=0, sysFlag=0, bornTimestamp=1468572196808, bornHost=/192.168.234.98:56837, storeTimestamp=1468572191827, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000011C60, commitLogOffset=72800, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=148, queueOffset=1, sysFlag=0, bornTimestamp=1468572196876, bornHost=/192.168.234.98:56837, storeTimestamp=1468572191895, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000011EB0, commitLogOffset=73392, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=148, queueOffset=2, sysFlag=0, bornTimestamp=1468572196903, bornHost=/192.168.234.98:56837, storeTimestamp=1468572191928, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000012100, commitLogOffset=73984, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]RocketMQ 00001PullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=151, queueOffset=3, sysFlag=0, bornTimestamp=1468572718149, bornHost=/192.168.234.98:57165, storeTimestamp=1468572713175, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000001222B, commitLogOffset=74283, bodyCRC=1133127810, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=14]]RocketMQ 00005PullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=151, queueOffset=4, sysFlag=0, bornTimestamp=1468572718178, bornHost=/192.168.234.98:57165, storeTimestamp=1468572713210, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000012487, commitLogOffset=74887, bodyCRC=1156050075, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=14]]......[queueId=1, storeSize=151, queueOffset=22, sysFlag=0, bornTimestamp=1469170324786, bornHost=/192.168.234.98:49814, storeTimestamp=1469170313333, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000010D3AA, commitLogOffset=1102762, bodyCRC=1707898805, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00477PullResult [pullStatus=FOUND, nextBeginOffset=62, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=151, queueOffset=23, sysFlag=0, bornTimestamp=1469170325237, bornHost=/192.168.234.98:49814, storeTimestamp=1469170313771, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000010D606, commitLogOffset=1103366, bodyCRC=1654764460, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00481PullResult [pullStatus=FOUND, nextBeginOffset=62, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=151, queueOffset=24, sysFlag=0, bornTimestamp=1469170325652, bornHost=/192.168.234.98:49814, storeTimestamp=1469170314163, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000010D862, commitLogOffset=1103970, bodyCRC=207227478, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00485PullResult [pullStatus=FOUND, nextBeginOffset=62, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=151, queueOffset=25, sysFlag=0, bornTimestamp=1469170326066, bornHost=/192.168.234.98:49814, storeTimestamp=1469170314595, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000010DABE, commitLogOffset=1104574, bodyCRC=188206671, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]......RocketMQ 01370PullResult [pullStatus=FOUND, nextBeginOffset=895, minOffset=0, maxOffset=895, msgFoundList=1]MessageExt [queueId=2, storeSize=151, queueOffset=894, sysFlag=0, bornTimestamp=1469175070573, bornHost=/192.168.234.98:51987, storeTimestamp=1469175059101, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000095A89, commitLogOffset=613001, bodyCRC=1094080495, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=895, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 01374PullResult [pullStatus=FOUND, nextBeginOffset=896, minOffset=0, maxOffset=896, msgFoundList=1]MessageExt [queueId=2, storeSize=151, queueOffset=895, sysFlag=0, bornTimestamp=1469175070712, bornHost=/192.168.234.98:51987, storeTimestamp=1469175059251, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000095CE5, commitLogOffset=613605, bodyCRC=1180406774, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=896, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 01378PullResult [pullStatus=FOUND, nextBeginOffset=897, minOffset=0, maxOffset=897, msgFoundList=1]MessageExt [queueId=2, storeSize=151, queueOffset=896, sysFlag=0, bornTimestamp=1469175070899, bornHost=/192.168.234.98:51987, storeTimestamp=1469175059427, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000095F41, commitLogOffset=614209, bodyCRC=1340989405, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=897, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 01382PullResult [pullStatus=FOUND, nextBeginOffset=898, minOffset=0, maxOffset=898, msgFoundList=1]MessageExt [queueId=2, storeSize=151, queueOffset=897, sysFlag=0, bornTimestamp=1469175071054, bornHost=/192.168.234.98:51987, storeTimestamp=1469175059582, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000009619D, commitLogOffset=614813, bodyCRC=681585164, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=898, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 01386PullResult [pullStatus=FOUND, nextBeginOffset=899, minOffset=0, maxOffset=899, msgFoundList=1]MessageExt [queueId=2, storeSize=151, queueOffset=898, sysFlag=0, bornTimestamp=1469175071203, bornHost=/192.168.234.98:51987, storeTimestamp=1469175059731, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF00000000000963F9, commitLogOffset=615417, bodyCRC=802024981, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=899, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 01390PullResult [pullStatus=FOUND, nextBeginOffset=900, minOffset=0, maxOffset=900, msgFoundList=1]MessageExt [queueId=2, storeSize=151, queueOffset=899, sysFlag=0, bornTimestamp=1469175071338, bornHost=/192.168.234.98:51987, storeTimestamp=1469175059866, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000096655, commitLogOffset=616021, bodyCRC=1605728865, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=900, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=209, msgFoundList=31]MessageExt [queueId=0, storeSize=148, queueOffset=0, sysFlag=0, bornTimestamp=1468571752640, bornHost=/192.168.234.98:56433, storeTimestamp=1468571747895, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000011B38, commitLogOffset=72504, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=209, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=209, msgFoundList=31]MessageExt [queueId=0, storeSize=148, queueOffset=1, sysFlag=0, bornTimestamp=1468572196772, bornHost=/192.168.234.98:56837, storeTimestamp=1468572191803, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000011BCC, commitLogOffset=72652, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=209, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=209, msgFoundList=31]MessageExt [queueId=0, storeSize=148, queueOffset=2, sysFlag=0, bornTimestamp=1468572196865, bornHost=/192.168.234.98:56837, storeTimestamp=1468572191886, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000011E1C, commitLogOffset=73244, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=209, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=209, msgFoundList=31]MessageExt [queueId=0, storeSize=148, queueOffset=3, sysFlag=0, bornTimestamp=1468572196899, bornHost=/192.168.234.98:56837, storeTimestamp=1468572191917, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000001206C, commitLogOffset=73836, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=209, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]RocketMQ 00000PullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=209, msgFoundList=31]MessageExt [queueId=0, storeSize=151, queueOffset=4, sysFlag=0, bornTimestamp=1468572718127, bornHost=/192.168.234.98:57165, storeTimestamp=1468572713166, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000012194, commitLogOffset=74132, bodyCRC=881661972, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=209, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=14]]RocketMQ 00004PullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=209, msgFoundList=31]MessageExt [queueId=0, storeSize=151, queueOffset=5, sysFlag=0, bornTimestamp=1468572718170, bornHost=/192.168.234.98:57165, storeTimestamp=1468572713197, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F00000000000123F0, commitLogOffset=74736, bodyCRC=870374413, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=209, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=14]]......RocketMQ 00560PullResult [pullStatus=FOUND, nextBeginOffset=210, minOffset=0, maxOffset=210, msgFoundList=24]MessageExt [queueId=0, storeSize=151, queueOffset=140, sysFlag=0, bornTimestamp=1469175032756, bornHost=/192.168.234.98:51986, storeTimestamp=1469175021285, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000126922, commitLogOffset=1206562, bodyCRC=1679588729, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=210, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00568PullResult [pullStatus=FOUND, nextBeginOffset=210, minOffset=0, maxOffset=210, msgFoundList=24]MessageExt [queueId=0, storeSize=151, queueOffset=141, sysFlag=0, bornTimestamp=1469175033043, bornHost=/192.168.234.98:51986, storeTimestamp=1469175021570, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000126B7E, commitLogOffset=1207166, bodyCRC=1791489355, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=210, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00576PullResult [pullStatus=FOUND, nextBeginOffset=210, minOffset=0, maxOffset=210, msgFoundList=24]MessageExt [queueId=0, storeSize=151, queueOffset=142, sysFlag=0, bornTimestamp=1469175033320, bornHost=/192.168.234.98:51986, storeTimestamp=1469175021848, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000126DDA, commitLogOffset=1207770, bodyCRC=342157581, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=210, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 01392PullResult [pullStatus=FOUND, nextBeginOffset=210, minOffset=0, maxOffset=210, msgFoundList=24]MessageExt [queueId=0, storeSize=151, queueOffset=143, sysFlag=0, bornTimestamp=1469175071411, bornHost=/192.168.234.98:52034, storeTimestamp=1469175059951, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000127036, commitLogOffset=1208374, bodyCRC=834345805, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=210, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 01400PullResult [pullStatus=FOUND, nextBeginOffset=210, minOffset=0, maxOffset=210, msgFoundList=24]MessageExt [queueId=0, storeSize=151, queueOffset=144, sysFlag=0, bornTimestamp=1469175071746, bornHost=/192.168.234.98:52034, storeTimestamp=1469175060289, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000127292, commitLogOffset=1208978, bodyCRC=188274605, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=210, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 01408PullResult [pullStatus=FOUND, nextBeginOffset=211, minOffset=0, maxOffset=211, msgFoundList=1]MessageExt [queueId=0, storeSize=151, queueOffset=145, sysFlag=0, bornTimestamp=1469175072078, bornHost=/192.168.234.98:52034, storeTimestamp=1469175060614, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F00000000001274EE, commitLogOffset=1209582, bodyCRC=98787231, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=211, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 01416PullResult [pullStatus=FOUND, nextBeginOffset=214, minOffset=0, maxOffset=214, msgFoundList=3]MessageExt [queueId=0, storeSize=151, queueOffset=146, sysFlag=0, bornTimestamp=1469175072405, bornHost=/192.168.234.98:52034, storeTimestamp=1469175060934, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000012774A, commitLogOffset=1210186, bodyCRC=2067809241, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=214, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]

可以看到,Consumer 在 broker-a 宕机时间的附近,也出现了异常,connect to <192.168.232.23:10911> failed。虽然还能保持分区上的顺序性,但是已经某种程度上出现了一些紊乱,例如,将我在实验之前的数据给取了出来(Hello MetaQ的消息)。可是,我在实验前,明明做过删除这个 Topic 的动作,看来 RocketMQ 所谓的删除,并未删除 Topic 的数据。之后,broker-a 主机重启之后,又恢复正常。

RocketMQ Pull模式消费需要手动管理 offset 和指定分区,这个在调用的时候不觉得,实际运行的时候才会发现每次总是消费一个分区,消费完之后,才开始消费下一个分区,而下一个分区可能已经堆积了很多消息了,手动做消息分配又比较费事。或许,Push 顺序模式消费才是更好的选择。

另外还有几个比较异常的情况,实验中有几次出现了 CODE: 17 DESC: topic[TopicTest1] not exist, apply first please! 这样的错误,实际上,这时候我只是关掉了 Producer;还有,sh mqadmin updateTopic –n 192.168.232.23:9876 –c DefaultCluster –t TopicTest1 明明文档中说可以用来新增 Topic,而实际上不行。

补充一下:之后,我又使用 Push 顺序模式消费重做了上述实验,结论差不多。只是因为有多线程的原因,日志看起来偶尔有错位,这个问题不大,可以解决。而且,在关闭重启 Broker 的附近,往往伴随着多次的消息重发,不过,RocketMQ 也不保证消息只收到一次就是了。消息重复的问题,Kafka 要比 RocketMQ 显得不那么严重一些。Push 顺序模式消费不需要指定 offset,不需要指定分区,第二次启动可以自动从前一次的 offset 后开始消费。功能上这个与 Kafka 的 Consumer 更类似,虽然 RocketMQ 采用的是异步模式。

RocketMQ 最佳实践

实际上,RocketMQ 自己就有一份《RocketMQ 最佳实践》的文档,里面提到了一些系统设计的问题,例如消费者要幂等,一个应用对应一个 Topic,如此等等。这些经验不仅仅是对 RocketMQ 有用,对 Kafka 也颇有借鉴意义。

后记

这里谈谈我对选择 RocketMQ 还是 Kafka 的个人建议。以上已经做了多处 RocketMQ 和 Kafka 的对比,我个人觉得,Kafka 是一个不断发展中的系统,开源社区比 RocketMQ 要大,也要更活跃一些;另外,Kafka 最新版本已经有了同步复制,消息可靠性更有保障;还有,Kafka 的分区机制,几乎实现了自动负载均衡,这绝对是个杀手级特性;RocketMQ 虽然提供了很多易用的功能,远超出 Kafka,但这些功能并不一定都能用得上,而且多数可以绕过。相比之下,Kafka 的基本功能更加吸引我,再处理故障恢复的时候,细节上要胜过 RocketMQ。当然,如果是 A 公司内部,或者所在公司使用了 A 公司的云产品,那么 RocketMQ 的企业级特性更多一些,或许我会选择 RocketMQ。

 

转自:http://valleylord.github.io/post/201607-mq-rocketmq/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/448602.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

好久没敲代码了(强行补上今天的博客。。。)

流水账日记&#xff08;哈哈&#xff09; 今天没课&#xff0c;早上好好的睡了个懒觉&#xff08;虽然还是很困- -&#xff09;&#xff1b; 哥几个把昨天买的排骨洗了做了个排骨汤&#xff0c;虽然不如家里做的好吃&#xff0c;但对此时的我们来说已经是美味了&#xff0c;晚…

Ubuntu下使用AMD APP编写OpenCL程序

对于Ubuntu或其近亲&#xff08;Lubuntu、Kubuntu、Mint等&#xff09;编写OpenCL程序也不会太难。由于本例用的是AMD APP SDK&#xff0c;因此需要AMD的GPU以及相关驱动。首先&#xff0c;去AMD官网下载GPU驱动——AMD Catalyst。如果你用的是APU并且还有一块独立显卡的话&…

jdk的安装与配置

Linux一、安装JDK 从sun网站上直接下载JDK&#xff1a;http://java.sun.com/j2se/1.4.2/download.html提供了两个下载j2re-1_4_2_10-linux-i586.bin 13.75 MB, j2re-1_4_2_10-linux-i586-rpm.bin 13.27 MB&#xff1a;1、RPM in self-extracting file (j2re-1_4_2_10-linux…

李洋疯狂C语言之n个人报数,报到3的退出,最后留在场上的是原来的第几位(约瑟夫环)

今天老师布置了个题目&#xff0c;约瑟夫环&#xff0c;俗称猴子选大王。n个人报数&#xff0c;报到3的退出&#xff0c;最后留在场上的时原来的第几位 #include <stdio.h>int main() {int i, n, q, p 0; //计数 i ,人数 n ,报数 p ,场上人数 qprintf ("input…

搭建Vue脚手架(vue-cli)并创建一个项目

1、 安装nodejs环境 官网下载&#xff1a;https://nodejs.org/en/download/ 一直默认就行&#xff0c;路径可以改变但要记得到 安装完成后cmd&#xff0c;输入node -v ,npm -v 如果能看到node和npm的版本号了&#xff0c;说明已经安装成功 2、安装vue-cli 有npm和cnpm两种方式…

NPM 使用介绍

前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到教程。 NPM是随同NodeJS一起安装的包管理工具&#xff0c;能解决NodeJS代码部署上的很多问题&#xff0c;常见的使用场景有以下几种&#xff1a…

人生致命的八个经典问题

问题一&#xff1a;如果你家附近有一家餐厅&#xff0c;东西又贵又难吃&#xff0c;桌上还爬着蟑螂&#xff0c;你会因为它很近很方便&#xff0c;就一而再、再而三地光临吗&#xff1f; 回答&#xff1a;你一定会说&#xff0c;这是什么烂问题&#xff0c;谁那么笨&#xff0c…

RabbitMQ学习总结(5)——发布和订阅实例详解

2019独角兽企业重金招聘Python工程师标准>>> 一、Publish/Subscribe&#xff08;发布/订阅&#xff09;&#xff08;using the Java Client&#xff09; 在前面的教程中,我们创建了一个work Queue&#xff08;工作队列&#xff09;。工作队列背后的假设是每个任务是…

iOS有哪些数据类型/基本数据类型?

简述 本文主要探究使用OC作为iOS开发语言时&#xff0c;我们能使用哪些数据类型。 一切类型始于C。 C语言的类型 基本数据类型&#xff1a; 基本数据类型&#xff08;fundamental data types&#xff09;也叫原始数据类型&#xff08;primitive data types&#xff09; 整型、字…

李洋疯狂C语言之将”you are come from shanghai ”倒置为”shanghai from come are you”,将句子中的单词位置倒置,而不改变单词内部结构

题目: 编写一个C函数,将”you are come from shanghai ”倒置为”shanghai from come are you”,及将句子中的单词位置倒置,而不改变单词内部结构 #include <stdio.h> #include <string.h> void change(char *p1, char *p2); //函数声明 int main() {char str[] …

马桶怎么清洗才干净无异味?

方法/步骤 在马桶水箱中一定要放上洁厕宝&#xff1a; 洁厕宝里面含有多种去除马桶中杂质以及异味的功能&#xff0c;另外它还带有香香的味道&#xff0c;我们一按冲马桶的按钮&#xff0c;放出来的总是蓝色的水&#xff0c;十分的美观和好看&#xff0c;但是这并不是花瓶般的作…

白话解说:阻塞和非阻塞,同步和异步

阻塞和非阻塞&#xff0c;同步和异步是node.js里经常遇到的词汇&#xff0c;举例说明&#xff1a; 我要看足球比赛&#xff0c;但是妈妈叫我烧水&#xff0c;电视机在客厅&#xff0c;烧水要在厨房。家里有2个水壶&#xff0c;一个是普通的水壶&#xff0c;另一个是水开了会叫的…

苏嵌点滴(一)

来苏嵌也有12天了&#xff0c;也渐渐开始习惯这样的生活&#xff0c;每天睁眼到闭眼&#xff0c;全都是代码。每天都得学习很多新的知识&#xff0c;C语言学到现在也学得差不多了&#xff0c;还有明天一天课。 指针、数组这些C语言中的重点&#xff0c;还是需要一点时间消化的…

Mysql学习总结(8)——MySql基本查询、连接查询、子查询、正则表达查询讲解...

2019独角兽企业重金招聘Python工程师标准>>> 查询数据指从数据库中获取所需要的数据。查询数据是数据库操作中最常用&#xff0c;也是最重要的操作。用户可以根据自己对数据的需求&#xff0c;使用不同的查询方式。通过不同的查询方式&#xff0c;可以获得不同的数据…

安装OpenCL和AMD驱动程序

我们将安装AMD OpenCL软件开发工具包&#xff08;SDK&#xff09;和AMD驱动程序。 userubuntu:~$ mkdir AMD-APP-SDK-v2.5-lnx64 userubuntu:~$ cd AMD-APP-SDK-v2.5-lnx64/ userubuntu:~$ wgethttp://developer.amd.com/Downloads/AMD-APP-SDK-v2.5-lnx64.tgz userubuntu:~$ t…

Node.js -- Stream 使用小例 ( 流运用 :读取、写入、写出、拷贝)

前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到教程。 Stream 是一个抽象接口&#xff0c;Node 中有很多对象实现了这个接口。例如&#xff0c;对http 服务器发起请求的request 对象就是一个 …

李洋疯狂C语言之有关“you are come from shanghai”逆序(二)

今天将指针和函数全部学完了&#xff0c;之前这题的做法&#xff0c;现在看来有点繁琐&#xff0c;于是乎做了一些修改&#xff0c;下面是新的代码 //you are from shanghai逆序#include <stdio.h> #include <string.h> //下面要用到strlenvoid reverse(c…

sync - 清空文件系统缓冲区

总览 (SYNOPSIS) sync [OPTION] 描述 (DESCRIPTION) 强迫把更改的块写入磁盘&#xff0c; 并更新超级块。 --help显示帮助然后终止。--version显示版本信息然后终止。 转载于:https://www.cnblogs.com/fanweisheng/p/11101219.html

学会用好 Visual Studio Code

前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到教程。 Visual Studio Code是个牛逼的编辑器&#xff0c;启动非常快&#xff0c;完全可以用来代替其他文本文件编辑工具。又可以用来做开发&…

苏嵌点滴(二)

今天把指针和函数讲完了&#xff0c;这些都还能接受&#xff0c;之后老师和我们讲了递归&#xff0c;有点难度。 晚上电脑还出了点状况&#xff0c;一个晚自习全用来重装系统和学习软件套装X_X&#xff0c;调试完已经接近下课&#xff0c;递归还没来得及看。 放学后&#xff…