spring jms 事务_Spring JMS:处理事务中的消息

spring jms 事务

1.引言

这篇文章将向您展示使用JMS异步接收消息期间使用者执行过程中的错误如何导致消息丢失。 然后,我将解释如何使用本地事务解决此问题。

您还将看到这种解决方案在某些情况下可能导致消息重复(例如,当它将消息保存到数据库中,然后侦听器执行失败时)。 发生这种情况的原因是因为JMS事务独立于其他事务资源(如DB)。 如果您的处理不是幂等的,或者您的应用程序不支持重复消息检测,那么您将不得不使用分布式事务。

分布式事务超出了本文的范围。 如果您对处理分布式事务感兴趣,可以阅读这篇有趣的文章。

我已经实现了一个再现以下情况的测试应用程序:

  1. 发送和接收消息:消费者将处理收到的消息,并将其存储到数据库中。

    生产者将消息发送到队列:

    发送1

    使用者从队列中检索消息并进行处理:

    发送2

  2. 消息处理之前发生错误:使用者检索消息,但是在将消息存储到DB之前执行失败。

    发送3

  3. 处理消息后发生错误:使用者检索消息,将其存储到DB,然后执行失败。

    发送4

  • 该应用程序的源代码可以在github上找到。

2.测试应用

测试应用程序执行两个测试类TestNotTransactedMessagingTestTransactedMessaging 。 这些类都将执行上述三种情况。

让我们看看在没有事务的情况下执行应用程序时的配置。

app-config.xml

应用程序配置。 基本上,它会在指定的包中进行检查以自动检测应用Bean:生产者和使用者。 它还配置了将在其中存储处理后的通知的内存数据库。

<context:component-scan base-package="xpadro.spring.jms.producer, xpadro.spring.jms.receiver"/><bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"><constructor-arg ref="dataSource"/>
</bean><jdbc:embedded-database id="dataSource"><jdbc:script location="classpath:db/schema.sql" />
</jdbc:embedded-database>

notx-jms-config.xml

配置JMS基础结构,该基础结构是:

  • 经纪人联系
  • JmsTemplate
  • 将通知发送到的队列
  • 侦听器容器,它将发送通知给侦听器以处理它们
<!-- Infrastructure -->
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="vm://embedded?broker.persistent=false"/>
</bean><bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"><property name="targetConnectionFactory" ref="connectionFactory"/>
</bean><bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><property name="connectionFactory" ref="cachingConnectionFactory"/><property name="defaultDestination" ref="incomingQueue"/>
</bean><!-- Destinations -->
<bean id="incomingQueue" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg value="incoming.queue"/>
</bean><!-- Listeners -->
<jms:listener-container connection-factory="connectionFactory"><jms:listener ref="notificationProcessor" destination="incoming.queue"/>
</jms:listener-container>

生产者仅使用jmsTemplate发送通知。

@Component("producer")
public class Producer {private static Logger logger = LoggerFactory.getLogger(Producer.class);@Autowiredprivate JmsTemplate jmsTemplate;public void convertAndSendMessage(String destination, Notification notification) {jmsTemplate.convertAndSend(destination, notification);logger.info("Sending notification | Id: "+notification.getId());}
}

侦听器负责从队列中检索通知,并将其存储到数据库中。

@Component("notificationProcessor")
public class NotificationProcessor implements MessageListener {private static Logger logger = LoggerFactory.getLogger(NotificationProcessor.class);@Autowiredprivate JdbcTemplate jdbcTemplate;@Overridepublic void onMessage(Message message) {try {Notification notification = (Notification) ((ObjectMessage) message).getObject();logger.info("Received notification | Id: "+notification.getId()+" | Redelivery: "+getDeliveryNumber(message));checkPreprocessException(notification);saveToBD(notification);checkPostprocessException(message, notification);} catch (JMSException e) {throw JmsUtils.convertJmsAccessException(e);}}	...
}

当id = 1的通知到达时, checkPreprocessException方法将抛出运行时异常。 这样,在将消息存储到数据库之前,我们将导致错误。

如果到达id = 2的通知, checkPostprocessException方法将引发异常,从而在将其存储到数据库后立即引起错误。

getDeliveryNumber方法返回消息已发送的次数。 这仅适用于事务,因为在侦听器处理失败导致回滚之后,代理将尝试重新发送消息。

