3W字!带你玩转「消息队列」

1. 消息队列解决了什么问题

消息中间件是目前比较流行的一个中间件,其中RabbitMQ更是占有一定的市场份额,主要用来做异步处理、应用解耦、流量削峰、日志处理等等方面。

1. 异步处理

一个用户登陆网址注册,然后系统发短信跟邮件告知注册成功,一般有三种解决方法。

  1. 串行到依次执行,问题是用户注册后就可以使用了,没必要等验证码跟邮件。

  2. 注册成功后,邮件跟验证码用并行等方式执行,问题是邮件跟验证码是非重要的任务,系统注册还要等这俩完成么?

  3. 基于异步MQ的处理,用户注册成功后直接把信息异步发送到MQ中,然后邮件系统跟验证码系统主动去拉取数据。

2. 应用解耦

比如我们有一个订单系统,还要一个库存系统,用户下订单了就要调用下库存系统来处理,直接调用到话库存系统出现问题咋办呢?

3. 流量削峰

举办一个 秒杀活动,如何较好到设计?服务层直接接受瞬间搞密度访问绝对不可以起码要加入一个MQ。

4. 日志处理

用户通过WebUI访问发送请求到时候后端如何接受跟处理呢一般?

2. RabbitMQ 安装跟配置

官网:https://www.rabbitmq.com/download.html

开发语言:https://www.erlang.org/

正式到安装跟允许需要Erlang跟RabbitMQ俩版本之间相互兼容!我这里图省事直接用Docker 拉取镜像了。下载:开启:管理页面 默认账号:guest  默认密码:guest 。Docker启动时候可以指定账号密码对外端口以及

docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management 

启动:用户添加:vitrual hosts 相当于mysql中的DB。创建一个virtual hosts,一般以/ 开头。对用户进行授权,点击/vhost_mmr,至于WebUI多点点即可了解。

3. 实战

RabbitMQ 官网支持任务模式:https://www.rabbitmq.com/getstarted.htm

l创建Maven项目导入必要依赖:

    <dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>4.0.2</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.10</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.5</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version></dependency></dependencies>

0. 获取MQ连接

package com.sowhat.mq.util;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ConnectionUtils {/*** 连接器* @return* @throws IOException* @throws TimeoutException*/public static Connection getConnection() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setVirtualHost("/vhost_mmr");factory.setUsername("user_mmr");factory.setPassword("sowhat");Connection connection = factory.newConnection();return connection;}
}

1. 简单队列

P:Producer 消息的生产者 中间:Queue消息队列 C:Consumer 消息的消费者

