Spring JMS:处理事务中的消息

1.引言

这篇文章将向您展示在使用JMS异步接收消息期间,使用者执行过程中的错误如何导致消息丢失。 然后,我将解释如何使用本地事务解决此问题。

您还将看到这种解决方案在某些情况下可能导致消息重复(例如,当它将消息保存到数据库中,然后侦听器执行失败时)。 发生这种情况的原因是因为JMS事务独立于其他事务资源(如DB)。 如果您的处理不是幂等的,或者您的应用程序不支持重复消息检测,那么您将不得不使用分布式事务。

分布式事务超出了此职位的范围。 如果您对处理分布式事务感兴趣,可以阅读这篇有趣的文章。

我已经实现了一个再现以下情况的测试应用程序:

  1. 发送和接收消息:使用者将处理收到的消息,并将其存储到数据库中。

    生产者将消息发送到队列:

    send1

    使用者从队列中检索消息并进行处理:

    发送2

  2. 消息处理之前发生错误:使用者检索消息,但是在将消息存储到DB之前执行失败。

    发送3

  3. 处理消息后发生错误:使用者检索消息,将其存储到DB,然后执行失败。

    发送4

  • 该应用程序的源代码可以在github上找到。

2.测试应用

测试应用程序执行两个测试类TestNotTransactedMessagingTestTransactedMessaging 。 这些类都将执行上述三种情况。

让我们看看在没有事务的情况下执行应用程序时的配置。

app-config.xml

应用程序配置。 基本上,它会在指定的软件包内进行检查以自动检测应用Bean:生产者和使用者。 它还配置了将在其中存储处理后的通知的内存数据库。

<context:component-scan base-package="xpadro.spring.jms.producer, xpadro.spring.jms.receiver"/><bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"><constructor-arg ref="dataSource"/>
</bean><jdbc:embedded-database id="dataSource"><jdbc:script location="classpath:db/schema.sql" />
</jdbc:embedded-database>

notx-jms-config.xml

配置JMS基础结构,该基础结构是:

  • 经纪人联系
  • JmsTemplate
  • 将通知发送到的队列
  • 侦听器容器,它将发送通知给侦听器以处理它们
<!-- Infrastructure -->
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="vm://embedded?broker.persistent=false"/>
</bean><bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"><property name="targetConnectionFactory" ref="connectionFactory"/>
</bean><bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><property name="connectionFactory" ref="cachingConnectionFactory"/><property name="defaultDestination" ref="incomingQueue"/>
</bean><!-- Destinations -->
<bean id="incomingQueue" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg value="incoming.queue"/>
</bean><!-- Listeners -->
<jms:listener-container connection-factory="connectionFactory"><jms:listener ref="notificationProcessor" destination="incoming.queue"/>
</jms:listener-container>

生产者仅使用jmsTemplate发送通知。

@Component("producer")
public class Producer {private static Logger logger = LoggerFactory.getLogger(Producer.class);@Autowiredprivate JmsTemplate jmsTemplate;public void convertAndSendMessage(String destination, Notification notification) {jmsTemplate.convertAndSend(destination, notification);logger.info("Sending notification | Id: "+notification.getId());}
}

侦听器负责从队列中检索通知,并将其存储到数据库中。

@Component("notificationProcessor")
public class NotificationProcessor implements MessageListener {private static Logger logger = LoggerFactory.getLogger(NotificationProcessor.class);@Autowiredprivate JdbcTemplate jdbcTemplate;@Overridepublic void onMessage(Message message) {try {Notification notification = (Notification) ((ObjectMessage) message).getObject();logger.info("Received notification | Id: "+notification.getId()+" | Redelivery: "+getDeliveryNumber(message));checkPreprocessException(notification);saveToBD(notification);checkPostprocessException(message, notification);} catch (JMSException e) {throw JmsUtils.convertJmsAccessException(e);}}	...
}

当id = 1的通知到达时, checkPreprocessException方法将抛出运行时异常。 这样,在将消息存储到数据库之前,我们将导致错误。

如果到达id = 2的通知, checkPostprocessException方法将引发异常,从而在将其存储到数据库之后立即引起错误。

getDeliveryNumber方法返回发送消息的次数。 这仅适用于事务,因为在侦听器处理失败导致回滚之后,代理将尝试重新发送消息。

