使用RabbitMQ的SpringBoot消息传递

RabbitMQ是流行的消息代理解决方案之一,并提供可用于各种编程语言的客户端库,包括Java,Scala,.NET,Go,Python,Ruby,PHP等。在本教程中,我们将学习如何使用RabbitMQ消息代理从SpringBoot应用程序发送和接收消息。 我们还将研究如何将消息作为JSON负载发送,以及如何使用Dead Letter Queue(DLQ)处理错误。

首先,按照此处https://www.rabbitmq.com/download.html所述在本地计算机上安装RabbitMQ服务器,或者使用以下docker-compose.yml作为Docker映像运行。

version: '3'
services:rabbitmq:container_name: rabbitmqimage: 'rabbitmq:management'ports:- "5672:5672"- "15672:15672"

现在,您可以使用docker-compose启动RabbitMQ 并在http:// localhost:15672 /启动管理UI。

如果您熟悉ActiveMQ等其他消息传递代理,则通常使用队列和主题发送一对一和发布-订阅通信模型。 在RabbitMQ中,我们将邮件发送到Exchange,并根据路由密钥将邮件转发到队列。 您可以在https://www.rabbitmq.com/tutorials/amqp-concepts.html上阅读有关RabbitMQ概念的更多信息。

您可以在https://github.com/sivaprasadreddy/sivalabs-blog-samples-code/tree/master/springboot-rabbitmq-demo中找到本文的源代码

RabbitMQ的SpringBoot应用程序

现在,让我们从http://start.spring.io/选择WebThymeleafRabbitMQ启动器创建一个SpringBoot应用程序。

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.sivalabs</groupId><artifactId>springboot-rabbitmq-demo</artifactId><version>1.0-SNAPSHOT</version><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.0.0.RC1</version><relativePath/></parent><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-thymeleaf</artifactId></dependency></dependencies></project>

让我们从RabbitMQ配置开始。 创建RabbitConfig配置类,并定义QueueExchangeBinding Bean,如下所示:

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig  
{public static final String QUEUE_ORDERS = "orders-queue";public static final String EXCHANGE_ORDERS = "orders-exchange";@BeanQueue ordersQueue() {return QueueBuilder.durable(QUEUE_ORDERS).build();}@BeanQueue deadLetterQueue() {return QueueBuilder.durable(QUEUE_DEAD_ORDERS).build();}@BeanExchange ordersExchange() {return ExchangeBuilder.topicExchange(EXCHANGE_ORDERS).build();}@BeanBinding binding(Queue ordersQueue, TopicExchange ordersExchange) {return BindingBuilder.bind(ordersQueue).to(ordersExchange).with(QUEUE_ORDERS);}
}

在这里,我们声明一个名称为orders-queue的队列和一个名称为orders-exchange的Exchange。
我们还定义了orders-queue和orders-exchange之间的绑定,以便将任何以routing-key作为“ orders-queue”发送到orders-exchange的消息都发送到orders-queue。

我们可以在application.properties中配置RabbitMQ服务器的详细信息,如下所示:

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

让我们创建一个Spring bean OrderMessageSender来发送消息到orders-exchange。

Spring Boot自动配置向RabbitMQ代理发送消息或从RabbitMQ代理接收消息所需的基础结构bean。 我们可以简单地通过调用RabbitTemplate.convertAndSend(“ routingKey”,Object)方法自动连接RabbitTemplate并发送消息。

public class Order implements Serializable {private String orderNumber;private String productId;private double amount;//setters & getters
}
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class OrderMessageSender {private final RabbitTemplate rabbitTemplate;@Autowiredpublic OrderMessageSender(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;}public void sendOrder(Order order) {this.rabbitTemplate.convertAndSend(RabbitConfig.QUEUE_ORDERS, order);}
}

默认情况下,Spring Boot使用org.springframework.amqp.support.converter.SimpleMessageConverter并将对象串行化为byte []

现在有了此配置,我们可以通过调用OrderMessageSender.sendOrder(Order)方法将消息发送到RabbitMQ订单队列。

发送消息后,您可以通过使用来宾/来宾凭证登录从Administration UI应用程序中查看消息。 您可以单击“ 交易所 / 队列”选项卡以查看已创建的订单交换订单队列 。 您还可以检查订单交换的绑定,如下所示:

