1.概述
众所周知, Spring Integration具有用于与外部系统交互的大量连接器。 Twitter也不例外,而且很长一段时间以来,因为Spring Social一直是一个开箱即用的解决方案,Spring Integration利用该解决方案来连接到社交网络。
1.1Spring社交EOL
不幸的是, Spring Social已经到了使用寿命 ,该项目现在处于维护模式。 Spring团队决定不进一步开发Spring Social的原因是,使API绑定与社交网络的API保持同步变得很繁琐。
除此之外,Spring Framework 5发布后,开发人员希望利用其响应式编程模型,这将要求团队在现有的响应式社交绑定旁边重新实现一个响应式Spring Social绑定。
现在建议开发人员实现自己的绑定或使用专用库之一连接社交网络。
1.2 Spring Integration的Twitter模块已移至扩展
Spring Social现在处于维护模式,这迫使Spring Integration团队将Twitter支持模块从主项目移至扩展。 由于Spring Social不会接收更新,因此它将基于早期的Spring Framework版本构建。 这将导致类路径冲突,也将阻碍Spring Integration的开发。
因此, 从Spring Integration 5.1开始,Twitter模块可作为扩展使用 。
1.3有哪些替代方案?
Twitter4J是Yamas Yusuke开发和维护的Twitter API的非官方Java库。 官方的HBC库(由Twitter构建)是一个Java HTTP Client,用于使用Twitter的Streaming API。 自2016年以来,后者从未见过重大更新,而Twitter4J正在定期更新。
也可以选择实现自己的API绑定。 在使用RestTemplate的基于Spring的项目中,绝对是一个选择,并且这是进行REST调用的简便方法。
本指南以流模式使用Twitter4J,可以将其集成到Spring Integration消息流中。
1.4 Twitter流如何工作?
简而言之, 您的应用打开了一个与Twitter API的单一连接,只要发生新匹配,就会通过该连接发送新结果 。 相反,另一种方法是通过向REST API重复发送请求来批量传送数据。
流提供了一种低延迟的传递机制 ,该机制可以支持非常高的吞吐量,而不必处理速率限制。
2.示例项目
该示例项目展示了Twitter的Streaming API到Spring Integration消息流的集成,可在GitHub上找到 : https : //github.com/springuni/springuni-examples/tree/master/spring-integration/twitter-streaming 。
Maven依赖
由于Spring Social现在是EOL,因此我们不会在此基础上继续发展。 我们引入的只是spring-integration-core和twitter4j-stream 。
<dependencies><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-core</artifactId></dependency><dependency><groupId>org.twitter4j</groupId><artifactId>twitter4j-stream</artifactId><version>4.0.1</version></dependency></dependencies>
该项目还使用了Lombok和Spring Boot测试支持,但是这些是可选的。
Spring Integration的可听消息源
Spring Integration提供了对实现入站消息组件的支持。 它们分为轮询和监听行为 。
最初依赖于Inbound Twitter Channel Adapter建立在Spring Social之上,现在已移至扩展, 它是轮询用户 。 也就是说,您必须提供一个轮询器配置才能使用它。 另一方面,Twitter实施速率限制,以管理应用程序获取更新的频率。 使用旧的Twitter Channel适配器时,您应该考虑速率限制,以便您配置的轮询间隔符合Twitter策略。
另一方面, 侦听入站组件更简单,通常只需要实现MessageProducerSupport 。 这样的侦听组件看起来像这样。
public class MyMessageProducer extends MessageProducerSupport {public MyMessageProducer(MessageChannel outputChannel) {// Defining an output channel is requiredsetOutputChannel(outputChannel);}@Overrideprotected void onInit() {super.onInit();// Custom initialization - if applicable - comes here}@Overridepublic void doStart() {// Lifecycle method for starting receiving messages}@Overridepublic void doStop() {// Lifecycle method for stopping receiving messages}private void receiveMessage() {// Receive data from upstream serviceSomeData data = ...;// Convert it to a message as appropriate and send it outthis.sendMessage(MessageBuilder.withPayload(data).build());}}
只有两个必需的元素:
- 必须定义输出消息通道
- 每当组件收到消息时,都必须调用
sendMessage
(可选)您可能希望控制组件的初始化并管理其生命周期。
由于Twitter的Streaming API本质上是消息驱动的,因此监听行为自然很合适。 让我们看看如何在这样的上下文中合并Twitter4J。
使用Twitter4J连接到Twitter Streaming API
Twitter4J管理连接处理的细微差别,并从Twitter的Streaming API接收更新。 我们需要做的就是获取一个TwitterStream
实例,附加一个侦听器并定义过滤。
实例化
Twitter4J网站上的流示例表明,应通过TwitterStreamFactory
创建一个TwitterStream
实例。 这完全有道理,但是在Spring应用程序上下文中,我们希望它成为托管bean。
Spring的FactoryBean
工具是包含创建单例TwitterStream
实例的详细信息的简单FactoryBean
方法。
public class TwitterStreamFactory extends AbstractFactoryBean<TwitterStream> {@Overridepublic Class<?> getObjectType() {return TwitterStream.class;}@Overrideprotected TwitterStream createInstance() {return new twitter4j.TwitterStreamFactory().getInstance();}@Overrideprotected void destroyInstance(TwitterStream twitterStream) {twitterStream.shutdown();}}
尽管我们也可以将其公开为普通的bean,而不用由FactoryBean
创建FactoryBean
,但这不会适当地将其关闭。
附加侦听器并定义过滤
这将是我们自定义MessageProducer
实现的责任。
@Slf4j
public class TwitterMessageProducer extends MessageProducerSupport {private final TwitterStream twitterStream;private List<Long> follows;private List<String> terms;private StatusListener statusListener;private FilterQuery filterQuery;public TwitterMessageProducer(TwitterStream twitterStream, MessageChannel outputChannel) {this.twitterStream = twitterStream;setOutputChannel(outputChannel);}@Overrideprotected void onInit() {super.onInit();statusListener = new StatusListener();long[] followsArray = null;if (!CollectionUtils.isEmpty(follows)) {followsArray = new long[follows.size()];for (int i = 0; i < follows.size(); i++) {followsArray[i] = follows.get(i);}}String[] termsArray = null;if (!CollectionUtils.isEmpty(terms)) {termsArray = terms.toArray(new String[0]);}filterQuery = new FilterQuery(0, followsArray, termsArray);}@Overridepublic void doStart() {twitterStream.addListener(statusListener);twitterStream.filter(filterQuery);}@Overridepublic void doStop() {twitterStream.cleanUp();twitterStream.clearListeners();}public void setFollows(List<Long> follows) {this.follows = follows;}public void setTerms(List<String> terms) {this.terms = terms;}StatusListener getStatusListener() {return statusListener;}FilterQuery getFilterQuery() {return filterQuery;}class StatusListener extends StatusAdapter {@Overridepublic void onStatus(Status status) {sendMessage(MessageBuilder.withPayload(status).build());}@Overridepublic void onException(Exception ex) {log.error(ex.getMessage(), ex);}@Overridepublic void onStallWarning(StallWarning warning) {log.warn(warning.toString());}}
}
MessageProducerSupport
和TwitterStream
的管理界面提供的生命周期方法可以很好地配合使用。 这也将使我们能够在需要时在运行时停止和启动组件。
Java配置
尽管Spring可以自动装配组件,但我还是更喜欢通过手动配置来控制依赖关系。
@Slf4j
@Configuration
public class TwitterConfig {@BeanTwitterStreamFactory twitterStreamFactory() {return new TwitterStreamFactory();}@BeanTwitterStream twitterStream(TwitterStreamFactory twitterStreamFactory) {return twitterStreamFactory.getInstance();}@BeanMessageChannel outputChannel() {return MessageChannels.direct().get();}@BeanTwitterMessageProducer twitterMessageProducer(TwitterStream twitterStream, MessageChannel outputChannel) {TwitterMessageProducer twitterMessageProducer =new TwitterMessageProducer(twitterStream, outputChannel);twitterMessageProducer.setTerms(Arrays.asList("java", "microservices", "spring"));return twitterMessageProducer;}@BeanIntegrationFlow twitterFlow(MessageChannel outputChannel) {return IntegrationFlows.from(outputChannel).transform(Status::getText).handle(m -> log.info(m.getPayload().toString())).get();}}
这里的重要部分是我们的自定义消息生成器如何与消息流集成。 基本上,除了在生产者的输出通道中列出消息之外,我们不需要执行任何其他操作。
测试中
只有Chuck Norris在生产中测试代码。 但是,像您和我这样的普通凡人,我们确实会编写测试用例。
@RunWith(SpringRunner.class)
@ContextConfiguration(classes = TestConfig.class)
public class TwitterMessageProducerTest {@MockBeanprivate TwitterStream twitterStream;@Autowiredprivate PollableChannel outputChannel;@Autowiredprivate TwitterMessageProducer twitterMessageProducer;@Testpublic void shouldBeInitialized() {StatusListener statusListener = twitterMessageProducer.getStatusListener();verify(twitterStream).addListener(statusListener);FilterQuery filterQuery = twitterMessageProducer.getFilterQuery();verify(twitterStream).filter(filterQuery);}@Testpublic void shouldReceiveStatus() {StatusListener statusListener = twitterMessageProducer.getStatusListener();Status status = mock(Status.class);statusListener.onStatus(status);Message<?> statusMessage = outputChannel.receive();assertSame(status, statusMessage.getPayload());}@Import(TwitterConfig.class)static class TestConfig {@BeanMessageChannel outputChannel() {return MessageChannels.queue(1).get();}}}
我喜欢Twitter4J的设计,因为它利用了界面。 该库的大多数重要部分都作为普通接口公开。 TwitterStream
也不例外。 也就是说,在测试用例中可以轻松地将其嘲笑。
六,结论
- Spring Social现在已经停产了 -它不会收到新功能
- Spring Integration的Twitter模块可作为扩展使用 -已从主项目中移出。
- Twitter入站通道适配器是一个轮询用户 –选择轮询间隔时必须处理速率限制
- Twitter的Streaming API符合入站通道适配器的监听行为
翻译自: https://www.javacodegeeks.com/2018/12/streaming-api-spring-integration.html