最后, saveToDB方法非常明显。 它将通知存储到数据库。

您始终可以通过单击本文开头的链接来检查此应用程序的源代码。

3.测试没有交易的消息接收

我将启动两个测试类,一个不包含事务,另一个在本地事务中。 这两个类都扩展了一个基类,该基类加载了常见的应用程序上下文并包含一些实用程序方法:

@ContextConfiguration(locations = {"/xpadro/spring/jms/config/app-config.xml"})
@DirtiesContext
public class TestBaseMessaging {protected static final String QUEUE_INCOMING = "incoming.queue";protected static final String QUEUE_DLQ = "ActiveMQ.DLQ";@Autowiredprotected JdbcTemplate jdbcTemplate;@Autowiredprotected JmsTemplate jmsTemplate;@Autowiredprotected Producer producer;@Beforepublic void prepareTest() {jdbcTemplate.update("delete from Notifications");}protected int getSavedNotifications() {return jdbcTemplate.queryForObject("select count(*) from Notifications", Integer.class);}protected int getMessagesInQueue(String queueName) {return jmsTemplate.browse(queueName, new BrowserCallback<Integer>() {@Overridepublic Integer doInJms(Session session, QueueBrowser browser) throws JMSException {Enumeration<?> messages = browser.getEnumeration();int total = 0;while (messages.hasMoreElements()) {messages.nextElement();total++;}return total;}});}
}

实用方法说明如下:

  • getSavedNotifications :返回存储到数据库的通知数。 我使用了queryForObject方法,因为自版本3.2.2开始建议使用该方法。 queryForInt方法已被弃用。
  • getMessagesInQueue :允许您检查指定队列中哪些消息仍在等待处理。 对于此测试,我们有兴趣知道仍有多少通知等待处理。

现在,让我向您展示第一个测试的代码( TestNotTransactedMessaging )。 此测试启动本文开头指示的3种情况。

@Test
public void testCorrectMessage() throws InterruptedException {Notification notification = new Notification(0, "notification to deliver correctly");producer.convertAndSendMessage(QUEUE_INCOMING, notification);Thread.sleep(6000);printResults();assertEquals(1, getSavedNotifications());assertEquals(0, getMessagesInQueue(QUEUE_INCOMING));
}@Test
public void testFailedAfterReceiveMessage() throws InterruptedException {Notification notification = new Notification(1, "notification to fail after receiving");producer.convertAndSendMessage(QUEUE_INCOMING, notification);Thread.sleep(6000);printResults();assertEquals(0, getSavedNotifications());assertEquals(0, getMessagesInQueue(QUEUE_INCOMING));
}@Test
public void testFailedAfterProcessingMessage() throws InterruptedException {Notification notification = new Notification(2, "notification to fail after processing");producer.convertAndSendMessage(QUEUE_INCOMING, notification);Thread.sleep(6000);printResults();assertEquals(1, getSavedNotifications());assertEquals(0, getMessagesInQueue(QUEUE_INCOMING));
}private void printResults() {logger.info("Total items in \"incoming\" queue: "+getMessagesInQueue(QUEUE_INCOMING));logger.info("Total items in DB: "+getSavedNotifications());
}

4,执行测试

好的,让我们执行测试,看看结果是什么:

testCorrectMessage输出:

Producer|Sending notification | Id: 0
NotificationProcessor|Received notification | Id: 0 | Redelivery: 1
TestNotTransactedMessaging|Total items in "incoming" queue: 0
TestNotTransactedMessaging|Total items in DB: 1

此处没有问题,因为消息已正确接收并存储到数据库,所以队列为空。

testFailedAfterReceiveMessage输出:

Producer|Sending notification | Id: 1
NotificationProcessor|Received notification | Id: 1 | Redelivery: 1
AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set.
java.lang.RuntimeException: error after receiving message
TestNotTransactedMessaging|Total items in "incoming" queue: 0
TestNotTransactedMessaging|Total items in DB: 0

由于它在事务外部执行,因此使用确认模式(默认为自动)。 这意味着一旦调用onMessage方法并因此将其从队列中删除,就认为该消息已成功传递。 因为侦听器在将消息存储到数据库之前失败,所以我们丢失了消息!