package com.sowhat.mq.simple;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send {public static final String QUEUE_NAME = "test_simple_queue";public static void main(String[] args) throws IOException, TimeoutException {// 获取一个连接Connection connection = ConnectionUtils.getConnection();// 从连接获取一个通道Channel channel = connection.createChannel();// 创建队列声明AMQP.Queue.DeclareOk declareOk = channel.queueDeclare(QUEUE_NAME, false, false, false, null);String msg = "hello Simple";// exchange,队列,参数,消息字节体channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());System.out.println("--send msg:" + msg);channel.close();connection.close();}
}
---
package com.sowhat.mq.simple;import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 消费者获取消息*/
public class Recv {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {newApi();oldApi();}private static void newApi() throws IOException, TimeoutException {// 创建连接Connection connection = ConnectionUtils.getConnection();// 创建频道Channel channel = connection.createChannel();// 队列声明  队列名,是否持久化,是否独占模式,无消息后是否自动删除,消息携带参数channel.queueDeclare(Send.QUEUE_NAME,false,false,false,null);// 定义消费者DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {@Override  // 事件模型,消息来了会触发该函数public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body, "utf-8");System.out.println("---new api recv:" + s);}};// 监听队列channel.basicConsume(Send.QUEUE_NAME,true,defaultConsumer);}// 老方法 消费者 MQ 在3。4以下 用次方法,private static void oldApi() throws IOException, TimeoutException, InterruptedException {// 创建连接Connection connection = ConnectionUtils.getConnection();// 创建频道Channel channel = connection.createChannel();// 定义队列消费者QueueingConsumer consumer = new QueueingConsumer(channel);//监听队列channel.basicConsume(Send.QUEUE_NAME, true, consumer);while (true) {// 发货体QueueingConsumer.Delivery delivery = consumer.nextDelivery();byte[] body = delivery.getBody();String s = new String(body);System.out.println("---Recv:" + s);}}
}

右上角有可以设置页面刷新频率,然后可以在UI界面直接手动消费掉,如下图:简单队列的不足:耦合性过高,生产者一一对应消费者,如果有多个消费者想消费队列中信息就无法实现了。

2. WorkQueue 工作队列

Simple队列中只能一一对应的生产消费,实际开发中生产者发消息很简单,而消费者要跟业务结合,消费者接受到消息后要处理从而会耗时。「可能会出现队列中出现消息积压」。所以如果多个消费者可以加速消费。

1. round robin 轮询分发

代码编程一个生产者两个消费者:

package com.sowhat.mq.work;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send {public static final String  QUEUE_NAME = "test_work_queue";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {// 获取连接Connection connection = ConnectionUtils.getConnection();// 获取 channelChannel channel = connection.createChannel();// 声明队列AMQP.Queue.DeclareOk declareOk = channel.queueDeclare(QUEUE_NAME, false, false, false, null);for (int i = 0; i <50 ; i++) {String msg = "hello-" + i;System.out.println("WQ send " + msg);channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());Thread.sleep(i*20);}channel.close();connection.close();}
}---
package com.sowhat.mq.work;import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv1 {public static void main(String[] args) throws IOException, TimeoutException {// 获取连接Connection connection = ConnectionUtils.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(Send.QUEUE_NAME, false, false, false, null);//定义消费者DefaultConsumer consumer = new DefaultConsumer(channel) {@Override // 事件触发机制public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body, "utf-8");System.out.println("【1】:" + s);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("【1】 done");}}};boolean autoAck = true;channel.basicConsume(Send.QUEUE_NAME, autoAck, consumer);}
}
---
package com.sowhat.mq.work;import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv2 {public static void main(String[] args) throws IOException, TimeoutException {// 获取连接Connection connection = ConnectionUtils.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(Send.QUEUE_NAME, false, false, false, null);//定义消费者DefaultConsumer consumer = new DefaultConsumer(channel) {@Override // 事件触发机制public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body, "utf-8");System.out.println("【2】:" + s);try {Thread.sleep(1000 );} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("【2】 done");}}};boolean autoAck = true;channel.basicConsume(Send.QUEUE_NAME, autoAck, consumer);}
}

现象:消费者1 跟消费者2 处理的数据量完全一样的个数:消费者1:处理偶数 消费者2:处理奇数 这种方式叫轮询分发(round-robin)结果就是不管两个消费者谁忙,「数据总是你一个我一个」,MQ 给两个消费发数据的时候是不知道消费者性能的,默认就是雨露均沾。此时 autoAck = true。

2. 公平分发 fair dipatch

如果要实现公平分发,要让消费者消费完毕一条数据后就告知MQ,再让MQ发数据即可。自动应答要关闭!

package com.sowhat.mq.work;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send {public static final String  QUEUE_NAME = "test_work_queue";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {// 获取连接Connection connection = ConnectionUtils.getConnection();// 获取 channelChannel channel = connection.createChannel();// s声明队列AMQP.Queue.DeclareOk declareOk = channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 每个消费者发送确认消息之前,消息队列不发送下一个消息到消费者,一次只发送一个消息// 从而限制一次性发送给消费者到消息不得超过1个。int perfetchCount = 1;channel.basicQos(perfetchCount);for (int i = 0; i <50 ; i++) {String msg = "hello-" + i;System.out.println("WQ send " + msg);channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());Thread.sleep(i*20);}channel.close();connection.close();}
}
---
package com.sowhat.mq.work;import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv1 {public static void main(String[] args) throws IOException, TimeoutException {// 获取连接Connection connection = ConnectionUtils.getConnection();// 获取通道final Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(Send.QUEUE_NAME, false, false, false, null);// 保证一次只分发一个channel.basicQos(1);//定义消费者DefaultConsumer consumer = new DefaultConsumer(channel) {@Override // 事件触发机制public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body, "utf-8");System.out.println("【1】:" + s);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("【1】 done");// 手动回执channel.basicAck(envelope.getDeliveryTag(),false);}}};// 自动应答boolean autoAck = false;channel.basicConsume(Send.QUEUE_NAME, autoAck, consumer);}
}
---
package com.sowhat.mq.work;import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv2 {public static void main(String[] args) throws IOException, TimeoutException {// 获取连接Connection connection = ConnectionUtils.getConnection();// 获取通道final Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(Send.QUEUE_NAME, false, false, false, null);// 保证一次只分发一个channel.basicQos(1);//定义消费者DefaultConsumer consumer = new DefaultConsumer(channel) {@Override // 事件触发机制public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body, "utf-8");System.out.println("【2】:" + s);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("【2】 done");// 手动回执channel.basicAck(envelope.getDeliveryTag(),false);}}};// 自动应答boolean autoAck = false;channel.basicConsume(Send.QUEUE_NAME, autoAck, consumer);}
}

结果:实现了公平分发,消费者2 是消费者1消费数量的2倍。

3. publish/subscribe 发布订阅模式

类似公众号的订阅跟发布,无需指定routingKey:

解读:

  1. 一个生产者多个消费者

  2. 每一个消费者都有一个自己的队列

  3. 生产者没有把消息直接发送到队列而是发送到了交换机转化器(exchange)

  4. 每一个队列都要绑定到交换机上。

  5. 生产者发送的消息经过交换机到达队列,从而实现一个消息被多个消费者消费。

生产者:

package com.sowhat.mq.ps;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send {public static final String EXCHANGE_NAME = "test_exchange_fanout";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(EXCHANGE_NAME,"fanout");// 分发= fanout// 发送消息String msg = "hello ps ";channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());System.out.println("Send:" + msg);channel.close();connection.close();}
}

消息哪儿去了?丢失了,在RabbitMQ中只有队列有存储能力,「因为这个时候队列还没有绑定到交换机 所以消息丢失了」。消费者:

package com.sowhat.mq.ps;import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv1 {public static final String  QUEUE_NAME = "test_queue_fanout_email";public static final String EXCHANGE_NAME = "test_exchange_fanout";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();final Channel channel = connection.createChannel();// 队列声明channel.queueDeclare(QUEUE_NAME,false,false,false,null);// 绑定队列到交换机转发器channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"" );// 保证一次只分发一个channel.basicQos(1);//定义消费者DefaultConsumer consumer = new DefaultConsumer(channel) {@Override // 事件触发机制public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body, "utf-8");System.out.println("【1】:" + s);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("【1】 done");// 手动回执channel.basicAck(envelope.getDeliveryTag(),false);}}};// 自动应答boolean autoAck = false;channel.basicConsume(QUEUE_NAME, autoAck, consumer);}
}
---
package com.sowhat.mq.ps;import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv2 {public static final String  QUEUE_NAME = "test_queue_fanout_sms";public static final String EXCHANGE_NAME = "test_exchange_fanout";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();final Channel channel = connection.createChannel();// 队列声明channel.queueDeclare(QUEUE_NAME,false,false,false,null);// 绑定队列到交换机转发器channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"" );// 保证一次只分发一个channel.basicQos(1);//定义消费者DefaultConsumer consumer = new DefaultConsumer(channel) {@Override // 事件触发机制public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body, "utf-8");System.out.println("【2】:" + s);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("【2】 done");// 手动回执channel.basicAck(envelope.getDeliveryTag(),false);}}};// 自动应答boolean autoAck = false;channel.basicConsume(QUEUE_NAME, autoAck, consumer);}
}

「同时还可以自己手动的添加一个队列监控到该exchange」

4. routing 路由选择 通配符模式

Exchange(交换机,转发器):「一方面接受生产者消息,另一方面是向队列推送消息」。匿名转发用 ""  表示,比如前面到简单队列跟WorkQueue。fanout:不处理路由键。「不需要指定routingKey」,我们只需要把队列绑定到交换机, 「消息就会被发送到所有到队列中」direct:处理路由键,「需要指定routingKey」,此时生产者发送数据到时候会指定key,任务队列也会指定key,只有key一样消息才会被传送到队列中。如下图

package com.sowhat.mq.routing;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send {public static final String  EXCHANGE_NAME = "test_exchange_direct";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();// exchangechannel.exchangeDeclare(EXCHANGE_NAME,"direct");String msg = "hello info!";// 可以指定类型String routingKey = "info";channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes());System.out.println("Send : " + msg);channel.close();connection.close();}
}
---
package com.sowhat.mq.routing;import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv1 {public static final String  EXCHANGE_NAME = "test_exchange_direct";public static final String QUEUE_NAME = "test_queue_direct_1";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME,false,false,false,null);channel.basicQos(1);channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");//定义消费者DefaultConsumer consumer = new DefaultConsumer(channel) {@Override // 事件触发机制public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body, "utf-8");System.out.println("【1】:" + s);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("【1】 done");// 手动回执channel.basicAck(envelope.getDeliveryTag(),false);}}};// 自动应答boolean autoAck = false;channel.basicConsume(QUEUE_NAME, autoAck, consumer);}
}
---
package com.sowhat.mq.routing;import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv2 {public static final String EXCHANGE_NAME = "test_exchange_direct";public static final String QUEUE_NAME = "test_queue_direct_2";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.basicQos(1);// 绑定种类似 Keychannel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");//定义消费者DefaultConsumer consumer = new DefaultConsumer(channel) {@Override // 事件触发机制public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body, "utf-8");System.out.println("【2】:" + s);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("【2】 done");// 手动回执channel.basicAck(envelope.getDeliveryTag(), false);}}};// 自动应答boolean autoAck = false;channel.basicConsume(QUEUE_NAME, autoAck, consumer);}
}

WebUI:缺点:路由key必须要明确,无法实现规则性模糊匹配。

5. Topics 主题

将路由键跟某个模式匹配,# 表示匹配 >=1个字符, *表示匹配一个。生产者会带routingKey,但是消费者的MQ会带模糊routingKey。商品:发布、删除、修改、查询。

package com.sowhat.mq.topic;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send {public static final String EXCHANGE_NAME = "test_exchange_topic";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();// exchangechannel.exchangeDeclare(EXCHANGE_NAME, "topic");String msg = "商品!";// 可以指定类型String routingKey = "goods.find";channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());System.out.println("Send : " + msg);channel.close();connection.close();}
}
---
package com.sowhat.mq.topic;import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv1 {public static final String  EXCHANGE_NAME = "test_exchange_topic";public static final String QUEUE_NAME = "test_queue_topic_1";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME,false,false,false,null);channel.basicQos(1);channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.add");//定义消费者DefaultConsumer consumer = new DefaultConsumer(channel) {@Override // 事件触发机制public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body, "utf-8");System.out.println("【1】:" + s);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("【1】 done");// 手动回执channel.basicAck(envelope.getDeliveryTag(),false);}}};// 自动应答boolean autoAck = false;channel.basicConsume(QUEUE_NAME, autoAck, consumer);}
}
---
package com.sowhat.mq.topic;import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv2 {public static final String EXCHANGE_NAME = "test_exchange_topic";public static final String QUEUE_NAME = "test_queue_topic_2";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.basicQos(1);// 此乃重点channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#");//定义消费者DefaultConsumer consumer = new DefaultConsumer(channel) {@Override // 事件触发机制public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body, "utf-8");System.out.println("【2】:" + s);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("【2】 done");// 手动回执channel.basicAck(envelope.getDeliveryTag(), false);}}};// 自动应答boolean autoAck = false;channel.basicConsume(QUEUE_NAME, autoAck, consumer);}
}

6. MQ的持久化跟非持久化

因为消息在内存中,如果MQ挂了那么消息也丢失了,所以应该考虑MQ的持久化。MQ是支持持久化的,

// 声明队列
channel.queueDeclare(Send.QUEUE_NAME, false, false, false, null);/*** Declare a queue* @see com.rabbitmq.client.AMQP.Queue.Declare* @see com.rabbitmq.client.AMQP.Queue.DeclareOk* @param queue the name of the queue* @param durable true if we are declaring a durable queue (the queue will survive a server restart)* @param exclusive true if we are declaring an exclusive queue (restricted to this connection)* @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)* @param arguments other properties (construction arguments) for the queue* @return a declaration-confirm method to indicate the queue was successfully declared* @throws java.io.IOException if an error is encountered*/Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;

boolean durable就是表明是否可以持久化,如果我们将程序中的durable = false改为true是不可以的!因为我们已经定义过的test_work_queue,这个queue已声明为未持久化的。结论:MQ 不允许修改一个已经存在的队列参数。

7. 消费者端手动跟自动确认消息

// 自动应答boolean autoAck = false;channel.basicConsume(Send.QUEUE_NAME, autoAck, consumer);

当MQ发送数据个消费者后,消费者要对收到对信息应答给MQ。

如果autoAck = true 表示「自动确认模式」,一旦MQ把消息分发给消费者就会把消息从内存中删除。如果消费者收到消息但是还没有消费完而MQ中数据已删除则会导致丢失了正在处理对消息。

如果autoAck = false表示「手动确认模式」,如果有个消费者挂了,MQ因为没有收到回执信息可以把该信息再发送给其他对消费者。

MQ支持消息应答(Message acknowledgement),消费者发送一个消息应答告诉MQ这个消息已经被消费了,MQ才从内存中删除。消息应答模式「默认为 false」

8. RabbitMQ生产者端消息确认机制(事务 + confirm)

在RabbitMQ中我们可以通过持久化来解决MQ服务器异常的数据丢失问题,但是「生产者如何确保数据发送到MQ了」?默认情况下生产者也是不知道的。如何解决 呢?

1. AMQP事务

第一种方式AMQP实现了事务机制,类似mysql的事务机制。txSelect:用户将当前channel设置为transition模式。txCommit:用于提交事务。txRollback:用于回滚事务。

以上都是对生产者对操作。

package com.sowhat.mq.tx;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class TxSend {public static final String QUEUE_NAME = "test_queue_tx";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);String msg = "hello tx message";try {//开启事务模式channel.txSelect();channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());int x = 1 / 0;// 提交事务channel.txCommit();} catch (IOException e) {// 回滚channel.txRollback();System.out.println("send message rollback");} finally {channel.close();connection.close();}}
}
---
package com.sowhat.mq.tx;import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class TxRecv {public static final String QUEUE_NAME = "test_queue_tx";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);String s = channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("recv[tx] msg:" + new String(body, "utf-8"));}});channel.close();connection.close();}
}

缺点就是大量对请求尝试然后失败然后回滚,会降低MQ的吞吐量。

2. Confirm模式。

「生产者端confirm实现原理」生产者将信道设置为confirm模式,一旦信道进入了confirm模式,所以该信道上发布的信息都会被派一个唯一的ID(从1开始),一旦消息被投递到所有的匹配队列后,Broker就回发送一个确认给生产者(包含消息唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息跟队列是可持久化的,那么确认消息会在消息写入到磁盘后才发出。broker回传给生产者到确认消息中deliver-tag域包含了确认消息到序列号,此外broker也可以设置basic.ack的multiple域,表示这个序列号之前所以信息都已经得到处理。

Confirm模式最大的好处在于是异步的。第一条消息发送后不用一直等待回复后才发第二条消息。

开启confirm模式:channel.confimSelect()编程模式:

1. 普通的发送一个消息后就 waitForConfirms()
package com.sowhat.confirm;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send1 {public static final String QUEUE_NAME = "test_queue_confirm1";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 将channel模式设置为 confirm模式,注意设置这个不能设置为事务模式。channel.confirmSelect();String msg = "hello confirm message";channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());if (!channel.waitForConfirms()) {System.out.println("消息发送失败");} else {System.out.println("消息发送OK");}channel.close();connection.close();}
}
---
package com.sowhat.confirm;import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv {public static final String QUEUE_NAME = "test_queue_confirm1";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);String s = channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("recv[tx] msg:" + new String(body, "utf-8"));}});}
}
2. 批量的发一批数据 waitForConfirms()
package com.sowhat.confirm;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send2 {public static final String QUEUE_NAME = "test_queue_confirm1";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 将channel模式设置为 confirm模式,注意设置这个不能设置为事务模式。channel.confirmSelect();String msg = "hello confirm message";// 批量发送for (int i = 0; i < 10; i++) {channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());}// 确认if (!channel.waitForConfirms()) {System.out.println("消息发送失败");} else {System.out.println("消息发送OK");}channel.close();connection.close();}
}
---
接受信息跟上面一样
3. 异步confirm模式,提供一个回调方法。

