Spring集成

目录

  • 概述
  • 1 声朋一个简单的集成流
    • 1.1 使用XML定义集成流
    • 1.2 使用Java配置集成流
    • 1.3 使用Spring lntegration 的 DSL 配置
  • 2 Spring integration 功能概览
    • 2.1 消息通道
    • 2.2 过滤器
    • 2.3 转换器
    • 2.4 路由器
    • 2.5 切分器
    • 2.6 服务激活器
    • 2.7 网关
    • 2.8 通道适配器
    • 2.9 端点模块

概述

就像我们需要连接互联网才能提高生产效率一样,很多应用都需要连接外部系统才能完成它们的功能。应用程序可能需要读取或发送电子邮件、与外部 API 交或者对写人数据库的数据做出反应。而且,由于数据是在外部系统读取或写人的,应用可能需要以某种方式处理这些数据,将其转换为应用程序自己的领域类。
因此,在本文中我们会看到如何使用 Spring Integration 实现通用的集成模式。Spring Integration 是众多集成模式的现成实现,这些模式在Gregor Hohpe 和 Bobby Woolf编写的Enterprise Integration Patterns (Addison-Wesley,2003 年)中进行了归类。每个模式都实现为一个组件,消息会通过该组件在管道中传递数据。借助 Spring 配置,可以将这些组件组装成一个管道,数据可以通过这个管道来流动。我们从定义一个简单的集成流开始,这个流包含了Spring Integration 的众多特性和特点。

1 声朋一个简单的集成流

通常来讲, Spring Integration 可以创建集成流,通过集成流,应用程序能够接收向应用程序之外的资源发送数据。应用程序可能集成的资源之一就是文件系统。因此Spring integration 的很多组件都有读入和写入文件的通道适配器(channel adapter)。为了熟悉 Spring Integration,我们会创建一个集成流,这个流会写人数据到文件综中。首先,需要添加Spring Integration 到项目的构建文件中。对于Maven 构建来讲必要的依赖如下所示:

        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId><version>X.X.X.RELEASE</version></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-file</artifactId><version>X.X.X.RELEASE</version></dependency>

第一项依赖是 Spring Integration的 Spring Boot starter。不管我们与哪种流进行交互对于Spring Integration 流的开发来讲,这个依赖都是必需的。与所有的Spring Boot starter 一样,在 Initializr表单中,这个依赖也可以通过复选框选择。
第二项依赖是Spring Integration的文件端点模块。这个模块是与外部系统集成的20余模块之一。我们会在 2.9 小节中更加详细地讨论端点模块。但是目前,我们只需要知道文件端点模块提供了将文件从文件系统导人集成流和将流中的数据写人文件系统的能力。
接下来,我们需要为应用创建一种方法,让它能够发送数据到集成流中,这样它才能写人文件。为了实现这一点,我们需要创建一个网关接口,这样的网关接口程序如下:

package sia6;import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.file.FileHeaders;
import org.springframework.messaging.handler.annotation.Header;@MessagingGateway(defaultRequestChannel = "textInChannel") //声明消息网关
public interface FileWriterGateway {void writeToFile(@Header(FileHeaders.FILENAME) String filename, String data);
}

尽管这只是一个很简单的Java 接口,但是关于 FileWriterGateway,有很多东西需要介绍。我们首先看到,它使用了@MessagingGateway 注解。这个注解会告诉 Spring Integration 要在运行时生成该接口的实现,这与 Spring Data 在运行时生成存储库接口的实现非常类似。其他地方的代码在希望写人文件时将会调用它。
@MessagingGatewaydefaultRequestChannel属性表明接口方法调用时所返回的消息要发送至给定的消息通道(message channel)。在本例中,我们声明调用 writeToFile() 所形成的消息应该发送至名为 textInChannel的通道中。
对于writeToFile()方法来说,它以 String类型的形式接受一个文件名,另外一个String包含了要写人文件的文本。关于这个方法的签名,还需要注意 filename 参数上带有@Header。在本例中,@Header 注解表明传递给 filename 的值应该包含在消息头信息中(通过 FileHeaders.FILENAME 声明,它将会被解析成file_name),而不是放到消息载荷(payload)中。
现在,我们已经有了消息网关,接下来就需要配置集成流了。尽管我们往构建文件中添加的 Spring Integration starter 依赖能够启用 Spring Integration 的自动配置功能,但是满足应用需求的流定义则需要我们自行编写额外的配置。在声明集成流方面,我们有3种配置方案可供选择:

  • XML 配置;
  • Java 配置;
  • 使用DSL的Java配置。

我们会依次了解Spring Integration 的这3种配置风格,从较为老式的XML配置开始。

1.1 使用XML定义集成流

尽管在开发中,我们应尽量避免使用XML配置,但是Spring Integration 有使用XML定义集成流的漫长历史。所以,展现一个 XML 定义集成流的样例还是很有价值的。下面的程序展现了如何使用XML配置示例集成流。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:int="http://www.springframework.org/schema/integration"xmlns:int-file="http://www.springframework.org/schema/integration/file"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/integrationhttp://www.springframework.org/schema/integration/spring-integration.xsdhttp://www.springframework.org/schema/integration/filehttp://www.springframework.org/schema/integration/file/spring-integration-file.xsd"><!--    声明textInChannel--><int:channel id="textInChannel" /><!--    转换文本--><int:transformer id="upperCase"input-channel="textInChannel"output-channel="fileWriterChannel"expression="payload.toUpperCase()" /><!--    声明WriterChanncl--><int:channel id="fileWriterChannel"/><!--    将文本写人文件  低版本可能不支持特性  append-new-line--><int-file:outbound-channel-adapter id="writer"channel="fileWriterChannel"directory="/tmp/sia6/files"mode="APPEND"append-new-line="true" auto-create-directory="true"/>
</beans>

