rabbitMQ(3)

RabbitMq 交换机

文章目录

  • 1. 交换机的介绍
  • 2. 交换机的类型
  • 3. 临时队列
  • 4. 绑定 (bindings)
  • 5. 扇形交换机(Fanout ) 演示
  • 6. 直接交换机 Direct exchange
    • 6.1 多重绑定
    • 6.2 direct 代码案例
  • 7. 主题交换机
    • 7.1 Topic 匹配案例
    • 7.2 Topic 代码案例
  • 8. headers 头交换机


前言:

在之前的文章中,我们一直都没有提到交换机,只是在介绍 rabbitmq 的时候 说过一嘴交换机 , 另外在 之前文章的代码案例中,我们其实

是使用到过交换机的 ,

比如:

channel.basicPublish("",QUEUE_NAME,null,message.getBytes());


这里第一个参数就是用来知名交换机的,但是我们用 “” 来作为交换机的名,rabbitmq 就会默认使用 Direct Exchange(直连交换机) , >

既然 之前我们没有讲到过 交换机,下面我们就来学习一下交换机.

1. 交换机的介绍

RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。

相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。

在这里插入图片描述


简单一句话: 交换机是用来将 生产者生产的消息 转发到对应的队列中.


简单看完交换机的介绍,下面我们来看看 rabbitmq 中交换机的几种类型 .

2. 交换机的类型


引用:

  1. Direct Exchange(直连交换机):根据消息的 routing key 将消息路由到与之完全匹配的队列。适用于一对一的消息传递。 比如 一个队列绑定到该交换机上 要求 routing key (路由键) 为 abc ,那么只有被标记为 abc 的消息才能被转发,不会转发 abc.def , 也不会转发 aaa.bbb.ccc 只会转发 abc.

  1. Fanout Exchange(扇形交换机):将消息广播到所有与该交换机绑定的队列。适用于一对多的消息广播。扇形交换机不处理路由键,我们只需要简单的将队列绑定到交换机上,一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout 交换机转发消息是最快的。

  1. Topic Exchange(主题交换机):根据消息的 routing key 和交换机与队列绑定时的 routing pattern 进行匹配,将消息路由到满足条件的队列。支持通配符匹配,比如 符号“#” 匹配一个或多个词,符号 * 匹配不多不少一个词。因此 abc.# 能够匹配到 abc.def.ghi,但是 abc.* 只会匹配到 abc.def。

  1. Headers Exchange(头交换机):根据消息的头部属性进行匹配,并将消息路由到满足条件的队列。

    解释: 不处理路由键。而是根据发送的消息内容中的headers属性进行匹配。在绑定 Queue 与 Exchange 时指定一组键值对;当消息发送
    到RabbitMQ 时会 取到该消息的 headers 与 Exchange 绑定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers 属性是一个键值对,可以是 Hashtable,键值对的值可以是任何类型。而 fanout,direct,topic 的路由键都需要要字符串形式的

    匹配规则 x-match 有下列两种类型:

    x-match = all :表示所有的键值对都匹配才能接受到消息

    x-match = any :表示只要有键值对匹配就能接受到消息
  2. Default Exchange(默认交换机):它是一个特殊的直连交换机(类型为 direct),当没有指定交换机时,默认的交换机会将消息根据消息的 routing key 发送到同名的队列上


看完了 交换机的类型,这里先来 学习一下 临时队列 之后在演示绑定交换机 会使用到.

3. 临时队列

在 之前的文章中 ,我们每次创建的队列都是具有特定的名字 比如 hello , ack_queue , 可以说 队列的名字对我们来说是非常重要的 ,因为我们要指定 消费者 去那个队列消费消息 . 但是我们 每次都要去想名字,有时候我们愿 想名字 (取名字啥的最烦了) .为了解决这个问题,我们就可以创建一个具有随机名称的队列 , 或者让服务器为我们选择一个随机对立的名称. 另外 如果 我们 想要 在使用完队列 后 断开连接 就删除 (一次性的队列) 就可以 通过下面这种方式来创建 临时队列.

String queueName = channel.queueDeclare().getQueue();


创建出来的队列:

在这里插入图片描述


