基于Java、Kafka、ElasticSearch的搜索框架的设计与实现


Jkes是一个基于Java、Kafka、ElasticSearch的搜索框架。Jkes提供了注解驱动的JPA风格的对象/文档映射,使用rest api用于文档搜索。

项目主页:https://github.com/chaokunyang/jkes


安装


可以参考jkes-integration-test项目快速掌握jkes框架的使用方法。jkes-integration-test是我们用来测试功能完整性的一个Spring Boot Application。

  • 安装jkes-index-connectorjkes-delete-connector到Kafka Connect类路径

  • 安装 Smart Chinese Analysis Plugin

sudo bin/elasticsearch-plugin install analysis-smartcn

配置

  • 引入jkes-spring-data-jpa依赖

  • 添加配置

@EnableAspectJAutoProxy@EnableJkes@Configurationpublic class JkesConfig {@Beanpublic PlatformTransactionManager transactionManager(EntityManagerFactory factory, EventSupport eventSupport) {    return new SearchPlatformTransactionManager(new JpaTransactionManager(factory), eventSupport);}
}
  • 提供JkesProperties Bean

@Component@Configurationpublic class JkesConf extends DefaultJkesPropertiesImpl {@PostConstructpublic void setUp() {Config.setJkesProperties(this);}    @Overridepublic String getKafkaBootstrapServers() {        return "k1-test.com:9292,k2-test.com:9292,k3-test.com:9292";}    @Overridepublic String getKafkaConnectServers() {        return "http://k1-test.com:8084,http://k2-test.com:8084,http://k3-test.com:8084";}    @Overridepublic String getEsBootstrapServers() {        return "http://es1-test.com:9200,http://es2-test.com:9200,http://es3-test.com:9200";}    @Overridepublic String getDocumentBasePackage() {        return "com.timeyang.jkes.integration_test.domain";}    @Overridepublic String getClientId() {        return "integration_test";}}

这里可以很灵活,如果使用Spring Boot,可以使用@ConfigurationProperties提供配置

  • 增加索引管理端点 因为我们不知道客户端使用的哪种web技术,所以索引端点需要在客户端添加。比如在Spring MVC中,可以按照如下方式添加索引端点

@RestController@RequestMapping("/api/search")public class SearchEndpoint {private Indexer indexer;    @Autowiredpublic SearchEndpoint(Indexer indexer) {        this.indexer = indexer;}    @RequestMapping(value = "/start_all", method = RequestMethod.POST)    public void startAll() {indexer.startAll();}    @RequestMapping(value = "/start/{entityClassName:.+}", method = RequestMethod.POST)    public void start(@PathVariable("entityClassName") String entityClassName) {indexer.start(entityClassName);}    @RequestMapping(value = "/stop_all", method = RequestMethod.PUT)    public Map<String, Boolean> stopAll() {        return indexer.stopAll();}    @RequestMapping(value = "/stop/{entityClassName:.+}", method = RequestMethod.PUT)    public Boolean stop(@PathVariable("entityClassName") String entityClassName) {        return indexer.stop(entityClassName);}    @RequestMapping(value = "/progress", method = RequestMethod.GET)    public Map<String, IndexProgress> getProgress() {        return indexer.getProgress();}}


快速开始


索引API


使用com.timeyang.jkes.core.annotation包下相关注解标记实体

