简述
前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到教程。
分布式消息中间件,主要是实现分布式系统中解耦、异步消息、流量销锋、日志处理等场景。生产中用的最多的消息队列有Activemq,rabbitmq,kafka,rocketmq等。
以 Jms 规范和 rocketmq 为主来分享。版本基于 3.2.6 。
主要分享:JMS规范、Rocketmq的介绍、部署方式、特性的一些使用。
JMS规范
rocketmq虽然不完全基于jms规范,但参考了jms规范和 CORBA Notification 规范,且青出于蓝而胜于蓝。
什么是jms呢
jms其实就是类似于jdbc的一套接口规范,不同的是他是面向的消息服务,提供一套标准API接口。大部分厂商都会参考jms规范,不过 rocketmq 却没有严格遵守jms规范。
常见的jms厂商有:IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ,还有APACHE开源的ActiveMQ。京东商城采用的就是 Activemq 。
基本概念
发送者( Sender)
---- 也就是消息的生产者,创建并发送消息的 JMS 客户端。接收者( Receiver)
---- 消息消费者,接收订制消息并按相应业务逻辑进行处理,最终将结果反馈给 mq 的服务端。
- 点对点( Point-to-Point(P2P) )
点对点是一对一的关系,一个消息发出只有一个接受者所处理。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。
- 发布订阅( Publish/Subscribe(Pub/Sub) )
1、客户端将消息发送到主题。多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。
2、如果你希望发送的消息不被做任何处理、或者被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用Pub/Sub模型
- 消息队列(Queue)
一个容纳那些被发送的等待阅读的消息的区域。与队列名字所暗示的意思不同,消息的接受顺序并不一定要与消息的发送顺序相同。一旦一个消息被阅读,该消息将被从队列中移走。
- 主题(Topic)
一种支持发送消息给多个订阅者的机制。
- 发布者(Publisher)
同生产者
- 订阅者(Subscriber)
针对同一主题的多个消费者
点对点
点对点的关系图
发布订阅
发布订阅的关系图
对象模型
- (1) ConnectionFactory
创建Connection对象的工厂,针对两种不同的jms消息模型,分别有QueueConnectionFactory和TopicConnectionFactory两种(基于点对点和和发布订阅的两种方式分别创建连接工厂的)。可以通过JNDI来查找ConnectionFactory对象。
- (2) Destination
Destination 是消息生产者的消息发送目标,或者是消息消费者的消息来源。对于消息生产者来说,它的Destination是某个队列(Queue)或某个主题(Topic);对于消息消费者来说,它的Destination也是某个队列或主题(即消息来源)。所以,Destination实际上就是两种类型的对象:Queue、Topic可以通过JNDI来查找Destination。
- (3) Connection
Connection表示在客户端和JMS系统之间建立的链接(对TCP/IP socket的包装)。Connection可以产生一个或多个Session。跟ConnectionFactory一样,Connection也有两种类型:QueueConnection和TopicConnection。
- (4) Session
Session是我们操作消息的接口。可以通过session创建生产者、消费者、消息等。Session提供了事务的功能。当我们需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。同样,也分QueueSession和TopicSession。
- (5) 消息的生产者
消息生产者由Session创建,并用于将消息发送到Destination。同样,消息生产者分两种类型:QueueSender和TopicPublisher。可以调用消息生产者的方法(send或publish方法)发送消息。
- (6) 消息消费者
消息消费者由Session创建,用于接收被发送到Destination的消息。两种类型:QueueReceiver和TopicSubscriber。可分别通过session的createReceiver(Queue)或createSubscriber(Topic)来创建。当然,也可以session的creatDurableSubscriber方法来创建持久化的订阅者。
- (7) MessageListener
消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。
消息消费
在JMS中,消息的产生和消息是异步的。对于消费来说,JMS的消息者可以通过两种方式来消费消息。
○ 同步
订阅者或接收者调用receive方法来接收消息,receive方法在能够接收到消息之前(或超时之前)将一直阻塞
○ 异步
订阅者或接收者可以注册为一个消息监听器。当消息到达之后,系统自动调用监听器的 onMessage 方法。
编程实例
通过 activemq 的部分代码来简单说明一下上面说到的一些JMS规范
举个例子:
public void init(){try {//创建一个链接工厂(用户名,密码,broker的url地址)connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);//从工厂中创建一个链接connection = connectionFactory.createConnection();//开启链接connection.start();//创建一个会话session = connection.createSession(true,Session.SESSION_TRANSACTED);} catch (JMSException e) {e.printStackTrace();}}
公共部分:也就是说不管你是消息的生产者还是消息的消费者都需要这些步骤
- 首先我们需要创建一个连接工厂,当然这里我们需要输入用户性和密码还有就是broker的url
- 然后我们根据连接工厂创建了一个连接,此刻这个工厂并没有和broker简历连接
- 调用start方法就和broker建立了连接,这里我大概解释一下broker
- broker:消息队列核心,相当于一个控制中心,负责路由消息、保存订阅和连接、消息确认和控制事务,activemq可以配置多个
- 创建一个session,上面我们提到过所有的消息操作都是与session进行的
public void sendMsg(String queueName){try {//创建一个消息队列(此处也就是在创建Destination)Queue queue = session.createQueue(queueName);//消息生产者MessageProducer messageProducer = null;if(threadLocal.get()!=null){messageProducer = threadLocal.get();}else{messageProducer = session.createProducer(queue);threadLocal.set(messageProducer);}while(true){Thread.sleep(1000);int num = count.getAndIncrement();//创建一条消息TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+"productor:生产消息,count:"+num);//发送消息messageProducer.send(msg);//提交事务session.commit();}} catch (JMSException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}
生产:配置完上面的公共部分我们就迫不及待的把消息生产出来吧,我这边说的是点对点的方式
- 通过session创建一个Destination,我这边直接就用了queue了
- 接下来我们需要创建一个消息的生产者
- 我这边就循环每1s发送一条消息
- 这边看到我们的消息也是用session来创建的,这里面我们用的是文本的消息类型
- 发送消息
- 提交这次发送,至此我们的消息就发送到了broker上了,用过activemq的同学都知道,activemq提供了一个很好用的界面可以查到你的消息的状态,包括是否消费等。
消费:消费我们上面也提到了两种方式,同步和异步,我这边准备了两份代码分别说明了一下
public void doMessage(String queueName){try {//创建DestinationQueue queue = session.createQueue(queueName);MessageConsumer consumer = null;while(true){Thread.sleep(1000);TextMessage msg = (TextMessage) consumer.receive();if(msg!=null) {msg.acknowledge();System.out.println(Thread.currentThread().getName()+": Consumer:我是消费者,我正在消费Msg"+msg.getText()+"--->"+count.getAndIncrement());}else {break;}}} catch (JMSException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}
同步:可以看到消息会一直阻塞到有消息才会继续
- 通过session创建一个Destination,我这边直接就用了queue了。
- 创建了一个Consumer。
- 做了一个死循环,类似于ServerSocket的accept方法,我们的receive会阻塞到这里,直到有消息。
- 如果消息不为空告知消息消费成功。
consumer.setMessageListener(MessageListener { public void onMessage(Message msg) { try { String message = ((TextMessage) msg).getText(); if(msg != null){msg.acknowledgeSystem.out.println("成功消费消息:"+message);} } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } }
);
异步:前两部和上面是一样的,我们从第三步说起
3、注册了一个监听接口的实现,当有消息时就调用onMessage的实现,后面就一样了
RocketMQ
简介
rocketmq是阿里巴巴开源的一款分布式的消息中间件,源于jms规范,但是不遵守jms规范。rocketmq天生就是分布式的,可以说是broker、provider、consumer等各种分布式。
大概特点:
- 能够保证严格的消息顺序(需要集群的支持)
- 提供丰富的消息拉取模式(可以任意定义你的拉取方式,exmaple中也提供了一个很好的例子)
- 高效的订阅者水平扩展能力(通过一个consumerGroup的方式做到consumer的方便扩容)
- 实时的消息订阅机制(消息的实时推送,类似于上面咱们的异步消费的方式)
- 亿级消息堆积能力(轻松完成系统销锋)
选择的理由
rocketmq 的特性
- 强调集群无单点,可扩展,任意一点高可用,水平可扩展
方便集群配置,而且容易扩展(横向和纵向),通过slave的方式每一点都可以实现高可用
- 支持上万个队列,顺序消息
顺序消费是实现在同一队列的,如果高并发的情况就需要队列的支持,rocketmq可以满足上万个队列同事存在
- 任性定制你的消息过滤
rocketmq提供了两种类型的消息过滤,也可以说三种可以通过topic进行消息过滤、可以通过tag进行消息过滤、还可以通过filter的方式任意定制过滤
- 消息的可靠性(无Buffer,持久化,容错,回溯消费)
消息无buffer就不用担心buffer回满的情况,rocketmq的所有消息都是持久化的,生产者本身可以进行错误重试,发送者也会按照时间阶梯的方式进行消息重发,消息回溯说的是可以按照指定的时间进行消息的重新消费,既可以向前也可以向后(前提条件是要注意消息的擦除时间)
- 海量消息堆积能力,消息堆积后,写入低延迟
针对于provider需要配合部署方式,对于consumer,如果是集群方式一旦master返现消息堆积会向consumer下发一个重定向指令,此时consumer就可以从slave进行数据消费了
- 分布式事务
我个人感觉 rocketmq3.2.6 对这一块说的不是很清晰,而且官方也说现在这块存在缺陷(会令系统pagecache过多),所以线上建议还是少用为好,这块后面有列子。
- 消息失败重试机制
针对provider的重试,当消息发送到选定的broker时如果出现失败会自动选择其他的broker进行重发,默认重试三次,当然重试次数要在消息发送的超时时间范围内。
针对consumer的重试,如果消息因为各种原因没有消费成功,会自动加入到重试队列,一般情况如果是因为网络等问题连续重试也是照样失败,所以rocketmq也是采用阶梯重试的方式。
- 定时消费
出了上面的配置,在发送消息是也可以针对message设置setDelayTimeLevel
- 活跃的开源社区
现在rocketmq成为了apache的一款开源产品,活跃度也是不容怀疑的
- 成熟度(经过双十一考验)
针对本身的成熟度,我们看看这么多年的双十一就可想而知了
术语
- NameServer
可以理解成类似于zk的一个注册中心,而且rocketmq最初也是基于zk作为注册中心的,现在相当于为rocketmq自定义了一个注册中心,代码不超过1000行。RocketMQ 有多种配置方式可以令客户端找到 Name Server, 然后通过 Name Server 再找到 Broker,分别如下,
优先级由高到低,高优先级会覆盖低优先级。客户端提供 http 和 ip + 端口号的两种方式,推荐使用 http 的方式可以实现nameserver 的热部署
- Push Consumer
Consumer 的一种,应用通常通过 Consumer 对象注册一个 Listener 接口,一旦收到消息,Consumer 对象立刻回调 Listener 接口方法,类似于 activemq 的方式
- Pull Consume
Consumer 的一种,应用通常主动调用 Consumer 的拉消息方法从 Broker 拉消息,主动权由应用控制
- Producer Group
一类producer的集合名称,这类 producer 通常发送一类消息,且发送逻辑一致
- Consumer Group
同上,consumer的集合名称
- Broker
消息中转的角色,负责存储消息(实际的存储是调用的store组件完成的),转发消息,一般也成为 server,同于 jms 中的provider
- Message Filter
可以实现高级的自定义的消息过滤,java编写
- Master/Slave
集群的主从关系,broker 的 name 相同,brokerid=0 的为主,大于 0 的为从
部署方式
物理部署
NameServer :类似云zk的集群,主要是维护了broker的相关内容,进行存取;节点之间无任何数据同步
1、接收broker的注册,注销请求
2、Producer获取topic下所有的BrokerQueue,put消息
3、Consumer获取topic下所有的BrokerQueue,get消息
Broker :
部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应Master。Master和Slave的对应关系通过制定相同的BrokerName来确定,通过制定BrokerId来区分主从,如果是0则为Master,如果大于0则为Slave。Master也可以部署多个。每个Broker与Name Server集群中的所有节点建立长连接,定时注册Topic信息到所有的NameServer
Producer:
与Name sever集群中的其中一个节点(随意选择)建立长连接,定期的从Name Server取Topic路由信息,并向提供Topic服务的Master 建立长连接,且定时向Master发送心跳。Producer完全无状态,可以集群部署。
Consumer:
与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic的Master、Slave简历长连接,且定时向Master、Slave发送心跳,Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则有Broker配置决定。
逻辑部署
Producer Group:
用来表示一个发送消息应用,一个Producer Group下办好多个Producer实例,可是多台机器,也可以是一台机器的多个线程,或一个进程的多个Producer对象,一个Producer Group可以发送多个Topic消息,Producer Group的作用如下:
1、标识一类Producer(分布式)
2、可以通过运维工具查询这个发送消息应用有多少个Producer
3、发送分布式事务消息时,如果Producer中途意外宕机,Broker会主动回调Producer Group内的任意一台机器来确认事务状态。
Consumer Group:
表示一个消费消息应用,一个Consumer Group下包含多个Consumer实例,可以是多台机器,也可是多个进程,或者是一个进程的多个Consumer对象。一个Consumer Group下的多个Consumer以均摊方式消费消息。如果设置为广播方式,那么这个Consumer Group下的每个实例都消费全量数据。
单Master模式
只有一个 Master节点
- 优点:配置简单,方便部署
- 缺点:这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用,不建议线上环境使用
多Master模式
一个集群无 Slave,全是 Master,例如 2 个 Master 或者 3 个 Master
- 优点:配置简单,单个Master 宕机或重启维护对应用无影响,在磁盘配置为RAID10 时,即使机器宕机不可恢复情况下,由与 RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢)。性能最高。多 Master 多 Slave 模式,异步复制
- 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到受到影响
多Master多Slave模式(异步复制)
每个 Master 配置一个 Slave,有多对Master-Slave, HA,采用异步复制方式,主备有短暂消息延迟,毫秒级。
- 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,因为Master 宕机后,消费者仍然可以从 Slave消费,此过程对应用透明。不需要人工干预。性能同多 Master 模式几乎一样。
- 缺点: Master 宕机,磁盘损坏情况,会丢失少量消息。
多Master多Slave模式(同步双写)
每个 Master 配置一个 Slave,有多对Master-Slave, HA采用同步双写方式,主备都写成功,向应用返回成功。
- 优点:数据与服务都无单点, Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高
- 缺点:性能比异步复制模式略低,大约低 10%左右,发送单个消息的 RT会略高。目前主宕机后,备机不能自动切换为主机,后续会支持自动切换功能
特性使用
Quick start
Producer:
mport com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;/*** Producer,发送消息* */
public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("pay_topic_01");producer.setNamesrvAddr("100.8.8.88:9876");producer.start();for (int i = 0; i < 1000; i++) {try {Message msg = new Message("TopicTest",// topic"TagA",// tag("Hello RocketMQ " + i).getBytes()// body);SendResult sendResult = producer.send(msg);System.out.println(sendResult);}catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}producer.shutdown();}
}
1、创建一个Producer的,这里我们看到rocketmq的创建producer很简单只输入一个Group Name名字就可以。
2、第二步就是设定Name Server的地址,这里注意两点,一个就是nameserver的默认端口是9876,另一个就是多个nameserver集群用分号来分割。
3、我这边循环发送了1000个消息。
4、消息创建也很简单,第一个参数是topic,第二个就是tags(多个tag用 || 连接),第三个参宿是消息内容。
5、调用send方法就能发送成功了(不用像 actimemq, 还需要commit)。
Consumer:
import java.util.List;import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;/*** Consumer,订阅消息*/
public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");consumer.setNamesrvAddr("100.8.8.88:9876");/*** 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>* 如果非第一次启动,那么按照上次消费的位置继续消费*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("TopicTest", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("Consumer Started.");}
}
1、前两步和Producer是一样的
2、可以设置从哪个位置开始读取消息,一般从头部开始读取消息,系统中注意去重,即幂等。
3、订阅topic,第一个参数是topic名字,第二个是tag,如果为 * 的就是全部消息
4、注册一个监听,如果有消息就会实时的推送到Consumer,调用consumeMessage进行消费,这里我们看到msgs是一个List,默认每次推送的是一条消息。
5、进行消息的消费逻辑,消费成功后会返回 CONSUME_SUCCESS 状态
消息过滤
RocketMq的消息过滤是从订阅的时候开始的,我们看到刚才的例子都是通过topic的tags进行的过滤,这个要求Producer发送的时候指定tags,这个和前面有点矛盾,但是前面只是进行了分组,并未进行过滤。Consumer在订阅消费的时候指定了tags才能对消息进行过滤,这种是简单的过滤方式,不过也可以满足我们大部分的消息过滤。更高级的过滤如下:
1、前面和后面部分不变,红色框部分需要指定一个过滤类,之前这里是 tags
2、我们看到所有的过滤类都要直接或者间接实现MessageFilter接口,并且需要覆盖match方法
3、在方法里面就可以写自己的过滤逻辑了,这个地方出了用事先制定的属性也可以反序列化这些消息内容进行消息解析,针对消息体的过滤
顺序消息
一些消息需要按照顺序消费才有意义。比如: 订单创建 --> 分批 --> 打包 --> 外发 ... 必须严格按照顺序才有意义。rocketmq实现的方式也很简单,只要把这些消息都放到一个队列中就能顺序消费了。实际上rocketmq的顺序消费有两种方式:一种是普通的顺序消费(多Master多Slave的异步复制);另一种是严格的顺序消费(多Master多Slave的同步双写)。
import java.util.List;import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.MQProducer;
import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.alibaba.rocketmq.remoting.exception.RemotingException;/*** Producer,发送顺序消息*/
public class Producer {public static void main(String[] args) {try {MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.setNamesrvAddr("100.8.8.88:9876");producer.start();String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD", "TagE" };for (int i = 0; i < 100; i++) {// 订单ID相同的消息要有序int orderId = i % 10;Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes());SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;int index = id % mqs.size();return mqs.get(index);}}, orderId);System.out.println(sendResult);}producer.shutdown();}catch (MQClientException e) {e.printStackTrace();}catch (RemotingException e) {e.printStackTrace();}catch (MQBrokerException e) {e.printStackTrace();}catch (InterruptedException e) {e.printStackTrace();}}
}
1、首先要保障消息要同时在一个topic中
2、要保障要发送的消息有相同的tag
3、在发送时要保障将数据发送到同一个队列(queue),我们这里采用的取模的方式
前面说过 rocketmq 可以同时支持上万个队列,这也是为了顺序消费而考虑的
事务消息
比如有两个账户:张三、李四,张三要给李四转10块钱。以下都在同一个事务中进行,锁定是通过事务来完成的
1、锁定张三和李四的账户
2、判断张三的账户是否大于等于10块钱,如果大于等于则继续,小于则返回。(只讨论大于等于的)
3、从张三的账户上减去10块
4、向李四的账户增加10块
5、解锁账户完成交易
update account set amount = amount - 100 where userNo='zhangsan' and amount >=10
update account set amount = amount + 100 where userNo='lisi'
分布式事务就要考虑到两个用户账户的一致性,从分布式的角度来分析一下
1、锁定张三的账户,同时通过网络锁定李四的账户(可以理解成冻结金额)
2、判断张三的账户是否大于等于10块钱,如果大于等于则继续,小于则返回(只讨论大于等于的)
3、从张三的账户上减去10块
4、通过网络向李四的账户增加10块
5、解锁张三账户完成交易,通过网络解锁李四的账户,时间基本上是累计的
通过rocketmq怎么实现呢,首先要分清角色,张三为事务的发起者 = 消息的发送者,李四就是消息的消费者了。rocketmq可以理解成中间账户,默认 Consumer 都会成功,如果不成功官方推荐人工介入。
1、判断张三的账户金额大于10
2、同时张三的账户减去10
3、同时丢出一个mq消息给rocketmq,两个要确保放在一个db事务中(此时的消息只是处于prapared阶段,不会被Consumer所消费)
4、如果本地事务执行成功则向 rocketmq 发送 commit
5、如果第四部出现了本 Consumer 宕机,也就是 rocketmq 没有收到 commit,此刻消息是是未知,所以他会向任意一台Producer 来确认当前消息的状态
6、从此保障了本地账户和 rocketmq 的一致性
中控如下:
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.client.producer.TransactionCheckListener;
import com.alibaba.rocketmq.client.producer.TransactionMQProducer;
import com.alibaba.rocketmq.common.message.Message;/*** 发送事务消息例子* */
public class TransactionProducer {public static void main(String[] args) throws MQClientException, InterruptedException {TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");producer.setNamesrvAddr("100.8.8.88:9876");// 事务回查最小并发数producer.setCheckThreadPoolMinSize(2);// 事务回查最大并发数producer.setCheckThreadPoolMaxSize(2);// 队列数producer.setCheckRequestHoldMax(2000);producer.setTransactionCheckListener(transactionCheckListener);producer.start();String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD", "TagE" };TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();for (int i = 0; i < 100; i++) {try {Message msg =new Message("TopicTest", tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes());SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);System.out.println(sendResult);}catch (MQClientException e) {e.printStackTrace();}}for (int i = 0; i < 100000; i++) {Thread.sleep(1000);}producer.shutdown();}
}
本地事务:
import java.util.concurrent.atomic.AtomicInteger;import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter;
import com.alibaba.rocketmq.client.producer.LocalTransactionState;
import com.alibaba.rocketmq.common.message.Message;/*** 执行本地事务*/
public class TransactionExecuterImpl implements LocalTransactionExecuter {private AtomicInteger transactionIndex = new AtomicInteger(1);@Overridepublic LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) {int value = transactionIndex.getAndIncrement();if (value == 0) {throw new RuntimeException("Could not find db");}else if ((value % 5) == 0) {return LocalTransactionState.ROLLBACK_MESSAGE;}else if ((value % 4) == 0) {return LocalTransactionState.COMMIT_MESSAGE;}return LocalTransactionState.UNKNOW;}
}
回调检查点:
import java.util.concurrent.atomic.AtomicInteger;import com.alibaba.rocketmq.client.producer.LocalTransactionState;
import com.alibaba.rocketmq.client.producer.TransactionCheckListener;
import com.alibaba.rocketmq.common.message.MessageExt;/*** 未决事务,服务器回查客户端*/
public class TransactionCheckListenerImpl implements TransactionCheckListener {private AtomicInteger transactionIndex = new AtomicInteger(0);@Overridepublic LocalTransactionState checkLocalTransactionState(MessageExt msg) {System.out.println("server checking TrMsg " + msg.toString());int value = transactionIndex.getAndIncrement();if ((value % 6) == 0) {throw new RuntimeException("Could not find db");}else if ((value % 5) == 0) {return LocalTransactionState.ROLLBACK_MESSAGE;}else if ((value % 4) == 0) {return LocalTransactionState.COMMIT_MESSAGE;}return LocalTransactionState.UNKNOW;}
}
点对点/广播
点对点、发布订阅两种模式,在 consumer 里面配置 MessageModel 即可。
需要注意的是:如果配置了发布订阅模式,那么 Consumer 的负载均衡将不生效(Consumer Name)
//发布订阅consumer.setMessageModel(MessageModel.BROADCASTING);//集群消费(默认)//consumer.setMessageModel(MessageModel.CLUSTERING);
推送/拉取
上面都是消息推送模式,注册监听,当有消息产生时就会实时的推送到Consumer进行消费。
消息拉取方式则相当于把主动权交给了应用自己,当然这样也给消费增加了复杂性。比如说offset的存储、定时拉取等。
阿里给我们提供了一个demo(文件夹名是simple),可以参考下。
import java.util.HashMap;
import java.util.Map;
import java.util.Set;import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
import com.alibaba.rocketmq.client.consumer.PullResult;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageQueue;/*** PullConsumer,订阅消息*/
public class PullConsumer {private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();public static void main(String[] args) throws MQClientException {DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");consumer.start();Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest");for (MessageQueue mq : mqs) {System.out.println("Consume from the queue: " + mq);SINGLE_MQ: while (true) {try {PullResult pullResult =consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);System.out.println(pullResult);putMessageQueueOffset(mq, pullResult.getNextBeginOffset());switch (pullResult.getPullStatus()) {case FOUND:// TODObreak;case NO_MATCHED_MSG:break;case NO_NEW_MSG:break SINGLE_MQ;case OFFSET_ILLEGAL:break;default:break;}}catch (Exception e) {e.printStackTrace();}}}consumer.shutdown();}private static void putMessageQueueOffset(MessageQueue mq, long offset) {offseTable.put(mq, offset);}private static long getMessageQueueOffset(MessageQueue mq) {Long offset = offseTable.get(mq);if (offset != null)return offset;return 0;}}
消息回溯
根据时间来设置消费进度,设置之前要关闭这个订阅组的所有consumer,设置完再启动,方可生效。
- 回溯消费是指 Consumer 已经消费成功的消息,由于业务上需求需要重新消费,Broker 在Consumer 投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度,例如由于 Consumer 系统故障,恢复后需要重新消费 1 小时前的数据, Broker 要提供一种机制,可以按照时间维度来回退消费
- RocketMQ 支持按照时间回溯消费,时间维度精确到毫秒,可以向前回溯,也可以向后回溯
- 操作: mqadmin resetOffsetByTime
转自:https://my.oschina.net/izhangll/blog/1581254,有作部分调整。