现在转到“队列”选项卡,然后单击“订单队列”。 向下滚动到“ 获取消息”部分,然后单击“ 获取消息”按钮,可以查看消息的内容。

现在,使用@RabbitListener创建订单队列的侦听器。

创建一个Spring bean OrderMessageListener ,如下所示:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class OrderMessageListener {static final Logger logger = LoggerFactory.getLogger(OrderMessageListener.class);@RabbitListener(queues = RabbitConfig.QUEUE_ORDERS)public void processOrder(Order order) {logger.info("Order Received: "+order);}
}

而已!! 通过简单地添加@RabbitListener并定义要监听的队列,我们​​可以创建一个Listener。

现在,如果您向Order-queue发送一条消息,该消息应该由OrderMessageListener.processOrder()方法使用,并且应该看到日志语句“ Order Received:”。

以JSON有效载荷的形式发送和接收消息

如我们所见,默认的序列化机制使用SimpleMessageConverter将消息对象转换为byte [],并在接收端将使用GenericMessageConverter将byte []反序列化为Object类型(在我们的示例中为Order)。

为了更改此行为,我们需要定制Spring Boot RabbitMQ自动配置的bean。

以JSON格式发送消息

一种将消息作为JSON有效负载发送的快速方法是使用ObjectMapper,我们可以将Order对象转换为JSON并发送。

@Autowired
private ObjectMapper objectMapper;public void sendOrder(Order order) {try {String orderJson = objectMapper.writeValueAsString(order);Message message = MessageBuilder.withBody(orderJson.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).build();this.rabbitTemplate.convertAndSend(RabbitConfig.QUEUE_ORDERS, message);} catch (JsonProcessingException e) {e.printStackTrace();}
}

但是像这样将对象转换为JSON是一种样板。 相反,我们可以采用以下方法。

我们可以配置让RabbitTemplate使用org.springframework.amqp.support.converter.Jackson2JsonMessageConverter bean,以便将消息序列化为JSON而不是byte []。

@Configuration
public class RabbitConfig 
{......@Beanpublic RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());return rabbitTemplate;}@Beanpublic Jackson2JsonMessageConverter producerJackson2MessageConverter() {return new Jackson2JsonMessageConverter();}
}

现在,当您发送一条消息时,它将转换为JSON并将其发送到Queue。

以JSON格式接收消息

为了将消息有效负载视为JSON,我们应该通过实现RabbitListenerConfigurer来定制RabbitMQ配置。

@Configuration
public class RabbitConfig implements RabbitListenerConfigurer {......@Overridepublic void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());}@BeanMessageHandlerMethodFactory messageHandlerMethodFactory() {DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory();messageHandlerMethodFactory.setMessageConverter(consumerJackson2MessageConverter());return messageHandlerMethodFactory;}@Beanpublic MappingJackson2MessageConverter consumerJackson2MessageConverter() {return new MappingJackson2MessageConverter();}
}

使用DeadLetterQueues(DLQ)处理错误和无效消息

我们可能希望将无效消息发送到单独的队列,以便以后可以检查和重新处理它们。 我们可以使用DLQ概念自动执行此操作,而无需手动编写代码来处理这种情况。

我们可以在定义Queue Bean的同时声明Queue的dead-letter-exchangedead-letter-routing-key