讲解

  • 我们首先配置了一个名为 textInChannel 的通道。可以发现,它就是 FileWriterGateway的请求通道。当FileWriterGatewaywriteToFile()方法被调用的时候,结果形成的消息会发布到这个通道上。
  • 我们还配置了一个转换器(transformer ),它会从 textInChannel 接收消息。它使用Spring 表达式语言(Spring Expression Language,SpEL)为消息载荷调用toUpperCase()方法。进行大写操作之后的结果会发布到 fileWriterChannel 上。
  • 随后,我们配置了名为 fileWriterChannel 的通道。这个通道会作为一根导线将转换器与出站通道适配器 (outbound channel adapter) 连接在一起。
    最后,我们使用 int-file 命名空间配置了出站通道适配器。这个 XML 命名空间是由Spring Integration 的文件模块提供的,实现文件写入的功能。按照我们的配置,它从fileWriterChannel接收消息,并将消息的载荷写入一个文件,这个文件的名称是由消息头信息中的 file_name属性指定的,而存入的目录则是由这里的 directory 属性指定的。如果文件已经存在,会以新行的方式进行追加文件内容,而不会覆盖原文件。

图,使用Enterprise Integration Patterns(EIP) 中的图形元素样式阐述了这个流。

在这里插入图片描述

这个流包含了 5 个组件:一个网关、两个通道、一个转换器和一个通道适配器。能够组装到集成流中的组件有很多,这只是其中很少的一部分。我们会在第 2 节讨论这些组件以及 Spring Integration 支持的其他组件。

如果想要在 Spring Boot 应用中使用XML 配置,需要将XML作为源导人Spring应川最简单的实现方式就是在应用的某个Java 配置类上用Spring的@ImportRerource 注解:

@Confiquration
@ImportResource("classpath:/filowrltor-config.xml")
public class FileWriterIntegrationConfig {....
} 

尽管基于XML的配置能够很好地用于 Spring integration,但是大多数的开发人员对千XML 的使用越来越谨慎。(尽量避免使用 XML 配置)。现在,我们抛开尖括号,看一下 Spring Integration 的 Java 配置风格。

1.2 使用Java配置集成流

大多数的现代 Spring 应用程序都会避免使用XML 配置,而更加青睐于 Java 配置。实际上,在 Spring Boot 应用中,Java 配置是自动化配置功能更自然的补充形式。因此如果要为 Spring Boot 应用添加集成流,最好使用Java 来定义流程。
下列程序展示了使用 Java 配置编写集成流的一个样例。这里的代码依然是功能相同的文件写人集成流,但是这次我们使用 Java 来实现。

package sia6;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.annotation.Transformer;
import org.springframework.integration.file.FileWritingMessageHandler;
import org.springframework.integration.file.support.FileExistsMode;
import org.springframework.integration.transformer.GenericTransformer;import java.io.File;@Configuration
public class FileWriterIntegrationConfig {@Bean//    声明转换器@Transformer(inputChannel = "textInChannel", outputChannel = "fileWriterChannel")public GenericTransformer<String, String> upperCaseTransformer() {return text -> text.toUpperCase();}@Bean//    声明文件写人器@ServiceActivator(inputChannel = "fileWriterChannel")public FileWritingMessageHandler fileWriter() {FileWritingMessageHandler handler = new FileWritingMessageHandler(new File("/tmp/ala6/tilea"));handler.setExpectReply(false);handler.setFileExistsMode(FileExistsMode.APPEND);handler.setAppendNewLine(true);return handler;}
}

在Java 配中,我们声明了两个 bean: 一个转换器和一个文件写入消息处理器。这里的转换器是 GenericTransformer。因为 GenericTransformer 是函数式接口,所以我们可以使用 lambda 表达式为其提供实现,这里调用了消息文本的 toUpperCase() 方法。我们为转换器 bean 使用了@Transformer 注解,这样会将其声明成集成流中的一个转换器。他接受来自 textInChannel 通道的消息,然后将消息写人名为 fileWriterChannel 的通道。

而负则文件写人的 bean 则使用了@ServicActivator 注解,表明它会接受来fileWriterChannel 的消息,并且会将消息传递给 FileWritingMessageHandler 实例所定义的服务。FileWritingMessageHandler 是一个消息处理器,可以将消息的载荷写入特定目录下的文件,而文件的名称是通过消息的 file_name 头信息指定的。与XML 样例类似,FileWritingMessageHandler 也配置为以新行的方式为文件追加内容。

FileWritingMessageHandler bean 的一个独特之处在于它调用了 setExpectReply(false)方法,能够通过这个方法告知服务激活器(service activator)不要期望存在答复通道(reply channel,通过这样的通道,我们可以将某个值返回到流中的上游组件 )。如果我们不调用setExpectReply(false),那么文件写入 bean 的默认值是 true,尽管管道的功能和预期一样,但是在日志中会看到一些错误信息,提示我们没有设置答复通道。

你会发现,我们在这里没有必要显式声明通道。如果名为 textInChannelfileWriterChannelbean 不存在,这两个通道将会自动创建。但是,如果想要更加精确地控制通道如何配置,可以按照如下的方式显式构建这些 bean:

    @Beanpublic MessageChannel textInChannel() {return new DirectChannel();}@Beanpublic MessageChannel fileWriterChannel() {return new DirectChannel();}

基于 Java 的配置方案可能更易于阅读、更简洁,也符合倡导的纯 Java配置风格。但是,如果使用 Spring Integration 的 Java DSL配置风格,配置过程可以更加流畅·。

1.3 使用Spring lntegration 的 DSL 配置

我们再次尝试文件写人集成流的定义。这一次,我们依然使用 Java 进行定义,但是会使用Spring IntegrationJava DSL。我们不再将流中的每个组件都声明为单独的bean,而是使用一个 bean 来定义整个流,程序如下所示(为集成流的设计提供一个流畅的API)。

