如何为 Elasticsearch 创建自定义连接器

了解如何为 Elasticsearch 创建自定义连接器以简化数据摄取过程。

作者:JEDR BLASZYK

Elasticsearch 拥有一个摄取工具库,可以从多个来源获取数据。 但是,有时你的数据源可能与 Elastic 现有的提取工具不兼容。 在这种情况下,你可能需要创建自定义连接器以将数据与 Elasticsearch 连接。

在你的应用程序中使用 Elastic 连接器有多种原因。 例如,你可能想要:

  • 将数据从自定义或遗留应用程序引入 Elasticsearch
  • 为你的组织数据引入语义搜索
  • 从 PDF、MS Office 文档等文件中提取文本内容
  • 使用 Kibana UI 管理你的数据源(包括配置、过滤规则、设置定期同步计划规则)
  • 你想要在自己的基础设施上部署 Elastic 连接器(一些 Elastic 支持的连接器可作为 Elastic Cloud 中的本机连接器使用)

用于创建定制连接器的开放代码框架

如果创建你自己的连接器是满足你需求的解决方案,那么连接器框架将帮助你创建一个。 我们创建的框架是为了支持创建自定义连接器并帮助用户将独特的数据源连接到 Elasticsearch。 连接器的代码可在 GitHub 上找到,并且我们有可以帮助你入门的文档。

该框架设计简单且高性能。 它旨在对开发人员友好,因此它是开放代码且高度可定制的。 你创建的连接器可以在你自己的基础设施上进行自我管理。 目标是让开发人员能够轻松地将自己的数据源与 Elasticsearch 集成。

使用连接器框架之前你需要了解什么

该框架是用 async-python 编写的

有几门课程可以学习 async-python。 如果你需要推荐,我们认为这个 LinkedIn 学习课程非常好,但需要订阅。 我们喜欢的一个免费替代方案是这个。

为什么我们选择异步 Python?

摄取受 IO 限制(而非 CPU 限制),因此从资源利用的角度构建连接器时,异步编程是最佳方法。 在 I/O 密集型应用程序中,大部分时间都花在等待外部资源上,例如读取文件、发出网络请求或查询数据库。 在这些等待期间,传统的同步代码会阻塞整个程序,导致资源利用效率低下。

还有其他先决条件吗?

这不是先决条件。 在开始之前,绝对值得阅读《连接器开发人员指南》! 希望你觉得这个有用。

使用连接器框架构建定制连接器

入门很容易。 在与框架相关的术语中,我们将自定义连接器称为源。 我们通过创建一个新类来实现一个新的源,该类的职责是将文档从自定义数据源发送到Elasticsearch。

作为一种可选的入门方式,用户还可以查看此目录源 (directory source) 示例。 这是一个很好但基本的示例,可以帮助你了解如何编写自定义连接器。

步骤概要

一旦你知道要为其创建连接器的自定义数据源,以下是编写新源的步骤概述:

  • 在 connectors/sources 中添加模块或目录
  • 在 requirements.txt 中声明你的依赖项。 确保固定这些依赖项
  • 实现一个类,该类实现 connectors.source.BaseDataSource 中描述的方法
  • (可选,在为 repo 做出贡献时)在 connectors/sources/tests 中添加单元测试,覆盖率 +90%
  • 在源部分声明你的连接器 connectors/config.py
  • 就是这样。 我们完成了! 现在你应该能够运行连接器

在编写定制连接器之前你需要了解什么

为了使 Elasticsearch 用户能够获取数据并在该数据的基础上构建搜索体验,我们提供了一个轻量级的连接器协议。 该协议允许用户轻松获取数据、使用企业搜索功能来操作该数据并创建搜索体验,同时在 Kibana 中为他们提供无缝的用户体验。 为了与企业搜索兼容并充分利用 Kibana 中提供的连接器功能,连接器必须遵守该协议。

关于连接器协议你需要了解的内容

