apache camel_学习Apache Camel –实时索引推文

apache camel

在大多数软件开发项目中,有一点需要使应用程序开始与其他应用程序或第三方组件通信。

无论是发送电子邮件通知,调用外部api,写入文件还是将数据从一个地方迁移到另一个地方,您都可以推出自己的解决方案或利用现有框架。

对于Java生态系统中的现有框架,我们可以发现Tibco BusinessWorks和Mule ESB ,另一方面是Spring Integration和Apache Camel 。

在本教程中,我将通过一个示例应用程序向您介绍Apache Camel ,该示例应用程序从Twitter的示例提要中读取推文,并使用Elastic Search实时对这些推文进行索引。

什么是Apache Camel?

和3-300x300

将应用程序与生态系统中的内部或外部组件集成是软件开发中最复杂的任务之一,如果操作不正确,则可能导致巨大的混乱,并导致长期维护的真正痛苦。

幸运的是,Camel是Apache托管的开源集成框架,它基于企业集成模式 ,这些模式可以帮助编写更易读和可维护的代码。 与Lego相似,这些模式可以用作构建可靠软件设计的基础。

Apache Camel还支持各种各样的连接器,以将您的应用程序与不同的框架和技术集成在一起。 顺便说一下,它也可以与Spring很好地配合使用。

如果您不熟悉Spring,那么您可能会发现这篇文章很有帮助: 使用Spring Boot处理Twitter feed 。

在以下各节中,我们将介绍一个示例应用程序,其中Camel与Twitter示例提要和ElasticSearch集成在一起。

什么是ElasticSearch?

类似于Apache Solr的 ElasticSearch是基于Apache Lucene的高度可扩展的基于Java的开源全文搜索引擎。

在此示例应用程序中,我们将使用ElasticSearch实时索引推文,并在这些推文上提供全文本搜索功能。

其他使用过的技术

除了Apache Camel和ElasticSearch,我还在此应用程序中包括其他框架: Gradle作为构建工具, Spring Boot作为Web应用程序框架,以及Twitter4j,用于从Twitter示例提要中读取推文。

入门

该项目的框架是在http://start.spring.io生成的,在该项目中,我检查了Web依赖项选项,填写了“项目元数据”部分,然后选择“ Gradle Project”作为项目类型。

生成项目后,您可以下载并将其导入您喜欢的IDE。 我现在不打算在Gradle上详细介绍,但是这是build.gradle文件中所有依赖项的列表:

def camelVersion = '2.15.2'
dependencies {compile("org.springframework.boot:spring-boot-starter-web")compile("org.apache.camel:camel-core:${camelVersion}")compile("org.apache.camel:camel-spring-boot:${camelVersion}")compile("org.apache.camel:camel-twitter:${camelVersion}")compile("org.apache.camel:camel-elasticsearch:${camelVersion}")compile("org.apache.camel:camel-jackson:${camelVersion}")compile("joda-time:joda-time:2.8.2")testCompile("org.springframework.boot:spring-boot-starter-test")
}

使用骆驼路线进行整合

骆驼实现了面向消息的体系结构,它的主要构建模块是描述消息流的路由

可以用XML(旧方式)或Java DSL(新方式)描述路由。 我们将仅在本文中讨论Java DSL,因为这是首选且更优雅的选择。

好吧,让我们看一个简单的Route:

from("file://orders").convertBodyTo(String.class).to("log:com.mycompany.order?level=DEBUG").to("jms:topic:OrdersTopic");

