通过Spring Integration消费Twitter Streaming API

1.概述

众所周知, Spring Integration具有用于与外部系统交互的大量连接器。 Twitter也不例外,而且很长一段时间以来,因为Spring Social一直是一个开箱即用的解决方案,Spring Integration利用该解决方案来连接到社交网络。

1.1Spring社交EOL

不幸的是, Spring Social已经到了使用寿命 ,该项目现在处于维护模式。 Spring团队决定不进一步开发Spring Social的原因是,使API绑定与社交网络的API保持同步变得很繁琐。

除此之外,Spring Framework 5发布后,开发人员希望利用其响应式编程模型,这将要求团队在现有的响应式社交绑定旁边重新实现一个响应式Spring Social绑定。

现在建议开发人员实现自己的绑定或使用专用库之一连接社交网络。

1.2 Spring Integration的Twitter模块已移至扩展

Spring Social现在处于维护模式,这迫使Spring Integration团队将Twitter支持模块从主项目移至扩展。 由于Spring Social不会接收更新,因此它将基于早期的Spring Framework版本构建。 这将导致类路径冲突,也将阻碍Spring Integration的开发。

因此, 从Spring Integration 5.1开始,Twitter模块可作为扩展使用 。

1.3有哪些替代方案?

Twitter4J是Yamas Yusuke开发和维护的Twitter API的非官方Java库。 官方的HBC库(由Twitter构建)是一个Java HTTP Client,用于使用Twitter的Streaming API。 自2016年以来,后者从未见过重大更新,而Twitter4J正在定期更新。

也可以选择实现自己的API绑定。 在使用RestTemplate的基于Spring的项目中,绝对是一个选择,并且这是进行REST调用的简便方法。

本指南以流模式使用Twitter4J,可以将其集成到Spring Integration消息流中。

1.4 Twitter流如何工作?

简而言之, 您的应用打开了一个与Twitter API的单一连接,只要发生新匹配,就会通过该连接发送新结果 。 相反,另一种方法是通过向REST API重复发送请求来批量传送数据。

流提供了一种低延迟的传递机制 ,该机制可以支持非常高的吞吐量,而不必处理速率限制。

2.示例项目

该示例项目展示了Twitter的Streaming API到Spring Integration消息流的集成,可在GitHub找到 : https : //github.com/springuni/springuni-examples/tree/master/spring-integration/twitter-streaming 。

Maven依赖

由于Spring Social现在是EOL,因此我们不会在此基础上继续发展。 我们引入的只是spring-integration-core和twitter4j-stream 。

<dependencies><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-core</artifactId></dependency><dependency><groupId>org.twitter4j</groupId><artifactId>twitter4j-stream</artifactId><version>4.0.1</version></dependency></dependencies>

该项目还使用了Lombok和Spring Boot测试支持,但是这些是可选的。

Spring Integration的可听消息源

Spring Integration提供了对实现入站消息组件的支持。 它们分为轮询和监听行为

最初依赖于Inbound Twitter Channel Adapter建立在Spring Social之上,现在已移至扩展, 它是轮询用户 。 也就是说,您必须提供一个轮询器配置才能使用它。 另一方面,Twitter实施速率限制,以管理应用程序获取更新的频率。 使用旧的Twitter Channel适配器时,您应该考虑速率限制,以便您配置的轮询间隔符合Twitter策略。

另一方面, 侦听入站组件更简单,通常只需要实现MessageProducerSupport 。 这样的侦听组件看起来像这样。

public class MyMessageProducer extends MessageProducerSupport {public MyMessageProducer(MessageChannel outputChannel) {// Defining an output channel is requiredsetOutputChannel(outputChannel);}@Overrideprotected void onInit() {super.onInit();// Custom initialization - if applicable - comes here}@Overridepublic void doStart() {// Lifecycle method for starting receiving messages}@Overridepublic void doStop() {// Lifecycle method for stopping receiving messages}private void receiveMessage() {// Receive data from upstream serviceSomeData data = ...;// Convert it to a message as appropriate and send it outthis.sendMessage(MessageBuilder.withPayload(data).build());}}

只有两个必需的元素:

  • 必须定义输出消息通道
  • 每当组件收到消息时,都必须调用sendMessage

(可选)您可能希望控制组件的初始化并管理其生命周期。

由于Twitter的Streaming API本质上是消息驱动的,因此监听行为自然很合适。 让我们看看如何在这样的上下文中合并Twitter4J。

使用Twitter4J连接到Twitter Streaming API

Twitter4J管理连接处理的细微差别,并从Twitter的Streaming API接收更新。 我们需要做的就是获取一个TwitterStream实例,附加一个侦听器并定义过滤。

实例化

Twitter4J网站上的流示例表明,应通过TwitterStreamFactory创建一个TwitterStream实例。 这完全有道理,但是在Spring应用程序上下文中,我们希望它成为托管bean。

Spring的FactoryBean工具是包含创建单例TwitterStream实例的详细信息的简单FactoryBean方法。

public class TwitterStreamFactory extends AbstractFactoryBean<TwitterStream> {@Overridepublic Class<?> getObjectType() {return TwitterStream.class;}@Overrideprotected TwitterStream createInstance() {return new twitter4j.TwitterStreamFactory().getInstance();}@Overrideprotected void destroyInstance(TwitterStream twitterStream) {twitterStream.shutdown();}}

尽管我们也可以将其公开为普通的bean,而不用由FactoryBean创建FactoryBean ,但这不会适当地将其关闭。

附加侦听器并定义过滤

这将是我们自定义MessageProducer实现的责任。

@Slf4j
public class TwitterMessageProducer extends MessageProducerSupport {private final TwitterStream twitterStream;private List<Long> follows;private List<String> terms;private StatusListener statusListener;private FilterQuery filterQuery;public TwitterMessageProducer(TwitterStream twitterStream, MessageChannel outputChannel) {this.twitterStream = twitterStream;setOutputChannel(outputChannel);}@Overrideprotected void onInit() {super.onInit();statusListener = new StatusListener();long[] followsArray = null;if (!CollectionUtils.isEmpty(follows)) {followsArray = new long[follows.size()];for (int i = 0; i < follows.size(); i++) {followsArray[i] = follows.get(i);}}String[] termsArray = null;if (!CollectionUtils.isEmpty(terms)) {termsArray = terms.toArray(new String[0]);}filterQuery = new FilterQuery(0, followsArray, termsArray);}@Overridepublic void doStart() {twitterStream.addListener(statusListener);twitterStream.filter(filterQuery);}@Overridepublic void doStop() {twitterStream.cleanUp();twitterStream.clearListeners();}public void setFollows(List<Long> follows) {this.follows = follows;}public void setTerms(List<String> terms) {this.terms = terms;}StatusListener getStatusListener() {return statusListener;}FilterQuery getFilterQuery() {return filterQuery;}class StatusListener extends StatusAdapter {@Overridepublic void onStatus(Status status) {sendMessage(MessageBuilder.withPayload(status).build());}@Overridepublic void onException(Exception ex) {log.error(ex.getMessage(), ex);}@Overridepublic void onStallWarning(StallWarning warning) {log.warn(warning.toString());}}
}

MessageProducerSupportTwitterStream的管理界面提供的生命周期方法可以很好地配合使用。 这也将使我们能够在需要时在运行时停止和启动组件。

Java配置

尽管Spring可以自动装配组件,但我还是更喜欢通过手动配置来控制依赖关系。

@Slf4j
@Configuration
public class TwitterConfig {@BeanTwitterStreamFactory twitterStreamFactory() {return new TwitterStreamFactory();}@BeanTwitterStream twitterStream(TwitterStreamFactory twitterStreamFactory) {return twitterStreamFactory.getInstance();}@BeanMessageChannel outputChannel() {return MessageChannels.direct().get();}@BeanTwitterMessageProducer twitterMessageProducer(TwitterStream twitterStream, MessageChannel outputChannel) {TwitterMessageProducer twitterMessageProducer =new TwitterMessageProducer(twitterStream, outputChannel);twitterMessageProducer.setTerms(Arrays.asList("java", "microservices", "spring"));return twitterMessageProducer;}@BeanIntegrationFlow twitterFlow(MessageChannel outputChannel) {return IntegrationFlows.from(outputChannel).transform(Status::getText).handle(m -> log.info(m.getPayload().toString())).get();}}

这里的重要部分是我们的自定义消息生成器如何与消息流集成。 基本上,除了在生产者的输出通道中列出消息之外,我们不需要执行任何其他操作。

测试中

只有Chuck Norris在生产中测试代码。 但是,像您和我这样的普通凡人,我们确实会编写测试用例。

@RunWith(SpringRunner.class)
@ContextConfiguration(classes = TestConfig.class)
public class TwitterMessageProducerTest {@MockBeanprivate TwitterStream twitterStream;@Autowiredprivate PollableChannel outputChannel;@Autowiredprivate TwitterMessageProducer twitterMessageProducer;@Testpublic void shouldBeInitialized() {StatusListener statusListener = twitterMessageProducer.getStatusListener();verify(twitterStream).addListener(statusListener);FilterQuery filterQuery = twitterMessageProducer.getFilterQuery();verify(twitterStream).filter(filterQuery);}@Testpublic void shouldReceiveStatus() {StatusListener statusListener = twitterMessageProducer.getStatusListener();Status status = mock(Status.class);statusListener.onStatus(status);Message<?> statusMessage = outputChannel.receive();assertSame(status, statusMessage.getPayload());}@Import(TwitterConfig.class)static class TestConfig {@BeanMessageChannel outputChannel() {return MessageChannels.queue(1).get();}}}

我喜欢Twitter4J的设计,因为它利用了界面。 该库的大多数重要部分都作为普通接口公开。 TwitterStream也不例外。 也就是说,在测试用例中可以轻松地将其嘲笑。

六,结论