该文档页面提供了该协议的良好概述。 以下是你需要了解的内容:

  • 连接器和系统其他部分之间的所有通信都通过 Elasticsearch 索引异步进行
  • 连接器将其状态传达给 Elasticsearch 和 Kibana,以便用户可以为其提供配置并诊断任何问题
  • 这允许简单、开发人员友好的连接器部署。 connectors 服务是无状态的,并且不关心你的 Elastic 部署在哪里运行,只要它可以通过网络连接到它就可以正常工作。 该服务还具有容错能力,可以在重新启动或发生故障后在不同的主机上恢复操作。 一旦与 Elasticsearch 重新建立连接,它将继续正常运行。
  • 在底层,该协议使用 Elasticsearch 索引来跟踪连接器状态
    • .elastic-connectors 和 .elastic-connectors-sync-jobs (在上面链接的文档中描述)

托管自定义连接器的位置

连接器本身不依赖于 Elasticsearch,它可以托管在你自己的环境中

如果你有 Elasticsearch 部署,无论它是自我管理还是位于 Elastic Cloud 中:

  • 作为开发人员/公司,你可以为你的数据源编写自定义连接器
  • 在你自己的基础设施上管理连接器并根据你的需求配置连接器服务
  • 只要连接器可以通过网络发现 Elasticsearch,它就能够对数据建立索引
  • 作为管理员,你可以通过 Kibana 控制连接器

示例:使用连接器框架的 Google Drive 连接器

我们使用连接器框架为 Google Drive 编写了一个简单的连接器。 我们通过创建一个新类来实现新的源,该类的职责是将文档从目标源发送到 Elasticsearch。

注意:本教程与 Elastic stack 版本 8.10 兼容。 对于更高版本,请务必检查连接器发行说明以获取更新并参考 Github 存储库。

我们从具有 BaseDataSource 预期方法签名的 GoogleDriveDataSource 类开始,以配置数据源、检查其可用性(ping)并检索文档。 为了使这个连接器发挥作用,我们需要实现这些方法。

class GoogleDriveDataSource(BaseDataSource):"""Google Drive"""name = "Google Drive"service_type = "google_drive"@classmethoddef get_default_configuration(cls):"""Returns a dict with a default configuration"""raise NotImplementedErrorasync def ping(self):"""When called, pings the backendIf the backend has an issue, raises an exception"""raise NotImplementedErrorasync def get_docs(self, filtering=None):"""Returns an iterator on all documents present in the backendEach document is a tuple with:- a mapping with the data to index- a coroutine to download extra data (attachments)The mapping should have least an `id` fieldand optionally a `timestamp` field in ISO 8601 UTCThe coroutine is called if the document needs to be syncedand has attachments. It needs to return a mapping to index.It has two arguments: doit and timestampIf doit is False, it should return None immediately.If timestamp is provided, it should be used in the mapping.Example:async def get_file(doit=True, timestamp=None):if not doit:returnreturn {'TEXT': 'DATA', 'timestamp': timestamp,'id': 'doc-id'}"""raise NotImplementedError

这个 GoogleDriveDataSource 类是编写 Google Drive 源代码的起点。 通过执行以下步骤,你将实现与 Google Drive 同步数据所需的逻辑:

  • 我们需要将此文件添加到 connectors/sources 中
  • 设置新的连接器名称和 service_type,例如 Google Drive 作为名称,google_drive 作为服务类型 (service type)
  • 要从源获取连接器同步数据,你需要实现:
    • get_default_configuration - 此函数应返回 RichConfigurableFields 的集合。 这些字段允许你从 Kibana UI 配置连接器。 这包括传递身份验证详细信息、凭据和其他特定于源的设置。 Kibana 巧妙地呈现这些配置。 例如,如果你将某个字段标记为 "sensitive": true, Kibana 会出于安全原因屏蔽它。
    • ping - 对数据源的简单调用,验证其状态,将其视为健康检查。
    • get_docs - 此方法需要实现实际从源获取数据的逻辑。 此函数应返回一个异步迭代器,该迭代器返回一个包含以下内容的元组:(document, lazy_download),其中:
      • document - 是远程源中项目的 JSON 表示形式。 (如 name, location, table, author, size 等)
      • lazy_download - 是一个协程,用于下载框架处理的内容提取的对象/附件(例如从 PDF 文档中提取文本)