testFailedAfterProcessingMessage输出:

2013-08-22 18:39:09,906|Producer|Sending notification | Id: 2
2013-08-22 18:39:09,906|NotificationProcessor|Received notification | Id: 2 | Redelivery: 1
2013-08-22 18:39:09,906|AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set.
java.lang.RuntimeException: error after processing message
2013-08-22 18:39:15,921|TestNotTransactedMessaging|Total items in "incoming" queue: 0
2013-08-22 18:39:15,921|TestNotTransactedMessaging|Total items in DB: 1

在这种情况下,在执行失败之前,该消息已从队列(AUTO_ACKNOWLEDGE)中删除并存储到DB。

5,添加本地交易

通常,我们不允许像测试的第二种情况那样丢失消息,因此我们要做的是在本地事务中调用侦听器。 所做的更改非常简单,并不意味着从我们的应用程序中修改一行代码。 我们只需要更改配置文件。

为了测试这3种涉及事务的情况,我将以下配置文件notx-jms-config.xml替换为:

tx-jms-config.xml

首先,我添加了在发生回滚的情况下进行的重新传递的数量(由于侦听器执行中的错误导致):

<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="vm://embedded?broker.persistent=false"/><property name="redeliveryPolicy"><bean class="org.apache.activemq.RedeliveryPolicy"><property name="maximumRedeliveries" value="4"/></bean></property>
</bean>

接下来,我指示侦听器将在事务内执行。 这可以通过修改侦听器容器定义来完成:

<jms:listener-container connection-factory="connectionFactory" acknowledge="transacted"><jms:listener ref="notificationProcessor" destination="incoming.queue"/>
</jms:listener-container>

这将导致在本地JMS事务中执行对侦听器的每次调用。 收到消息后,事务将开始。 如果侦听器执行失败,则消息接收将回滚。

这就是我们要做的一切。 让我们使用此配置启动测试。

6,测试交易中的消息接收

来自TestTransactedMessaging类的代码实际上与先前的测试相同。 唯一的区别是,它向DLQ(死信队列)添加了查询。 在事务内执行时,如果回退消息接收,则代理会将消息发送到此队列(在所有重新传递失败之后)。

我跳过了成功接收的输出,因为它不会带来任何新的变化。

testFailedAfterReceiveMessage输出:

Producer|Sending notification | Id: 1
NotificationProcessor|Received notification | Id: 1 | Redelivery: 1
AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set.
java.lang.RuntimeException: error after receiving message
NotificationProcessor|Received notification | Id: 1 | Redelivery: 2
AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set.
...
java.lang.RuntimeException: error after receiving message
NotificationProcessor|Received notification | Id: 1 | Redelivery: 5
AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set.
java.lang.RuntimeException: error after receiving message
TestTransactedMessaging|Total items in "incoming" queue: 0
TestTransactedMessaging|Total items in "dead letter" queue: 1
TestTransactedMessaging|Total items in DB: 0

如您所见,第一次接收失败,并且代理尝试将其重新发送四次(如maximumRedeliveries属性中所示)。 由于情况持续存在,因此消息已发送到特殊DLQ队列。 这样,我们不会丢失消息。

testFailedAfterProcessingMessage输出:

Producer|Sending notification | Id: 2
NotificationProcessor|Received notification | Id: 2 | Redelivery: 1
AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set.
java.lang.RuntimeException: error after processing message
NotificationProcessor|Received notification | Id: 2 | Redelivery: 2
TestTransactedMessaging|Total items in "incoming" queue: 0
TestTransactedMessaging|Total items in "dead letter" queue: 0
TestTransactedMessaging|Total items in DB: 2

在这种情况下,这是发生的情况:

  1. 侦听器检索到消息
  2. 它将消息存储到数据库
  3. 侦听器执行失败
  4. 代理重新发送消息。 由于情况已解决,因此侦听器将消息再次存储到DB。 该消息已重复。

7,结论

将本地事务添加到消息接收中可避免丢失消息。 我们必须考虑的是,可能会出现重复的消息,因此我们的侦听器将不得不检测到它,否则我们的处理必须是幂等的才能再次进行处理。 如果这不可能,我们将不得不进行分布式事务,因为它们支持涉及不同资源的事务。