最后, saveToDB方法非常明显。 它将通知存储到数据库。

您始终可以通过单击本文开头的链接来检查此应用程序的源代码。

3.测试没有交易的消息接收

我将启动两个测试类,一个不包含事务,另一个在本地事务中。 这两个类都扩展了一个基类,该基类加载了公共应用程序上下文并包含一些实用程序方法:

@ContextConfiguration(locations = {"/xpadro/spring/jms/config/app-config.xml"})
@DirtiesContext
public class TestBaseMessaging {protected static final String QUEUE_INCOMING = "incoming.queue";protected static final String QUEUE_DLQ = "ActiveMQ.DLQ";@Autowiredprotected JdbcTemplate jdbcTemplate;@Autowiredprotected JmsTemplate jmsTemplate;@Autowiredprotected Producer producer;@Beforepublic void prepareTest() {jdbcTemplate.update("delete from Notifications");}protected int getSavedNotifications() {return jdbcTemplate.queryForObject("select count(*) from Notifications", Integer.class);}protected int getMessagesInQueue(String queueName) {return jmsTemplate.browse(queueName, new BrowserCallback<Integer>() {@Overridepublic Integer doInJms(Session session, QueueBrowser browser) throws JMSException {Enumeration<?> messages = browser.getEnumeration();int total = 0;while (messages.hasMoreElements()) {messages.nextElement();total++;}return total;}});}
}

实用方法说明如下:

  • getSavedNotifications :返回存储到数据库的通知数。 我使用了queryForObject方法,因为自版本3.2.2开始建议使用该方法。 queryForInt方法已被弃用。
  • getMessagesInQueue :允许您检查指定队列中哪些消息仍在等待处理。 对于此测试,我们有兴趣知道仍有多少通知等待处理。

现在,让我向您展示第一个测试的代码( TestNotTransactedMessaging )。 此测试启动本文开头指示的3种情况。

@Test
public void testCorrectMessage() throws InterruptedException {Notification notification = new Notification(0, "notification to deliver correctly");producer.convertAndSendMessage(QUEUE_INCOMING, notification);Thread.sleep(6000);printResults();assertEquals(1, getSavedNotifications());assertEquals(0, getMessagesInQueue(QUEUE_INCOMING));
}@Test
public void testFailedAfterReceiveMessage() throws InterruptedException {Notification notification = new Notification(1, "notification to fail after receiving");producer.convertAndSendMessage(QUEUE_INCOMING, notification);Thread.sleep(6000);printResults();assertEquals(0, getSavedNotifications());assertEquals(0, getMessagesInQueue(QUEUE_INCOMING));
}@Test
public void testFailedAfterProcessingMessage() throws InterruptedException {Notification notification = new Notification(2, "notification to fail after processing");producer.convertAndSendMessage(QUEUE_INCOMING, notification);Thread.sleep(6000);printResults();assertEquals(1, getSavedNotifications());assertEquals(0, getMessagesInQueue(QUEUE_INCOMING));
}private void printResults() {logger.info("Total items in \"incoming\" queue: "+getMessagesInQueue(QUEUE_INCOMING));logger.info("Total items in DB: "+getSavedNotifications());
}

4,执行测试

好的,让我们执行测试,看看结果是什么:

testCorrectMessage输出:

Producer|Sending notification | Id: 0
NotificationProcessor|Received notification | Id: 0 | Redelivery: 1
TestNotTransactedMessaging|Total items in "incoming" queue: 0
TestNotTransactedMessaging|Total items in DB: 1

此处没有问题,因为消息已正确接收并存储到数据库,所以队列为空。

testFailedAfterReceiveMessage输出:

Producer|Sending notification | Id: 1
NotificationProcessor|Received notification | Id: 1 | Redelivery: 1
AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set.
java.lang.RuntimeException: error after receiving message
TestNotTransactedMessaging|Total items in "incoming" queue: 0
TestNotTransactedMessaging|Total items in DB: 0

由于它在事务外部执行,因此使用确认模式(默认为自动)。 这意味着一旦调用onMessage方法并因此将其从队列中删除,就认为该消息已成功传递。 因为侦听器在将消息存储到DB之前失败,所以我们丢失了消息!

testFailedAfterProcessingMessage输出:

2013-08-22 18:39:09,906|Producer|Sending notification | Id: 2
2013-08-22 18:39:09,906|NotificationProcessor|Received notification | Id: 2 | Redelivery: 1
2013-08-22 18:39:09,906|AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set.
java.lang.RuntimeException: error after processing message
2013-08-22 18:39:15,921|TestNotTransactedMessaging|Total items in "incoming" queue: 0
2013-08-22 18:39:15,921|TestNotTransactedMessaging|Total items in DB: 1

在这种情况下,在执行失败之前,已从队列(AUTO_ACKNOWLEDGE)中删除了该消息并将其存储到DB。

5,添加本地交易

通常,我们不允许像测试的第二种情况那样丢失消息,因此我们要做的是在本地事务中调用侦听器。 所做的更改非常简单,并不意味着从我们的应用程序中修改一行代码。 我们只需要更改配置文件。

为了测试这3种事务的情况,我将以下配置文件notx-jms-config.xml替换为:

tx-jms-config.xml

首先,我添加了在发生回滚的情况下进行的重新传递的数量(由于侦听器执行中的错误导致):

<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="vm://embedded?broker.persistent=false"/><property name="redeliveryPolicy"><bean class="org.apache.activemq.RedeliveryPolicy"><property name="maximumRedeliveries" value="4"/></bean></property>
</bean>

接下来,我指示侦听器将在事务内执行。 这可以通过修改侦听器容器定义来完成:

<jms:listener-container connection-factory="connectionFactory" acknowledge="transacted"><jms:listener ref="notificationProcessor" destination="incoming.queue"/>
</jms:listener-container>

这将导致在本地JMS事务中执行对侦听器的每次调用。 收到消息后,事务将开始。 如果侦听器执行失败,则消息接收将回滚。

这就是我们要做的一切。 让我们使用此配置启动测试。

6,测试交易中的消息接收

来自TestTransactedMessaging类的代码实际上与先前的测试相同。 唯一的区别是,它向DLQ(死信队列)添加了查询。 在事务内执行时,如果回退消息接收,则代理会将消息发送到此队列(在所有重新传递失败之后)。

我跳过了成功接收的输出,因为它不会带来任何新的变化。

testFailedAfterReceiveMessage输出:

Producer|Sending notification | Id: 1
NotificationProcessor|Received notification | Id: 1 | Redelivery: 1
AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set.
java.lang.RuntimeException: error after receiving message
NotificationProcessor|Received notification | Id: 1 | Redelivery: 2
AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set.
...
java.lang.RuntimeException: error after receiving message
NotificationProcessor|Received notification | Id: 1 | Redelivery: 5
AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set.
java.lang.RuntimeException: error after receiving message
TestTransactedMessaging|Total items in "incoming" queue: 0
TestTransactedMessaging|Total items in "dead letter" queue: 1
TestTransactedMessaging|Total items in DB: 0

如您所见,第一次接收失败,并且代理尝试将其重新发送四次(如maximumRedeliveries属性中所示)。 由于情况持续存在,因此消息已发送到特殊DLQ队列。 这样,我们不会丢失消息。

testFailedAfterProcessingMessage输出:

Producer|Sending notification | Id: 2
NotificationProcessor|Received notification | Id: 2 | Redelivery: 1
AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set.
java.lang.RuntimeException: error after processing message
NotificationProcessor|Received notification | Id: 2 | Redelivery: 2
TestTransactedMessaging|Total items in "incoming" queue: 0
TestTransactedMessaging|Total items in "dead letter" queue: 0
TestTransactedMessaging|Total items in DB: 2

在这种情况下,这是发生的情况:

  1. 侦听器检索到消息
  2. 它将消息存储到数据库
  3. 侦听器执行失败
  4. 代理重新发送消息。 由于情况已解决,因此侦听器将消息再次存储到DB。 该消息已重复。

7,结论

将本地事务添加到消息接收中可避免丢失消息。 我们必须考虑的是,可能会出现重复的消息,因此我们的侦听器将必须检测到它,否则我们的处理必须是幂等的才能再次进行处理。 如果这不可能,我们将不得不进行分布式事务,因为它们支持涉及不同资源的事务。

参考: Spring JMS:在XavierPadró的Blog博客上处理来自JCG合作伙伴 Xavier Padro的事务内的消息 。

翻译自: https://www.javacodegeeks.com/2014/02/spring-jms-processing-messages-within-transactions.html

spring jms 事务

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/343866.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[渝粤教育] 西南科技大学 智能交通系统 在线考试复习资料

