apache camel_学习Apache Camel –实时索引推文

apache camel

在大多数软件开发项目中,有一点需要使应用程序开始与其他应用程序或第三方组件通信。

无论是发送电子邮件通知,调用外部api,写入文件还是将数据从一个地方迁移到另一个地方,您都可以推出自己的解决方案或利用现有框架。

对于Java生态系统中的现有框架,我们可以发现Tibco BusinessWorks和Mule ESB ,另一方面是Spring Integration和Apache Camel 。

在本教程中,我将通过一个示例应用程序向您介绍Apache Camel ,该示例应用程序从Twitter的示例提要中读取推文,并使用Elastic Search实时对这些推文进行索引。

什么是Apache Camel?

和3-300x300

将应用程序与生态系统中的内部或外部组件集成是软件开发中最复杂的任务之一,如果操作不正确,则可能导致巨大的混乱,并导致长期维护的真正痛苦。

幸运的是,Camel是Apache托管的开源集成框架,它基于企业集成模式 ,这些模式可以帮助编写更易读和可维护的代码。 与Lego相似,这些模式可以用作构建可靠软件设计的基础。

Apache Camel还支持各种各样的连接器,以将您的应用程序与不同的框架和技术集成在一起。 顺便说一下,它也可以与Spring很好地配合使用。

如果您不熟悉Spring,那么您可能会发现这篇文章很有帮助: 使用Spring Boot处理Twitter feed 。

在以下各节中,我们将介绍一个示例应用程序,其中Camel与Twitter示例提要和ElasticSearch集成在一起。

什么是ElasticSearch?

类似于Apache Solr的 ElasticSearch是基于Apache Lucene的高度可扩展的基于Java的开源全文搜索引擎。

在此示例应用程序中,我们将使用ElasticSearch实时索引推文,并在这些推文上提供全文本搜索功能。

其他使用过的技术

除了Apache Camel和ElasticSearch,我还在此应用程序中包括其他框架: Gradle作为构建工具, Spring Boot作为Web应用程序框架,以及Twitter4j,用于从Twitter示例提要中读取推文。

入门

该项目的框架是在http://start.spring.io生成的,在该项目中,我检查了Web依赖项选项,填写了“项目元数据”部分,然后选择“ Gradle Project”作为项目类型。

生成项目后,您可以下载并将其导入您喜欢的IDE。 我现在不打算在Gradle上详细介绍,但是这是build.gradle文件中所有依赖项的列表:

def camelVersion = '2.15.2'
dependencies {compile("org.springframework.boot:spring-boot-starter-web")compile("org.apache.camel:camel-core:${camelVersion}")compile("org.apache.camel:camel-spring-boot:${camelVersion}")compile("org.apache.camel:camel-twitter:${camelVersion}")compile("org.apache.camel:camel-elasticsearch:${camelVersion}")compile("org.apache.camel:camel-jackson:${camelVersion}")compile("joda-time:joda-time:2.8.2")testCompile("org.springframework.boot:spring-boot-starter-test")
}

使用骆驼路线进行整合

骆驼实现了面向消息的体系结构,它的主要构建模块是描述消息流的路由

可以用XML(旧方式)或Java DSL(新方式)描述路由。 我们将仅在本文中讨论Java DSL,因为这是首选且更优雅的选择。

好吧,让我们看一个简单的Route:

from("file://orders").convertBodyTo(String.class).to("log:com.mycompany.order?level=DEBUG").to("jms:topic:OrdersTopic");