参考: Spring JMS:在XavierPadró的Blog博客上处理来自JCG合作伙伴 Xavier Padro的事务内的消息 。

翻译自: https://www.javacodegeeks.com/2014/02/spring-jms-processing-messages-within-transactions.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/364749.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux下Python编译安装

1.安装python3 1.1下载python源码包 网址&#xff1a;https://www.python.org/downloads/release/python-362/ 下载地址&#xff1a;https://www.python.org/ftp/python/3.6.2/Python-3.6.2.tgz 1.1.1安装python前的库环境&#xff0c;非常重要 yum install gcc patch libffi-d…

OO第二单元作业总结

一&#xff1a;设计策略 第一次作业&#xff1a;第一次是单电梯傻瓜调度策略&#xff0c;因此我把调度器当作共享资源对象&#xff0c;有一个put和一个get方法&#xff0c;因为只有一个电梯&#xff0c;并且单次取出和投放一个请求&#xff0c;因此只需要同步控制一下这两个方法…

jmeter找不到java.dll_Windows下Jmeter安装出现Not able to find Java executable or version问题解决方案...

最近在做一个开放接口平台性能测试 , 指标是最少达到1000/s的并发 , 接口鉴权 百万级的表 在1s内完成..在众多压测工具中 ,,选择了Apache的jmeter ,于官网下载了最新版本http://jmeter.apache.org/download_jmeter.cgi (jmeter下载地址)由于jmeter运行是基于java的,所以需要…

迭代加深搜索 C++解题报告 :[SCOI2005]骑士精神

题目 此题根据题目可知是迭代加深搜索。 首先应该枚举空格的位置&#xff0c;让空格像一个马一样移动。 但迭代加深搜索之后时间复杂度还是非常的高&#xff0c;根本过不了题。 感觉也想不出什么减枝&#xff0c;于是便要用到了乐观估计函数&#xff08;Optimistic Estimation …

一个web项目web.xml的配置中context-param配置作用

<context-param>的作用:web.xml的配置中<context-param>配置作用1. 启动一个WEB项目的时候,容器(如:Tomcat)会去读它的配置文件web.xml.读两个节点: <listener></listener> 和 <context-param></context-param>2.紧接着,容器创建一个Servl…

51Nod 1362 搬箱子 —— 组合数(非质数取模) (差分TLE)

题目&#xff1a;http://www.51nod.com/Challenge/Problem.html#!#problemId1362 首先&#xff0c;\( f[i][j] \) 是一个 \( i \) 次多项式&#xff1b; 如果考虑差分&#xff0c;用一个列向量维护 0 次差分到 \( n \) 次差分即可&#xff0c;在第 \( n \) 次上差分数组已经是一…

错误处理在Spring Integration中如何工作

1.引言 这篇文章的目标是向您展示将消息传递系统与Spring Integration结合使用时如何处理错误。 您将看到同步和异步消息传递之间的错误处理有所不同。 和往常一样&#xff0c;我将跳过聊天并继续进行一些示例。 您可以在github上获取源代码。 2&#xff0c;样品申请 我将使用…

原生js、jQuery实现选项卡功能

在大家在网上平常浏览网页的时候&#xff0c;想必各位都会看到选项卡功能&#xff0c;在这里给大家详解一下用原生js、jQuery如何来写一些基本的选项卡 话不多说&#xff0c;先给各位看一下功能图&#xff1a; 好了&#xff0c;下边开始写代码了&#xff1a; HTML代码&#x…

.NET core2.0 发布至IIS中

.NET CORE和asp.net 发布时不太一样&#xff0c;ASP.NET Core不再是由IIS工作进程&#xff08;w3wp.exe&#xff09;托管&#xff0c;而是使用自托管Web服务器&#xff08;Kestrel&#xff09;运行&#xff0c;IIS则是作为反向代理的角色转发请求到Kestrel不同端口的ASP.NET Co…

如何用纯 CSS 创作一个文本淡入淡出的 loader 动画

效果预览 在线演示 按下右侧的“点击预览”按钮可以在当前页面预览&#xff0c;点击链接可以全屏预览。https://codepen.io/comehope/pen/ERwpeG可交互视频此视频是可以交互的&#xff0c;你可以随时暂停视频&#xff0c;编辑视频中的代码。请用 chrome, safari, edge 打开观…