@Configuration
public class RabbitConfig implements RabbitListenerConfigurer {public static final String QUEUE_ORDERS = "orders-queue";public static final String EXCHANGE_ORDERS = "orders-exchange";public static final String QUEUE_DEAD_ORDERS = "dead-orders-queue";@BeanQueue ordersQueue() {return QueueBuilder.durable(QUEUE_ORDERS).withArgument("x-dead-letter-exchange", "").withArgument("x-dead-letter-routing-key", QUEUE_DEAD_ORDERS).withArgument("x-message-ttl", 15000) //if message is not consumed in 15 seconds send to DLQ.build();}@BeanQueue deadLetterQueue() {return QueueBuilder.durable(QUEUE_DEAD_ORDERS).build();}......
}

现在尝试将无效的JSON消息发送到orders-queue,它将被发送到dead-orders-queue。

您可以在https://github.com/sivaprasadreddy/sivalabs-blog-samples-code/tree/master/springboot-rabbitmq-demo中找到本文的源代码。

翻译自: https://www.javacodegeeks.com/2018/02/springboot-messaging-rabbitmq.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/348130.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

1.0jpa 2.0_在JPA 2.1中使用@Convert正确完成映射枚举

1.0jpa 2.0如果您曾经在JPA中使用过Java枚举&#xff0c;那么您肯定会意识到它们的局限性和陷阱。 使用enum作为Entity的属性通常是一个很好的选择&#xff0c;但是2.1之前的JPA不能很好地处理它们。 它给了您2 1个选择&#xff1a; 托肖夫达林 Enumerated(EnumType.ORDINAL…

Drools可执行模型还活着

总览 可执行模型的目的是提供规则集的纯基于Java的表示&#xff0c;以及方便的Java DSL以编程方式创建这种模型。 该模型是低级别的&#xff0c;旨在为用户提供所需的所有信息&#xff0c;例如用于索引评估的lambda。 这样可以使其保持快速运行&#xff0c;并避免在此级别上进行…

java ee cdi_Java EE CDI依赖注入(@Inject)教程

java ee cdi在本教程中&#xff0c;我们将向您展示如何在CDI管理的Bean中实现依赖注入。 特别是&#xff0c;我们将利用CDI API提供的Inject批注将CDI bean注入另一个bean。 通过这种方式&#xff0c;可以在应用程序&#xff08;例如JavaServer Faces应用程序&#xff09;中使用…

Java 8中的可重复注释

使用Java 8&#xff0c;您可以对声明或类型重复相同的注释。 例如&#xff0c;要注册一个类仅应由特定角色在运行时访问&#xff0c;则可以编写如下内容&#xff1a; Role("admin") Role("manager") public class AccountResource { }注意&#xff0c;现在…

java sax解析xml_在Java中使用DOM,SAX和StAX解析器解析XML

java sax解析xml我碰巧通读了有关Java中XML解析和构建API的章节。 我试用了样本XML上的其他解析器。 然后&#xff0c;我想在我的博客上分享它&#xff0c;这样我就可以得到该代码的参考以及任何阅读此代码的参考。 在本文中&#xff0c;我将在不同的解析器中解析相同的XML&…

矩形脉冲信号的_IQ信号的解调学习

前面的文章学习了IQ的调制&#xff0c;本文主要讨论IQ的解调过程。调制波形回顾IQ的shifted DQPSK的调制波形总结如下。IQ的星图映射波形每个symbol有256个采样点&#xff0c;这里可以用单位脉冲或者矩形脉冲。如果只看256个symbol的话&#xff1a;IQ经过FIR滤波之后&#xff0…

Spring Data MongoDB教程

在当今世界&#xff0c;尽快启动并运行应用程序非常重要。 该应用程序还应该易于开发和维护。 Spring是这样的框架&#xff0c;它提供了与许多不同框架的集成的简便性&#xff0c;这使得使用Spring开发应用程序变得容易。 一种这样的集成是Spring与MongoDB的集成。 1.简介 在…

块内拉升lisp_求大神告知,如何用LISP实现块数量分类统计程序,最好统计后列出表格。谢谢了。...

该楼层疑似违规已被系统折叠 隐藏此楼查看此楼这个可以统计各种块的数量(基于块名)&#xff0c;但是不能生成表格。(prompt "\n命令为: BC ,用法如下: ")(prompt "\n先选择要统计的块,然后选择要统计的区域,结束后按F2显示出统计数量.");;;***************…

arm 交叉编译找不到so_嵌入式杂谈之交叉编译

这次扯一下嵌入式开发过程中经常用到的交叉编译器&#xff0c;虽说在之前的文章也提到过这个问题&#xff0c;不过上次是着重介绍为什么使用交叉编译器(主要是为了劝服自己从单片机的思想中脱离出来&#xff0c;慢慢的接受嵌入式Linux开发的一些约定俗成的工具与方法)&#xff…

Java 10将如何改变您的编码方式

突出显示Java 10中新的Java局部变量类型推断功能 早在2016年&#xff0c;Java社区就掀起了新的JDK增强提案&#xff08;JEP&#xff09;&#xff1a; JEP 286 。 现在&#xff0c;两年后&#xff0c;局部变量类型推断可以说是Java 10中最值得注意的功能。这是Java语言开发人员…

中兴zxr10路由器重启命令_蒲公英路由器刷第三方固件(一)

蒲公英路由器是由上海贝锐信息科技股份有限公司(oray)在2015年10月14日推出的一款路由器。它是一款采用VPC[3]技术实现智能组网的路由器。2台或多台使用&#xff0c;无需公网IP&#xff0c;能将异地局域网通过蒲公英组建成一个网络。这次刷固件的教程为蒲公英路由器x3/x3pro刷第…

还有更多REST技巧

在以前的博客文章中&#xff0c;我介绍了一些实现REST体系结构的想法和技巧。 在这篇文章中&#xff0c;我将介绍更多的想法和技巧。 快取 缓存是原始论文的很大一部分。 见5.1.4节 策略包括验证&#xff08; 客户端检查它是否具有最新版本 &#xff09;和到期&#xff08; 客…

javaml_一些基于Java的AI框架:Encog,JavaML,Weka

javaml在进行编程收集情报工作时&#xff0c;我发现自己花了很多时间将Python代码转换为Java&#xff0c;通常对我的进度缓慢感到不耐烦&#xff0c;所以我一直在寻找替代方法。 我发现3&#xff1a; Encog – Heaton研究 Java语言 威卡 这绝不是深入的研究&#xff0c;我…

Spring Cloud Netflix尤里卡

本教程是关于Spring云Netflix Eureka的。 在这里&#xff0c;我们将创建eureka发现服务器和微服务&#xff0c;这些服务本身将注册到发现服务器和使用netflix客户端API的客户端中&#xff0c;以使用示例示例来发现服务并使用该服务公开的微服务。因此&#xff0c;我们将开发每个…

掘进循环作业图表_Word版本。煤矿安全生产标准化评分方法(2020)8.4 掘进

注&#xff1a;以下内容来自于网络,若有侵权请联系QQ3609400292进行删除。煤矿安全生产标准化管理体系基本要求及评分方法(试行)8.4 掘 进一、工作要求(风险管控)1.生产组织(1)煤巷、半煤岩巷宜采用综合机械化掘进&#xff0c;综合机械化程度不低于50%&#xff0c;并持续提…

vc++调用jni_通过JNI使用C ++尖叫快速进行Lucene搜索

vc调用jni最终&#xff0c;当Lucene执行查询时&#xff0c;在初始设置后&#xff0c;真正的热点通常是相当基本的代码&#xff0c;该代码对整数docID&#xff0c;术语频率和位置的顺序块进行解码&#xff0c;并将它们匹配&#xff08;例如&#xff0c;对BooleanQuery并集或交集…

hide show vue 动画_jQuery动画效果

-------------------------------------------------------------------------------------------------------1. 隐藏和显示show()方法和hide()方法是jQuery中最基本的动画方法。在HTML文档里&#xff0c;为一个元素调用hide()方法&#xff0c;会将该元素的display样式改为“n…

使用Java的Selenium:Google搜索

1.概述 在本教程中&#xff0c;我们将探讨如何将Selenium与Java结合使用。 我们将使用Selenium打开Goog​​le&#xff0c;进行搜索&#xff0c;然后单击URL。 该代码在Github上可用。 2.什么是硒&#xff1f; Selenium使Web浏览器自动化。 就是这样 Selenium使我们能够模拟…

润乾报表分组求和_一招搞定各种报表合计需求

一、常用合计方案在有专业报表工具之前&#xff0c;常用的实现方案有&#xff1a;1&#xff09; SQL“select sum(字段) from 表 group by 字段 order by 字段“可以实现简单的分组数据合计、条件合计&#xff0c;这种方式有一个很明显的缺点&#xff0c;就是随着合计需求复杂度…

Spring Security 6.x 系列(12)—— Form表单认证登录注销自定义配置

一、前言 在本系列文章中介绍了 Form 表单认证和注销流程&#xff0c;对部分源码也进行详细分析。 本章主要学习 Spring Security 中表单认证登录注销的相关自定义配置。 二、自定义登录页面 Spring Security 表单认证默认规则中对未认证的请求会重定向到默认登录页面&…