BaseDataSource 类中还有其他抽象方法。 请注意,如果你只想支持内容同步(例如从谷歌驱动器获取所有数据),则不需要实现这些方法。 它们指的是其他连接器功能,例如:

  • 文档级安全性(get_access_control、access_control_query)
  • 高级过滤规则(advanced_rules_validators)
  • 增量同步 (get_docs_incrementally)
  • 将来可能会添加其他功能

我们如何编写官方 Elasticsearch Google Drive 连接器

首先实现 BaseDataSource 类所需的方法

我们需要实现方法 get_default_configuration、ping 和 get_docs 以使连接器同步数据。 因此,让我们更深入地了解实现。

首先要考虑的是:如何与Google Drive “对话” 来获取数据?

Google 提供了官方的 python 客户端,但它是同步的,因此同步内容可能会很慢。 我们认为更好的选择是 aiogoogle 库,它提供了用异步 python 编写的完整客户端功能。 一开始这可能并不直观,但使用异步操作来提高性能非常重要。 因此,在此示例中,我们选择不使用官方谷歌库,因为它不支持异步模式。

如果你在异步框架中使用同步或阻塞代码,可能会对性能产生重大影响。 任何异步框架的核心都是事件循环。 事件循环允许通过连续轮询已完成的任务并调度新任务来并发执行异步任务。 如果引入阻塞操作,它将停止循环的执行,从而阻止它管理其他任务。 这本质上否定了异步架构提供的并发优势。

下一个问题是连接器身份验证

我们将 Google Drive 连接器验证为服务帐户。 有关身份验证的更多信息可以在这些连接器文档页面中找到。

  • 服务帐户可以使用密钥进行身份验证
  • 我们通过 Elasticsearch 中的 Kibana UI 将身份验证密钥传递给服务帐户

让我们看一下 get_default_configuration 实现,它允许最终用户传递凭证密钥,该凭证密钥将存储在索引中以在同步期间进行身份验证:

class GoogleDriveDataSource(BaseDataSource):"""Google Drive"""name = "Google Drive"service_type = "google_drive"{...}@classmethoddef get_default_configuration(cls):"""Get the default configuration for Google Drive.Returns:dict: Default configuration."""return {"service_account_credentials": {"display": "textarea","label": "Google Drive service account JSON","sensitive": True,"order": 1,"tooltip": "This connectors authenticates as a service account to synchronize content from Google Drive.","type": "str","value": "",},}

接下来我们来实现一个简单的 ping 方法

我们将对 google Drive api 进行简单的调用,例如 /about 端点。

对于此步骤,我们考虑 GoogleDriveClient 的简化表示。 我们的主要目标是指导你完成连接器创建,因此我们不关注 Google Drive 客户端的实现细节。 然而,最少的客户端代码对于连接器的操作至关重要,因此我们将依赖 GoogleDriveClient 类表示的伪代码。

class GoogleDriveClient(GoogleAPIClient):"""A google drive client to handle api calls made to Google Drive API."""{... google drive client implementation}async def ping(self):return await self.api_call(resource="about", method="get", fields="kind")class GoogleDriveDataSource(BaseDataSource):"""Google Drive"""name = "Google Drive"service_type = "google_drive"{...}@cached_propertydef google_drive_client(self):"""Initialize and return the GoogleDriveClientReturns:GoogleDriveClient: An instance of the GoogleDriveClient."""self._validate_service_account_json()json_credentials = json.loads(self.configuration["service_account_credentials"])return GoogleDriveClient(json_credentials=json_credentials)async def ping(self):"""Verify the connection with Google Drive"""try:await self.google_drive_client.ping()self._logger.info("Successfully connected to the Google Drive.")except Exception:self._logger.exception("Error while connecting to the Google Drive.")raise

异步 iterator 从 google drive 返回文件以进行内容提取

下一步是编写 get_docs 异步迭代器,该迭代器将从 Google drive 和协程返回文件以下载它们以进行内容提取。 根据个人经验,开始将 get_docs 作为一个简单的独立 python 脚本来实现并获取一些数据通常更简单。 一旦 get_docs 代码正常工作,我们就可以将其移动到数据源类。

我们看一下 api 文档,我们可以:

  • 使用文 files/list 端点通过分页迭代 drive 中的文档
  • 使用 files/get 和 files/export 下载文件(或将 google 文档导出为特定文件格式)