智能交通系统——在线考试复习资料 一、单选题 1.运用多种方式将路线优化结果告知用户的过程称为( )。 A.数据挖掘 B.路线诱导 C.模式识别 D.预测分析 2.( )是根据国家现行财税制度和借个体系,分析、计算投资者或项目直接发生的财务效益和费用,编制财务报表,计算评价指标,考察…

php 脏数据,使用 PHP Masked Package 屏蔽敏感数据

Fuko \ Masked 是 Kaloyan Tsvetkov 的一个小型 PHP 库&#xff0c;用于通过用编辑后的元素替换列入黑名单的元素来屏蔽敏感数据。以下是 软件包 readme 的基本用法示例&#xff1a;use Fuko\Masked\Protect;//隐藏$secret_key var中的值Protect::hideValue($secret_key);//隐藏…

视频数据复用光端机故障排除方法

数字光端机是一种通过数字电路控制光信号&#xff0c;使用类似于0、1代码来实现光通信的机器&#xff0c;广泛应用于光纤通信等领域。那么&#xff0c;我们在使用数据光端机的时候&#xff0c;如果遇到故障该如何来解决呢&#xff1f;接下来我们就跟随飞畅科技的小编一起来详细…

[渝粤教育] 西南科技大学 电子商务原理及应用 在线考试复习资料(1)

电子商务原理及应用——在线考试复习资料 一、单选题 1.( )接受商家的送货要求,将商品送到消费者手中。 A.邮局 B.快递公司 C.送货公司 D.物流中心 2.卓越属于( )类型的B2C电子商务企业: A.经营着离线商店的零售商 B.没有离线商店的虚拟零售企业 C.商品制造商 D.网络交易服务公…

使用AWS Lambda,S3和AWS CloudFront进行动态内容缓存

快速提供内容对于任何网站或应用程序具有更好的客户体验至关重要。 如果您将网站或应用程序托管在AWS Cloud中&#xff0c;那么无论从何处访问应用程序&#xff0c;都可以以较低的延迟快速提供内容。 AWS提供了CloudFront服务&#xff0c;用于将内容缓存在每个用户地理位置本地…

[渝粤教育] 西南科技大学 组织行为学 在线考试复习资料

组织行为学——在线考试复习资料 一、单选题 1.( )是指一个团队的综合情绪控制调节能力。 A.团队学习 B.团队情商 C.团队成员角色 D.团队创建 2.在沟通过程中,由信息发送者选择来向接收者传送信息的媒介物,叫( )。 A.编码 B.解码 C.反馈 D.通道 3.组织风俗属于组织文化的( )。…

php获取页面指定内容,php获取页面指定标签内容的实现代码分享