@lombok.Data@Entity@Documentpublic class Person extends AuditedEntity {// @Id will be identified automatically// @Field(type = FieldType.Long)@Id@GeneratedValue(strategy = GenerationType.IDENTITY)    private Long id;    @MultiFields(mainField = @Field(type = FieldType.Text),otherFields = {                    @InnerField(suffix = "raw", type = FieldType.Keyword),                    @InnerField(suffix = "english", type = FieldType.Text, analyzer = "english")})    private String name;    @Field(type = FieldType.Keyword)    private String gender;    @Field(type = FieldType.Integer)    private Integer age;    // don't add @Field to test whether ignored// @Field(type = FieldType.Text)private String description;    @Field(type = FieldType.Object)    @ManyToOne(fetch = FetchType.EAGER)    @JoinColumn(name = "group_id")    private PersonGroup personGroup;}
@lombok.Data@Entity@Document(type = "person_group", alias = "person_group_alias")public class PersonGroup extends AuditedEntity {@Id@GeneratedValue(strategy = GenerationType.IDENTITY)    private Long id;    private String name;    private String interests;    @OneToMany(fetch = FetchType.EAGER, cascade = CascadeType.ALL, mappedBy = "personGroup", orphanRemoval = true)    private List<Person> persons;    private String description;    @DocumentId@Field(type = FieldType.Long)    public Long getId() {        return id;}    @MultiFields(mainField = @Field(type = FieldType.Text),otherFields = {                    @InnerField(suffix = "raw", type = FieldType.Keyword),                    @InnerField(suffix = "english", type = FieldType.Text, analyzer = "english")})    public String getName() {        return name;}    @Field(type = FieldType.Text)    public String getInterests() {        return interests;}    @Field(type = FieldType.Nested)    public List<Person> getPersons() {        return persons;}    /*** 不加Field注解,测试序列化时是否忽略*/public String getDescription() {        return description;}
}

当更新实体时,文档会被自动索引到ElasticSearch;删除实体时,文档会自动从ElasticSearch删除。


搜索API


启动搜索服务jkes-search-service,搜索服务是一个Spring Boot Application,提供rest搜索api,默认运行在9000端口。

  • URI query

curl -XPOST localhost:9000/api/v1/integration_test_person_group/person_group/_search?from=3&size=10
  • Nested query

integration_test_person_group/person_group/_search?from=0&size=10{  "query": {    "nested": {      "path": "persons",      "score_mode": "avg",      "query": {        "bool": {          "must": [{              "range": {                "persons.age": {                  "gt": 5}}}]}}}}
}
  • match query

integration_test_person_group/person_group/_search?from=0&size=10{  "query": {      "match": {        "interests": "Hadoop"}}
}
  • bool query

{"query": {"bool" : {"must" : {"match" : { "interests" : "Hadoop" }      },"filter": {"term" : { "name.raw" : "name0" }      },"should" : [{ "match" : { "interests" : "Flink" } },{"nested" : {                "path" : "persons",                "score_mode" : "avg",                "query" : {                    "bool" : {                        "must" : [                        { "match" : {"persons.name" : "name40"} },                        { "match" : {"persons.interests" : "interests"} }                        ],                        "must_not" : {                            "range" : {                              "age" : { "gte" : 50, "lte" : 60 }                            }                          }                    }                }            }        }],"minimum_should_match" : 1,"boost" : 1.0}  }}
  • Source filtering

integration_test_person_group/person_group/_search
{    "_source": false,    "query" : {        "match" : { "name" : "name17" }}
}
integration_test_person_group/person_group/_search
{    "_source": {            "includes": [ "name", "persons.*" ],            "excludes": [ "date*", "version", "persons.age" ]},    "query" : {        "match" : { "name" : "name17" }}
}
  • prefix

integration_test_person_group/person_group/_search
{ "query": {    "prefix" : { "name" : "name" }}
}
  • wildcard

integration_test_person_group/person_group/_search
{    "query": {        "wildcard" : { "name" : "name*" }}
}
  • regexp

integration_test_person_group/person_group/_search
{    "query": {        "regexp":{            "name": "na.*17"}}
}


Jkes工作原理


索引工作原理:


  • 应用启动时,Jkes扫描所有标注@Document注解的实体,为它们构建元数据。

  • 基于构建的元数据,创建indexmappingJson格式的配置,然后通过ElasticSearch Java Rest Client将创建/更新index配置。

  • 为每个文档创建/更新Kafka ElasticSearch Connector,用于创建/更新文档

  • 为整个项目启动/更新Jkes Deleter Connector,用于删除文档

  • 拦截数据操作方法。将* save(*)方法返回的数据包装为SaveEvent保存到EventContainer;使用(* delete*(..)方法的参数,生成一个DeleteEvent/DeleteAllEvent保存到EventContainer

  • 拦截事务。在事务提交后使用JkesKafkaProducer发送SaveEvent中的实体到Kafka,Kafka会使用我们提供的JkesJsonSerializer序列化指定的数据,然后发送到Kafka。

  • SaveEvent不同,DeleteEvent会直接被序列化,然后发送到Kafka,而不是只发送一份数据

  • SaveEventDeleteEvent不同,DeleteAllEvent不会发送数据到Kafka,而是直接通过ElasticSearch Java Rest Client删除相应的index,然后重建该索引,重启Kafka ElasticSearch Connector


查询工作原理:


  • 查询服务通过rest api提供

  • 我们没有直接使用ElasticSearch进行查询,因为我们需要在后续版本使用机器学习进行搜索排序,而直接与ElasticSearch进行耦合,会增加搜索排序API的接入难度

  • 查询服务是一个Spring Boot Application,使用docker打包为镜像

  • 查询服务提供多版本API,用于API进化和兼容

  • 查询服务解析json请求,进行一些预处理后,使用ElasticSearch Java Rest Client转发到ElasticSearch,将得到的响应进行解析,进一步处理后返回到客户端。

  • 为了便于客户端人员开发,查询服务提供了一个查询UI界面,开发人员可以在这个页面得到预期结果后再把json请求体复制到程序中。


流程图



模块介绍


jkes-core


jkes-core是整个jkes的核心部分。主要包括以下功能:


  • annotation包提供了jkes的核心注解

  • elasticsearch包封装了elasticsearch相关的操作,如为所有的文档创建/更新索引,更新mapping

  • kafka包提供了Kafka 生产者,Kafka Json Serializer,Kafka Connect Client

  • metadata包提供了核心的注解元数据的构建与结构化模型

  • event包提供了事件模型与容器

  • exception包提供了常见的Jkes异常

  • http包基于Apache Http Client封装了常见的http json请求

  • support包暴露了Jkes核心配置支持

  • util包提供了一些工具类,便于开发。如:Asserts, ClassUtils, DocumentUtils, IOUtils, JsonUtils, ReflectionUtils, StringUtils


jkes-boot


jkes-boot用于与一些第三方开源框架进行集成。


当前,我们通过jkes-spring-data-jpa,提供了与spring data jpa的集成。通过使用Spring的AOP机制,对Repository方法进行拦截,生成SaveEvent/DeleteEvent/DeleteAllEvent保存到EventContainer。通过使用我们提供的SearchPlatformTransactionManager,对常用的事务管理器(如JpaTransactionManager)进行包装,提供事务拦截功能。


在后续版本,我们会提供与更多框架的集成。


jkes-spring-data-jpa说明:


  • ContextSupport类用于从bean工厂获取Repository Bean

  • @EnableJkes让客户端能够轻松开启Jkes的功能,提供了与Spring一致的配置模型

  • EventSupport处理事件的细节,在保存和删除数据时生成相应事件存放到EventContainer,在事务提交和回滚时处理相应的事件

  • SearchPlatformTransactionManager包装了客户端的事务管理器,在事务提交和回滚时加入了回调hook

  • audit包提供了一个简单的AuditedEntity父类,方便添加审计功能,版本信息可用于结合ElasticSearch的版本机制保证不会索引过期文档数据

  • exception包封装了常见异常

  • intercept包提供了AOP切点和切面

  • index包提供了全量索引功能。当前,我们提供了基于线程池的索引机制和基于ForkJoin的索引机制。在后续版本,我们会重构代码,增加基于阻塞队列生产者-消费者模式,提供并发性能



jkes-services


jkes-services主要用来提供一些服务。 目前,jkes-services提供了以下服务:


  • jkes-delete-connector

    • jkes-delete-connector是一个Kafka Connector,用于从kafka集群获取索引删除事件(DeleteEvent),然后使用Jest Client删除ElasticSearch中相应的文档。

    • 借助于Kafka Connect的rest admin api,我们轻松地实现了多租户平台上的文档删除功能。只要为每个项目启动一个jkes-delete-connector,就可以自动处理该项目的文档删除工作。避免了每启动一个新的项目,我们都得手动启动一个Kafka Consumer来处理该项目的文档删除工作。尽管可以通过正则订阅来减少这样的工作,但是还是非常不灵活

  • jkes-search-service

    • jkes-search-service是一个restful的搜索服务,提供了多版本的rest query api。查询服务提供多版本API,用于API进化和兼容

    • jkes-search-service目前支持URI风格的搜索和JSON请求体风格的搜索。

    • 我们没有直接使用ElasticSearch进行查询,因为我们需要在后续版本使用机器学习进行搜索排序,而直接与ElasticSearch进行耦合,会增加搜索排序的接入难度

    • 查询服务是一个Spring Boot Application,使用docker打包为镜像

    • 查询服务解析json请求,进行一些预处理后,使用ElasticSearch Java Rest Client转发到ElasticSearch,将得到的响应进行解析,进一步处理后返回到客户端。

    • 为了便于客户端人员开发,查询服务提供了一个查询UI界面,开发人员可以在这个页面得到预期结果后再把json请求体复制到程序中。

后续,我们将会基于zookeeper构建索引集群,提供集群索引管理功能


jkes-integration-test


jkes-integration-test是一个基于Spring Boot集成测试项目,用于进行功能测试。同时测量一些常见操作的吞吐率


开发


To build a development version you'll need a recent version of Kafka. You can build jkes with Maven using the standard lifecycle phases.


Contribute


  • Source Code: https://github.com/chaokunyang/jkes

  • Issue Tracker: https://github.com/chaokunyang/jkes/issues


LICENSE


This project is licensed under Apache License 2.0.


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/525840.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Docker是传统的应用发布管理的终结者么?

译者注&#xff1a;使用Docker能真正改善传统的应用发布管理中遇到的问题么&#xff1f;以下是译文&#xff1a;自从2013年发布以来&#xff0c;Docker已经成为每一个操作管理者眼中的最爱。如果你一直与世隔绝&#xff0c;这里恰恰是你错过的部分。Docker是在一个操作环境地址…

基于Mesos/Docker构建数据处理平台

本文深入介绍了去哪儿网利用Mesos和Docker构建私有云服务的全过程&#xff0c;分享了从无状态应用向有状态应用逐步过度的经验与心得。平台概览2014年下半年左右&#xff0c;去哪儿完成了有关构建私有云服务的技术调研&#xff0c;并最终拍定了Docker/Mesos这一方案。下图1展示…

Mesos容器引擎的架构设计和实现解析

引言&#xff1a;提到容器&#xff0c;大家第一时间都会想到Docker&#xff0c;毕竟Docker是目前最为流行的容器开源项目&#xff0c;它实现了一个容器引擎&#xff08;Docker engine&#xff09;&#xff0c;并且为容器的创建和管理、容器镜像的生成、分发和下载提供一套非常便…

阿里的盔甲、未来20年发展的动力以及对未来的洞察

刚刚变身迈克尔杰克逊&#xff0c;用“经济体”、“理想主义”等词刷屏的马云又在教师节那天&#xff0c;赶到2017世界物联网博览会&#xff0c;为阿里的物联网站台。过去18年以来&#xff0c;淘宝网、天猫、聚划算、全球速卖通、阿里巴巴国际交易市场、1688、阿里妈妈、蚂蚁金…

MySQL InnoDB Memcached Plugin在Oray公司的实践

1、应用背景介绍我所在职的Oray是一家提供各种互联网服务且具有海量用户的企业&#xff0c;我们也一直在实践各种新技术新架构&#xff1b;缓存方面&#xff0c;我们从memcached、ttserver、redis等都有较多应用&#xff0c;其中redis在我们的dns体系中有着很深度的集成使用&am…

网易数据运河系统NDC设计与应用

【导语】 NDC是网易近一年新诞生的结构化数据传输服务&#xff0c;它整合了网易过去在数据传输领域的各种工具和经验&#xff0c;将单机数据库、分布式数据库、OLAP系统以及下游应用通过数据链路串在一起。除了保障高效的数据传输外&#xff0c;NDC的设计遵循了单元化和平台化的…

想学区块链技术?来这!

2017年&#xff0c;区块链技术可谓是最热的宠儿。在国务院日前印发《“十三五”国家信息化规划》中&#xff0c;区块链技术和人工智能、虚拟现实、大数据、无人驾驶交通工具、基因编辑等新多项高新技术创新被定义为战略性前沿技术超前布局&#xff0c;在政府大方向认同的情况下…

oracle管理员登录报错,关于Oracle使用管理员账号登录失败的问题

我在本地建的Oracle数据库在调试自己写的存储过程的时候提示缺少 debug connect session 权限&#xff0c;一般情况下根据这个提示直接用管理员账号登录进去&#xff0c;执行grant debug connect session to 你的用户名这样的sql就行了&#xff0c;但是问题来了&#xff0c;当我…

万字长文|深度剖析Service Mesh服务网格新生代Istio

Service Mesh新秀&#xff0c;初出茅庐便声势浩荡&#xff0c;前有Google&#xff0c;IBM和Lyft倾情奉献&#xff0c;后有业界大佬俯首膜拜&#xff0c;这就是今天将要介绍的主角&#xff0c;扛起Service Mesh大旗&#xff0c;掀起新一轮微服务开发浪潮的Istio&#xff01; 今天…

避免大规模故障的微服务架构设计之道

作者&#xff1a;Pter Mrton 译者&#xff1a;Jackyrong 本文首先介绍微服务架构存在的风险&#xff0c;然后针对如何避免微服务架构的故障&#xff0c;提出了多种有效的微服务架构中的方法和技术&#xff0c;其中例如服务降级、变更管理、健康检查和修复、断路器、限流器等。…

AI 线上峰会 | 人工智能技术解析与实战

10月28日&#xff0c; SDCC 2017“人工智能技术实战线上峰会”将在CSDN学院以直播互动的方式举行。 如今人工智能已不单单是发表学术论文、刷新正确率的竞赛&#xff0c;抑或全民参与的新闻事件&#xff0c;它早在为各行各业的先行者们创造着实实在在的利润和商业价值。而且&am…

五阿哥钢铁电商资深运维工程师手把手教你这样玩企业组网

虽说干的是信息化智能化的行当&#xff0c;但每个IT工程师都必定踩过“IT系统不智能”的坑。就拿企业组建局域网来说&#xff0c;为了对网络接入用户身份进行确认&#xff0c;确保用户权限不受办公地点变更的影响&#xff0c;许多IT工程师都习惯开启 “手动模式”和苦逼的“加班…

预告:Intel、Hulu、阿里、京东、携程等大数据实战直播

前言&#xff1a;由CSDN主办的SDCC 2017之大数据技术实战线上峰会将在CSDN学院举行。作为SD系列技术峰会的一部分&#xff0c;本次线上峰会秉承干货实料&#xff08;案例&#xff09;的内容原则&#xff0c;将邀请圈内顶尖的布道师、技术专家和技术引领者&#xff0c;共话大数据…

微服务应用容器化场景中常见问题总结

简介&#xff1a;云原生技术栈是下一代应用转型的必然选择&#xff0c;它包含了微服务架构&#xff0c;DevOps和容器技术。对于微服务架构来说&#xff0c;应用是“第一公民”&#xff0c;他逐渐蚕食原来底层软件或者硬件的功能&#xff0c;例如服务注册与发现以及负载均衡&…

Swarm的进化和大规模应用

目前在容器编排领域&#xff0c;Kubernetes、Mesos以及Swarm呈现“三分天下”的格局&#xff0c;各自都有着不同的应用场景。短期内&#xff0c;很难看到“一统天下”的局面&#xff0c;本文&#xff0c;来自阿里云高级专家陈萌辉将带你了解阿里内部在推行容器化过程中的一些着…

linux可以用dos命令是什么意思,Linux系统常用命令与DOS命令的类似之处和本质区别各是什么?...

满意答案iedsa3641推荐于 2019.09.13采纳率&#xff1a;56% 等级&#xff1a;8已帮助&#xff1a;361人Linux是一个非常优秀的操作系统&#xff0c;与MS&#xff0d;WINDOWS相比具有可靠、稳定、速度快等优点&#xff0c;且拥有丰富的根据UNIX版本改进的强大功能。下面做一个…

从 0 到 300,Instagram 创始人 CTO 分享工程团队成长的经验

最初&#xff0c;Instagram 被 Facebook 收购时公司只有六个工程师&#xff0c;且都是全栈。本文Instagram 创始人兼 CTO Mike Krieger 分享了创业初期并在资源有限的情况下&#xff0c;人才招聘、技术专攻的实践经验&#xff0c;将时间、精力用在最有价值的地方。以下为译文&a…

深度揭秘Twitter的新一代流处理引擎Heron

流计算又称实时计算&#xff0c;是继以Map-Reduce为代表的批处理之后的又一重要计算模型。随着互联网业务的发展以及数据规模的持续扩大&#xff0c;传统的批处理计算难以有效地对数据进行快速低延迟处理并返回结果。由于数据几乎处于不断增长的状态中&#xff0c;及时处理计算…

linux生成图片快捷方式,在Deepin Linux系统下给AppImage格式软件创建快捷方式的方法...

这两天使用deepin的过程中&#xff0c;无意中发现了一个叫krita的程序&#xff0c;是一个图像处理软件&#xff0c;类似Photoshop&#xff0c;于是就下载krita-4.2.8-x86_64的这个版本。但是麻烦的就是他是一个AppImage格式&#xff0c;每次我打开的时候需要打开相应文件夹中的…

图数据库在CMDB领域的应用

【导语】在上期的图数据库介绍中&#xff0c;我们对什么是图数据库&#xff0c;以及图数据库所擅长的领域做了一个初步的介绍&#xff0c;也收到了众多的反馈和咨询&#xff0c;特别要求我们对图数据库在一些具体行业的应用能做一些深入介绍。为此&#xff0c;从本期文档开始&a…