这里有几件事要注意:

  • 消息在由URI表示并使用URI配置的端点之间流动
  • 路由只能有一个消息生产者端点(在本例中为“ file:// orders”,它从orders文件夹中读取文件)和多个消息消费者端点:
    • “ log:com.mycompany.order?level = DEBUG”,它将文件内容记录在com.mycompany.order日志记录类别下的调试消息中,
  • 在端点之间,可以更改消息,即:convertBodyTo(String.class)将消息正文转换为String。

另请注意,相同的URI可以在一个路由中用于使用者端点,而在另一路由中用于生产者端点:

from("file://orders").convertBodyTo(String.class).to("direct:orders");from("direct:orders).to("log:com.mycompany.order?level=DEBUG").to("jms:topic:OrdersTopic");

Direct端点是通用端点之一,它允许将消息从一条路由同步传递到另一条路由。

这有助于创建可读代码并在代码的多个位置重用路由。

索引推文

现在,让我们看一下代码中的一些路由。 让我们从简单的事情开始:

private String ES_TWEET_INDEXER_ENDPOINT = "direct:tweet-indexer-ES";...from("twitter://streaming/sample?type=EVENT&consumerKey={{twitter4j.oauth.consumerKey}}&consumerSecret={{twitter4j.oauth.consumerSecret}}∾cessToken={{twitter4j.oauth.accessToken}}∾cessTokenSecret={{twitter4j.oauth.accessTokenSecret}}").to(ES_TWEET_INDEXER_ENDPOINT);

这是如此简单,对吧? 到现在为止,您可能已经知道,该路由会从Twitter示例提要中读取推文,并将它们传递到“ direct:tweet-indexer-ES”端点。 请注意,consumerKey,consumerSecret等已配置并作为系统属性传递(请参见http://twitter4j.org/en/configuration.html )。

现在,让我们看一下一个稍微复杂的Route,它从“ direct:tweet-indexer-ES”端点读取,并将Tweets批量插入到Elasticsearch中(有关每个步骤的详细说明,请参见注释):

@Value("${elasticsearch.tweet.uri}")private String elasticsearchTweetUri;...from(ES_TWEET_INDEXER_ENDPOINT)// groups tweets into separate indexes on a weekly basis to make it easier clean up old tweets:.process(new WeeklyIndexNameHeaderUpdater(ES_TWEET_INDEX_TYPE))// converts Twitter4j Tweet object into an elasticsearch document represented by a Map:.process(new ElasticSearchTweetConverter())// collects tweets into weekly batches based on index name:.aggregate(header("indexName"), new ListAggregationStrategy())// creates new batches every 2 seconds.completionInterval(2000)// makes sure the last batch will be processed before the application shuts down:.forceCompletionOnStop()// inserts a batch of tweets to elasticsearch: .to(elasticsearchTweetUri).log("Uploaded documents to ElasticSearch index ${headers.indexName}: ${body.size()}");

关于此路线的注意事项:

  • elasticsearchTweetUri是一个字段,其值由Spring从application.properties文件(elasticsearch.tweet.uri = elasticsearch:// tweet-indexer?operation = BULK_INDEX&ip = 127.0.0.1&port = 9300)中获取并注入到该字段中
  • 为了在Route中实现自定义处理逻辑,我们可以创建实现Processor接口的类。 参见WeeklyIndexNameHeaderUpdater和ElasticSearchTweetConverter
  • 使用自定义ListAggregationStrategy策略聚合推文,该策略将消息聚合到ArrayList中,稍后每2秒(或在应用程序停止时)传递到下一个终结点
  • Camel实现了一种表达语言 ,我们正在使用它来记录批处理的大小(“ $ {body.size()}”)和插入消息的索引的名称($ {headers.indexName})。

在Elasticsearch中搜索推文

现在我们已经在Elasticsearch中索引了推文,是时候对其进行一些搜索了。

首先,让我们看一下接收搜索查询的Route和限制搜索结果数量的maxSize参数:

public static final String TWEET_SEARCH_URI = "vm:tweetSearch";...from(TWEET_SEARCH_URI).setHeader("CamelFileName", simple("tweet-${body}-${header.maxSize}-${date:now:yyyyMMddHHmmss}.txt"))// calls the search() method of the esTweetService which returns an iterator// to process search result - better than keeping the whole resultset in memory:.split(method(esTweetService, "search"))// converts Elasticsearch doucment to Map object:.process(new ElasticSearchSearchHitConverter())// serializes the Map object to JSON:.marshal(new JacksonDataFormat())// appends new line at the end of every tweet.setBody(simple("${body}\n"))// write search results as json into a file under /tmp folder:.to("file:/tmp?fileExist=Append").end().log("Wrote search results to /tmp/${headers.CamelFileName}");

当消息传递到“ vm:tweetSearch”端点(使用内存队列异步处理消息)时,将触发此路由。

SearchController类实现REST api,从而允许用户通过使用Camel的ProducerTemplate类将消息发送到“ vm:tweetSearch”端点来运行tweet搜索:

@Autowiredprivate ProducerTemplate producerTemplate;@RequestMapping(value = "/tweet/search", method = { RequestMethod.GET, RequestMethod.POST },produces = MediaType.TEXT_PLAIN_VALUE)@ResponseBodypublic String tweetSearch(@RequestParam("q") String query,@RequestParam(value = "max") int maxSize) {LOG.info("Tweet search request received with query: {} and max: {}", query, maxSize);Map<String, Object> headers = new HashMap<String, Object>();// "content" is the field in the Elasticsearch index that we'll be querying:headers.put("queryField", "content");headers.put("maxSize", maxSize);producerTemplate.asyncRequestBodyAndHeaders(CamelRouter.TWEET_SEARCH_URI, query, headers);return "Request is queued";}

这将触发Elasticsearch的执行,但是结果不会在响应中返回,而是写入/ tmp文件夹中的文件(如前所述)。

此路由使用ElasticSearchService类在ElasticSearch中搜索推文。 当执行此Route时,Camel调用search()方法并传递搜索查询和maxSize作为输入参数:

public SearchHitIterator search(@Body String query, @Header(value = "queryField") String queryField, @Header(value = "maxSize") int maxSize) {boolean scroll = maxSize > batchSize;LOG.info("Executing {} on index type: '{}' with query: '{}' and max: {}", scroll ? "scan & scroll" : "search", indexType, query, maxSize);QueryBuilder qb = termQuery(queryField, query);long startTime = System.currentTimeMillis();SearchResponse response = scroll ? prepareSearchForScroll(maxSize, qb) : prepareSearchForRegular(maxSize, qb);return new SearchHitIterator(client, response, scroll, maxSize, KEEP_ALIVE_MILLIS, startTime);}

请注意,根据maxSize和batchSize,代码将执行常规搜索以返回单页结果,或者执行滚动请求以使我们能够检索大量结果。 在滚动的情况下, SearchHitIterator将随后调用Elasticsearch以分批检索结果。

安装ElasticSearch

  1. 从https://www.elastic.co/downloads/elasticsearch下载Elasticsearch。
  2. 将其安装到本地文件夹($ ES_HOME)
  3. 编辑$ ES_HOME / config / elasticsearch.yml并添加以下行:
    cluster.name:tweet-indexer
  4. 安装BigDesk插件以监视Elasticsearch:$ ES_HOME / bin / plugin -install lukas-vlcek / bigdesk
  5. 运行Elasticsearch:$ ES_HOME / bin / elasticsearch.sh或$ ES_HOME / bin / elasticsearch.bat

这些步骤将允许您以最少的配置运行独立的Elasticsearch实例,但请记住,它们并非供生产使用。

运行

这是应用程序的入口点,可以从命令行运行。

package com.kaviddiss.twittercamel;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);}
}

要运行该应用程序,请从您喜欢的IDE运行Application.main()方法,或者从命令行执行以下代码:

$GRADLE_HOME/bin/gradlew build && java -jar build/libs/twitter-camel-ingester-0.0.1-SNAPSHOT.jar

一旦应用程序启动,它将自动开始索引推文。 转到http:// localhost:9200 / _plugin / bigdesk /#cluster可视化索引:

推特索引

要搜索推文,请在浏览器中输入与此类似的URL: http:// localhost:8080 / tweet / search?q = toronto&max = 100 。

推特搜索

使用BigDesk插件,我们可以监视Elasticsearch如何索引推文:

推特索引

结论

在Apache Camel的简介中,我们介绍了如何使用此集成框架与Twitter提要feed和Elasticsearch之类的外部组件进行通信,以实时索引和搜索推文。

  • 示例应用程序的源代码可从https://github.com/davidkiss/twitter-camel-ingester获得 。

翻译自: https://www.javacodegeeks.com/2015/09/learn-apache-camel-indexing-tweets-in-real-time.html

apache camel

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/337113.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

让你不再害怕指针——C指针详解(经典,非常详细)

点击蓝字关注我们因公众号更改推送规则&#xff0c;请点“在看”并加“星标”第一时间获取精彩技术分享来源于网络&#xff0c;侵删前言:复杂类型说明要了解指针,多多少少会出现一些比较复杂的类型,所以我先介绍一下如何完全理解一个复杂类型,要理解复杂类型其实很简单,一个类型…

C语言实现可写入文件的账号密码登录系统

点击蓝字关注我们因公众号更改推送规则&#xff0c;请点“在看”并加“星标”第一时间获取精彩技术分享来源于网络&#xff0c;侵删账号登录系统在很多系统设计时都时必不可少的&#xff0c;今天这个登录系统功能较全&#xff0c;可以注册&#xff0c;登录&#xff0c;找回密码…

一文搞懂 | Linux 内核的 4 大 IO 调度算法

点击蓝字关注我们因公众号更改推送规则&#xff0c;请点“在看”并加“星标”第一时间获取精彩技术分享来源于网络&#xff0c;侵删Linux 内核包含4个IO调度器&#xff1a;Noop IO schedulerAnticipatory IO schedulerDeadline IO scheduler CFQ IO scheduler。anticipatory, 预…

众神进入瓦尔哈拉_一时冲动:“通往瓦尔哈拉之路的冒险”

众神进入瓦尔哈拉通过所有有关Java 9和Project Jigsaw的讨论&#xff0c;我们不应忽视Java的另一重大变化。 希望在第10版或第11版中&#xff0c; Valhalla项目能够实现并介绍价值类型和专业化。 那么&#xff0c;这是什么一回事&#xff0c;项目进展如何&#xff0c;面临什么…

当电子工程师十余年,感慨万千!

点击蓝字关注我们因公众号更改推送规则&#xff0c;请点“在看”并加“星标”第一时间获取精彩技术分享来源于网络&#xff0c;侵删当电子工程师也一些年头了&#xff0c;不算有出息&#xff0c;环顾四周&#xff0c;也没有看见几个有出息的。回顾工程师生涯&#xff0c;感慨万…

面试大全 | C语言高级部分总结,2.6万字长文

点击蓝字关注我们因公众号更改推送规则&#xff0c;请点“在看”并加“星标”第一时间获取精彩技术分享来源于网络&#xff0c;侵删一、内存大话题1.0、内存就是程序的立足之地&#xff0c;体现内存重要性。1.1、内存理解&#xff1a;内存物理看是有很多个 Bank&#xff08;就是…

程序如何运行,编译、链接、装入?

点击蓝字关注我们因公众号更改推送规则&#xff0c;请点“在看”并加“星标”第一时间获取精彩技术分享来源于网络&#xff0c;侵删一、地址概念和程序如何运行在多道程序环境下&#xff0c;要使程序运行&#xff0c;必须先为之创建进程。而创建进程的第一件事&#xff0c;便是…

NSA:建议从 C/C++ 切换到内存安全语言

点击蓝字关注我们因公众号更改推送规则&#xff0c;请点“在看”并加“星标”第一时间获取精彩技术分享来源于网络&#xff0c;侵删美国国家安全局&#xff08;NSA&#xff09;发布了一份指南&#xff0c;旨在帮助软件开发商和运营商预防和缓解软件内存安全问题。其鼓励组织将编…

探索cqrs和事件源_编写基于事件的CQRS读取模型

探索cqrs和事件源关于事件源和CQRS的讨论似乎通常集中在CQRS上下文中的整体系统架构或领域驱动设计的各种形式。 但是&#xff0c;尽管也有一些有趣的考虑&#xff0c;但读取模型经常被忽略。 在本文中&#xff0c;我们将介绍通过使用事件流来填充视图模型的示例实现。 总览 …

C# 11正式发布

点击蓝字关注我们因公众号更改推送规则&#xff0c;请点“在看”并加“星标”第一时间获取精彩技术分享来源于网络&#xff0c;侵删C# 11 现已发布。公告称&#xff0c;“随着每个版本的发布&#xff0c;社区的参与度越来越高&#xff0c;贡献了从建议、见解和错误报告到整个功…

分享一个通用的嵌入式驱动层

点击蓝字关注我们因公众号更改推送规则&#xff0c;请点“在看”并加“星标”第一时间获取精彩技术分享来源于网络&#xff0c;侵删C 语言面向对象编程的最佳实践一、前言以STM32为例&#xff0c;打开网络上下载的例程或者是购买开发板自带的例程&#xff0c;都会发现应用层中会…

cks32和stm32_cks子,间谍,局部Mo子和短管

cks32和stm32本文是我们名为“ 用Mockito测试 ”的学院课程的一部分。 在本课程中&#xff0c;您将深入了解Mockito的魔力。 您将了解有关“模拟”&#xff0c;“间谍”和“部分模拟”的信息&#xff0c;以及它们相应的存根行为。 您还将看到使用测试双打和对象匹配器进行验证…

CSON+CJSON,解析json数据更优雅?

点击蓝字关注我们因公众号更改推送规则&#xff0c;请点“在看”并加“星标”第一时间获取精彩技术分享来源于网络&#xff0c;侵删前言json是目前最为流行的文本数据传输格式&#xff0c;特别是在网络通信上广泛应用&#xff0c;随着物联网的兴起&#xff0c;在嵌入式设备上&a…

让C语言源码可知自身函数的实际地址与大小

点击蓝字关注我们因公众号更改推送规则&#xff0c;请点“在看”并加“星标”第一时间获取精彩技术分享来源于网络&#xff0c;侵删事情的起因大概是这样……在很久很久以前&#xff0c;我最早用的是MASM&#xff08;Win32ASM&#xff09;写程序&#xff0c;从平台兼容性、开发…

java设计模式迭代器模式_迭代器设计模式示例

java设计模式迭代器模式本文是我们名为“ Java设计模式 ”的学院课程的一部分。 在本课程中&#xff0c;您将深入研究大量的设计模式&#xff0c;并了解如何在Java中实现和利用它们。 您将了解模式如此重要的原因&#xff0c;并了解何时以及如何应用模式中的每一个。 在这里查…

平衡二叉树 C语言代码实现

点击蓝字关注我们因公众号更改推送规则&#xff0c;请点“在看”并加“星标”第一时间获取精彩技术分享来源于网络&#xff0c;侵删1.什么是平衡二叉树平衡二叉树&#xff0c;我们也称【二叉平衡搜索树/AVL】,树中任何节点的两个子树的高度最大差别为1&#xff0c;巴拉巴拉。。…

UDP/TCP 包的大小限制知多少

点击蓝字关注我们因公众号更改推送规则&#xff0c;请点“在看”并加“星标”第一时间获取精彩技术分享来源于网络&#xff0c;侵删一、MTU 简述 - 分包后数据包最大长度1、定义Maximum Transmission Unit&#xff08;最大可传输单元&#xff09; 的缩写&#xff0c;它的单位是…

java ee的小程序_扩展Java EE应用程序的基础

java ee的小程序老实说&#xff0c;“可扩展性”是一个详尽的主题&#xff0c;并且通常没有被很好地理解。 通常&#xff0c;它被认为与高可用性相同。 我已经看到新手程序员和“经验丰富”的建筑师都建议将“ 群集 ”作为可伸缩性和HA的解决方案。 它实际上没有任何问题&#…

28 张图,一次性说清楚 TCP,速度

点击蓝字关注我们因公众号更改推送规则&#xff0c;请点“在看”并加“星标”第一时间获取精彩技术分享来源于网络&#xff0c;侵删做IT相关的工作&#xff0c;肯定都离不开网络&#xff0c;网络中最重要的协议是TCP。无论是实际工作还是笔试面试&#xff0c;你看哪里能少得了T…

晨风机器人怎么买奴隶_潮牌复刻和正品该怎么抉择???带你了解了解

今天带你们聊一聊潮牌复刻和正品&#xff0c;简单介绍一下我自己&#xff0c;在复刻圈子五年&#xff0c;我的原则从始至终就是质量放在第一位&#xff0c;之所以能走这么久也是这个原因。回归正题&#xff0c;接着往下看。无论是正品还是复刻&#xff0c;其实还要根据自己的能…