php获取页面指定标签内容的实现代码分享可以匹配任意可闭合带id标签header ( "Content-type: text/html; charsetutf-8" );/** 参数说明: $tag_id:所要获取的元素Tag Id $url:所要获取页面的Url $tag:所要获取的标签 $data*/function getWebTag($tag_id, $url false…

数据光端机设备性能指标介绍

作为安防监控工程&#xff0c;设备的可靠性应该是第一考虑要素。而数据光端机设备的可靠性是设备厂商在产品设计时就必需考虑的&#xff0c;但是&#xff0c;有些厂商可能会因为某些原因而不愿做或不知道怎么做这方面的工作&#xff0c;在这里着重从工程的角度简单地讨论以下问…

[渝粤教育] 西南科技大学 行政法学与行政诉讼法学 在线考试复习资料(1)

行政法学与行政诉讼法学——在线考试复习资料 一、单选题 1.某省工商局与税务局联名对某公司作出处罚,吊销其营业执照,罚款100万元。该公司提起复议,复议机关是( ) A.国家工商总局 B.国家税务总局 C.国务院 D.省政府 2.行政相对人对下列行为不能申请行政复议的是哪一种?( ) A…

光电转换器有什么作用?光纤收发器如何保养?

光电转换器可以使原来的快速以太网平滑升级&#xff0c;并能充分保护用户原来的网络资源&#xff0c;它也可以称为光纤收发器。光电转换器可以实现交换机和计算机之间的互联&#xff0c;也可以作为传输中继&#xff0c;还可以进行单多模转换。光纤收发器在应用过程中&#xff0…

php中id如何与删除关联,ThinkPHP查询语句与关联查询用法实例

这篇文章主要介绍了ThinkPHP查询语句与关联查询用法,以实例的形式常见的查询方法,包括数组作为查询条件及对象方式来查询等技巧,需要的朋友可以参考下本文实例讲述了ThinkPHP查询语句与关联查询用法。分享给大家供大家参考。具体如下&#xff1a;在thinkphp框架页面中我们可以直…

[渝粤教育] 西南科技大学 货币银行学 在线考试复习资料

货币银行学——在线考试复习资料 一、单选题 1.最严重的恶性通货膨胀的最终结果是( )。 A.突发性的商品抢购 B.挤兑银行 C.货币制度崩溃 D.投机盛行 2.凯恩斯的货币需求函数非常重视( )。 A.恒久收入的作用 B.货币供应量的作用 C.利率的作用 D.汇率的作用 3.下列西方的中央银…

latex段落悬挂缩进_使用正则表达式在Java中悬挂缩进段落

latex段落悬挂缩进这篇文章显示了如何使用正则表达式将缩进的长段落挂起。 该方法将考虑单词边界&#xff0c;这意味着它不会破坏缩进单词。 为了说明此问题&#xff0c;请考虑以下示例&#xff1a; 近年来&#xff0c;人们越来越努力从自然语言文本中提取实体之间的关系。 在…

【渝粤教育】电大中专会计电算化 (2)作业 题库

1下列有关会计电算化狭义概念的说法正确的是()。 A以会计理论为主体的电子信息技术在会计工作中的应用 B与实现电算化有关的所有工作 C以电子计算机为主体的电子信息技术在会计工作中的应用 D与实现电算化有关的主要工作 错误 正确答案&#xff1a;左边查询 学生答案&#xff1…

光纤模块与光纤收发器的区别

随着科技的发展&#xff0c;城市信息化速度的加快&#xff0c;对于通信技术的要求越来越高&#xff0c;光纤以其传输速度快、距离远、安全稳定、抗干扰、扩容便捷等优点越来越成为人们在通讯敷设时的首选。我们经常看到在建筑智能化项目中的远距离数据传输需求&#xff0c;基本…

html中输出PHP的下拉列表,html中关于下拉列表select的图文代码详解

HTML中的下拉列表&#xff1a;Html代码VolvoSaabOpelAudi其中select是显示一个下拉列表(drop down list)出来&#xff0c;option是下拉列表中的项目(item)&#xff0c;而option的文本内容(text content)是下拉列表项目中显示到页面上的值&#xff0c;value是真正需要提交到服务…

【渝粤教育】电大中专学前儿童语言教育 (2)作业 题库

作业视频教务托管&#xff0c;壹叁路路贰陆陆壹〇肆〇 认为儿童天生就有学习语言能力且体现在一种语言获得装置&#xff08;LAD&#xff09;中的教育家是( )。 A.皮亚杰 B.乔姆斯基 C.伍顿 D.斯金纳 错误 正确答案&#xff1a;左边查询 学生答案&#xff1a;未作答 2.语言是( )…

光纤收发器tx和rx的区别?

光纤收发器&#xff0c;是一种将短距离的双绞线电信号和长距离的光信号进行互换的以太网传输媒体转换单元&#xff0c;在很多地方也被称之为光电转换器&#xff08;Fiber Converter&#xff09;。产品一般应用在以太网电缆无法覆盖、必须使用光纤来延长传输距离的实际网络环境中…

【渝粤教育】电大中专幼儿园课程论 (10)作业 题库

作业视频教务托管&#xff0c;壹叁路路贰陆陆壹〇肆〇 下列哪种不是具有代表性的课程定义( ) A.课程即知识 B.课程即目标 C.课程即科目 D.课程即经验 错误 正确答案&#xff1a;左边查询 学生答案&#xff1a;未作答 2.幼儿园课程目标要素不包括( ) A.内容 B.目标 C.特征 D.评价…

php缓存实例,一个PHP缓存类实例

一个PHP缓存类实例发布于 2014-08-05 21:44:28 | 104 次阅读 | 评论: 0 | 来源: 网友投递PHP开源脚本语言PHP(外文名: Hypertext Preprocessor&#xff0c;中文名&#xff1a;“超文本预处理器”)是一种通用开源脚本语言。语法吸收了C语言、Java和Perl的特点&#xff0c;入门门…