Spring整合RabbitMQ-注解方式

maven导入

            <dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId><version>2.2.7.RELEASE</version></dependency>

5.2.1 消息的生产者

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.MessagePropertiesBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.support.AbstractApplicationContext;import java.nio.charset.StandardCharsets;public class ProducterApplication {public static void main(String[] args) throws Exception {AbstractApplicationContext context = new AnnotationConfigApplicationContext(RabbitConfig.class);RabbitTemplate template = context.getBean(RabbitTemplate.class);//构造消息属性对象MessageProperties msgBuild = MessagePropertiesBuilder.newInstance()//设置消息的类型为文本.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)//消息的编码方式为UTF-8.setContentEncoding(StandardCharsets.UTF_8.name())//自定义消息头信息.setHeader("test.header", "test.value").build();//对象消息进行编码操作Message msg = MessageBuilder.withBody("你好 RabbitMQ!".getBytes(StandardCharsets.UTF_8)).andProperties(msgBuild).build();template.send("ex.anno.fanout", "routing.anno", msg);context.close();}}

RabbitConfig

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.context.annotation.Bean;
import java.net.URI;@Configurable
public class RabbitConfig {/*** 连接工厂** @return*/@Beanpublic ConnectionFactory getConnectionFactory() {URI uri = URI.create("amqp://root:123456@node1:5672/%2f");ConnectionFactory factory = new CachingConnectionFactory(uri);return factory;}/*** RabbitTemplate*/@Bean@Autowiredpublic RabbitTemplate rabbitTemplate(ConnectionFactory factory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);return rabbitTemplate;}/*** RabbitAdmin*/@Bean@Autowiredpublic RabbitAdmin rabbitAdmin(ConnectionFactory factory) {RabbitAdmin admin = new RabbitAdmin(factory);return admin;}/*** Queue*/@Beanpublic Queue queue() {Queue queue = QueueBuilder.nonDurable("queue.anno")//是否排外,即是否只有当前这个连接才能看到。//.exclusive()//是否自动删除//.autoDelete().build();return queue;}/*** Exchange*/@Beanpublic Exchange exchange() {Exchange exchange = new FanoutExchange("ex.anno.fanout", false, false, null);return exchange;}/*** Binding*/@Bean@Autowiredpublic Binding binding(Queue queue, Exchange exchange) {//创建一个不指定参数的绑定Binding binding = BindingBuilder.bind(queue).to(exchange).with("routing.anno").noargs();return binding;}
}

提示:

ConnectionFactory有三个实现

CachingConnectionFactory 基于channel的缓存模式 最常用是这个。

LocalizedQueueConnectionFactory 直接连接某个节点的方式。如果是集群,此种不太适合。

SimpleRoutingConnectionFactory 在当前的连接工厂中按查找的KEY获取连接工厂。

运行消息的生产者,查看消息发送信息

[root@nullnull-os ~]# rabbitmqctl list_exchanges --formatter pretty_table
Listing exchanges for vhost / ...
┌────────────────────┬─────────┐
│ name               │ type    │
├────────────────────┼─────────┤
│ amq.fanout         │ fanout  │
├────────────────────┼─────────┤
│ ex.anno.fanout     │ fanout  │
├────────────────────┼─────────┤
│ ex.busi.topic      │ topic   │
├────────────────────┼─────────┤
│ amq.rabbitmq.trace │ topic   │
├────────────────────┼─────────┤
│ amq.headers        │ headers │
├────────────────────┼─────────┤
│ amq.topic          │ topic   │
├────────────────────┼─────────┤
│ amq.direct         │ direct  │
├────────────────────┼─────────┤
│ ex.direct          │ direct  │
├────────────────────┼─────────┤
│                    │ direct  │
├────────────────────┼─────────┤
│ ex.routing         │ direct  │
├────────────────────┼─────────┤
│ amq.match          │ headers │
└────────────────────┴─────────┘
[root@nullnull-os ~]# rabbitmqctl list_bindings --formatter pretty_table
Listing bindings for vhost /...
┌────────────────┬─────────────┬──────────────────┬──────────────────┬──────────────┬───────────┐
│ source_name    │ source_kind │ destination_name │ destination_kind │ routing_key  │ arguments │
├────────────────┼─────────────┼──────────────────┼──────────────────┼──────────────┼───────────┤
│                │ exchange    │ queue.msg        │ queue            │ queue.msg    │           │
├────────────────┼─────────────┼──────────────────┼──────────────────┼──────────────┼───────────┤
│                │ exchange    │ queue.anno       │ queue            │ queue.anno   │           │
├────────────────┼─────────────┼──────────────────┼──────────────────┼──────────────┼───────────┤
│ ex.anno.fanout │ exchange    │ queue.anno       │ queue            │ routing.anno │           │
├────────────────┼─────────────┼──────────────────┼──────────────────┼──────────────┼───────────┤
│ ex.direct      │ exchange    │ queue.msg        │ queue            │ routing.q1   │           │
└────────────────┴─────────────┴──────────────────┴──────────────────┴──────────────┴───────────┘
[root@nullnull-os ~]# rabbitmqctl list_queues --formatter pretty_table
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
┌────────────┬──────────┐
│ name       │ messages │
├────────────┼──────────┤
│ queue.msg  │ 0        │
├────────────┼──────────┤
│ queue.anno │ 1        │
└────────────┴──────────┘
[root@nullnull-os ~]# 

通过检查发现,消息已经成功的发送到了队列

5.2.2 使用拉模式获取消息

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.support.AbstractApplicationContext;public class ConsumerGetApplication {public static void main(String[] args) throws Exception {//从指定类加载配制信息AbstractApplicationContext context = new AnnotationConfigApplicationContext(RabbitConfig.class);RabbitTemplate rabbit = context.getBean(RabbitTemplate.class);Message receive = rabbit.receive("queue.anno");String encoding = receive.getMessageProperties().getContentEncoding();System.out.println("消息信息:" + new String(receive.getBody(), encoding));context.close();}}

RabbitConfig的配制

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.context.annotation.Bean;
import java.net.URI;@Configurable
public class RabbitConfig {/*** 连接工厂** @return*/@Beanpublic ConnectionFactory getConnectionFactory() {URI uri = URI.create("amqp://root:123456@node1:5672/%2f");ConnectionFactory factory = new CachingConnectionFactory(uri);return factory;}/*** RabbitTemplate*/@Bean@Autowiredpublic RabbitTemplate rabbitTemplate(ConnectionFactory factory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);return rabbitTemplate;}/*** RabbitAdmin*/@Bean@Autowiredpublic RabbitAdmin rabbitAdmin(ConnectionFactory factory) {RabbitAdmin admin = new RabbitAdmin(factory);return admin;}/*** Queue*/@Beanpublic Queue queue() {Queue queue = QueueBuilder.nonDurable("queue.anno")//是否排外,即是否只有当前这个连接才能看到。//.exclusive()//是否自动删除//.autoDelete().build();return queue;}
}

运行主程序,检查控制台的输出。

消息信息:你好 RabbitMQ!

至此使用拉模式,已经成功的获取队列中的数据。

**5.2.3 使用推模式获取数据 **

消费者处理的代码

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class MessageListener {/*** com.rabbitmq.client.Channel to get access to the Channel channel对象* org.springframework.amqp.core.Message  message对象,可以直接操作原生的AMQP消息* org.springframework.messaging.Message to use the messaging abstraction counterpart** @Payload-annotated 注解方法参数,该参数的值就是消息体。   method arguments including the support of validation* @Header-annotated 注解方法参数,访问指定的消息头字段的值。 method arguments to extract a specific header value, including standard AMQP headers defined by AmqpHeaders* @Headers-annotated 该注解的参数获取该消息的消息头的所有字段,参数集合类型对应的MAP argument that must also be assignable to java.util.Map for getting access to all headers.* MessageHeaders 参数类型,访问所有消息头字段  arguments for getting access to all headers.* MessageHeaderAccessor or AmqpMessageHeaderAccessor  访问所有消息头字段。* <p>* 消息监听*/@RabbitListener(queues = "queue.anno")public void whenMessageCome(Message msg) throws Exception {String encoding = msg.getMessageProperties().getContentEncoding();System.out.println("收到的消息:" + new String(msg.getBody(), encoding));}/**// * 使用payload进行消费// *// * 不可同时存在相同的队列被两个监听// *// * @param data// *///@RabbitListener(queues = "queue.anno")//public void whenMessageConsumer(@Payload String data) {//    System.out.println("收到的消息:" + data);//}}

此处存在两种方式,一种是接收Message作为参数,还有一种是使用@Payload接收内容作为参数

配制处理

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.amqp.core.Queue;
import java.net.URI;@EnableRabbit
//@ComponentScan("com.nullnull.learn")
@ComponentScan
@Configurable //xml中也可以使用<rabbit:annotation-driven/> 启用@RabbitListener注解
public class RabbitConfig {@Beanpublic ConnectionFactory connectionFactory() {URI uriInfo = URI.create("amqp://root:123456@node1:5672/%2f");return new CachingConnectionFactory(uriInfo);}@Bean@Autowiredpublic RabbitAdmin rabbitAdmin(ConnectionFactory factory) {return new RabbitAdmin(factory);}@Bean@Autowiredpublic RabbitTemplate rabbitTemplate(ConnectionFactory factory) {return new RabbitTemplate(factory);}@Beanpublic Queue queue() {return QueueBuilder.nonDurable("queue.anno").build();}/*** RabbitListener的容器管理对象* <p>* 使用监听器监听推送过来的消息。在一个应用中可能会有多个监听器。这些监听器是需要一个工厂管理起来的。** @return*/@Bean("rabbitListenerContainerFactory")@Autowiredpublic SimpleRabbitListenerContainerFactory containerFactory(ConnectionFactory connectFactory) {SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();//要管理容器就得有连接containerFactory.setConnectionFactory(connectFactory);containerFactory.setAcknowledgeMode(AcknowledgeMode.AUTO);//containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//containerFactory.setAcknowledgeMode(AcknowledgeMode.NONE);//设置并发的消费者,即可以同时存在10个消费都消费消息。containerFactory.setConcurrentConsumers(10);//设置并发的最大消费者。containerFactory.setMaxConcurrentConsumers(15);//按照批次处理消息消息。containerFactory.setBatchSize(10);return containerFactory;}}

启动类

import org.springframework.context.annotation.AnnotationConfigApplicationContext;public class ConsumerListenerApplication {public static void main(String[] args) {new AnnotationConfigApplicationContext(RabbitConfig.class);}}

再启动生产者

对生产者作一点改造,让其发送多条

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.MessagePropertiesBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.support.AbstractApplicationContext;import java.nio.charset.StandardCharsets;public class ProducterApplication {public static void main(String[] args) throws Exception {AbstractApplicationContext context = new AnnotationConfigApplicationContext(RabbitConfig.class);RabbitTemplate template = context.getBean(RabbitTemplate.class);MessageProperties msgBuild = MessagePropertiesBuilder.newInstance().setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN).setContentEncoding(StandardCharsets.UTF_8.name()).setHeader("test.header", "test.value").build();for (int i = 0; i < 20; i++) {Message msg = MessageBuilder.withBody(("你好 RabbitMQ! id :" + i).getBytes(StandardCharsets.UTF_8)).andProperties(msgBuild).build();template.send("ex.anno.fanout", "routing.anno", msg);}context.close();}}

客户端接收,查看控制台

收到的消息:你好 RabbitMQ! id :4
收到的消息:你好 RabbitMQ! id :9
收到的消息:你好 RabbitMQ! id :8
收到的消息:你好 RabbitMQ! id :7
收到的消息:你好 RabbitMQ! id :6
收到的消息:你好 RabbitMQ! id :2
收到的消息:你好 RabbitMQ! id :3
收到的消息:你好 RabbitMQ! id :5
收到的消息:你好 RabbitMQ! id :14
收到的消息:你好 RabbitMQ! id :17
收到的消息:你好 RabbitMQ! id :1
收到的消息:你好 RabbitMQ! id :0
收到的消息:你好 RabbitMQ! id :13
收到的消息:你好 RabbitMQ! id :15
收到的消息:你好 RabbitMQ! id :12
收到的消息:你好 RabbitMQ! id :16
收到的消息:你好 RabbitMQ! id :18
收到的消息:你好 RabbitMQ! id :19
收到的消息:你好 RabbitMQ! id :11
收到的消息:你好 RabbitMQ! id :10

通过观察发现,此处接收的顺序与并非发送的顺序进行的接收,这是因为批量以及并发的控制在这里起的作用,如果要按顺序,去接批量及并发则就是按顺序接收。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/77380.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

1-4 AUTOSAR方法论

总目录——AUTOSAR入门详解AUTOSAR入门详解目录汇总&#xff1a;待续中。。。https://xianfan.blog.csdn.net/article/details/132818463 目录 一、前言 二、方法论 三、单个ECU开发流程 一、前言 汽车生产供应链上有以下角色&#xff1a;OEM、TIER1、TIER2&#xff0c;其主…

国内外大语言模型调研(更新到2023.09.12)

目录 国外 OpenAI-ChatGPT Anthropic-Claude Google-Bard 国内 百度-文心一言 清华大学&智谱AI-ChatGLM 百川智能-百川大模型 科大讯飞-星火 阿里-通义千问 360-360智脑 腾讯-混元大模型 华为-盘古大模型 字节跳动-云雀大模型 好未来-MathGPT 商汤科技-商量…

C基础-操作符详解

操作符分类&#xff1a; 算数操作符&#xff1a; - * / % //算数操作符 // int main() // { // // /除法 1.整数除法(除号两端都是整数) 2浮点数除法&#xff0c;除号的两端只要有一个小数就执行小数除法 // // 除法中&#xff0c;除数为0 // int a 7 / 2; /…

Spring中使用了哪些设计模式

1、工厂模式 在各种BeanFactory以及ApplicationContext创建中都用到了。 2、模板模式 在各种BeanFactory以及ApplicationContext实现中也都用到了。 3、代理模式 Spring AOP 利用了AspectJ AOP实现的&#xff0c;AspectJ AOP底层使用了动态代理。 4、策略模式 加载资源文…

Java基础入门·多线程·线程池ThreadPool篇

前言 特点分析 线程池ThreadPool 销毁线程池 Executor类 ​​​​​​​ ​​​​​​​ ​​​​​​​ Callable接口 线程池使用 ​​​​​​​…

Android EditText setTranslationY导致输入法覆盖问题

平台 RK3288 Android 8.1 显示: 1920x1080 160 dpi 概述 碰到一个问题&#xff1a; 弹出的输入法会覆盖文本输入框。 原因&#xff1a;输入框使用了setTranslationY() 位置偏移后&#xff0c; 输入法无法正确获取焦点的位置。 分析 先上图: 初始布局 调用etTranslation…

抖音小程序开发教学系列(5)- 抖音小程序数据交互

第五章&#xff1a;抖音小程序数据交互 5.1 抖音小程序的网络请求5.1.1 抖音小程序的网络请求方式和API介绍5.1.2 抖音小程序的数据请求示例和错误处理方法 5.2 抖音小程序的数据缓存和本地存储5.2.1 抖音小程序的数据缓存机制和使用方法5.2.2 抖音小程序的本地存储和数据持久化…

使用工厂模式、策略模式、门面模式、单例模式、责任链模式、装饰者模式和访问者模式来实现红包雨

红包雨是一种在移动应用程序中经常出现的营销活动,它可以在特定时间内向用户投放很多红包,来吸引用户参与活动。如何在程序中实现红包雨呢?下面将介绍如何使用设计模式来实现红包雨。 首先,使用工厂模式来创建不同类型的红包对象。在工厂模式中,我们定义一个工厂类,该类…

unity 接收拼接数据进行纹理替换且保存相机纹理到rtsp server(一)

1 rtsp 协议后编码解码 rtsp协议的问题就是&#xff0c;拼接完成后&#xff0c;还需要编码&#xff0c;而unity里面再需要解码&#xff0c;需要的过程多了一步编码再解码&#xff0c;大大加重了 2 rtsp 协议后轻量编码 rtsp协议使用mjpeg进行图片传输。why&#xff1f;这样做…

MFC:程序的托盘显示

介绍 关键技术&#xff0c;API函数Shell_NotifyIcon&#xff0c;具体查看msdn吧 实现的主要代码 #define MY_TRAY_ICON_ID (1)/ //其他代码&#xff1a;略BEGIN_MESSAGE_MAP(CTestShowTrayDlg, CDialogEx)//...ON_MESSAGE(WM_MY_TRAY_ICON, &CTestShowTrayDlg::OnMessag…

vite + react + typescript + uni-app + node 开发一个生态系统

简介 使用 vite react typescript uni-app node 来开发一个简易的生态系统案例&#xff0c;包含 APP&#xff0c;H5&#xff0c;微信小程序&#xff0c;控制台&#xff0c;服务端 开发 admin 技术栈&#xff1a;vite react typescript初始化控制台项目选择自定义预设…

基于开源模型搭建实时人脸识别系统(五):人脸跟踪

继续填坑&#xff0c;之前已经讲了人脸检测&#xff0c;人脸识别实战之基于开源模型搭建实时人脸识别系统&#xff08;二&#xff09;&#xff1a;人脸检测概览与模型选型_开源人脸识别模型_CodingInCV的博客-CSDN博客&#xff0c;人脸检测是定位出画面中人脸的位置&#xff0c…

【C语言】指针的“最后一站”【进阶版】

欢迎各位看官^_^ 目录 1、字符指针 2、指针数组 3、数组指针 3.1数组指针的定义 3.2数组指针的使用 4、数组指针和指针数组的区别 5、数组参数&#xff0c;指针参数 5.1数组参数定义 5.2指针参数定义 5.3一维数组传参 5.4二维数组传参 5.5一级指针传参 5.6二级指…

《Docker 容器化的艺术:深入理解容器技术》

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f405;&#x1f43e;猫头虎建议程序员必备技术栈一览表&#x1f4d6;&#xff1a; &#x1f6e0;️ 全栈技术 Full Stack: &#x1f4da…

javaweb04-vue基础

话不多说&#xff0c;参考官网地址Vue官网集成Vue应用。 一、Vue快速入门 &#xff08;1&#xff09;新建HTML页面&#xff0c;引入Vue.js 我这里用的是CDN方式 <script src"https://unpkg.com/vue3/dist/vue.global.js"></script> &#xff08;2&am…

windows10系统下Python3.11中安装Numpy库教程

Python3.11中安装Numpy库目录 项目场景&#xff1a;问题描述解决方案&#xff1a;①下载Numpy文件②把NumPy文件放到Python安装的Scripts文件夹里。③安装numpy④安装验证 项目场景&#xff1a; numpy是开源的数值计算扩展&#xff0c;用于数据分析、机器学习、科学计算的重要…

敲代码常用快捷键

1、代码拖动 PyCharm&#xff1a;按住 shiftalt鼠标选中某一区域来拖动&#xff0c;即可实现拖动这一区域至指定区域。Visual Studio Code (VSCode): - Windows/Linux&#xff1a;Alt 鼠标左键拖动 - MacOS&#xff1a;Option 鼠标左键拖动 IntelliJ IDEA: - Win…

【Java 基础篇】Java TreeSet 详解:红黑树实现的有序集合

Java 集合框架提供了多种数据结构&#xff0c;用于存储和操作数据。其中&#xff0c;TreeSet 是一种特殊类型的集合&#xff0c;它通过红黑树&#xff08;Red-Black Tree&#xff09;数据结构实现了有序的、唯一元素存储。本篇博客将深入探讨 TreeSet&#xff0c;包括其概念、特…

小程序中使用分包

前言 小程序在未使用的分包的情况下仅支持大小为2M,如果图片等资源过多的情况下可以使用分包功能&#xff0c;使用分包的情况下单个分包大小不能超过2M,总大小不能超过20M&#xff0c;分包有两种情况&#xff1a;普通分包和独立分包&#xff0c;下面介绍的是普通分包。官方文档…

《向量数据库指南》——哪些需求推动了如Milvus Cloud等的向量数据库的更新和迭代?

这个问题需要深入讨论大模型与向量数据库之间的关系。从去年 ChatGPT 推出时这个问题就开始引发我们的思考。在当时,我们敏锐地意识到这将是一个机遇。然而,在国内,这个概念的认知需要更长的时间。我个人在去年四五月份的美国之行中注意到,数据库在美国已经是一个非常热门的…