natty的异步通信框架
在本系列的前一部分中,我们介绍了OpenHub框架 。 这部分显示了框架最强大的功能之一- 异步消息传递模型 。
当源系统无法等待目标系统的响应时,将使用系统之间的异步通信。 有以下几个原因:
- 源系统必须尽可能地响应 ,并且不受外部影响(通信缓慢,目标系统不稳定等)的影响
- 在目标系统中处理需要很多时间
- 异步通信对性能和流量产生积极影响
异步方案
当您决定异步通信时,您必须考虑可能的情况:
- 目标系统必须确认传入消息已成功保存 ,并准备进行进一步处理。 是否应将异步过程的最终结果通知源系统?
- 如果异步处理失败该怎么办? 如果存在暂时性技术错误 (例如,与另一个系统的通信失败), 请尝试几次,或者因为存在业务错误 (例如,输入数据无效)而停止进一步处理 。
- 在异步处理期间会调用其他系统- 如果对第一个系统的调用可以,但对第二个系统的调用失败,该怎么办? 异步处理必须是幂等的,并跳过第一个成功的呼叫,然后仅重试第二个呼叫。
- 异步过程可能很棘手,因此最好将一个大进程(父)划分为较小的(子)进程 。 如果处理了子进程,则父进程也将完成。
- 有时,您必须保证传入请求的顺序 (请求的到达顺序与发送的顺序不相同)并按照确切的顺序进行处理。
- 它是异步处理,您需要对其进行监视或在发生意外情况 (例如异步过程失败) 时自动得到通知 。
- 有时,您需要保存数据或异步过程的当前状态以尝试成功完成之间,例如,第一次调用外部系统的结果就是第二次调用的输入。
当您开始考虑所有这些情况时,您会发现从头开始实施它并不容易。 OpenHub框架内置了对异步消息处理的支持。 它易于使用,但同时又强大又灵活。 并且也是可配置的,例如,该过程应再次运行多少次? 在哪个时间间隔?
异步路由实现
使用OpenHub框架的路由实现有两个子路由:
- 一种用于处理传入消息(
RouteIn
) - 一种用于异步过程实现(
RouteOut
)
/*** Route definition for asynchronous operation "translate" via web services.*/
@CamelConfiguration(value = AsyncTranslateWsRoute.ROUTE_BEAN)
public class AsyncTranslateWsRoute extends AbstractBasicRoute {static final String ROUTE_BEAN = "asyncTranslateWsRouteBean";private static final String OPERATION_NAME = "asyncTranslateWs";static final String ROUTE_ID_ASYNC_IN = getInRouteId(ServiceEnum.TRANSLATE, OPERATION_NAME);static final String ROUTE_ID_ASYNC_OUT = getOutRouteId(ServiceEnum.TRANSLATE, OPERATION_NAME);static final String URI_ASYNC_OUT = "direct:" + ROUTE_ID_ASYNC_OUT;@Overrideprotected void doConfigure() throws Exception {// asyncTranslate - input asynch messagecreateAsyncRouteIn();// asyncTranslate - process delivery (=asynchronous execution)createAsyncRouteOut();}/*** Route for asynchronous <strong>asyncTranslate</strong> input operation.* <p/>* Prerequisite: none* <p/>* Output: {@link AsyncTranslateResponse}*/private void createAsyncRouteIn() {Namespaces ns = new Namespaces("h", TranslateWebServiceConfig.TRANSLATE_SERVICE_NS);// note: mandatory parameters are set already in XSD, this validation is extraXPathValidator validator = new XPathValidator("/h:asyncTranslateRequest", ns, "h:inputText");AsynchRouteBuilder.newInstance(ServiceEnum.TRANSLATE, OPERATION_NAME,getInWsUri(new QName(TranslateWebServiceConfig.TRANSLATE_SERVICE_NS, "asyncTranslateRequest")),new AsynchResponseProcessor() {@Overrideprotected Object setCallbackResponse(CallbackResponse callbackResponse) {AsyncTranslateResponse res = new AsyncTranslateResponse();res.setConfirmAsyncTranslate(callbackResponse);return res;}}, jaxb(AsyncTranslateResponse.class)).withValidator(validator).build(this);}/*** Route for <strong>asyncTranslate</strong> operation - process delivery (=asynchronous execution).* Only input text is logged in this case.* <p/>* Prerequisite: none*/private void createAsyncRouteOut() {final String URI_LOG_INPUT_PARAMS = "direct:logInputParams";from(URI_ASYNC_OUT).routeId(ROUTE_ID_ASYNC_OUT)// xml -> AsyncTranslateRequest.unmarshal(jaxb(AsyncTranslateRequest.class)).to("extcall:message:" + URI_LOG_INPUT_PARAMS);from(URI_LOG_INPUT_PARAMS).validate(body().isInstanceOf(AsyncTranslateRequest.class)).log(LoggingLevel.DEBUG, "Asynchronous execution - input text '${body.inputText}' (lang: ${body.inputLang})");}
}
RouteIn
使用AsynchRouteBuilder
通过以下功能轻松配置:
- 定义哪个传入的Web Service请求应开始此路由
- 定义源系统的确认响应。 输入路由成功执行后,将返回对源系统的同步响应。
- 定义验证器,该验证器检查传入请求中是否存在元素inputText
RouteOut
定义了异步过程本身。 在这种情况下,仅记录输入请求( AsyncTranslateRequest
)。
就这样。 周围的一切都由OpenHub框架实现。
外部通话
您的路线实施通常会调用外部系统或其他路线。 如果实现异步过程,则必须遵守幂等规则-可以多次调用过程的每个部分,并且必须确保所有调用的行为相同。 有时外部系统/路由本身是幂等的,然后您可以根据需要多次调用它。 如果没有,那么您必须在实现中对其进行控制。 因此,我们将Camel组件设为extcall 。
上例中的组件excall
确保即使整个异步过程运行了多次,使用URI_LOG_INPUT_PARAMS
路由也会被精确地调用一次。
描述:
- 在异步消息处理期间调用了两个外部系统
- 在处理过程中我们可以返回两个extcall的停靠点
- 如果在对外部系统1的第一次请求之前发生错误,则下一个处理尝试将从头开始,与新消息到达相同
漏斗和节流组件
其他强大的组件是funnel
和throttling
。
Funnel
组件用于过滤特定集成点处的并发消息。 这种过滤可确保在该位置仅处理一个特定类型的消息或同时具有特定信息的消息,即使是以保证的顺序(可选选项)进行处理。 对于与外部系统进行通信很有用,该外部系统一次只能接受一个特定实体(例如,订购系统中的一个特定客户)的一条输入消息。
第二个组件throttling
允许您确保特定端点不会过载,或者我们不会超出任何外部服务的商定SLA。 Throttling
组件也可以用于同步消息。
所有组件都支持群集。
实施细节
OpenHub需要保存的所有内容都保存在数据库中–类型没有限制。 无需调整JMS / MQ系统即可支持异步消息传递。 然后,您可以使用自己喜欢的任何工具进行日常工作- 数据模型简单,清晰并且文档齐全。 数据库工具比JMS / MQ系统更多。
有时我们听说在这种情况下使用数据库是一种反模式,从性能的角度来看,在某些情况下它可能是瓶颈。 这取决于实际项目中的集成用例,但在处理数十万个并发请求的实际项目中,我们仍然没有严格的性能限制。 我们准备添加JMS / MQ实现,但是到目前为止还不需要。
不必仅通过传入请求启动异步过程–您还可以使用调度作业在需要的任何时候启动路由,然后将其留给OpenHub框架。
所有示例都可以在GitHub的参考实现中找到–参见https://github.com/OpenWiseSolutions/openhub-ri
翻译自: https://www.javacodegeeks.com/2017/10/asynchronous-communication-made-openhub-framework.html
natty的异步通信框架