这里有几件事要注意:

  • 消息在由URI表示并使用URI配置的端点之间流动
  • 路由只能有一个消息生产者端点(在本例中为“ file:// orders”,它从orders文件夹中读取文件)和多个消息消费者端点:
    • “ log:com.mycompany.order?level = DEBUG”,它将文件内容记录在com.mycompany.order日志记录类别下的调试消息中,
  • 在端点之间,可以更改消息,即:convertBodyTo(String.class)将消息正文转换为String。

另请注意,相同的URI可以在一个路由中用于使用者端点,而在另一路由中用于生产者端点:

from("file://orders").convertBodyTo(String.class).to("direct:orders");from("direct:orders).to("log:com.mycompany.order?level=DEBUG").to("jms:topic:OrdersTopic");

Direct端点是通用端点之一,它允许将消息从一条路由同步传递到另一条路由。

这有助于创建可读代码并在代码的多个位置重用路由。

索引推文

现在,让我们看一下代码中的一些路由。 让我们从简单的事情开始:

private String ES_TWEET_INDEXER_ENDPOINT = "direct:tweet-indexer-ES";...from("twitter://streaming/sample?type=EVENT&consumerKey={{twitter4j.oauth.consumerKey}}&consumerSecret={{twitter4j.oauth.consumerSecret}}∾cessToken={{twitter4j.oauth.accessToken}}∾cessTokenSecret={{twitter4j.oauth.accessTokenSecret}}").to(ES_TWEET_INDEXER_ENDPOINT);

这是如此简单,对吧? 到现在为止,您可能已经知道,该路由会从Twitter示例提要中读取推文,并将它们传递到“ direct:tweet-indexer-ES”端点。 请注意,consumerKey,consumerSecret等已配置并作为系统属性传递(请参见http://twitter4j.org/en/configuration.html )。

现在,让我们看一下一个稍微复杂的Route,它从“ direct:tweet-indexer-ES”端点读取,并将Tweets批量插入到Elasticsearch中(有关每个步骤的详细说明,请参见注释):

@Value("${elasticsearch.tweet.uri}")private String elasticsearchTweetUri;...from(ES_TWEET_INDEXER_ENDPOINT)// groups tweets into separate indexes on a weekly basis to make it easier clean up old tweets:.process(new WeeklyIndexNameHeaderUpdater(ES_TWEET_INDEX_TYPE))// converts Twitter4j Tweet object into an elasticsearch document represented by a Map:.process(new ElasticSearchTweetConverter())// collects tweets into weekly batches based on index name:.aggregate(header("indexName"), new ListAggregationStrategy())// creates new batches every 2 seconds.completionInterval(2000)// makes sure the last batch will be processed before the application shuts down:.forceCompletionOnStop()// inserts a batch of tweets to elasticsearch: .to(elasticsearchTweetUri).log("Uploaded documents to ElasticSearch index ${headers.indexName}: ${body.size()}");

关于此路线的注意事项:

  • elasticsearchTweetUri是一个字段,其值由Spring从application.properties文件(elasticsearch.tweet.uri = elasticsearch:// tweet-indexer?operation = BULK_INDEX&ip = 127.0.0.1&port = 9300)中获取并注入到该字段中
  • 为了在Route中实现自定义处理逻辑,我们可以创建实现Processor接口的类。 参见WeeklyIndexNameHeaderUpdater和ElasticSearchTweetConverter
  • 使用自定义ListAggregationStrategy策略聚合推文,该策略将消息聚合到ArrayList中,稍后每2秒(或在应用程序停止时)传递到下一个终结点
  • Camel实现了一种表达语言 ,我们正在使用它来记录批处理的大小(“ $ {body.size()}”)和插入消息的索引的名称($ {headers.indexName})。

在Elasticsearch中搜索推文

现在我们已经在Elasticsearch中索引了推文,是时候对其进行一些搜索了。

首先,让我们看一下接收搜索查询的Route和限制搜索结果数量的maxSize参数:

public static final String TWEET_SEARCH_URI = "vm:tweetSearch";...from(TWEET_SEARCH_URI).setHeader("CamelFileName", simple("tweet-${body}-${header.maxSize}-${date:now:yyyyMMddHHmmss}.txt"))// calls the search() method of the esTweetService which returns an iterator// to process search result - better than keeping the whole resultset in memory:.split(method(esTweetService, "search"))// converts Elasticsearch doucment to Map object:.process(new ElasticSearchSearchHitConverter())// serializes the Map object to JSON:.marshal(new JacksonDataFormat())// appends new line at the end of every tweet.setBody(simple("${body}\n"))// write search results as json into a file under /tmp folder:.to("file:/tmp?fileExist=Append").end().log("Wrote search results to /tmp/${headers.CamelFileName}");

当消息传递到“ vm:tweetSearch”端点(使用内存队列异步处理消息)时,将触发此路由。

SearchController类实现REST api,从而允许用户通过使用Camel的ProducerTemplate类将消息发送到“ vm:tweetSearch”端点来运行tweet搜索:

@Autowiredprivate ProducerTemplate producerTemplate;@RequestMapping(value = "/tweet/search", method = { RequestMethod.GET, RequestMethod.POST },produces = MediaType.TEXT_PLAIN_VALUE)@ResponseBodypublic String tweetSearch(@RequestParam("q") String query,@RequestParam(value = "max") int maxSize) {LOG.info("Tweet search request received with query: {} and max: {}", query, maxSize);Map<String, Object> headers = new HashMap<String, Object>();// "content" is the field in the Elasticsearch index that we'll be querying:headers.put("queryField", "content");headers.put("maxSize", maxSize);producerTemplate.asyncRequestBodyAndHeaders(CamelRouter.TWEET_SEARCH_URI, query, headers);return "Request is queued";}

这将触发Elasticsearch的执行,但是结果不会在响应中返回,而是写入/ tmp文件夹中的文件(如前所述)。

此路由使用ElasticSearchService类在ElasticSearch中搜索推文。 当执行此Route时,Camel调用search()方法并传递搜索查询和maxSize作为输入参数:

public SearchHitIterator search(@Body String query, @Header(value = "queryField") String queryField, @Header(value = "maxSize") int maxSize) {boolean scroll = maxSize > batchSize;LOG.info("Executing {} on index type: '{}' with query: '{}' and max: {}", scroll ? "scan & scroll" : "search", indexType, query, maxSize);QueryBuilder qb = termQuery(queryField, query);long startTime = System.currentTimeMillis();SearchResponse response = scroll ? prepareSearchForScroll(maxSize, qb) : prepareSearchForRegular(maxSize, qb);return new SearchHitIterator(client, response, scroll, maxSize, KEEP_ALIVE_MILLIS, startTime);}

请注意,根据maxSize和batchSize,代码将执行常规搜索以返回单页结果,或者执行滚动请求以使我们能够检索大量结果。 在滚动的情况下, SearchHitIterator将随后调用Elasticsearch以分批检索结果。

安装ElasticSearch

  1. 从https://www.elastic.co/downloads/elasticsearch下载Elasticsearch。
  2. 将其安装到本地文件夹($ ES_HOME)
  3. 编辑$ ES_HOME / config / elasticsearch.yml并添加以下行:
    cluster.name:tweet-indexer
  4. 安装BigDesk插件以监视Elasticsearch:$ ES_HOME / bin / plugin -install lukas-vlcek / bigdesk
  5. 运行Elasticsearch:$ ES_HOME / bin / elasticsearch.sh或$ ES_HOME / bin / elasticsearch.bat

这些步骤将允许您以最少的配置运行独立的Elasticsearch实例,但请记住,它们并非供生产使用。

运行

这是应用程序的入口点,可以从命令行运行。

package com.kaviddiss.twittercamel;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);}
}

要运行该应用程序,请从您喜欢的IDE运行Application.main()方法,或者从命令行执行以下代码:

$GRADLE_HOME/bin/gradlew build && java -jar build/libs/twitter-camel-ingester-0.0.1-SNAPSHOT.jar

一旦应用程序启动,它将自动开始索引推文。 转到http:// localhost:9200 / _plugin / bigdesk /#cluster可视化索引:

推特索引

要搜索推文,请在浏览器中输入与此类似的URL: http:// localhost:8080 / tweet / search?q = toronto&max = 100 。

推特搜索

使用BigDesk插件,我们可以监视Elasticsearch如何索引推文:

推特索引

结论

在Apache Camel的简介中,我们介绍了如何使用此集成框架与Twitter提要feed和Elasticsearch之类的外部组件进行通信,以实时索引和搜索推文。

  • 示例应用程序的源代码可从https://github.com/davidkiss/twitter-camel-ingester获得 。

翻译自: https://www.javacodegeeks.com/2015/09/learn-apache-camel-indexing-tweets-in-real-time.html

apache camel

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/337113.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

让你不再害怕指针——C指针详解(经典,非常详细)

点击蓝字关注我们因公众号更改推送规则&#xff0c;请点“在看”并加“星标”第一时间获取精彩技术分享来源于网络&#xff0c;侵删前言:复杂类型说明要了解指针,多多少少会出现一些比较复杂的类型,所以我先介绍一下如何完全理解一个复杂类型,要理解复杂类型其实很简单,一个类型…

dubbo单元测试调用_使用LocalTestServer对HTTP调用进行单元测试

dubbo单元测试调用有时候&#xff0c;您正在对远程服务器进行HTTP调用的单元测试代码。 您可能正在使用诸如ApachesHttpClient或Spring的RestTemplate之类的库。 当然&#xff0c;您不想依靠远程服务进行单元测试。 除了涉及的开销&#xff08;请记住单元测试应该是快速的&…

C语言实现可写入文件的账号密码登录系统

点击蓝字关注我们因公众号更改推送规则&#xff0c;请点“在看”并加“星标”第一时间获取精彩技术分享来源于网络&#xff0c;侵删账号登录系统在很多系统设计时都时必不可少的&#xff0c;今天这个登录系统功能较全&#xff0c;可以注册&#xff0c;登录&#xff0c;找回密码…

复函数图像怎么画_...1等等.只需大致图象,和大致画法(根据原函数就能画出复合函数的...-复函数的图形-数学-禄凡闷同学...

概述&#xff1a;本道作业题是禄凡闷同学的课后练习&#xff0c;分享的知识点是复函数的图形&#xff0c;指导老师为终老师&#xff0c;涉及到的知识点涵盖&#xff1a;...1等等.只需大致图象&#xff0c;和大致画法(根据原函数就能画出复合函数的...-复函数的图形-数学&#x…

rxjava 循环发送事件_使用RxJava和SseEmitter进行服务器发送的事件

rxjava 循环发送事件Spring Framework 4.2 GA即将发布&#xff0c;让我们看一下它提供的一些新功能。 引起我注意的一个事件是一个简单的新类SseEmitter &#xff0c;它是对Spring MVC控制器中容易使用的发送事件的抽象。 SSE是一项技术&#xff0c;可让您在一个HTTP连接内沿一…

一文搞懂 | Linux 内核的 4 大 IO 调度算法

点击蓝字关注我们因公众号更改推送规则&#xff0c;请点“在看”并加“星标”第一时间获取精彩技术分享来源于网络&#xff0c;侵删Linux 内核包含4个IO调度器&#xff1a;Noop IO schedulerAnticipatory IO schedulerDeadline IO scheduler CFQ IO scheduler。anticipatory, 预…

众神进入瓦尔哈拉_一时冲动:“通往瓦尔哈拉之路的冒险”

众神进入瓦尔哈拉通过所有有关Java 9和Project Jigsaw的讨论&#xff0c;我们不应忽视Java的另一重大变化。 希望在第10版或第11版中&#xff0c; Valhalla项目能够实现并介绍价值类型和专业化。 那么&#xff0c;这是什么一回事&#xff0c;项目进展如何&#xff0c;面临什么…

当电子工程师十余年,感慨万千!

点击蓝字关注我们因公众号更改推送规则&#xff0c;请点“在看”并加“星标”第一时间获取精彩技术分享来源于网络&#xff0c;侵删当电子工程师也一些年头了&#xff0c;不算有出息&#xff0c;环顾四周&#xff0c;也没有看见几个有出息的。回顾工程师生涯&#xff0c;感慨万…

canva画图 图片居中裁剪_css实现不定宽高的图片img居中裁剪_类似微信朋友圈图片效果...

需求如下&#xff1a;前端需要显示矩形的缩略图&#xff0c;接口返回的图片尺寸大小不一&#xff0c;宽高不相等&#xff0c;需要前端来处理并显示成正方形&#xff0c;类似微信朋友圈图片的效果&#xff0c;等比例正方形显示在列表中&#xff0c;让图片根据宽高来自适应显示在…

哈希策略_优化哈希策略的简介

哈希策略总览 用于哈希键的策略可以直接影响哈希集合&#xff08;例如HashMap或HashSet&#xff09;的性能。 内置的哈希函数被设计为通用的&#xff0c;并且可以在各种用例中很好地工作。 我们可以做得更好&#xff0c;特别是如果您对用例有一个很好的了解吗&#xff1f; 测…

面试大全 | C语言高级部分总结,2.6万字长文

点击蓝字关注我们因公众号更改推送规则&#xff0c;请点“在看”并加“星标”第一时间获取精彩技术分享来源于网络&#xff0c;侵删一、内存大话题1.0、内存就是程序的立足之地&#xff0c;体现内存重要性。1.1、内存理解&#xff1a;内存物理看是有很多个 Bank&#xff08;就是…

c#设计12星座速配软件_C#设计模式(12)——组合模式

阅读目录1.组合模式在软件开发中我们经常会遇到处理部分与整体的情况&#xff0c;如我们经常见到的树形菜单&#xff0c;一个菜单项的子节点可以指向具体的内容&#xff0c;也可以是子菜单。类似的情况还有文件夹&#xff0c;文件夹的下级可以是文件夹也可以是文件。举一个例子…

hibernate与jpa_将JPA Hibernate与OptaPlanner集成

hibernate与jpa我们一直在改进OptaPlanner与JEE其余部分的集成&#xff0c;因此更容易构建可以正常工作的最终用户应用程序。 让我们看一下改进的JPA Hibernate集成。 基础 JPA Hibernate和OptaPlanner都可以在POJO&#xff08;普通的旧Java对象&#xff09;上工作&#xff0c…

程序如何运行,编译、链接、装入?

点击蓝字关注我们因公众号更改推送规则&#xff0c;请点“在看”并加“星标”第一时间获取精彩技术分享来源于网络&#xff0c;侵删一、地址概念和程序如何运行在多道程序环境下&#xff0c;要使程序运行&#xff0c;必须先为之创建进程。而创建进程的第一件事&#xff0c;便是…

python举两种字符串引号的例子_python里的单引号和双引号的有什么作用

在Python当中表达字符串既可以使用单引号&#xff0c;也可以使用双引号&#xff0c;那两者有什么区别吗&#xff1f;python单引号和双引号的区别简单来说&#xff0c;在Python中使用单引号或双引号是没有区别的&#xff0c;都可以用来表示一个字符串。但是这两种通用的表达方式…

枚举对象注释_如何以及何时使用枚举和注释

枚举对象注释本文是我们名为“ 高级Java ”的学院课程的一部分。 本课程旨在帮助您最有效地使用Java。 它讨论了高级主题&#xff0c;包括对象创建&#xff0c;并发&#xff0c;序列化&#xff0c;反射等。 它将指导您完成Java掌握的旅程&#xff01; 在这里查看 &#xff01;…

background 互联网图片_cssbackground-image和layer-background-image的区别

layer-background-image语法&#xff1a;layer-background-image : none | url (url)参数&#xff1a;none :  无背景图url :  使用绝对或相对地址指定背景图像说明&#xff1a;设置或检索对象整个区域的背景图像。示例&#xff1a;code {position: absolute;top: 100px; lef…

纪事本 乱码_纪事日记–可自定义的数据存储

纪事本 乱码总览 使任何数据结构或算法尽可能快的方法是使代码完全执行您想要的操作&#xff0c;而无需执行其他操作。 建立一个可以做任何人想做的每件事的数据存储的问题是&#xff0c;它做得特别不好。 自定义数据存储在性能方面可以实现什么&#xff1f; 您可以支持&#…

datavideo切换台说明书_【新品发布】datavideo SE-650 高清四通道切换台

还在为音乐演唱会的拍摄而烦恼吗&#xff1f;或者为体育比赛的多机位发愁&#xff1f;或者为微课、优课、慕课制作而焦头烂额&#xff1f;大部分用户对多机位的恐惧来源于相关产品令人发指的复杂和专业性&#xff0c;面对满眼的键盘会有无从下手之感&#xff0c;很多学校和工作…

NSA:建议从 C/C++ 切换到内存安全语言

点击蓝字关注我们因公众号更改推送规则&#xff0c;请点“在看”并加“星标”第一时间获取精彩技术分享来源于网络&#xff0c;侵删美国国家安全局&#xff08;NSA&#xff09;发布了一份指南&#xff0c;旨在帮助软件开发商和运营商预防和缓解软件内存安全问题。其鼓励组织将编…