【消息队列】RabbitMQ五种消息模式

RabbitMQ

  • RabbitMQ
    • RabbitMQ安装
  • 常见的消息模型
    • 基本消息队列
    • SpringAMQP
    • WorkQueue
    • 消息预取
    • 发布订阅模式
      • Fanout Exchange
      • DirectExchange
      • TopicExchange
    • 消息转换器

RabbitMQ

RabbitMQ是基于Erlang语言开发的开源消息通信中间件
官网地址:https://www.rabbitmq.com/

RabbitMQ安装

我们在Centos虚拟机中使用Docker来安装

  1. 下载镜像,在线拉取
    docker pull rabbitmq
  2. 安装MQ
docker run\
--env RABBITMQ_DEFAULT_USER=itcast \  # 设置环境变量用户名
--env RABBITMQ_DEFAULT_PASS= \  # 设置环境变量密码
--name mq \   # 队列名称
--hostname mq1 \  #配置主机名
-p 15672:15672 \  # MQ管理端口
-p 5672:5672 \   #MQ消息传输端口
-d \   # 后台运行
rabbitmq

在这里插入图片描述
在这里插入图片描述

交换机的创建与消息的发送由虚拟主机来完成,每个用户的虚拟主机是相互隔离的

在RabbitMQ中:
channel:操作MQ的工具
exchange:路由消息到队列中
queue:缓存消息
virtual host:虚拟主机,是对queue,exchange等资源的逻辑分组

常见的消息模型

  1. 基本消息队列
  2. 工作消息队列

这两种并没有用到交换机,而是直接到达队列

  1. 发布订阅(Publish,Subscribe),根据交换机类型不同分为三种:
    Fanout Exchange:广播
    Direct Exchange:路由
    Topic Exchange:主题

基本消息队列

publisher:消息发布者,将消息发送到队列queue
queue:消息队列,负责接收并缓存消息
consumer:订阅队列,处理队列中的消息

java模型(消息发布者)

@Test
public void test() throws IOException,TimeoutException{//1.建立连接,与消息队列进行连接ConnetionFactory factory =new ConnetionFactory();//设置连接参数,主机名,端口号,vhost,用户名,密码factory.setHost(192.168.75.136);factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("itcast");factory.setPassword("");//建立连接Connection connection =factory.newConnection();//创建通道Channel,就可以向队列发送消息了Channel channel =connection.createChannel();//创建队列String queuename="hlh";channel.queueDeclare(queuename,false,false,false,null);//发送消息String message="hello";channel.basicPublish("",queuename,null,message.getBytes());//关闭通道和连接channel.close();connection.close();
}

java模型(消息消费者)

    //1.建立连接,与消息队列进行连接ConnetionFactory factory =new ConnetionFactory();//设置连接参数,主机名,端口号,vhost,用户名,密码factory.setHost(192.168.75.136);factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("itcast");factory.setPassword("");//建立连接Connection connection =factory.newConnection();//创建通道Channel,就可以向队列发送消息了Channel channel =connection.createChannel();//创建队列String queuename="hlh";channel.queueDeclare(queuename,false,false,false,null);//订阅消息channel.basicConsume(queuename,true,new DefaultConsumer(channel){@Override//处理消息的代码,绑定函数,有了消息才执行public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException{//处理消息String message=new String(body);             }})

注意:上边生产者消费者都创建了队列:

这是为了防止消息队列中的队列不存在,在进行消息队列初始化的时候不知道是先建立消费者,还是先建立生产者,所以都执行创建函数,但是创建的队列只有一个不会重复

SpringAMQP

  • AMQP

是用于在应用程序或之间传递业务消息的开放标准,该协议与语言和平台无关,更符合微服务中的独立性的要求

  • Spring AMQP

Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息,包含两部分,其中Spring-amqp是基础抽象,spring-rabbit是底层的默认实现

  • 特征:
  1. 监听器容器,用于异步处理入站消息
  2. 用于发送和接收消息的RabbitTemplate
  3. Rabbitadmin用于自动声明队列,交换和绑定
  • 使用:
  1. 引入spring-amqp的依赖
    在这里插入图片描述
    在yml中配置mq连接信息:
spring: rabbitmq:host: 192.168.75.136 #主机名port: 5672 #端口virtual-host: / #虚拟主机username: itcast #用户名password:   #密码
  1. 在生产者服务中利用RabbitTemplate发送消息到hlh.queue这个队列
public class springamqptest{@Autowiredprivate RabbitTemplate rabbittemplate;@Testpublic void test(){String queuename="hlh.queue";String message="hello";rabbittemplate.convertAndSend(queuename,message);}
}
  1. 在消费者服务端编写消费逻辑,绑定到hlh.queue这个队列中
@Component
public class SpringrabbitListener {@RabbitListener(queues="hlh.queue")public void listenSimple(String msg) throws InterruptedException{//消费逻辑代码}
}

注意:消息一旦消费就会从队列中删除,rabbitmq没有消息回溯功能

WorkQueue

Work queue,工作队列。可以提高消息处理速度,避免队列消息堆积

一个消息队列绑定多个消费者

假设现在生产者每秒循环发送50条消息,此时的消费者怎么处理:

@Component
public class SpringrabbitListener {@RabbitListener(queues="hlh.queue")public void listenSimple(String msg) throws InterruptedException{//消费逻辑代码}@RabbitListener(queues="hlh.queue")public void listenSimple2(String msg) throws InterruptedException{//消费逻辑代码}
}

通过定义多个消费者进行消费,追上生产者生产的速度,同一个消息只能被一个消费者消费,一旦消费完就会在队列中删除

消息预取

指的每个消费者每次取多少条消息:
可以通过配置进行配置:

spring:rabbitmq:host: 192.168.75.136port: 5672virtual-host: /username: itcastpassword: listener:simple:prefecth: 1 #每次只能获取一条消息,处理完才能获得下一个消息

发布订阅模式

发布订阅可以使得同一个消息发送给多个消费者,实现方式是加入了exchange(交换机)
在这里插入图片描述

注意:exchange负责消息路由,而不是存储,路由失败则消息丢失

交换机的作用:

  1. 接收生产者的消息,将消息按照规则路由到与之绑定的队列
  2. 不能缓存消息,路由失败,消息丢失
  3. FanoutExchange的会将消息路由到每个绑定的队列

SpringAMQP提过了声明交换机,队列,绑定关系的API:
在这里插入图片描述

Fanout Exchange

Fanout Exchange 会将所有的消息路由到每一个跟其绑定的queue
在创建配置类,在配置类中进行消息队列绑定交换机

@Configuration
public class FanoutConfig{// 声明FanoutExchange交换机@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("itcast.fanout");}//声明一个队列@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}// 绑定队列跟交换机@Beanpublic Binding bindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}
}

此时的生产者如何发送消息:

public void test(){//给出交换机名称String exchangeName="itcast.fanout";String message="hello";//发送消息rabbitTemplate.convertAndSend(exchangeName,"",message);
}

监听者如何收到消息

@RabbitListener(queues="fanout.queue1")
public void listener(String msg){//处理得到的消息
}

DirectExchange

Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此成为路由模式(routes)

每一个Queue都与Exchange设置一个BindingKey

发布者发送消息时,指定消息的RoutingKey,Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

一个队列可以指定多个Key

我们可以通过 @RabbitListener声明Exchange,Queue,RoutingKey
在消费者方法上注解

@RabbitListener(bindings=@QueueBinding(value=@Queue(name="direct.queue1"),exchange=@Exchange(name="itcast.direct",type=ExchangeTypes.DIRECT),key={"red","blue"}))
public void Listener(String msg){//进行消息的处理
}

在生产者生产时:

public void test(){//给出交换机名称String exchangeName="itcast.fanout";String message="hello";//发送消息rabbitTemplate.convertAndSend(exchangeName,"blue",message);
}

TopicExchange

TopicExchange与路由模式类似,区别在于routingKey必须是多个单词的列表,并且以.分隔
Queue与Exchange指定BindingKey时可以使用通配符:
#:代指0个或多个单词
*:代指一个单词
同样也是使用 @RabbitListener进行声明

@RabbitListener(bindings=@QueueBinding(value=@Queue(name="direct.queue1"),exchange=@Exchange(name="itcast.direct",type=ExchangeTypes.DIRECT),key="hi.#"))
public void Listener(String msg){//进行消息的处理
}

生产者生产消息:

public void test(){//给出交换机名称String exchangeName="itcast.fanout";String message="hello";//发送消息rabbitTemplate.convertAndSend(exchangeName,"hi.now",message);
}

消息转换器

在SpringAMQP的发送方法中,接收消息的类型是Object,也就是我们可以发送任意对象类型的消息,SpringAMQP会帮助我们序列化为字节后发送

Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的,而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化

如果要修改只需定义一个MessageConverter 类型的Bean即可,推荐使用JSON方式完成序列化