知道了如何 创建临时队列,下面我们来看看 绑定 (bindings) ,这里是 使用 交换机 最重要的 一环节 , 如果 没有绑定 交换机 就无法绑定到 队列中,交换机就无法 把 消息 转发给 队列.

4. 绑定 (bindings)

什么是 绑定 呢,绑定 其实是 exchange 和 queue 之间的桥梁,它告诉我们 交换机 和 那个队列进行了绑定关系。

比如说下面这张图告诉我们的就是 X 与 Q1 和 Q2 进行了绑定

在这里插入图片描述

关于 临时队列 和 绑定 预备知识点看完后,我们就来 使用 交换机.

5. 扇形交换机(Fanout ) 演示

扇形交换机: 将消息广播到所有与该交换机绑定的队列。适用于一对多的消息广播。

演示: 这里我们创建一个简单的日志系统来完成代码样式 .

创建一个消费者 用来 生产消息 ,创建两个消费者 ,一个消费者 将接收到的消息 显示到 控制台,另外一个将消息 存储到 本地磁盘 .


大致:

在这里插入图片描述


代码案例:


消费者:

ReceiveLogs01

package org.example.five;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;// 消费者1 --> 将消息显示到控制台
public class ReceiveLogs01 {// 交换机名字private static final String EXCHANGE_NAME = "logs";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();// 声明一个交换机 --> 扇形 fanoutchannel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 声明一个队列 , 临时队列 --> 名字随机,当消费者与临死队列断开连接后 队列自动删除String queueName = channel.queueDeclare().getQueue();// 绑定交换机与队列 --> 第三个参数为路由键 因为使用的是扇形交换机 , 所以路由键可以不写 用 ""channel.queueBind(queueName, EXCHANGE_NAME, "");// 接受消息 -->  消费者接受消息时的回调DeliverCallback deliverCallback = (tag, message) -> {System.out.println("控制台打印接收到的消息:" + new String(message.getBody(), "UTF-8"));};channel.basicConsume(queueName, true, deliverCallback, tag -> {});}
}


ReceiveLogs02

package org.example.five;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;import java.io.*;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.TimeoutException;// 消费者2 --> 将消息 写入本地
public class ReceiveLogs02 {// 交换机名字private static final String EXCHANGE_NAME = "logs";private static final String path = "E:\\java\\java_lx\\practice\\javaTest\\blog_rabbitmq\\src\\main\\java\\org\\example\\five";public static void main(String[] args) throws IOException, TimeoutException, URISyntaxException {Channel channel = RabbitMQUtils.getChannel();// 声明一个交换机 --> 扇形 fanoutchannel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 声明一个队列 , 临时队列 --> 名字随机,当消费者与临死队列断开连接后 队列自动删除String queueName = channel.queueDeclare().getQueue();// 绑定交换机与队列 --> 第三个参数为路由键 因为使用的是扇形交换机 , 所以路由键可以不写 用 ""channel.queueBind(queueName, EXCHANGE_NAME, "");// 接受消息 -->  消费者接受消息时的回调DeliverCallback deliverCallback = (tag, message) -> {// 文件操作File file = new File(path + "/test.txt");// 文件不存在创建文件if (!file.exists()) {file.createNewFile();}// 使用 Files.newOutputStream 共创创建 outputStream 对象try (OutputStream outputStream = Files.newOutputStream(file.toPath());) {outputStream.write(message.getBody());} catch (IOException e) {throw new RuntimeException("写入文件时发生错误: " + e.getMessage());}};channel.basicConsume(queueName, true, deliverCallback, tag -> {});}
}


生产者: EmitLog