  • Spring Social现在已经停产了 -它不会收到新功能
  • Spring Integration的Twitter模块可作为扩展使用 -已从主项目中移出。
  • Twitter入站通道适配器是一个轮询用户 –选择轮询间隔时必须处理速率限制
  • Twitter的Streaming API符合入站通道适配器的监听行为

翻译自: https://www.javacodegeeks.com/2018/12/streaming-api-spring-integration.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/344564.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

mysql5.6 1g内存_1G内存用MySQL5.6还是用MySQL5.5比较好

mysql的50版本和51版本的区别&#xff1a;一、5.0 增加了stored procedures、views、cursors、triggers、xa transactions的支持&#xff0c;增加了inforation_schema系统数据库。二、5.1 增加了event scheduler&#xff0c;partitioning&#xff0c;pluggable storage engine …

java aspectj_Java:AspectJ的异常翻译

java aspectj在这篇博客文章中&#xff0c;我描述了如何使用AspectJ自动将一种异常类型转换为另一种异常类型。 问题 有时&#xff0c;我们处于必须将异常&#xff08;通常由第三方库引发&#xff09;转换为另一种异常的情况。 假设您正在使用像hibernate这样的持久性框架&…

mysql5.6.33安装教程_Linux下mysql5.6.33安装配置教程

本教程为大家分享了mysql5.6.33在linux下的安装配置方法&#xff0c;供大家参考&#xff0c;具体内容如下1、下载下载地址&#xff1a;http://dev.mysql.com/downloads/mysql/5.6.html#downloads下载版本&#xff1a;我这里选择的5.6.33&#xff0c;通用版&#xff0c;linux下6…

【渝粤教育】广东开放大学 劳动关系理论与实务 形成性考核 (1)

选择题 题目&#xff1a; 劳动关系的主体是&#xff08; &#xff09;。 答案&#xff1a;看左侧 题目&#xff1a; 主要研究企业对员工的管理政策、策略和实践的学派被称为&#xff08; &#xff09;。 答案&#xff1a;看左侧 题目&#xff1a; 工会以&#xff08;…

【渝粤教育】广东开放大学 土木工程测量 形成性考核 (45)

选择题 题目&#xff1a;地面点沿法线方向至参考椭球面的距离称为该点的&#xff08;&#xff09; 题目&#xff1a;地面点沿铅垂线方向至大地水准面的距离称为该点的&#xff08;&#xff09; 题目&#xff1a;我国目前采用的高程基准是 题目&#xff1a;象限角的取值范围 题目…

基于表达式的访问控制

1.概述 今天&#xff0c;我们将回顾基于表达式的访问控制&#xff08;EBAC&#xff09;&#xff0c;基于角色的访问控制&#xff08;RBAC&#xff09;和基于属性的访问控制&#xff08;ABAC&#xff09;之间的区别&#xff0c;并将重点放在EBAC上。 2.什么是基于表达式的访问控…

excel数据命令导入mysql_如何将EXCEL数据导入MYSQL

VBA对MySql数据库进行读取和写入操作时间:2009-10-06 09:18:47来源:网络 作者:未知 点击:178次以下代码用于32位系统,Office 2003,环境,MySql版本5.1在使用前需要先安装MySql的驱动&#xff0c;进行正确配置注意&#xff1a;必须给出正确的服务器名、数据库名、表名、数据库连接…

【渝粤教育】广东开放大学 工程力学 形成性考核 (27)

选择题 题目&#xff1a;物体受平面内三个互不平行的力作用而平衡&#xff0c;三个力的作用线&#xff08;&#xff09;。 题目&#xff1a;力偶对物体的作用效应&#xff0c;决定于&#xff08;&#xff09;。 题目&#xff1a;平面平行力系的独立平衡方程数目一般有&#xff…

node mysql安装目录_nodejs 指定全局安装路径和缓存路径

1、前提&#xff1a;已安装 nodejs(nodejs官网 https://nodejs.org), 并且已将其添加到了环境变量 path 中&#xff1b;2、进入cmd命令行&#xff0c;然后输入 node -v &#xff0c;测试是否安装成功&#xff0c;出现版本号就表示安装成功3、进入cmd命令行&#xff0c;然后输入…

【渝粤教育】广东开放大学 房屋建筑学 形成性考核 (50)

选择题 题目&#xff1a;以下基础一般应用于单层厂房的基础是 题目&#xff1a;以下属于刚性防水的是 题目&#xff1a;装配整体式钢筋混凝土楼板包括 题目&#xff1a;当主梁的跨度为7m时&#xff0c;以下主梁高设计尺寸合理为 题目&#xff1a;是指为了防止建筑物构件由于气候…

【渝粤教育】广东开放大学 文化项目管理 形成性考核 (36)

选择题 题目&#xff1a; ( )是依据一定的标准和程序&#xff0c;对政策的效益、效率及价值进行判断的一种政治行为&#xff0c;目的在于取得有关这些方面的信息&#xff0c;作为决定政策变化、政策改进和制定新政策的依据 选择一项&#xff1a; 答案&#xff1a;看左侧 题目&…

mysql的sql执行原理图_性能测试MySQL之SQL运行原理

一&#xff0c;MySQL运行原理两个一样的图1&#xff0c;SQL语句执行的过程详细说明如上图所示&#xff0c;当向MySQL发送一个请求的时候&#xff0c;MySQL到底做了什么&#xff1a;a, 客户端发送一条查询给服务器。b, 服务器先检查查询缓存&#xff0c;如果命中了缓存&#xff…

穹顶灯打不出阴暗面_Java 8星期五:Java 8的阴暗面

穹顶灯打不出阴暗面在Data Geekery &#xff0c;我们喜欢Java。 而且&#xff0c;由于我们真的很喜欢jOOQ的流畅的API和查询DSL &#xff0c;我们对Java 8将为我们的生态系统带来什么感到非常兴奋。 Java 8星期五 每个星期五&#xff0c;我们都会向您展示一些不错的教程风格的…

【渝粤教育】广东开放大学 物业服务营销管理 形成性考核 (59)

选择题 题目&#xff1a; 单选 &#xff08; &#xff09;是物业服务产品的期望产品。 答案&#xff1a;看左侧 题目&#xff1a; 单选 &#xff08; &#xff09;是指模仿市场上旺销的其他物业服务企业的产品而开发的某种新产品。 答案&#xff1a;看左侧 题目&#xff1a…

【渝粤教育】广东开放大学 系统工程 形成性考核 (25)

选择题 题目&#xff1a;系统工程与系统科学的区别是&#xff0c;前者是工程技术&#xff0c;后者是基础理论。 答案&#xff1a;看左侧 题目&#xff1a;"有机论”生物学认为&#xff0c;有机体可分解为各个部分&#xff0c;各个部分的功能完全决定了系统的功能和特性。 …

Java 11:JOIN表,获取Java流

是否曾经想过如何将联接的数据库表转换为Java Stream&#xff1f; 阅读这篇简短的文章&#xff0c;并了解如何使用Speedment Stream ORM完成它。 我们将从Java 8示例开始&#xff0c;然后研究Java 11的改进。 Java 8和JOIN 速度允许将动态JOIN&#xff1a;ed数据库表作为标准J…

mysql宽字节注入_转宽字节注入详解

在mysql中&#xff0c;用于转义的函数有addslashes&#xff0c;mysql_real_escape_string&#xff0c;mysql_escape_string等&#xff0c;还有一种情况是magic_quote_gpc&#xff0c;不过高版本的PHP将去除这个特性。首先&#xff0c;宽字节注入与HTML页面编码是无关的&#xf…

【渝粤教育】电大中专中药制剂学 (2)作业 题库

1.根据药典、药品标准等将药物加工制成具有一定规格&#xff0c;可直接用于临床的药物制品&#xff0c;称为&#xff08;&#xff09;。 A.制剂 B.剂型 C.单味中药 D.中成药 E.炮制 错误 正确答案&#xff1a;左边查询 学生答案&#xff1a;D 2.药材经过炮制后可直接用于中医临…

mysql集群软件有哪些_浅谈数据库集群软件优缺点有哪些

满心狼藉回答时间&#xff1a;2019-12-05向TA提问集群(Cluster)是由两台或多台节点机(服务器)构成的一种松散耦合的计算节点集合&#xff0c;为用户提供网络服务或应用程序(包括数据库、Web服务和文件服务等)的单一客户视图&#xff0c;同时提供接近容错机的故障恢复能力。集群…

【渝粤教育】电大中专办公设备使用与维护 (2)作业 题库

1以下哪个不是现代办公硬件需求的主要依赖&#xff08;&#xff09;。 A扫描仪 B计算机 C办公桌 D打印机 错误 正确答案&#xff1a;左边查询 学生答案&#xff1a;B 2现代办公设备可分为计算机、通信&#xff08;&#xff09;三大类。 A电子工具 B办公机械 C路由器 D碎纸机 错…