package sia6;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.file.dsl.Files;
import org.springframework.integration.file.support.FileExistsMode;import java.io.File;@Configuration
public class FileWriterIntegrationConfig {@Beanpublic IntegrationFlow fileWriterFlow() {return IntegrationFlows.from(MessageChannels.direct("textInChannel")).<String, String>transform(t -> t.toUpperCase())//声明转换器.handle(Files.//处理文件写人outboundAdapter(new File("/tmp/sia6/files")).fileExistsMode(FileExistsMode.APPEND).appendNewLine(true)).get();}
}

这种新的配置方式在一个bean 方法中定义了整个流做到了尽可能简洁。IntegrationFlows类初始化构建器 API,我们可以通过这个API来定义流。
在上面程序中,我们首先从名为 textInchannel的通道接收消息,然后,消息进人一个转换器,这个转换器会将消息载荷转换成大写形式。在转换器之后,消息会交由出站通道适配器处理,这个适配器是由 Spring Integration file模块的 Files类型创建的。最后,通过对 get()的调用返回要构建的IntegrationFlow。简言之,这个 bean 方法定义了与XMLJava配置样例相同的集成流。
你可能已经发现,与 Java 配置样例类似,我们不需要显式声明通道 bean。我们引用了textInChannel,如果该名字对应的通道不存在,Spring Integration 会自动创建它。不过,我们也可以显式声明 bean
对于连接转换器和出站通道适配器的通道,我们甚至没有通过名字引用它。如果需要显式配置通道,可以在流定义的时候,通过调用 channel()来引用它的名称:

    @Beanpublic IntegrationFlow fileWriterFlow() {return IntegrationFlows.from(MessageChannels.direct("textInChannel")).<String, String>transform(t -> t.toUpperCase())//声明转换器.channel(MessageChannels.direct("FileWriterChannel")).handle(Files//处理文件写人.outboundAdapter(new File("/tmp/sia6/files")).fileExistsMode(FileExistsMode.APPEND).appendNewLine(true)).get();}

使用 Spring Integration 的Java DSL (与其他的 fluent API类似)时,必须要巧妙地使用空格来保持可读性。在这里的样例中,我小心翼翼地使用缩进来保证代码块的可谈性。对于更长、更复杂的流,我们甚至可以考虑将流的一部分抽取到单独的方法或子流中,以实现更好的可读性。
现在,我们已经看到了如何使用3 种不同的方式来定义一个简单的流,接下来,我们回过头来看一下Spring Integration 的全景。

2 Spring integration 功能概览

Spring Integration 涵盖了大量的集成场景。如果想将所有的内容放到一章中,就像把一头大象装进信封一样不现实。在这里,只会向你展示 Spring Integration 这头大象的照片,而不是对 Spring Integration 进行面面俱到的讲解,目的就是让你能够了解它是如何运行的。随后,我们会再创建一个集成流,为 Taco Cloud 应用添加新的功能。
集成流是由一个或多个如下介绍的组件组成的。在继续编写代码之前,我们先看一下这些组件在集成流中所扮演的角色。

  • 通道 (channel): 将消息从一个元素传递到另一个元素。

  • 过滤器 (filter): 基于某些断言,条件化地允许某些消息通过流。

  • 转换器(transformer): 改变消息的值、将消息载荷从一种类型转换成另一种类型。

  • 路由器(router): 将消息路由至一个或多个通道,通常会基于消息的头信息进行路由。

  • 切分器(splitter): 将传入的消息切分成两份或更多份,然后发送至不同的通道。

  • 聚合器(aggregator): 与切分器的操作相反,将来自不同通道的多个消息合并成一个消息。

  • 服务激活器(service activator): 将消息传递给某个 Java 方法处理,并将返回值发布到输出通道上。

  • 通道适配器 (channel adapter): 将通道连接到某些外部系统或传输方式。可以接受输入,也可以写出到外部系统。

  • 网关 (gateway): 通过接口,将数据传递到集成流中。

在定义文件写人集成流时,我们已经看过其中的一些组件了。FileWriterGateway 是个网关,通过它,应用可以提交要写人文件的文本。我们还定义了一个转换器,将给定的文本转换成大写的形式,随后,我们定义了一个出站通道适配器,它执行将文本写人文件的任务。这个流有两个通道:textInChannelfileWriterChannel,它们将应用中的其他组件连接在一起。现在,我们按照承诺快速看一下这些集成流组件。

2.1 消息通道

消息通道是消息穿行集成通道的一种方式(如下图)。它们是连接 Spring Integration其他组成部分的管道。

Spring Integration 提供了多种通道实现。

  • PublishSubscribeChannel: 发送到 PublishSubscribeChannel的消息会传递到一个或多个消费者中。如果有多个消费者,则它们都会接收到消息。
  • QueueChannel: 发送到 QueueChannel 的消息会存储到一个队列中,按照 FIFO的方式被拉取。如果有多个消费者,只有其中的一个消费者会接收到消息。
  • PriorityChannel:与QueueChannel类似,但它不是FIFO的方式,而是会基于消息的 priority 头信息被消费者拉取。
  • RendezvousChannel:与 QueueChannel 类似,但是发送者会一直阻塞通道,直到消费者接收到消息。它实际上会同步发送者和消费者。
  • DirectChannel:与 PublishSubscribeChannel 类似,但是消息只会发送至一个消费者。它会在与发送者相同的线程中调用消费者。这种方式允许跨通道的事务.。
  • ExecutorChannel:与 DirectChannel类似,但消息分发是通过 TaskExecutor 实现的,这样会在与发送者独立的线程中执行。这种通道类型不支持跨通道的事务。
  • FluxMessageChannel: 反应式流的发布者消息通道,基于 Reactor 项目的 Flux

在 Java 和JavaDSL 中,输人通道都自动创建的,默认使用 DirectChannel 但是,如果想要使用不同的通道实现,就需要将通道声例为 bean 并在集应流中引用它。可例如,要声明 PublishSubscribeChannel,需要明如下的@Bean 方法:

    @Beanpublic MessageChannel orderChannel(){return new PublishSubscribeChannel();}

随后,可以在集成流定义中根据通道名称引用它。例如,如果这个通道要被一个服务激活器 bean 所消费,我们可以在@ServiceActivator 注解的 inputChannel 属性中用它:

	@ServicenActivator(inputChannel = "orderChannel")

或者,使用Java DSL配置风格,可以调用 channel() 来引用它:

    @Beanpublic IntegrationFlow orderFlow() {return IntegrationFlows.....channel("orderChannel")....get();}

很重要的一点是,如果使用 QueueChannel,消费者必须配置一个 poller。例如,假设我们声明了一个这样的QueueChannel bean:

    @Beanpublic MessageChannel orderChannel() {return new QueueChannel();}

那么,我们需要确保消费者配置成轮询该通道的消息。如果是消息激活器@ServiceActivator 注解可能会如下所示:

 	@ServiceActivator(inputChannel ="orderChannel",poller= @Poller(fixedRate ="1000"))

在本例中,服务激活器每秒(或者说每 1000 毫秒)都会轮询名为 orderChannel 的通道。

2.2 过滤器

过滤器放置于集成管道的中间,它能够根据断言允许或拒绝消息进入流程的下一步。

在这里插入图片描述

例如,假设消息包含了整型的值,要通过名为 numberChannel 进行发布,但是我们只想让偶数进人名为 evenNumberChannel 的通道。在这种情况下,可以使用 @Filter 注解定义一个过滤器:

    @Filter(inputChannel = "numberChannel",outputChannel ="evenNumberChannel")public boolean evenNumberFilter(Integer number) {return number % 2 == 0;}

作为替代方案,如果使用 Java DSL 配置风格来定义集成流,可以按照如下的方式来调用 filter():

    @Beanpublic IntegrationFlow evenNumberFlow(AtomicInteger integerSource) {return IntegrationFlows....<Integer > filter((p) -> p % 2 == 0)....get();}

在本例中,我们使用 lambda 表达式来实现过滤器。但实际上,filter()方法会接受GenericSelector 作为参数。这意味着,如果我们的过滤器过于复杂,不适合放到一个简单的lambda 表达式中,那么我们可以实现 GenericSelector 接口作为替代方案。

2.3 转换器

转换器会对消息执行一些操作,一般会导致不同的消息形成,还有可能会产生不同的载荷类型(如图所示)。转换过程可以非常简单,比如执行数字的数学运算或者操作String值。转换过程也可以比较复杂,比如根据代表 ISBNString 值查询并返回对应图书的详细信息。

在这里插入图片描述
例如,假设整型值会通过名为 numberChannel 的通道进行发布,我们希望将这些宇转换成它们的罗马数字形式,以 String 类型来表示。在这种情况下,可以声明一个GenericTransforer类型的 bean 并为其添加@Transfommer 注解:

    @Bean@Transformer(inputChannel = "numberChannel", outputChannel = "romanNumberChannel")public GenericTransformer<Integer, String> romanNumTransformer() {return RomanNumbers::toRoman;}

@Transformer注解可以将这个 bean 声明为转换器 bean,它会从名为 numberChannel的通道接收 Integer 值,然后使用静态方法 toRoman()进行转换(toRoman()是静态方法定义在名为 RomanNumbers 的类中,这里使用方法引用来使用它)。转换后的结果会发布到名为romanNumberChannel的通道中。
在Java DSL配置风格中,调用 transform()会更加简单,我们只需将对toRoman()的方法引用传递进来:

    @Beanpublic IntegrationFlow transformerFlow() {return IntegrationFlows....transform(RomanNumbers::toRoman)....get();}

尽管这两个转换器代码中都使用了方法引用,但是转换器也可以使用 lambda 表达式声明。或者,如果转换器足够复杂,需要使用一个单独的类,那么可以将其作为一个bean注人流定义,并将引用传递给 transform()方法:

    @Bean public RomanNumberTransformer romanNumberTransformer() {return new RomanNumberTransformer();}@Beanpublic IntegrationFlow transformerFlow(RomanNumberTransformer romanNumberTransformex) {return IntegrationFlows....transform(romanNumberTransformer)....get();}

在这里,我们声明了RomanNumberTransformer类型的bean,它本身是Spring Integration TransfomerGenericTransfomer 接口的实现。这个bean注人了 tansformerFlow()方法并且在定义集成流的时候传递给了 transform()方法。

2.4 路由器

路由器能够基于其个路由断言,实现集成流的分支,从而将消息发送至不同的通道上,如图所示。

例如,很设我们有一个名为 numberChannel 的通道,它会传输整型值。我们想要将带有偶数的消息定向到名为 evenChannnel 的通道,将带有奇数的消息定向到名为 oddChannel的通道。要在集成流中创建这样一个路由器,我们可以声明一个 AbstractMessageRouter类型的 bean,并为其添加@Router 注解:

    @Bean@Router(inputChannel = "numberChannel")public AbstractMessageRouter evenOddRouter() {return new AbstractMessageRouter() {@Overrideprotected Collection<MessageChannel>determineTargetChannels(Message<?> message) {Integer number = (Integer) message.getPayload();if (number % 2 == 0) {return Collections.singleton(evenChannel());}return Collections.singleton(oddChannel());}};}@Beanpublic MessageChannel evenChannel() {return new DirectChannel();}@Beanpublic MessageChannel oddChannel() {return new DirectChannel();}

这里定义的AbstractMessageRouter 接收名为 numberChannel 的输人通道的消息。它的实现以匿名内部类的形式检查消息的载荷,如果是偶数,返回名为 evenChannel 的通道(在路由器 bean 之后同样以 bean 的方式进行了声明)。否则,通道载荷中的数字必然是奇数在这种情况下,返回名为 oddChannel 的通道 (同样以 bean 方法的形式进行了声明)。

Java DSL 风格中,路由器是通过在流定义中调用 route()方法来声明的,如下所式:

	@Beanpublic IntegrationFlow numberRoutingFlow(AtomicInteger source) {return IntegrationFlows....<Integer, String > route(n -> n % 2 == 0 ? "EVEN" : "ODD", mapping -> mapping.subFlowMapping("EVEN", sf -> sf.<Integer, Integer>transform(n -> n * 10).handle((i, h) -> {...})).subFlowMapping("ODD", sf -> sf.transform(RomanNumbers::toRoman).handle((i, h) -> ( ... )))).get();}

尽管我们依然可以定义 AbstractMessageRouter 并将其传递到 route(),但是在这个样例中使用了 lambda 表达式来确定消息载荷是偶数还是奇数:对于偶数,返回 EVEN:对于奇数,返回 ODD。然后这些值会用来确定该使用哪个子映射处理消息。

2.5 切分器

在集成流中,有时候将一个消息切分为多个消息独立处理可能会非常有用。切分器将会负责切分并处理这些消息,如图所示。

在很多场景中,切分器都非常有用,尤其是以下两种特殊的场景。

  • 消息载荷中包含了相同类型条目的一个列表。我们希望将它们作为单独的消息载荷来进行处理。例如,消息中携带了一个商品列表,可以切分为多个消息,每个消息的载荷分别对应一件商品。
  • 消息载荷所携带的信息尽管有所关联,但是可以拆分为两个或更多个不同类型的消息。例如,一个购买订单可能会包含投递信息、账单、商品项的信息。可以将投递细节交由某个子流来处理,账单交由另一个子流来处理,而商品项再交由其他的子流来处理。在这种情况下,切分器后面通常会紧跟一个路由器根据消息的载荷类型进行路由,确保数据都由正确的子流处理。

在我们将消息我荷切分为两个成更多个不同类型的消息时,通常定义一个POJO 就足够了。它提取传人消息不同的组成部分,并将其以元素集合的形式返回。
例如,假设我们想要将带有购买订单的消息切分为两个消息,其中一个会携带账单信息,另一个携带商品项的信息。如下的 OrderSplitter 就可以完成该任务;

    public class OrderSplltter {public Collection<Object> splitOrderIntoParts(PurchaseOrder po) {ArrayLlst<Object> parts = new ArrayList<>();parts.add(po.getBillIngInfo()) ;parts.add(po.getLineItema());return Parts;}}

接下来,我们声明一个 OrderSplitter bean,并通过@Splitter 注解将其作为集成流的一部分:

    @Bean@Splitter(inputChannel ="poChannel",outputChannel="splitOrderChannel")public OrderSplitter orderSplitter() {return new OrderSplitter();}

在这里,购买订单会到达名为 poChannel 的通道,它们会被 OrderSplitter 切分。然后,所返回集合中的每个条目都会作为集成流中独立的消息发布到名为 splitOrderChannel 的通道中。此时,我们可以在流中声明一个 PayloadTypeRouter,将账单信息和商品项分别路由至它们自己的子流:

    @Bean@Router(inputChannel = "splitOrderChannel")public MessageRouter splitOrderRouter() {PayloadTypeRouter router = new PayloadTypeRouter();router.setChannelMapping(BillingInfo.class.getName(), "billingInfoChannel");router.setChannelMapping(List.class.getName(), "lineItemsChannel");return router;}

顾名思义,PayloadTypeRouter 会根据消息的载荷将它们路由至不同的通道。按照这里的配置,载荷为 BillingInfo 类型的消息将会被路由至名为 billingInfoChannel 的通道供后续进行处理。至于商品项,它们会放到一个java.util.List 集合中,因此,我们将 List类型的载荷映射到名为lineItemsChannel的通道中。
按照目前的状况,流将会被切分成两个子流,一个是 BillingInfo 对象的流,另一个则是 List<LineItem>的流。假设我们想要进一步进行拆分,例如不想处理 LineItems 的列表,而是想要分别处理每个 LineItem,又该怎么办呢? 要将商品列表拆分为多个消息,其中每个消息包含一个条目,只需要编写一个方法(而不是一个bean),这个方法带有@Splitter 注解并且返回 LineItem 的集合,如下所示:

    @Splitter(inputChannel="lineItemsChannel", outputChannel="lineItemChannel")public List<LineItem> lineItemSplitter(List<LineItem> lineItems) {return lineItems;}

当带有 List<LineItem>载荷的消息抵达名为 lineItemsChannel 通道时,消息会进入lineItemSplitter()。按照切分器的规则,这个方法必须要返回切分后条目的集合。在本例中,我们已经有了 LineItem 的集合,所以我们直接返回这个集合就可以了。这样做的结果是,集合中的每个 LineItem 都会发布到一个消息中,这些消息会被发送到名为
lineItemChannel的通道中。如果想要使用 Java DSL 声明相同的 splitter/router 配置,则可以通过调用 split()route()来实现:

    return IntegrationFlows.from(MessageChannels.direct("textInChannel")).split(orderSplitter()).<Object, String>route(p -> {if (p.getClass().isAssignableFrom(BillingInfo.class)) {return "BILLING INFO";} else {return "LINE ITEMS";}}, mapping -> mapping.subFlowMapping("BILLING INFO", sf -> sf.<BillingInfo>handle((billingInfo, h) -> {...})).subFlowMapping("LINE ITEMS", sf -> sf.split().<LineItem>handle((lineItem, h) -> {...}))).get();

DSL 所组成的流定义相当简洁,但是可能有点难以理解。它使用与 Java 配置样例相同的 OrderSplitter 来切分订单。我们可以将 lambda 表达式抽取到方法中,使其更为整洁,例如使用如下所示的 3 个方法来取代流定义中的 lambda 表达式:

private String route(Object p) {return p.getClass().isAssignableFrom(BillingInfo.class)?"BILLING INFO":"LINE ITEMS";
}private BillingInfo handleBillingInfo(BillingInfo billingInfo, MessageHeaders h){// ...
}private LineItem handleLineItems(LineItem lineItem, MessageHeaders h) {// ...
}

然后,使用方法引用重写集成流:

return IntegrationFlows....split().route(this::route,mapping -> mapping.subFlowMapping("BILLING INFO",sf -> sf.<BillingInfo> handle(this::handleBillingInfo)).subFlowMapping("LINE ITEMS",sf -> sf.split().<LineItem> handle(this::handleLineItems)));

不管采用哪种方式,都会像 Java 配置样例那样,使用相同的 OrderSplitter 的切分订单。在订单切分之后,根据类型路由至两个独立的子流。

2.6 服务激活器

服务激活器接收来自输人通道的消息并将这些消息发送至一个 MessageHandler 的实现,如图所示。


Spring Integration 提供了多个 “开箱即用” 的MessageHandler (PayloadTypeRouter甚至就是 MessageHandler 的一个实现),但是我们通常会需要为其提供一些自定义的实现作为服务激活器。作为样例,如下的代码展现了如何声明 MessageHandler bean 并将其配置为服务激活器:

    @Bean@ServiceActivator(inputChannel ="someChannel")public MessageHandler sysoutHandler() {return message -> {System.out.println("Message payload: " + message.getPayload());};}

这个 bean 使用了@ServiceActivator 注解,表明它会作为一个服务活器处来自someChannel 通道的消息。对于 MessageHandler 本身,它是通过一个 lambda 表达式现的。这是一个简单的 MessageHandler,当得到消息之后,它会将消息的载荷打印至标准输出流。
我们还可以声明一个服务激活器,让它在返回新载荷之前处理输入消息中的数据在这种情况下,bean 应该是 GenericHandler,而不是 MessageHandler:

    @Bean@ServiceActivator(inputChannel = "orderChannel",outputChannel ="completeChannel")public GenericHandler<EmailOrder> orderHandler(OrderRepository orderRepo) {return (payload, headers) -> {return orderRepo.save(payload);};}

在本例中,服务激活器是一个GenericHandler,它会接收载荷类型为 EmailOrder 的消息。订单抵达时,我们会通过一个存储库将它保存起来,并返回保存之后的EmailOrder,这个 EmailOrder 随后被发送至名为 completeChannel的输出通道。
你可能已经注意到了,GenericHandler 不仅能够得到载荷,还能得到消息头(虽然我们这个样例根本没有用到这些头信息 )。我们还可以在Java DSL配置风格中使用服激活器,只需将 MessageHandlerGenericHandler 传递到流定义的 handle()方法中:

    public IntegrationFlow someFlow() {return IntegrationFlows....handle(msg -> {System.out.println("Message payload: " + msg.getPayload());}).get();}

在本例中,MessageHandler 会得到一个 lambda 表达式,但是我们也可以为其提供个方法引用,甚至实现 MessageHandler 接口的类实例。如果想要为其提供 lambda 表达式或方法引用,需要记住它们均接受消息作为其参数。
类似地,如果不想将服务激活器作为流的终点,handle()还可以接受 GenericHandler如果要将前面提到的订单保存服务激活器添加进来,可以按照如下的形式使用 Java DSL配置流:

    public IntegrationFlow orderFlow(OrderRepository orderRepo) {return IntegrationFlows....<EmailOrder > handle((payload, headers) -> {return orderRepo.save(payload);})....get();}

使用 GenericHandler 时,lambda 表达式或方法引用会接受消息载荷和头信息作为参数。如果选择使用 GenericHandler 作为流的终点,就需要其返回 null,否则就会出现错误,提示没有指定输出通道。

2.7 网关

通过网关,应用可以提交数据到集成流中,并且能够可选地接收流的结果作为响应,网关会声明为接口,借助 Spring Integration 的实现,应用可以调用它来向集成流发送消息(如图所示)。

我们已经看过消息网关的样例,也就是 FileWriterGatewayFileWriterGateway 是一个单向的网关,有一个接受 String 类型的方法,该方法会将文本写入到文件中,并返回void。编写双向的网关同样简单。在编写网关接口时,需要确保方法要返回某个值以便推送到集成流中。
作为样例,假设网关面对的是一个简单的集成流,这个流会接受一个 String 并将给定的 String 转换成全大写的形式。这个网关接口大致如下所示:

package sia6;import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.stereotype.Component;@Component
@MessagingGateway(defaultRequestChannel = "inChannel",defaultReplyChannel= "outChannel")
public interface UpperCaseGateway {String uppercase(String in);
}

让人开心的是,这个接口不需要实现。Spring lntegration 会在运行时自动提供一个通过特定通道发送和接收消息的实现。

uppercase()被调用时,给定的 String 会发布到集成流中,进人名为inChannel通道。不管流是如何定义的、千了些什么,当数据进入名为 outChannel 通道时,都会从uppercase()方法返回。
我们这个用以转换大写格式的集成流是一个非常简单的流,只需要将一个 String 转换成大写格式的步骤。它可以通过Java DSL配置声明如下:

    @Beanpublic IntegrationFlow uppercaseFlow() {return IntegrationFlows.from("inChannel").<String, String>transform(s -> s.toUpperCase()).channel("outChannel").get();}

按照这里的定义,这个流随着进入 inChannel 通道的数据开始。消息荷会由转换器处理,执行大写操作(在这里是通过lambda 表达式定义的)。形成的结果消息会到名为outChannel的通道,也就是我们在UpperCaseGateway中声明的答复通道。

2.8 通道适配器

通道适配器代表了集成流的入口和出口。数据通过入站通道适配器(inbound channel adapter)进人一个集成流,通过出站通道适配器离开一个集成流。如图所示。

在这里插入图片描述

根据要引人集成流的数据源,入站通道适配器可以有很多形式。例如,我们可以声明一个入站通道适配器,将来自 AtomicInteger 的、不断递增的数引入流。使用Java配置,则如下所示:

    @Bean@InboundChannelAdapter(poller = @Poller(fixedRate = "1000"), channel = "numberChannel")public MessageSource<Integer> numberSource(AtomicInteger source) {return () -> {return new GenericMessage<>(source.getAndIncrement());};}

这个@Bean方法通过@InboundChannelAdapter 注解声明了一个入站通道适配器,它根据注人的 AtomicInteger 每隔一秒(也就是 1000 毫秒提交一个数字给名为 numberChannel
的通道。
使用 Java 配置时,我们可以通过@InboundChannelAdapter 注解声明入站通道适配器,而使用Java DSL 定义集成流时,我们需要使用 form()方法完成同样的事情。如下的流定义展现了类似的入站通道适配器,它是使用 Java DSL 定义的:

    @Beanpublic IntegrationFlow someFlow (AtomicInteger integerSource) {return IntegrationFlows.from(integerSource,"getAndIncrement",c -> c.poller(Pollers.fixedRate(1000)))....get();}

通常,通道适配器是由 Spring Integration 的众多端点模块提供的。假设我们需要一个人站通道适配器监控一个特定的目录,并将写入该目录的文件以消息的形式提交到 file-channel通道中。如下的 Java 配置使用来自 Spring Integration file 端点模块的 FileReadingMessageSource 实现该功能:

    @Bean@InboundChannelAdapter(channel="file-channel",poller=@Poller(fixedDelay="1000"))public MessageSource<File> fileReadingMessageSource(){FileReadingMessageSource sourceReader = new FileReadingMessageSource();sourceReader.setDirectory(new File(INPUT_DIR));sourceReader.setFilter(new SimplePatternFileListFilter(FILE_PATTERN));return sourceReader;}

如果要使用 Java DSL 编写同等功能的人站通道适配器,可以使用 Files 类的inboundAdapter()方法。出站通道适配器是集成流的终点,会将最终的消息传递给应用或其他外部系统:

    @Beanpublic IntegrationFlow fileReaderFlow() {return IntegrationFlows.from(Files.inboundAdapter(new File(INPUT_DIR)).patternFilter(FILE_PATTERN)).get();}

我们通常会将消息激活器实现为消息处理器,让它作为出站通道适配器,对数据需要传递给应用本身的情况更是如此。我们已经讨论过消息激活器,这里就没有必要重复讨论了。
但是,要注意,Spring Integration 端点模块为多个通用场景提供了消息处理器。在1.2小节使用Java配置来定义集成流的程序中,我们已经见过这种出站通道适配器的样例 FileWritingMessageHandler。提到Spring Integration 端点模块,不妨看一下都有哪些直接可用的集成端点模块。

2.9 端点模块

Spring Integration 允许我们创建自己的通道适配器,这一点非常好,但更棒的是Spring Integration 提供了 20 余个包含通道适配器(同时包括入站和出站的适配器)的点模块,用于和各种常见的外部系统实现集成,如下表。

模块依赖的 artifact ID ( Group ID: org.springframework.integration )
AMQPspring-integration-amqp
应用事件spring-integration-event
Atom和RSSspring-integration-feed
电子邮件spring-integration-mail
文件系统spring-integration-file
FTP/FTPSspring-integration-ftp
GemFirespring-integration-gemfire
HTTPspring-integration-http
JDBCspring-integration-jdbc
JMSspring-integration-jms
JMXspring-integration-jmx
JPAspring-integration-jpa
Kafkaspring-integration-kafka
MongoDBspring-integration-mongodb
MQTTspring-integration-mqtt
R2DBCspring-integration-r2dbc
Redisspring-integration-redis
RMIspring-integration-rmi
RSocketspring-integration-rsocket
SFTPspring-integration-sftp
STOMPspring-integration-stomp
Streamspring-integration-stream
Syslogspring-integration-syslog
TCP/UDPspning-integration-ip
WebFluxspring-integration-webflux
Web Servicesspring-integration-ws
WebSocketspring-integration-websocket
XMPPspring-integration-xmpp
ZeroMQspring-integration-zeromq
ZooKeeperspring-integration-zookeeper

从表中可以清楚地看到,Spring Integration 提供了用途广泛的一组组件,能够满足非常多的集成需求。虽然大多数应用程序使用的功能只是 Spring Integration 所提供功能的九牛一毛,但我们最好知道 Spring Integration能够提供哪些功能。
另外,我们不可能在一篇文章中介绍表中的所有的通道适配器。我们已经看到了如何使用文件系统模块写人文件的样例。其他的如果需要请自行查阅相关文档。
对于每个端点模块的通道适配器,我们可以在 Java 配置中将其声明为 bean,也可以在 Java DSL 配置中以静态方法的方式引用它们。我建议你探索一下自己最感兴趣的其他端点模块。你会发现它们在使用方式上是非常一致的。

最后本篇文章到此结束。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/623039.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JDK8-JDK17版本升级

局部变量类型推断 switch表达式 文本块 Records 记录Records是添加到 Java 14 的一项新功能。它允许你创建用于存储数据的类。它类似于 POJO 类&#xff0c;但代码少得多&#xff1b;大多数开发人员使用 Lombok 生成 POJO 类&#xff0c;但是有了记录&#xff0c;你就不需要使…

逸学Docker【java工程师基础】3.1安装Jenkins

1.下载镜像 docker pull jenkins/jenkins:lts 2.运行容器 docker run -d -u root -p 8080:8080 -p 50000:50000 -v /var/jenkins_home:/var/jenkins_home -v /etc/localtime:/etc/localtime --name jenkins jenkins/jenkins:lts 3.要启动名为 jenkins 的 Docker 容器 docker st…

HarmonyOS-LocalStorage:页面级UI状态存储

管理应用拥有的状态概述 上一个章节中介绍的装饰器仅能在页面内&#xff0c;即一个组件树上共享状态变量。如果开发者要实现应用级的&#xff0c;或者多个页面的状态数据共享&#xff0c;就需要用到应用级别的状态管理的概念。ArkTS根据不同特性&#xff0c;提供了多种应用状态…

OpenGauss源码分析-SQL引擎

所讨论文件大多位于src\common\backend\parser文件夹下 总流程 start_xact_command()&#xff1a;开始一个事务。pg_parse_query()&#xff1a;对查询语句进行词法和语法分析&#xff0c;生成一个或者多个初始的语法分析树。进入foreach (parsetree_item, parsetree_list)循环…

LeetCode 每日一题 Day 37-43

终于考完试了&#xff0c;寒假期间将会每天持续更新&#xff01; 447. 回旋镖的数量(Day 37) 给定平面上 n 对 互不相同 的点 points &#xff0c;其中 points[i] [xi, yi] 。回旋镖 是由点 (i, j, k) 表示的元组 &#xff0c;其中 i 和 j 之间的欧式距离和 i 和 k 之间的欧…

通过开源端点可见性改善网络安全响应

在当今复杂的数字环境中&#xff0c;企业内的许多不同端点&#xff08;从数据中心的服务器到咖啡店的笔记本电脑&#xff09;创建了巨大且多样化的攻击面。每个设备都存在网络安全威胁的机会&#xff0c;每个设备都有其独特的特征和复杂性。攻击者使用的多种攻击媒介不仅是一个…

正则表达式中的“回引用(回溯)”——别名引用与序号引用的差异及正则表达式中的“P”关键字

读到一段巧妙的正则表达式&#xff0c;勾起我对正则表达式欠缺知识点的探寻&#xff1a; P y t h o n Python Python正则表达式中的“回引用(回溯)”——分组别名引用与序号引用的差异及正则表达式中的“P”关键字详情。 (笔记模板由python脚本于2024年01月14日 07:49:35创建&a…

pytorch集智4-情绪分类器

1 目标 从中文文本中识别出句子里的情绪。和上一章节单车预测回归问题相比&#xff0c;这个问题是分类问题&#xff0c;不是回归问题 2 神经网络分类器 2.1 如何用神经网络分类 第二章节用torch.nn.Sequantial做的回归预测器&#xff0c;输出神经元只有一个。分类器和其区别…

QT——connect的第五个参数 Qt::ConnectionType (及qt和c++的多线程的区别)

一直对QT的多线程和c的多线程的区别有疑惑&#xff0c;直到看到文档中这一部分内容才豁然开朗 一.ConnectionType参数的类型和区别 首先是官方文档中对于该枚举值的区别介绍&#xff1a; 对于队列&#xff08;queued &#xff09;连接&#xff0c;参数必须是 Qt 元对象系统已知…

强化学习应用(四):基于Q-learning的物流配送路径规划研究(提供Python代码)

一、Q-learning算法简介 Q-learning是一种强化学习算法&#xff0c;用于解决基于马尔可夫决策过程&#xff08;MDP&#xff09;的问题。它通过学习一个值函数来指导智能体在环境中做出决策&#xff0c;以最大化累积奖励。 Q-learning算法的核心思想是使用一个Q值函数来估计每…

助力工业园区作业违规行为检测预警,基于YOLOv7【tiny/l/x】不同系列参数模型开发构建工业园区场景下作业人员违规行为检测识别系统

在很多工业园区生产作业场景下保障合规合法进行作业生产操作&#xff0c;对于保护工人生命安全降低安全隐患有着非常重要的作用&#xff0c;但是往往在实际的作业生产中&#xff0c;因为一个安全观念的淡薄或者是粗心大意&#xff0c;对于纪律约束等意思薄弱&#xff0c;导致在…

maven镜像源设置aliyun提升下载速度

一、打开pom.xml project下在添加 <repositories><repository><id>aliyunmaven</id><name>aliyun</name><url>https://maven.aliyun.com/repository/public</url></repository><repository><id>central2&l…

分布形态的度量_峰度系数的探讨

集中趋势和离散程度是数据分布的两个重要特征,但要全面了解数据分布的特点&#xff0c;还应掌握数据分布的形态。 描述数据分布形态的度量有偏度系数和峰度系数, 其中偏度系数描述数据的对称性,峰度系数描述与正态分布的偏离程度。 峰度系数反映分布峰的尖峭程度的重要指标. 当…

【ESP32接入语言大模型之智谱清言】

1. 智谱清言 讲解视频&#xff1a; 随着人工智能技术的不断发展&#xff0c;自然语言处理领域也得到了广泛的关注和应用。智谱清言作为千亿参数对话模型 基于ChatGLM2模型开发&#xff0c;支持多轮对话&#xff0c;具备内容创作、信息归纳总结等能力。可以快速注册体验中国版…

远程开发之vscode端口转发

远程开发之vscode端口转发 涉及的软件forwarded port 通过端口转发&#xff0c;实现在本地电脑上访问远程服务器上的内网的服务。 涉及的软件 vscode、ssh forwarded port 在ports界面中的port字段&#xff0c;填需要转发的IP:PORT&#xff0c;即可转发远程服务器中的内网端…

增强FAQ搜索引擎:发挥Elasticsearch中KNN的威力

英文原文地址&#xff1a;https://medium.com/nerd-for-tech/enhancing-faq-search-engines-harnessing-the-power-of-knn-in-elasticsearch-76076f670580 增强FAQ搜索引擎&#xff1a;发挥Elasticsearch中KNN的威力 2023 年 10 月 21 日 在一个快速准确的信息检索至关重要的…

基于MOD02/MYD02获得亮度温度再转冰温

用HEG处理MOD02/MYD02,提取里面的EV_1KM_Emissive波段,band为11和12(其实就是band 31和32)。注意这里的band和output dile type 1. 获得之后,转辐射亮度。 参考:https://www.cnblogs.com/enviidl/p/16539422.html radiance_scales,和radiance_offset这两项参数代表波段…

【生存技能】git操作

先下载git https://git-scm.com/downloads 我这里是win64&#xff0c;下载了相应的直接安装版本 64-bit Git for Windows Setup 打开git bash 设置用户名和邮箱 查看设置的配置信息 获取本地仓库 在git bash或powershell执行git init&#xff0c;初始化当前目录成为git仓库…

LeetCode讲解篇之216. 组合总和 III

文章目录 题目描述题解思路题解代码 题目描述 题解思路 使用递归回溯算法&#xff0c;当选择数字num后&#xff0c;在去选择大于num的合法数字&#xff0c;计算过程中的数字和&#xff0c;直到选择了k次&#xff0c;如果数组和等于n则加入结果集 从1开始选择数字&#xff0c;直…

ubuntu 2022.04 安装vcs2018和verdi2018

主要参考网站朋友们的作业。 安装时参考&#xff1a; ubuntu18.04安装vcs、verdi2018_ubuntu安装vcs-CSDN博客https://blog.csdn.net/qq_24287711/article/details/130017583 编译时参考&#xff1a; 【ASIC】VCS报Error-[VCS_COM_UNE] Cannot find VCS compiler解决方法_e…