class GoogleDriveDataSource(BaseDataSource):"""Google Drive"""name = "Google Drive"service_type = "google_drive"{...}async def get_content(self, file, timestamp=None, doit=None):"""Extracts the content from a file file.Args:file (dict): Formatted file document.timestamp (timestamp, optional): Timestamp of file last_modified. Defaults to None.doit (boolean, optional): Boolean value for whether to get content or not. Defaults to None.Returns:dict: Content document with id, timestamp & text"""# Code details have been omitted here for brevity. For a complete implementation,# please refer to the connector code on GitHub.async def get_docs(self, filtering=None):"""Executes the logic to fetch Google Drive objects in an async manner.Args:filtering (optional): Advenced filtering rules. Defaults to None.Yields:dict, partial: dict containing meta-data of the Google Drive objects,partial download content function"""async for files_page in self.google_drive_client.list_files():async for file in self.prepare_files(files_page=files_page):yield file, partial(self.get_content, file)

那么这段代码中发生了什么?

  • list_files 对驱动器中的文件进行分页。
  • prepare_files 将文件元数据格式化为预期模式
  • get_content 是一个下载文件并对其内容进行 Base64 编码的协程(内容提取的兼容格式)

为了简洁起见,省略了一些代码细节。 有关完整的实现,请参阅 GitHub 上的当前连接器实现。

让我们运行连接器!

要将自定义连接器集成到框架中,你需要注册其实现。 通过在 connectors/config.py 的源部分中添加自定义连接器的条目来执行此操作。 对于 Google Drive 示例,添加内容将如下所示:

"sources": {...,"google_drive": "connectors.sources.google_drive:GoogleDriveDataSource",...
}

现在在 Kibana 界面中:

  • 转到 Search -> Indices -> Create a new index -> Use a Connector
  • 选择 Customized connector(使用自定义连接器时)
  • 配置你的连接器。 生成 Elasticsearch API 密钥和连接器 ID,并按照说明将这些详细信息放入 config.yml 中,然后启动连接器。

此时,Kibana 应该检测到您的连接器! 安排定期数据同步或只需单击 “Sync” 即可开始完全同步。

连接器可以配置为使用 Elasticsearch 的摄取管道在将数据存储到索引之前对数据执行转换。 一个常见的用例是通过机器学习丰富文档。 例如,你可以:

  • 使用文本嵌入模型分析文本字段,该模型将生成数据的密集向量表示
  • 运行文本分类以进行情感分析
  • 使用命名实体识别 (NER) 从文本中提取关键信息

同步完成后,你的数据将在搜索优化的 Elasticsearch 索引中可用。 此时,你可以深入构建搜索体验或深入分析。

你想创建并贡献一个新的连接器吗?

如果你为可能对 Elasticsearch 社区有所帮助的源创建自定义连接器,请考虑贡献它。 以下是使定制连接器成为 Elastic 支持的连接器的升级路径指南。

贡献连接器的验收标准

此外,在开始花一些时间开发连接器之前,你应该创建一个问题并寻求有关连接器及其将使用哪些库的一些初步反馈。 一旦你的连接器想法得到一些初步反馈,请确保您的项目满足一些验收标准:

  • 在 connectors/sources 中添加模块或目录
  • 实现一个类,该类实现 connectors.source.BaseDataSource 中描述的所有方法
  • 在 connectors/sources/tests 中添加覆盖率 +90% 的单元测试
  • 在源部分的 connectors/config.py 中声明你的连接器
  • 在 requirements.txt 中声明你的依赖项。 确保固定这些依赖项
  • 对于你要添加的每个依赖项(包括间接依赖项),列出所有许可证并在补丁中提供该列表。
  • 确保你的源使用异步库。 如果不可能,请确保没有阻塞循环
  • 如果可能,请提供运行后端服务的 docker 映像,以便我们测试连接器。 如果您无法提供 Docker 映像,请提供针对在线服务运行所需的凭据。
  • 由于 Elasticsearch 分页的默认大小限制为 10k,测试后端需要返回超过 10k 的文档。 从测试后端返回超过 10k 文档将有助于测试连接器