  1. 引入jackson的依赖
    在这里插入图片描述
  2. 声明MessageConverter:
@Bean
public MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();
}

这样发送的消息就会使用自定义的转换类型

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/830861.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【VUE】Vue中实现树状表格结构编辑与版本对比的详细技术实现

Vue中实现树状表格结构编辑与版本对比的详细技术实现 在Vue中,创建一个可编辑的树状表格并实施版本对比功能是一种需求较为常见的场景。在本教程中,我们将使用Vue结合Element UI的el-table组件,来构建一个树状表格,其中包含添加、…

速盾:什么是cdn架构

CDN(Content Delivery Network)即内容分发网络,是一种分布式的架构,用于提高互联网上的内容传输速度和用户体验。CDN架构通过将内容分发到全球多个节点,使用户能够从最近的节点获取内容,从而减少延迟和网络…

第15届蓝桥杯-蒟蒻の反思与总结

基本情况 第15届蓝桥杯,参加c大学A组,完整做出的只有两道填空题。 然后后面的题目基本只拿了20%这样的分数,最后两道15分题目空白。 满分100分,估计总分在15-20分这样。 对于二分答案还是没有太熟练,考试的时候没有…

深入探究C++四大关键特性:初始化列表、友元函数、内部类与static成员

目录 1. 构造函数不为人知的那些事 1.1 构造函数体赋值与初始化列表对比 1.2 explicit关键字与构造函数隐式转换 2. static成员 2.1 static成员的概念 2.2 static成员的特性与应用 2.3 小结 3. C11 成员变量初始化新用法 4. 友元 4.1 友元函数 4.2 友元类 5. 内部类…

数字孪生需要的世界模型

今天看到这篇文章,提到了世界模型,仅是数据驱动的数字孪生已经不能满足需要了,需要应用世界模型使数字孪生具备推理、判断、感知世界的能力。 由于人工智能和机器学习的兴起,使用数据驱动建模来表示复杂系统已变得普遍&#xff0…

Python 中的花卉矩阵组合

使用场景描述 (rib) 协议编写脚本的基础知识。通过创建在 3D 空间中转换的基本几何图形,解决了 xyz 坐标系的基础知识。初步渲染是使用基本着色完成的,因此可以更容易地看到几何体。RenderMan 图1 图 1 是我作为作业参考的示例图片,并尝试匹配 中的图片。为了完成这项任务…

Python | Leetcode Python题解之第61题旋转链表

题目: 题解: class Solution:def rotateRight(self, head: ListNode, k: int) -> ListNode:if k 0 or not head or not head.next:return headn 1cur headwhile cur.next:cur cur.nextn 1if (add : n - k % n) n:return headcur.next headwhi…

JS从入门到精通

1.JS概述 window.sessionStorage.setItem("flag", flag);原生JS也可以存SessionStorage 尚硅谷的视频教程: 不用在服务端,在客户端就验证了。 解释型VS编译型;事件驱动;客户端的脚本语言;脚本语言&#xff0…

机器学习的两种典型任务

机器学习中的典型任务类型可以分为分类任务(Classification)和回归任务(Regression) 分类任务 回归任务 简单的理解,分类任务是对离散值进行预测,根据每个样本的值/特征预测该样本属于类 型A、类型B 还是类…

Django后台项目开发实战四

用户可以浏览工作列表以及工作详情 第四阶段 在 jobs 文件夹下创建 templates 文件夹&#xff0c;在里面创建 base.html 网页&#xff0c;内容如下 <!-- base.html --> <div style"text-align:center;"><h1 style "margin:auto; width:50%;&…

MATLAB - 自定义惯性矩阵

系列文章目录 前言 一、关键惯性约定 Simscape 多体软件在惯性定义中采用了一系列约定。请注意这些约定&#xff0c;因为如果手动进行惯性计算&#xff0c;这些约定可能会影响计算结果。如果您的惯性数据来自 CAD 应用程序或其他第三方软件&#xff0c;这些约定还可能影响到您需…

Mac好用又好看的终端iTerm2 + oh-my-zsh

Mac好用又好看的终端iTerm2 1. iTerm2的下载安装2. oh-my-zsh的安装2.1 官网安装方式2.2 国内镜像源安装方式 3. oh-my-zsh配置3.1 存放主题的路径3.2 存放插件的路径3.3 配置文件路径 1. iTerm2的下载安装 官网下载&#xff1a; iTerm2 2. oh-my-zsh的安装 oh-my-zsh是一…

C语言 | Leetcode C语言题解之第60题排列序列

题目&#xff1a; 题解&#xff1a; char* getPermutation(int n, int k) {int factorial[n];factorial[0] 1;for (int i 1; i < n; i) {factorial[i] factorial[i - 1] * i;}--k;char* ans malloc(n 1);ans[n] \0;int valid[n 1];for (int i 0; i < n; i) {val…

飞书API(6):使用 pandas 处理数据并写入 MySQL 数据库

一、引入 上一篇了解了飞书 28 种数据类型通过接口读取到的数据结构&#xff0c;本文开始探讨如何将这些数据写入 MySQL 数据库。这个工作流的起点是从 API 获取到的一个完整的数据&#xff0c;终点是写入 MySQL 数据表&#xff0c;表结构和维格表结构类似。在过程中可以有不同…

【Leetcode每日一题】 动态规划 - 简单多状态 dp 问题 - 按摩师(难度⭐)(64)

1. 题目解析 题目链接&#xff1a;面试题 17.16. 按摩师 这个问题的理解其实相当简单&#xff0c;只需看一下示例&#xff0c;基本就能明白其含义了。 2.算法原理 一、状态定义 在解决这类动态规划问题时&#xff0c;首先我们需要明确状态的定义。对于本题&#xff0c;我们…

在mac上安装node.js及使用npm,yarn相关命令教程

1、安装node.js 官网&#xff1a;Node.js — Download Node.js 选择需要的版本&#xff0c;点击DownLoad 2、点击继续&#xff0c;直到安装成功。 2.1打开终端输入命令node -v 显示版本号则说明已安装成功 3、全局安装yarn命令 1、sudo npm install --global yarn &#xf…

Git学习笔记(五)IDEA使用Git

在前面几篇文章中&#xff0c;我们已经介绍了git的基础知识&#xff0c;知道了其主要作用是用来进行代码的版本管理&#xff1b;并且已经介绍了Git操作的常用命令。在日常的开发环境下&#xff0c;除了通过Bash命令行来操作Git之外&#xff0c;我们另外一种常用的操作方式则是直…

Redis从入门到精通

第一章> 1、Redis概述 2、Mysql的演进 3、当今企业的架构分析 4、到底什么是nosql 5、阿里巴巴数据架构演进 第二章> 6、NoSql四大分类 7、Redis概述 8、Windows安装redis…

基于STC12C5A60S2系列1T 8051单片机的Proteus中的单片机发送一帧或一串数据给串口调试助手软件接收区显示出来的串口通信应用

基于STC12C5A60S2系列1T 8051单片机的Proteus中的单片机发送一帧或一串数据给串口调试助手软件接收区显示出来的串口通信应用 STC12C5A60S2系列1T 8051单片机管脚图STC12C5A60S2系列1T 8051单片机串口通信介绍STC12C5A60S2系列1T 8051单片机串口通信的结构基于STC12C5A60S2系列…

华为鸿蒙HarmonyOS应用开发者高级认证答案

判断 1只要使用端云一体化的云端资源就需要支付费用&#xff08;错&#xff09; 2所有使用Component修饰的自定义组件都支持onPageShow&#xff0c;onBackPress和onPageHide生命周期函数。&#xff08;错&#xff09; 3 HarmonyOS应用可以兼容OpenHarmony生态&#xff08;对…