package org.example.five;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;import java.io.*;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.TimeoutException;// 消费者2 --> 将消息 写入本地
public class ReceiveLogs02 {// 交换机名字private static final String EXCHANGE_NAME = "logs";private static final String path = "E:\\java\\java_lx\\practice\\javaTest\\blog_rabbitmq\\src\\main\\java\\org\\example\\five";public static void main(String[] args) throws IOException, TimeoutException, URISyntaxException {Channel channel = RabbitMQUtils.getChannel();// 声明一个交换机 --> 扇形 fanoutchannel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 声明一个队列 , 临时队列 --> 名字随机,当消费者与临死队列断开连接后 队列自动删除String queueName = channel.queueDeclare().getQueue();// 绑定交换机与队列 --> 第三个参数为路由键 因为使用的是扇形交换机 , 所以路由键可以不写 用 ""channel.queueBind(queueName, EXCHANGE_NAME, "");// 接受消息 -->  消费者接受消息时的回调DeliverCallback deliverCallback = (tag, message) -> {// 文件操作File file = new File(path + "/test.txt");// 文件不存在创建文件if (!file.exists()) {file.createNewFile();}// 使用 Files.newOutputStream 共创创建 outputStream 对象 , ture 表示 写文件不覆盖之前的内容try (OutputStream outputStream = new FileOutputStream(file, true)) {outputStream.write(message.getBody());} catch (IOException e) {throw new RuntimeException("写入文件时发生错误: " + e.getMessage());}};channel.basicConsume(queueName, true, deliverCallback, tag -> {});}
}


注意:

先启动两个消费者再启动生产者。

生产者生产消息后,如果没有对应的消费者接收,则该消息是遗弃的消息


启动看看效果:

在这里插入图片描述


可以看到 一个 生产者生产者的消息被多个消费者消费 ,到此 fanout 交换机 就 演示完成了 .

6. 直接交换机 Direct exchange

直接交换机: 根据消息的 routing key 将消息路由到与之完全匹配的队列。适用于一对一的消息传递。


在使用 Fanout 交换机 ,完成了对日志系统的构造 ,但是还不够,因为 日志 有很多 ,我们并不好 一下找到 严重错误 , 我们可以让一个消费者去操作文件存入全部日志, 让 另外一个消费者 消费严重错误 (如果是严重的错误 才发送给消费者) 将 严重错误的消息存盘 . 此时就好找错误了.


我们要实现这个功能 扇形交换机就不好完成,因为扇形交换机会将消息传播给全部绑定的队列中 ,这里我们就可以使用直接交换机 (direct)


使用 queueBind 方法 绑定路由键

channel.queueBind(queueName, EXCHANGE_NAME, "routingKey");


绑定玩路由键后 ,消息只会去 它绑定的 路由键队列中.

在这里插入图片描述


在上面这张图中,我们可以看到 X 绑定了两个队列,绑定类型是 direct。队列 Q1 绑定键为 orange, 队列 Q2 绑定键有两个:一个绑定键为 black,另一个绑定键为 green.


在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列 Q1。绑定键为 black ,green 和的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃。

6.1 多重绑定

在这里插入图片描述


当然如果 exchange 的绑定类型是direct,但是它绑定的多个队列的 key 如果都相同,在这种情况下虽然绑定类型是 direct 但是它表现的就和 fanout 有点类似了,就跟广播差不多,如上图所示。

6.2 direct 代码案例


这里我们 实现 让 生产者发送多个消息 ,多个消费者 消费不同的消息.

图:

在这里插入图片描述


C1 消费者:绑定 console 队列,routingKey 为 info、warning

C2 消费者:绑定 disk 队列,routingKey 为 error


当生产者生产消息到 direct_logs 交换机里,该交换机会检测消息的 routingKey 条件,然后分配到满足条件的队列里,最后由消费者从队列消费消息。


代码: 这里就不写文件操作了,直接将消费者拿到的消息 放到 控制到上.


生产者

package org.example.five;import com.rabbitmq.client.Channel;
import org.example.utils.RabbitMQUtils;import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;public class DirectLogs {// 交换机public static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();Scanner sc = new Scanner(System.in);while (sc.hasNext()) {String message = sc.next();// 路由键为 infochannel.basicPublish(EXCHANGE_NAME, "warning", null, message.getBytes("UTF-8"));System.out.println("生产者发出消息: " + message);}}
}


启动后 先用 路由键为 info 发送多个消息 ,然后更改 为 error 和 warning 发送消息.


消费者 c1