用于测试连接器的支持工具

我们还有一些支持工具来分析连接器代码并运行性能测试。 你可以在这里找到这些资源:

  • Perf8 - 性能库和仪表板,用于分析 python 代码的质量,以评估资源利用率并检测阻塞调用
  • E-2-E 功能测试,利用 perf8 库来分析每个连接器

总结

我们希望这个博客和示例对你有用。

以下是 Elasticsearch 可用的 native connectors 和 connector clients 的完整列表。 如果你没有找到列出的数据源,是否可以创建一个自定义连接器?

以下是与本文相关的一些有用资源:

  • 连接器 GitHub 存储库和文档页面
  • 异步 Python 学习课程
  • 新的自定义连接器社区指南
  • Elastic 连接器框架的许可详细信息(在此链接中搜索 'Connector Framework')

如果你没有 Elastic 帐户,你可以随时启动试用帐户来开始!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/115659.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【深蓝学院】手写VIO第7章--VINS初始化和VIO系统--作业

0. 内容 1. T1 1. 下载EuRoc数据集(optional) 因为作业主要使用Ch2生成的数据,所以这一步也是可选的,但是为了整个系统的bring up,可以先用EuRoc数据集跑起来。 下载EuRoc数据集,SLAM相关数据集链接 2.…

【LeetCode刷题(数据结构与算法)】:数据结构中的常用排序实现数组的升序排列

现在我先将各大排序的动图和思路以及代码呈现给大家 插入排序 直接插入排序是一种简单的插入排序法,其基本思想是: 把待排序的记录按其关键码值的大小逐个插入到一个已经排好序的有序序列中,直到所有的记录插入完为 止,得到一个…

c语言练习91:合并两个有序链表

合并两个有序链表 力扣(LeetCode)官网 - 全球极客挚爱的技术成长平台 代码1: /*** Definition for singly-linked list.* struct ListNode {* int val;* struct ListNode *next;* };*/typedef struct ListNode ListNode; struct Li…

Appium+python+unittest搭建UI自动化框架!

阅读本小节,需要读者具备如下前提条件: 1. 掌握一种编程语言基础,如java、python等。 2. 掌握一种单元测试框架,如java语言的testng框架、python的unittest框架。 3. 掌握目前主流的UI测试框架,移动端APP测试框架Appiu…

应用3之Like运算符的应用

《VBA经典应用69例》(10178981),是我推出的第九套教程,教程是专门针对初级、中级学员在学习VBA过程中可能遇到的案例展开,这套教程案例众多,紧贴“实战”,并做“战术总结”,以便大家…

医药电商行业想要精准获客?媒介盒子分享三大技巧

随着医疗需求的不断增长,健康成为社会关注的重点,消费者对医药保健产品和需求正在不断增长,数字化时代的来临使医药行业逐渐电商化,线上零售渠道成为医药行业销售额的主要来源,那当下医药电商行业如何抓住机遇&#xf…

AB试验(六)A/B实验常见知识点的Python计算

AB试验(六)A/B实验常见知识点的Python计算 前面理论知识上提到了很多的知识点需要计算,作为一个实用主义的博主,怎么可以忍受空谈呢?所以本期就给大家分享如何利用Python对这些知识点进行计算。 均值类指标 import …

一起学数据结构(11)——快速排序及其优化

上篇文章中,解释了插入排序、希尔排序、冒泡排序、堆排序及选择排序的原理及具体代码实现本片文章将针对快速排序,快速排序的几种优化方法、快速排序的非递归进行解释。 目录 1. 快速排序原理解析以及代码实现: 2. 如何保证相遇位置的值一…

嵌入式硬件库的基本操作方式与分析

本次要介绍的开源软件是 c-periphery: https://github.com/vsergeev/c-periphery一个用 C 语言编写的硬件外设访问库。 我们可以用它来读写 Serial、SPI、I2C 等,非常适合在嵌入式产品上使用。 我们可以基于它优秀的代码框架,不断地扩展出更…

Prometheus接入AlterManager配置邮件告警(基于K8S环境部署)