Channel对象提供的ConfirmListener()回调方法只包含deliveryTag(包含当前发出消息序号),我们需要自己为每一个Channel维护一个unconfirm的消息序号集合,每publish一条数据,集合中元素加1,每回调一次handleAck方法,unconfirm集合删掉响应的一条(multiple=false)或多条(multiple=true)记录,从运行效率来看,unconfirm集合最好采用有序集合SortedSet存储结构。

package com.sowhat.mq.confirm;import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;public class Send3 {public static final String QUEUE_NAME = "test_queue_confirm3";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);//生产者调用confirmSelectchannel.confirmSelect();// 存放未确认消息final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());// 添加监听通道channel.addConfirmListener(new ConfirmListener() {// 回执有问题的public void handleAck(long deliveryTag, boolean multiple) throws IOException {if (multiple) {System.out.println("--handleNack---multiple");confirmSet.headSet(deliveryTag + 1).clear();} else {System.out.println("--handleNack-- multiple false");confirmSet.remove(deliveryTag);}}// 没有问题的handleAckpublic void handleNack(long deliveryTag, boolean multiple) throws IOException {if (multiple) {System.out.println("--handleAck---multiple");confirmSet.headSet(deliveryTag + 1).clear();} else {System.out.println("--handleAck--multiple false");confirmSet.remove(deliveryTag);}}});// 一般情况下是先开启 消费者,指定好 exchange跟routingkey,如果生产者等routingkey 就会触发这个return 方法channel.addReturnListener(new ReturnListener() {public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("---- handle return----");System.out.println("replyCode:" + replyCode );System.out.println("replyText:" +replyText );System.out.println("exchange:" + exchange);System.out.println("routingKey:" + routingKey);System.out.println("properties:" + properties);System.out.println("body:" + new String(body));}});String msgStr = "sssss";while(true){long nextPublishSeqNo = channel.getNextPublishSeqNo();channel.basicPublish("",QUEUE_NAME,null,msgStr.getBytes());confirmSet.add(nextPublishSeqNo);Thread.sleep(1000);}}
}