package org.example.five;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;// 消费者 c1 -- 消费 路由键为 info 和 warning 的消息
public class ReceiveLogsDirect01 {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();// 声明一个交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);// 声明一个队列channel.queueDeclare("console", false, false, false, null);// 绑定 --> 队列 console , 交换机 direct_logs , 路由键 infochannel.queueBind("console", EXCHANGE_NAME, "info");// 绑定 --> 队列 console , 交换机 direct_logs , 路由键 warningchannel.queueBind("console", EXCHANGE_NAME, "warning");// 接受消息DeliverCallback deliverCallback = (tag, message) -> {System.out.println("ReceiveLogsDirect01控制台打印接收到的消息:" + new String(message.getBody(), "UTF-8"));};channel.basicConsume("console", true, deliverCallback, (tag) -> {});}
}


消费者 c2

package org.example.five;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;// 消费者 c2 -- 消费 路由键为 error
public class ReceiveLogsDirect02 {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();// 声明一个交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);// 声明一个队列channel.queueDeclare("disk", false, false, false, null);// 绑定 --> 队列 console , 交换机 direct_logs , 路由键 infochannel.queueBind("disk", EXCHANGE_NAME, "error");// 接受消息DeliverCallback deliverCallback = (tag, message) -> {System.out.println("ReceiveLogsDirect01控制台打印接收到的消息:" + new String(message.getBody(), "UTF-8"));};channel.basicConsume("disk", true, deliverCallback, (tag) -> {});}
}


效果:


图一:

在这里插入图片描述


图二:

在这里插入图片描述

7. 主题交换机


主题交换机: 根据消息的 routing key 和交换机与队列绑定时的 routing pattern 进行匹配,将消息路由到满足条件的队列。支持通配符匹配,

上面我们使用 直接交换机改进了 日志记录系统 ,。我们没有使用只能进行随意广播的 fanout 交换机,而是使用了 direct 交换机,从而有能实现有选择性地接收日志。


尽管使用 direct 交换机改进了我们的系统,但是它仍然存在局限性——比方说我们想接收的日志类型有 info.base 和 info.advantage,某个队列只想 info.base 的消息,那这个时候direct 就办不到了。这个时候就只能使用 topic 类型


另外 使用 topic 需要注意:

发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表以点号分隔开。这些单词可以是任意单词

比如说:“stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit” 这种类型的。

当然这个单词列表最多不能超过 255 个字节。

在这个规则列表中,其中有两个替换符是大家需要注意的:

  • *(星号)可以代替一个位置
  • #(井号)可以替代零个或多个位置

7.1 Topic 匹配案例


绑定关系图

在这里插入图片描述

  • Q1–>绑定的是
    • 中间带 orange 带 3 个单词的字符串 (*.orange.*)
  • Q2–>绑定的是
    • 最后一个单词是 rabbit 的 3 个单词 (*.*.rabbit)
    • 第一个单词是 lazy 的多个单词 (lazy.#)


对上面 q1 和 q2 绑定的路由键 , 举几个例子

例子说明
uick.orange.rabbit被队列 Q1Q2 接收到
azy.orange.elephant被队列 Q1Q2 接收到
quick.orange.fox被队列 Q1 接收到
lazy.brown.fox被队列 Q2 接收到
lazy.pink.rabbit虽然满足两个绑定但只被队列 Q2 接收一次
quick.brown.fox不匹配任何绑定不会被任何队列接收到会被丢弃
quick.orange.male.rabbit是四个单词不匹配任何绑定会被丢弃
lazy.orange.male.rabbit是四个单词但匹配 Q2


如果一个队列绑定的路由键为 # , 此时队列可以接收到所有的数据,此时就相当于绑定了 一个 fanout (扇形) 交换机了 ,如果 一个队列 绑定的路由键 没有 # 和 * 此时就可以认为 绑定了一个 direct(直接) 交换机了.


看完这些,下面就来看看代码案例

7.2 Topic 代码案例

创建一个生产者 ,生产多个消息到交换机,交换机按照通配符分配消息到不同的队列中,队列由消费者进行消费


生产者 EmitLogTopic:

package org.example.six;import com.rabbitmq.client.Channel;
import org.example.utils.RabbitMQUtils;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;public class EmitLogTopic {// 交换机的名称public static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();/*** Q1-->绑定的是*      中间带 orange 带 3 个单词的字符串(*.orange.*)* Q2-->绑定的是*      最后一个单词是 rabbit 的 3 个单词(*.*.rabbit)*      第一个单词是 lazy 的多个单词(lazy.#)*/HashMap<String, String> bindingKeyMap = new HashMap<>();bindingKeyMap.put("quick.orange.rabbit", "被队列 Q1Q2 接收到");bindingKeyMap.put("lazy.orange.elephant", "被队列 Q1Q2 接收到");bindingKeyMap.put("quick.orange.fox", "被队列 Q1 接收到");bindingKeyMap.put("lazy.brown.fox", "被队列 Q2 接收到");bindingKeyMap.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列 Q2 接收一次");bindingKeyMap.put("quick.brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃");bindingKeyMap.put("quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃");bindingKeyMap.put("lazy.orange.male.rabbit", "是四个单词但匹配 Q2");// 遍历 map 发送消息for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {String routingKey = bindingKeyEntry.getKey();String message = bindingKeyEntry.getValue();channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));System.out.println("生产者发出消息: " + message);}}
}


消费者c1

package org.example.six;// 消费者 c1 --> 绑定的路由键为 *.orange.*import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ReceiveLogsTopic01 {// 交换机名称public static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();// 声明交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);// 声明队列String queueName = "Q1";channel.queueDeclare(queueName, false, false, false, null);channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");System.out.println("等待接受消息.....");// 接收到消息的回调DeliverCallback deliverCallback = (tag, message) -> {System.out.println(new String(message.getBody(), "UTF-8"));System.out.println("接收队列:" + queueName + "  绑定键:" + message.getEnvelope().getRoutingKey());};// 接受消息channel.basicConsume(queueName, true, deliverCallback, (message) -> {});}
}


