1.引言
这篇文章的目标是向您展示将消息传递系统与Spring Integration结合使用时如何处理错误。 您将看到同步和异步消息传递之间的错误处理有所不同。 和往常一样,我将跳过聊天并继续进行一些示例。
- 您可以在github上获取源代码。
2,样品申请
我将使用一个基本示例,因为我想专注于异常处理。 该应用程序包含一个订单服务,该服务接收订单,处理订单并返回确认。
下面我们可以看到消息传递系统的配置方式:
int-config.xml
<context:component-scan base-package="xpadro.spring.integration"/><int:gateway default-request-channel="requestChannel" service-interface="xpadro.spring.integration.service.OrderService"/><int:channel id="requestChannel"/><int:router input-channel="requestChannel" ref="orderRouter" method="redirectOrder"/><int:channel id="syncChannel"/><int:channel id="asyncChannel"><int:queue capacity="5"/>
</int:channel><int:service-activator method="processOrder" input-channel="syncChannel" ref="orderProcessor"/><int:service-activator method="processOrder" input-channel="asyncChannel" ref="orderProcessor"><int:poller fixed-delay="2000"/>
</int:service-activator>
网关是消息传递系统的入口点。 它将接收订单并将其发送到直接通道“ requestChannel”,路由器将根据订单ID将其重定向到适当的通道:
- syncChannel:一个直接通道 ,它将订单发送到订阅该通道的订单处理器。
- asyncChannel:一个队列通道 ,订单处理器将从中主动检索订单。
处理订单后,订单确认将发送回网关。 这是代表此的图形:
好的,让我们从最简单的情况开始,使用直接通道进行同步发送。
3.与直接通道同步发送
订单处理器已订阅“ syncChannel”直接渠道。 “ processOrder”方法将在发送者的线程中调用。
public OrderConfirmation processOrder(Order order) {logger.info("Processing order {}", order.getId());if (isInvalidOrder(order)) {logger.info("Error while processing order [{}]", ERROR_INVALID_ID);throw new InvalidOrderException(ERROR_INVALID_ID);}return new OrderConfirmation("confirmed");
}
现在,我们将执行一个测试,该测试将通过发送无效订单来引发异常。 此测试将向网关发送订单:
public interface OrderService {@Gatewaypublic OrderConfirmation sendOrder(Order order);
}
考试:
TestSyncErrorHandling.java
@ContextConfiguration(locations = {"/xpadro/spring/integration/config/int-config.xml"})
@RunWith(SpringJUnit4ClassRunner.class)
public class TestSyncErrorHandling {@Autowiredprivate OrderService service;@Testpublic void testCorrectOrder() {OrderConfirmation confirmation = service.sendOrder(new Order(3, "a correct order"));Assert.assertNotNull(confirmation);Assert.assertEquals("confirmed", confirmation.getId());}@Testpublic void testSyncErrorHandling() {OrderConfirmation confirmation = null;try {confirmation = service.sendOrder(new Order(1, "an invalid order"));Assert.fail("Should throw a MessageHandlingException");} catch (MessageHandlingException e) {Assert.assertEquals(InvalidOrderException.class, e.getCause().getClass());Assert.assertNull(confirmation);}}
}
我们运行测试,看看在订单处理器中如何引发异常并到达测试。 没关系; 我们想验证发送无效订单是否引发了异常。 发生这种情况是因为测试发送了订单,并阻止等待在同一线程中处理订单。 但是,当我们使用异步通道时会发生什么? 让我们继续下一节。
4,与队列通道异步发送
此部分的测试发送一个命令,该命令将由路由器重定向到队列通道。 网关如下所示:
public interface OrderService {@Gatewaypublic Future<OrderConfirmation> sendFutureOrder(Order order);
}
请注意,这次网关正在返回Future 。 如果我们不返回此值,则网关将阻止测试线程。 通过返回Future,网关将变为异步状态,并且不会阻塞发送方的线程。
考试:
TestKoAsyncErrorHandling.java
@ContextConfiguration(locations = {"/xpadro/spring/integration/config/int-config.xml"})
@RunWith(SpringJUnit4ClassRunner.class)
public class TestKoAsyncErrorHandling {@Autowiredprivate OrderService service;@Test(expected=MessageHandlingException.class)public void testAsyncErrorHandling() throws InterruptedException, ExecutionException {Future<OrderConfirmation> confirmation = service.sendFutureOrder(new Order(6, "another order"));}
}
好的,现在我们将启动测试并看到引发异常的信息…
java.lang.AssertionError: Expected exception: org.springframework.integration.MessageHandlingException
糟糕,测试失败,因为没有异常到达测试! 发生了什么? 好吧,解释如下:
<int:channel id="asyncChannel"><int:queue capacity="5"/>
</int:channel><int:service-activator method="processOrder" input-channel="asyncChannel" ref="orderProcessor"><int:poller fixed-delay="2000"/>
</int:service-activator>
由于我们使用的是异步通道(队列),因此发送者发送订单并继续前进。 然后,接收方从另一个线程轮询订单。 因此,不可能将Exception抛回到发送方。 让我们表现得好像什么都没发生吗? 好吧,您最好不要,还有其他选择。
5,异步错误处理
当使用异步消息传递时,Spring Integration通过将异常发布到消息通道来处理它们。 引发的异常将包装到MessagingException中,并成为消息的有效负载。
错误消息发送到哪个通道? 首先,它将检查请求消息是否包含名为“ errorChannel”的标头。 如果找到,错误消息将被发送到那里。 否则,该消息将被发送到所谓的全局错误通道。
5.1全局错误通道
默认情况下,Spring Integration创建一个名为“ errorChannel”的全局错误通道。 该频道是发布-订阅频道。 这意味着我们可以为该频道订阅多个端点。 实际上,已经有一个端点订阅了它:一个日志记录处理程序 。该处理程序将记录到达通道的消息的有效负载,尽管可以将其配置为不同的行为。
现在,我们将向该全局通道订阅一个新的处理程序,并通过将其存储到数据库中来测试它是否接收到异常消息。
首先,我们需要在配置中进行一些更改。 我创建了一个新文件,因此它不会干扰我们之前的测试:
int-async-config.xml
<context:component-scan base-package="xpadro.spring.integration"/><int:gateway default-request-channel="asyncChannel" service-interface="xpadro.spring.integration.service.OrderService" error-channel="errorChannel"/><int:channel id="asyncChannel"><int:queue capacity="5"/>
</int:channel><int:service-activator method="processOrder" input-channel="asyncChannel" ref="orderProcessor"><int:poller fixed-delay="2000"/>
</int:service-activator><int:service-activator input-channel="errorChannel" ref="orderErrorHandler" method="handleFailedOrder"/><bean id="orderErrorHandler" class="xpadro.spring.integration.activator.OrderErrorHandler"/>
网关 :我添加了一个错误通道。 如果调用失败,错误消息将发送到该通道。 如果我没有定义错误通道,则网关会将异常传播给调用者,但是在这种情况下,由于这是一个异步网关,因此无法正常工作。
错误处理程序 :我已经定义了一个新的端点,该端点已订阅了全局错误通道。 现在,任何发送到全局错误通道的错误消息都将传递给我们的处理程序。
我还添加了一个配置文件以配置数据库。 我们的错误处理程序会将收到的错误插入此数据库:
db-config.xml
<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"><constructor-arg ref="dataSource"/>
</bean><!-- in-memory database -->
<jdbc:embedded-database id="dataSource"><jdbc:script location="classpath:db/schemas/schema.sql" />
</jdbc:embedded-database>
错误处理程序非常简单。 它收到错误消息,并将其信息插入数据库:
public class OrderErrorHandler {@Autowiredprivate JdbcTemplate jdbcTemplate;@ServiceActivatorpublic void handleFailedOrder(Message<MessageHandlingException> message) {Order requestedOrder = (Order) message.getPayload().getFailedMessage().getPayload();saveToBD(requestedOrder.getId(), message.getPayload().getMessage());}private void saveToBD(int orderId, String errorMessage) {String query = "insert into errors(orderid, message) values (?,?)";jdbcTemplate.update(query, orderId, errorMessage);}
}
好的,现在一切就绪。 让我们实施一个新的测试:
TestOkAsyncErrorHandlingTest.java
@ContextConfiguration(locations = {"/xpadro/spring/integration/config/int-async-config.xml","/xpadro/spring/integration/config/db-config.xml"})
@RunWith(SpringJUnit4ClassRunner.class)
public class TestOkAsyncErrorHandling {@Autowiredprivate JdbcTemplate jdbcTemplate;@Autowiredprivate OrderService service;@Beforepublic void prepareTest() {jdbcTemplate.update("delete from errors");}@Testpublic void testCorrectOrder() throws InterruptedException, ExecutionException {Future<OrderConfirmation> confirmation = service.sendFutureOrder(new Order(7, "another correct order"));OrderConfirmation orderConfirmation = confirmation.get();Assert.assertNotNull(orderConfirmation);Assert.assertEquals("confirmed", orderConfirmation.getId());}@Testpublic void testAsyncErrorHandling() throws InterruptedException, ExecutionException {Future<OrderConfirmation> confirmation = service.sendFutureOrder(new Order(6, "another order"));Thread.sleep(2000);Assert.assertEquals(1, getSavedErrors());validateSavedError(6);}private int getSavedErrors() {return jdbcTemplate.queryForObject("select count(*) from errors", Integer.class);}private void validateSavedError(int orderId) {String query = "select * from errors where orderid=?";Map<String, Object> result = jdbcTemplate.queryForMap(query, orderId);Assert.assertEquals(6, result.get("orderid"));assertThat((String)result.get("message"), containsString("Order ID is invalid"));}
}
这次测试成功,错误消息已存储到数据库。
5.2其他机制
自定义错误通道 :您可以定义错误通道并将其定义为队列通道,而不是默认的发布-订阅通道:
<int:poller id="defaultPoller" default="true" fixed-delay="5000" /><int:channel id="errorChannel"><int:queue capacity="10"/>
</int:channel>
ErrorMessageExceptionTypeRouter :这个Spring Integration专用路由器将解析将错误消息发送到的通道。 它基于错误的最具体原因做出决定:
<int:exception-type-router input-channel="errorChannel" default-output-channel="genericErrorChannel"><int:mapping exception-type="xpadro.spring.integration.exception.InvalidOrderException" channel="invalidChannel" /><int:mapping exception-type="xpadro.spring.integration.exception.FooException" channel="fooChannel" />
</int:exception-type-router>
六,结论
我们已经了解了使用Spring Integration时错误处理的不同机制是什么。 有了这个基础,您将能够通过实现转换器从错误消息中提取信息,使用标头扩展器设置错误通道或实现自己的路由器等来扩展它并配置错误处理。
翻译自: https://www.javacodegeeks.com/2014/02/how-error-handling-works-in-spring-integration.html