《机器学习基石》第一周 —— When Can Machine Learn?

&#xff08;注&#xff1a;由于之前进行了吴恩达机器学习课程的学习&#xff0c;其中有部分内容与机器学习基石的内容重叠&#xff0c;所以以下该系列的笔记只记录新的知识&#xff09; 《机器学习基石》课程围绕着下面这四个问题而展开&#xff1a; 主要内容&#xff1a; 一、…

如何用纯 CSS 创作一盘传统蚊香

效果预览 在线演示 按下右侧的“点击预览”按钮可以在当前页面预览&#xff0c;点击链接可以全屏预览。https://codepen.io/comehope/pen/BVpvMz可交互视频教程此视频是可以交互的&#xff0c;你可以随时暂停视频&#xff0c;编辑视频中的代码。请用 chrome, safari, edge 打…

[Unity3D]Unity3D游戏开发之怪物AI

大家好。欢迎大家关注由我为大家带来的Unity3D游戏开发系列文章&#xff0c;我的博客地址为&#xff1a;http://blog.csdn.net/qinyuanpei。在上一篇文章中&#xff0c;我们基本上实现了一个小地图的功能&#xff0c;今天呢&#xff0c;我们来实现怪物AI&#xff0c;所谓怪物AI…

如何把握好 transition 和 animation 的时序,创作描边按钮特效

效果预览 在线演示 按下右侧的“点击预览”按钮可以在当前页面预览&#xff0c;点击链接可以全屏预览。https://codepen.io/comehope/pen/mKdzZM可交互视频教程此视频是可以交互的&#xff0c;你可以随时暂停视频&#xff0c;编辑视频中的代码。请用 chrome, safari, edge 打…

使用PHREAK算法实现Drools 6性能

Drools 6引入了新的惰性匹配算法。 该算法的详细信息已在之前的两个博客中介绍&#xff1a; RIP RETE时间获得PHREAKY 基于PHREAK堆栈的评估和向后链接 第一篇文章讨论了性能以及为什么算法的批处理和惰性方面难以比较。 “性能的最后一点。 通常&#xff0c;使用PHREAK的单…

PAT 1131 Subway Map

题目链接&#xff1a; https://pintia.cn/problem-sets/994805342720868352/problems/994805347523346432 思路&#xff1a; 说多了都是泪&#xff0c; Dijstra超时&#xff0c;采用dfs 利用map<pair<int,int>,int>&#xff0c;表示两个点和他们中间的地铁线号 每次…

专访Vue作者尤雨溪:Vue CLI 3.0重构的原因

1、为什么要对 Vue CLI 进行大规模修改&#xff1f; 尤雨溪认为旧版本的 Vue CLI 本质上只是从 GitHub 拉取模版&#xff0c;这种拉模版的方式有几个问题&#xff1a; &#xff08;1&#xff09; 在单个模版里面同时支持太多选项会导致模版本身变得极其复杂和难以维护&#x…

java秒杀时间与服务器时间_Javascript实现秒杀倒计时(时间与服务器时间同步)...

现在有很多网站都在做秒杀商品&#xff0c;而这其中有一个很重要的环节就是倒计时。关于倒计时&#xff0c;有下面几点需要注意&#xff1a;1.应该使用服务器时间而不是本地时间(本地时间存在时区不同、用户自行设置等问题)。2.要考虑网络传输的耗时。3.获取时间时可直接从AJAX…

Python3.5-20190501-廖老师的

python是一门解释型\脚本语言(和js特别像,如果同时学习js和python完全搅浑了.) 在运行py时候是一句一句翻译成cpu识别的机器码,所以速度比较慢.而C程序是运行前直接编译成CPU能执行的机器码&#xff0c;所以非常快. 学习python,就需要安装python.安装的同时会有一个解释器,就是…

如何用纯 CSS 创作一个冒着热气的咖啡杯

效果预览 在线演示 按下右侧的“点击预览”按钮在当前页面预览&#xff0c;点击链接全屏预览。https://codepen.io/zhang-ou/pen/xjXxoz可交互视频教程此视频是可以交互的&#xff0c;你可以随时暂停视频&#xff0c;编辑视频中的代码。请用 chrome, safari, edge 打开观看。…