消费者2

package org.example.six;// 消费者 c2 绑定的路由键为 *.*.rabbit  , lazy.#import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ReceiveLogsTopic02 {// 交换机名称public static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();// 声明交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);// 声明队列String queueName = "Q2";channel.queueDeclare(queueName, false, false, false, null);channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");System.out.println("c2 等待接受消息.....");// 接收到消息的回调DeliverCallback deliverCallback = (tag, message) -> {System.out.println(new String(message.getBody(), "UTF-8"));System.out.println("接收队列:" + queueName + "  绑定键:" + message.getEnvelope().getRoutingKey());};// 接受消息channel.basicConsume(queueName, true, deliverCallback, (message) -> {});}
}


效果展示:

在这里插入图片描述

8. headers 头交换机


headers 皮皮额 AMQP 消息的 headr 而不是 路由键 ,此外 headers 交换机和 direct 交换机完全一致 ,但是性能差很多 ,目前几乎 用不到 ,了解即可.


消费方指定的 headers 中必须包含一个 “x-match” 的键 。

键 “x-match” 的值有两个

  1. x-match = all :表示所有的键值对都匹配才能接收到消息
  2. x-mathc = any : 表示只有键值对匹配就能接受到消息

在这里插入图片描述


代码演示:

生产者:

package org.example.six;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import org.example.utils.RabbitMQUtils;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;public class ProducerHeaders {public static String EXCHANGE = "header_exchange";public static String QUEUE = "header_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();// 声明一个交换机channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.HEADERS, true, false, null);// 声明一个队列channel.queueDeclare(QUEUE, true, false, false, null);Map<String, Object> headerMap = new HashMap<>();headerMap.put("name", "abcdef");headerMap.put("sex", "男");AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder().headers(headerMap);// 发送消息String message = "hello_header";channel.basicPublish(EXCHANGE, "", properties.build(), message.getBytes());}}

消费者:

package org.example.six;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeoutException;public class ConsumerHeader {public static String EXCHANGE = "header_exchange";public static String QUEUE = "header_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();DeliverCallback deliverCallback = (tag, message) -> {System.out.println("接收到消息: " + new String(message.getBody()));};CancelCallback callback = (tag) -> {System.out.println("消息被中断");};Map<String, Object> headerMap = new HashMap<>();headerMap.put("x-match", "all");// 除了 all 还有 any headerMap.put("name", "abcdef");headerMap.put("sex", "男");channel.queueBind(QUEUE, EXCHANGE, "", headerMap);channel.basicConsume(QUEUE, true, deliverCallback, callback);}
}

效果:

图一:

在这里插入图片描述

图二:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/116478.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Kubernetes-进阶(Pod生命周期/调度/控制器,Ingress代理,数据存储PV/PVC)

Kubernetes-进阶 Pod详解 每个Pod中都可以包含一个或多个容器&#xff0c;这些容器可以分两类 用户程序所在容器&#xff0c;数量用户决定Pause容器&#xff0c;这是每个Pod都会有的一个根容器&#xff0c;它的作用有两个 可以以它为依据&#xff0c;评估整个Pod的健康状态可以…

itbuilder软件在线设计数据库模型,AI与数据库擦出的火花

今天要介绍一款强大的软件&#xff0c;它就是itBuilder软件&#xff0c;一款在线设计数据库模型软件&#xff0c;借助人工智能提高效率&#xff0c;可以生成CRUD代码并推送至开发工具中&#xff1b;它涵盖了几乎所有语言&#xff0c;如Java、Python、JavaScript等&#xff0c;并…

4种实用的制作URL 文件的方法

很多小伙伴有自己的博客、淘宝或者共享文件网站&#xff0c;想要分享、推广自己的网址做成url文件&#xff0c;让别人点击这个url文件直接访问自己的网站。URL文件其实就一个超级链接&#xff0c;制作的方法很多&#xff0c;这里列举4种。 收藏网站直接拖拽 1.第一种&#xf…

drf-过滤、排序、异常处理、自封装Response

过滤 过滤就是根据路由url?后的信息过滤出符合&#xff1f;后条件的数据而非全部&#xff0c;比如…/?nameweer就是只查name是weer的数据&#xff0c;其余不返回。 1、安装&#xff1a;pip3 install django-filter2、注册&#xff1a;在settings.py中的app中注册django-filt…

服务器数据恢复-某银行服务器硬盘数据恢复案例

服务器故障&分析&#xff1a; 某银行的某一业务模块崩溃&#xff0c;无法正常使用。排查服务器故障&#xff0c;发现运行该业务模块的服务器中多块硬盘离线&#xff0c;导致上层应用崩溃。 故障服务器内多块硬盘掉线&#xff0c;硬盘掉线数量超过服务器raid阵列冗余级别所允…

【T3】畅捷通T3备份账套提示:超时已过期,错误‘53‘文件不存在。

【问题描述】 针对畅捷通T3软件&#xff0c;进行账套备份&#xff08;账套输出&#xff09;的时候&#xff0c; 先是提示”超时已过期“&#xff1b; 点击确定后&#xff0c;再次提示&#xff1a;运行时错误53&#xff0c;文件未找到。 最终导致账套备份/输出失败。 【解决…

windows10下pytorch环境部署留念

pytorch环境部署留念 第一步&#xff1a;下载安装anaconda 官网地址 &#xff08;也可以到清华大学开源软件镜像站下载&#xff1a;https://mirrors.tuna.tsinghua.edu.cn/anaconda/archive/&#xff09; 我安装的是下面这个&#xff0c;一通下一步就完事儿。 第二步&#x…

Java中配置RabbitMQ基本步骤

在Java中配置RabbitMQ&#xff0c;需要遵循以下步骤&#xff1a; 1.添加依赖 在项目的pom.xml文件中添加RabbitMQ的Java客户端依赖&#xff1a; <dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><versio…

NLP入门——语言结构/语言建模

一、Linguistics 语言学 wordsmorphology 形态学&#xff1a;词的构成和内部结构研究。如英语的dog、dogs和dog-catcher有相当的关系morpheme 语素&#xff1a;最小的语法单位&#xff0c;是最小的音义结合体lexeme 词位&#xff1a;词的意义的基本抽象单位&#xff0c;是一组…

Scala入门到放弃—02—函数

文章目录 函数方法定义默认参数命名参数可变参数条件语句循环表达式 函数 方法定义 def 方法名(参数: 参数类型): 返回值类型 {//方法体//最后一行作为返回值(不需要使用return) } def max(x: Int, y: Int): Int {if(x > y)xelse y }package org.example object App {de…

2.6.C++项目:网络版五子棋对战之数据管理模块-游戏房间管理模块的设计

文章目录 一、意义二、功能三、作用四、游戏房间类基本框架五、游戏房间管理类基本框架七、游戏房间类代码八、游戏房间管理类代码 一、意义 对匹配成功的玩家创建房间&#xff0c;建立起一个小范围的玩家之间的关联关系&#xff01; 房间里一个玩家产生的动作将会广播给房间里…

基本的爬虫工作原理

爬虫是一种自动化程序&#xff0c;能够模拟人类的浏览行为&#xff0c;从网络上获取数据。爬虫的工作原理主要包括网页请求、数据解析和数据存储等几个步骤。本文将详细介绍爬虫的基本工作原理&#xff0c;帮助读者更好地理解和应用爬虫技术。 首先&#xff0c;爬虫的第一步是…

I/O模型之非阻塞IO

简介 五种IO模型   阻塞IO   非阻塞IO   信号驱动IO   IO多路转接    异步IO 代码书写 非阻塞IO 再次理解IO 什么是IO&#xff1f;什么是高效的IO&#xff1f; 为了理解后面的一个问题&#xff0c;我们首先要再重新理解一下什么是IO 在之前的网络介绍中&#xff…

CUDA学习笔记(二)CUDA简介

本篇博文转载于https://www.cnblogs.com/1024incn/tag/CUDA/&#xff0c;仅用于学习。 CUDA是并行计算的平台和类C编程模型&#xff0c;我们能很容易的实现并行算法&#xff0c;就像写C代码一样。只要配备的NVIDIA GPU&#xff0c;就可以在许多设备上运行你的并行程序&#xf…

浅谈uniapp中开发安卓原生插件

其实官方文档介绍的比较清楚而且详细,但是有时候他太墨迹,你一下子找不到自己想要的,所以我总结了一下开发的提纲,也是为了自己方便下次使用。 1.第一步,下载官方提供的Android的示例工程,然后倒入UniPlugin-Hello-AS工程请在App离线SDK中查找,之后Android studio,编译运行项目…

自编efi文件测试vmware虚拟机如何进入UEFI环境

同事突然让帮忙编一下UEFI&#xff0c;之前完全没有接触过&#xff0c;在此粗鲁记录其过程。 UEFI的开源框架是edk2&#xff0c;开发环境配置起来还是有些麻烦&#xff0c;完全按照文档编译不过&#xff0c;经人帮助总算编译通过&#xff0c;但如何测试又是问题&#xff1b;网…

【T+】畅捷通T+增加会计科目提示执行超时已过期。

【问题描述】 在畅捷通T软件中&#xff0c; 增加会计科目的时候提示&#xff1a; 通过DataTable插入ext扩展表出错:执行超时已过期。 完成操作之前已超时或服务器未响应。 操作已被用户取消。 语句已终止。 【解决方法】 【方法一】 注销用户登录&#xff0c;回到软件登录界面…

FFmpeg和rtsp服务器搭建视频直播流服务

下面使用的是ubuntu的&#xff0c;window系统可以参考&#xff1a; 通过rtsp-simple-server和ffmpeg实现录屏并发布视频直播_rtsp simple server_病毒宇宇的博客-CSDN博客 一、安装rtsp-simple-server &#xff08;1&#xff09;下载rtsp-simple-server 下载地址&#xff1a;R…

vsCode 格式化配置

学习目标&#xff1a; 基于 vsCode 配置格式化工具&#xff0c;提高&#xff08;React、Vue &#xff09;开发效率  1. vsCode 安装 prettier 插件并启用  2. 修改配置文件 setting.json setting.json 位置&#xff1a; 依次点击 替换内容&#xff1a;↓ {"git.enab…

智加科技与东风柳汽达成深度合作 自动驾驶重卡计划2024年初量产交付

&#xff08;2023年10月19日&#xff0c;苏州&#xff09;全球领先的重卡自动驾驶技术公司智加科技与东风柳汽宣布&#xff0c;双方共同开发的自动驾驶重卡H7计划2024年初实现量产交付。未来&#xff0c;双方将携手推出安全可靠、高性价比、性能卓越的自动驾驶重卡产品&#xf…