文章目录 一、配置AlterManager告警发送至邮箱二、Prometheus接入AlterManager配置三、部署PrometheusAlterManager(放到一个Pod中)四、测试告警 注意:请基于 PrometheusGrafana监控K8S集群(基于K8S环境部署)文章之上做本次实验。 一、配置AlterManager告警发送至邮…

C++——特殊类设计

目录 一.不能被拷贝的类 1.C98做法 2.C11做法 二.只能在堆上实例化的类 1.实现方式一 2.实现方式二 三.只能在栈上创建的对象 四.不能被继承的类 1.C98方式 2.C11方式 五.只能创建一个对象的类 1.设计模式 2.单例模式 一.不能被拷贝的类 拷贝只会放在两个场景中&a…

visual studio Qt 开发环境中手动添加 Q_OBJECT 导致编译时出错的问题

问题简述 创建项目的时候,已经添加了类文件,前期认为不需要信号槽,就没有添加宏Q_OBJECT,后面项目需要,又加入了宏Q_OBJECT,但是发现只是添加了一个宏Q_OBJECT,除此之外没有改动其它的代码,原本…

基于springboot实现地方废物回收机构平台管理系统【项目源码+论文说明】

基于springboot实现地方废物回收机构管理系统演示 摘要 网络的广泛应用给生活带来了十分的便利。所以把地方废物回收机构管理与现在网络相结合,利用java技术建设地方废物回收机构管理系统,实现地方废物回收机构的信息化。则对于进一步提高地方废物回收机…

如何提高广告投放转化率?Share Creators 资产库与Appsflyer营销数据的全面结合

如何提高广告投放转化率?Share Creators 资产库与Appsflyer营销数据的全面结合 全球经济进入了低迷期。 营销成本越来越高, 营销需要更务实,注重投入产出比。众所周知,除了渠道、客群画像以外, 优秀的广告设计图&#…

c进阶测试题

选择题 1.请问该程序的输出是多少&#xff08;C&#xff09; #include<stdio.h> int main(){unsigned char i 7;int j 0;for(;i > 0;i - 3){ j;} printf("%d\n", j);return 0; }A. 2 B. 死循环 C. 173 D. 172 首先unsigned char型是不会为负数&#xff…

flask入门(四)前后端数据传输

文章目录 1、flask后端接收来自前端的数据1&#xff09;如果前端提交的方法为POST2&#xff09;如果前段提交的方法是GET 2、flask后端向前端传数据3、案例参考文献 1、flask后端接收来自前端的数据 1&#xff09;如果前端提交的方法为POST 后端接收时的代码&#xff1a; xx…

我试图扯掉这条 SQL 的底裤。只能扯一点点,不能扯多了

之前不是写分页嘛,分页肯定就要说到 limit 关键字嘛。 然后我啪的一下扔了一个链接出来: https://dev.mysql.com/doc/refman/8.0/en/limit-optimization.html 这个链接就是 MySQL 官方文档,这一章节叫做“对 Limit 查询的优化”,针对 limit 和 order by 组合的场景进行了较…

【MySQL】存储引擎

存储引擎 查看存储引擎设置表的存储引擎创建表时指定存储引擎修改表的存储引擎 引擎介绍InnoDB引擎: 具备外键支持的十五存储引擎MyISAM引擎: 主要的非事务处理存储引擎Archive引擎: 用于数据存档Blackhole引擎: 丢弃写操作,读操作返回空内容CSV引擎: 读取数据时,以逗号分隔各个…

redis底层数据结构

总所周知&#xff0c;redis支持五种数据类型String、Hash、List、Set、ZSet。在支持这些复杂数据结构的同时&#xff0c;redis不仅需要保证读写的性能&#xff0c;还能提供各种微操作&#xff0c;比如直接修改Hash字典中的某个field的值&#xff0c;或者直接往ZSet中插入某个值…

Failed to start The nginx HTTP and reverse proxy server.

本章教程主要分享一下&#xff0c;当nginx 启动时&#xff0c;遇到报这个错误时的一个解决问题思路。 目录 1、观察报错信息 2、尝试性解决 1、观察报错信息 根据日志的信息&#xff0c;我们至少可以知道2个比较信息。 1、操作用户执行命令是在非root权限下进行操作的。 2、Ad…