总结:AMQP模式相对来说没Confirm模式性能好些,推荐使用后者。

9. RabbitMQ延迟队列 跟死信

淘宝订单付款,验证码等限时类型服务。

        Map<String,Object> headers =  new HashMap<String,Object>();headers.put("my1","111");headers.put("my2","222");AMQP.BasicProperties build = new AMQP.BasicProperties().builder().deliveryMode(2).contentEncoding("utf-8").expiration("10000").headers(headers).build();

死信的处理:

10. SpringBoot Tpoic Demo

需求图:新建SpringBoot 项目添加如下依赖:

       <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
1. 生产者

application.yml

spring:rabbitmq:host: 127.0.0.1username: adminpassword: admin

测试用例:

package com.sowhat.mqpublisher;import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
class MqpublisherApplicationTests {@Autowiredprivate AmqpTemplate amqpTemplate;@Testvoid userInfo() {/*** exchange,routingKey,message*/this.amqpTemplate.convertAndSend("log.topic","user.log.error","Users...");}
}
2. 消费者

application.xml

spring:rabbitmq:host: 127.0.0.1username: adminpassword: admin# 自定义配置
mq:config:exchange_name: log.topic# 配置队列名称queue_name:info: log.infoerror: log.errorlogs: log.logs

三个不同的消费者:

package com.sowhat.mqconsumer.service;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;/*** @QueueBinding value属性:用于绑定一个队列。@Queue去查找一个名字为value属性中的值得队列,如果没有则创建,如果有则返回* type = ExchangeTypes.TOPIC 指定交换器类型。默认的direct交换器*/
@Service
public class ErrorReceiverService {/*** 把一个方法跟一个队列进行绑定,收到消息后绑定给msg*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "${mq.config.queue_name.error}"),exchange = @Exchange(value = "${mq.config.exchange_name}", type = ExchangeTypes.TOPIC),key = "*.log.error"))public void process(String msg) {System.out.println(msg + " Logs...........");}
}
---
package com.sowhat.mqconsumer.service;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;/*** @QueueBinding value属性:用于绑定一个队列。* @Queue去查找一个名字为value属性中的值得队列,如果没有则创建,如果有则返回*/
@Service
public class InfoReceiverService {/*** 添加一个能够处理消息的方法*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value ="${mq.config.queue_name.info}"),exchange = @Exchange(value = "${mq.config.exchange_name}",type = ExchangeTypes.TOPIC),key = "*.log.info"))public void process(String msg){System.out.println(msg+" Info...........");}
}
--
package com.sowhat.mqconsumer.service;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;/*** @QueueBinding value属性:用于绑定一个队列。* @Queue去查找一个名字为value属性中的值得队列,如果没有则创建,如果有则返回*/
@Service
public class LogsReceiverService {/*** 添加一个能够处理消息的方法*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value ="${mq.config.queue_name.logs}"),exchange = @Exchange(value = "${mq.config.exchange_name}",type = ExchangeTypes.TOPIC),key = "*.log.*"))public void process(String msg){System.out.println(msg+" Error...........");}
}

详细安装跟代码看参考下载:

总结

如果需要指定模式一般是在消费者端设置,灵活性调节。

模式生产者Queue生产者exchange生产者routingKey消费者exchange消费者queueroutingKey
Simple(简单模式少用)指定不指定不指定不指定指定不指定
WorkQueue(多个消费者少用)指定不指定不指定不指定指定不指定
fanout(publish/subscribe模式)不指定指定不指定指定指定不指定
direct(路由模式)不指定指定指定指定指定消费者routingKey精确指定多个
topic(主题模糊匹配)不指定指定指定指定指定消费者routingKey可以进行模糊匹配


往期推荐

用好MySQL的21个好习惯!

2020-11-25

这么简单的三目运算符,竟然这么多坑?

2020-11-24

5种SpringBoot热部署方式,你用哪种?

2020-11-23

关注我,每天陪你进步一点点!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/545182.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

okhttp_utils的使用以及与服务端springboot交互中遇到的问题

okhttp_utils的使用以及与服务端springboot交互中遇到的问题1_okhttp_utils在Android studio中的引入方法2_okhttputils的使用举例3_get和post的简单使用3_图片的上传3.1_单张图片的上传3.1.1_获取安卓本地图片问题3.1.2_okhttputils上传图片代码3.1.3_服务端接收图片3.2_单张图…

算法系列之图--DFS

深度优先搜索使用的策略是&#xff0c;只要与可能就在图中尽量“深入”。DFS总是对最近才发现的结点v出发边进行探索&#xff0c;知道该结点的所有出发边都被发现为止。一旦v的所有出发边都被发现了&#xff0c;搜索就回溯到v的前驱结点&#xff08;v是经该结点才被发现的&…

这8种常见的SQL错误用法,你还在用吗?

来源 | yq.aliyun.com/articles/72501MySQL 在近几年仍然保持强劲的数据库流行度增长趋势。越来越多的客户将自己的应用建立在 MySQL 数据库之上&#xff0c;甚至是从 Oracle 迁移到 MySQL上来。但也存在部分客户在使用 MySQL 数据库的过程中遇到一些比如响应时间慢&#xff0c…

千万不要这样写代码!9种常见的OOM场景演示

《Java虚拟机规范》里规定除了程序计数器外&#xff0c;虚拟机内存的其他几个运行时区域都有发生 OutOfMemoryError 异常的可能&#xff0c;我们本文就来演示一下这些错误的使用场景。一. StackOverflowError1.1 写个 bugpublic class StackOverflowErrorDemo {public static v…

MySQL数据库安装与配置详解

目录 一、概述 二、MySQL安装 三、安装成功验证 四、NavicatforMySQL下载及使用 一、概述 MySQL版本&#xff1a;5.7.17 下载地址&#xff1a;http://rj.baidu.com/soft/detail/12585.html?ald 客户端工具&#xff1a;NavicatforMySQL 绿色版下载地址&#xff1a;http://www.c…

求求你,不要再使用!=null判空了!

对于Java程序员来说&#xff0c;null是令人头痛的东西。时常会受到空指针异常&#xff08;NPE&#xff09;的骚扰。连Java的发明者都承认这是他的一项巨大失误。那么&#xff0c;有什么办法可以避免在代码中写大量的判空语句呢&#xff1f;有人说可以使用 JDK8提供的 Optional …

JDBC(Java语言连接数据库)

JDBC&#xff08;Java语言连接数据库&#xff09;JDBC本质整体结构基层实现过程&#xff08;即用记事本而不是idea&#xff09;第一种实现方式第二种实现方式乐观锁和悲观锁乐观锁悲观锁JDBC本质 整体结构 基层实现过程&#xff08;即用记事本而不是idea&#xff09; 第一种实…

那些牛逼的数据分析师,SQL用的到底有多溜

从各大招聘网站中可以看到&#xff0c;今年招聘信息少了很多&#xff0c;但数据分析相关岗位有一定增加&#xff0c;而数据分析能力几乎已成为每个岗位的必备技能。是什么原因让企业如此重视“数据人才”&#xff1f;伴随滴滴出行、智慧营销等的落地商用&#xff0c;部分企业尝…

knn机器学习算法_K-最近邻居(KNN)算法| 机器学习

knn机器学习算法Goal: To classify a query point (with 2 features) using training data of 2 classes using KNN. 目标&#xff1a;使用KNN使用2类的训练数据对查询点(具有2个要素)进行分类。 K最近邻居(KNN) (K- Nearest Neighbor (KNN)) KNN is a basic machine learning…

Linux 指令的分类 (man page 可查看)

man page 常用按键 转载于:https://www.cnblogs.com/aoun/p/4324350.html

Springboot遇到的问题

Springboot遇到的问题1_访问4041.1_url错误1.2_controller和启动项不在同级目录1.3_未加ResponseBody2_字母后端显示大写&#xff0c;传到前端变为小写2.1_Data注释问题1_访问404 1.1_url错误 1.2_controller和启动项不在同级目录 1.3_未加ResponseBody 在方法上面加&#…

45 张图深度解析 Netty 架构与原理

作为一个学 Java 的&#xff0c;如果没有研究过 Netty&#xff0c;那么你对 Java 语言的使用和理解仅仅停留在表面水平&#xff0c;会点 SSH 写几个 MVC&#xff0c;访问数据库和缓存&#xff0c;这些只是初等 Java 程序员干的事。如果你要进阶&#xff0c;想了解 Java 服务器的…

ajax实现浏览器前进后退-location.hash与模拟iframe

为什么80%的码农都做不了架构师&#xff1f;>>> Aajx实现无数据刷新时&#xff0c;我们会遇到浏览器前进后退失效的问题以及URL不友好的问题。 实现方式有两种 1、支持onhashchange事件的&#xff0c;通过更新和读取location.hash的方式来实现 /* 因为Javascript对…

java环境变量配置以及遇到的一些问题

java环境变量配置以及遇到的一些问题1_下载2_配置环境变量2.1_配置JAVA_HOME2.2_配置CLASS_PATH2.2_配置系统路径PATH3_遇到的问题3.1_输入java -version无效3.2_javac无效1_下载 2_配置环境变量 打开我的电脑&#xff0c;右击空白处点击属性 点击高级系统设置 点击环境变量…

c fputc 函数重写_使用示例的C语言中的fputc()函数

c fputc 函数重写C中的fputc()函数 (fputc() function in C) Prototype: 原型&#xff1a; int fputc(const char ch, FILE *filename);Parameters: 参数&#xff1a; const char ch, FILE *filenameReturn type: int 返回类型&#xff1a; int Use of function: 使用功能&a…

信息系统状态过程图_操作系统中的增强型过程状态图

信息系统状态过程图The enhanced process state diagram was introduced for maintaining the degree of multiprogramming by the Operating System. The degree of multiprogramming is the maximum number of processes that can be handled by the main memory at a partic…

Java中竟有18种队列?45张图!安排

今天我们来盘点一下Java中的Queue家族&#xff0c;总共涉及到18种Queue。这篇恐怕是市面上最全最细讲解Queue的。本篇主要内容如下&#xff1a;本篇主要内容帮你总结好的阻塞队列&#xff1a;18种Queue总结一、Queue自我介绍 队列原理图1.1 Queue自我介绍hi&#xff0c;大家好&…

肯德尔相关性分析_肯德尔的Tau机器学习相关性

肯德尔相关性分析Before we begin I hope you guys have a basic understanding of Pearson’s and Spearmans correlation. As the name suggests this correlation was named after Maurice Kendall in the year 1938. 在开始之前&#xff0c;我希望你们对皮尔逊和斯皮尔曼的…

40 张图带你搞懂 TCP 和 UDP

我们本篇文章的组织脉络如下运输层位于应用层和网络层之间&#xff0c;是 OSI 分层体系中的第四层&#xff0c;同时也是网络体系结构的重要部分。运输层主要负责网络上的端到端通信。运输层为运行在不同主机上的应用程序之间的通信起着至关重要的作用。下面我们就来一起探讨一下…

腾讯推出高性能 RPC 开发框架

Tars是基于名字服务使用Tars协议的高性能RPC开发框架&#xff0c;同时配套一体化的服务治理平台&#xff0c;帮助个人或者企业快速的以微服务的方式构建自己稳定可靠的分布式应用。Tars是将腾讯内部使用的微服务架构TAF&#xff08;Total Application Framework&#xff09;多年…