将 OneLake 数据索引到 Elasticsearch - 第二部分

作者:来自 Elastic Gustavo Llermaly 及 Jeffrey Rengifo

本文分为两部分,第二部分介绍如何使用自定义连接器将 OneLake 数据索引并搜索到 Elastic 中。

在本文中,我们将利用第 1 部分中学到的知识来创建 OneLake 自定义 Elasticsearch 连接器。

我们已经上传了一些 OneLake 文档并将其索引到 Elasticsearch 中以供搜索。但是,这仅适用于一次性上传。如果我们想要同步数据,那么我们需要开发一个更复杂的系统。

幸运的是,Elastic 有一个连接器框架可用于开发满足我们需求的自定义连接器:

我们现在将根据本文制作一个 OneLake 连接器:如何为 Elasticsearch 创建自定义连接器。

步骤

  • 连接器引导
  • 实现 BaseDataSource 类
  • 身份验证
  • 运行连接器
  • 配置计划

连接器引导

背景信息:Elastic 连接器分为两种类型:

  1. Elastic 托管连接器:完全由 Elastic Cloud 托管和运行。
  2. 自托管连接器:由用户自行托管,必须部署在你的基础设施中。

自定义连接器属于 “连接器客户端” 类别,因此我们需要下载并部署连接器框架。

首先,克隆连接器的代码库:

git clone https://github.com/elastic/connectors

现在在 requirements/framework.txt 文件末尾添加你将使用的依赖项。在本例中:

azure-identity==1.19.0
azure-storage-file-datalake==12.17.0

这样,存储库就完成了,我们可以开始编码了。

实现 BaseDataSource 类

你可以在此存储库中找到完整的工作代码。

我们将介绍 onelake.py 文件中的核心部分。

在导入和类声明之后,我们必须定义将捕获配置参数的 __init__ 方法。

"""OneLake connector to retrieve data from datalakes"""from functools import partialfrom azure.identity import ClientSecretCredential
from azure.storage.filedatalake import DataLakeServiceClientfrom connectors.source import BaseDataSourceACCOUNT_NAME = "onelake"class OneLakeDataSource(BaseDataSource):"""OneLake"""name = "OneLake"service_type = "onelake"incremental_sync_enabled = True# Here we can enter the data that we'll later need to connect our connector to OneLake.def __init__(self, configuration):"""Set up the connection to the azure base clientArgs:configuration (DataSourceConfiguration): Object of DataSourceConfiguration class."""super().__init__(configuration=configuration)self.tenant_id = self.configuration["tenant_id"]self.client_id = self.configuration["client_id"]self.client_secret = self.configuration["client_secret"]self.workspace_name = self.configuration["workspace_name"]self.data_path = self.configuration["data_path"]

然后,你可以配置 UI 将显示的表单,使用返回配置字典的 get_default_configuration 方法填充这些参数。

    # Method to generate the Enterprise Search UI fields for the variables we need to connect to OneLake.@classmethoddef get_default_configuration(cls):"""Get the default configuration for OneLakeReturns:dictionary: Default configuration"""return {"tenant_id": {"label": "OneLake tenant id","order": 1,"type": "str",},"client_id": {"label": "OneLake client id","order": 2,"type": "str",},"client_secret": {"label": "OneLake client secret","order": 3,"type": "str","sensitive": True, # To hide sensitive data like passwords or secrets},"workspace_name": {"label": "OneLake workspace name","order": 4,"type": "str",},"data_path": {"label": "OneLake data path","tooltip": "Path in format <DataLake>.Lakehouse/files/<Folder path>","order": 5,"type": "str",},"account_name": {"tooltip": "In the most cases is 'onelake'","default_value": ACCOUNT_NAME,"label": "Account name","order": 6,"type": "str",},}

然后我们配置下载方法,并从 OneLake 文档中提取内容。

async def download_file(self, file_client):"""Download file from OneLakeArgs:file_client (obj): File clientReturns:generator: File stream"""try:download = file_client.download_file()stream = download.chunks()for chunk in stream:yield chunkexcept Exception as e:self._logger.error(f"Error while downloading file: {e}")raiseasync def get_content(self, file_name, doit=None, timestamp=None):"""Obtains the file content for the specified file in `file_name`.Args:file_name (obj): The file name to process to obtain the content.timestamp (timestamp, optional): Timestamp of blob last modified. Defaults to None.doit (boolean, optional): Boolean value for whether to get content or not. Defaults to None.Returns:str: Content of the file or None if not applicable."""if not doit:returnfile_client = await self._get_file_client(file_name)file_properties = file_client.get_file_properties()file_extension = self.get_file_extension(file_name)doc = {"_id": f"{file_client.file_system_name}_{file_properties.name}",  # workspacename_data_path"name": file_properties.name.split("/")[-1],"_timestamp": file_properties.last_modified,"created_at": file_properties.creation_time,}can_be_downloaded = self.can_file_be_downloaded(file_extension=file_extension,filename=file_properties.name,file_size=file_properties.size,)if not can_be_downloaded:return docextracted_doc = await self.download_and_extract_file(doc=doc,source_filename=file_properties.name.split("/")[-1],file_extension=file_extension,download_func=partial(self.download_file, file_client),)return extracted_doc if extracted_doc is not None else doc

为了让我们的连接器对框架可见,我们需要在 connectors/config.py 文件中声明它。为此,我们将以下代码添加到源中:

 "sources": {..."onelake": "connectors.sources.onelake:OneLakeDataSource",...}

身份验证

在测试连接器之前,我们需要获取 client_id, tenant_id 和 client_secret,我们将使用它们从连接器访问工作区。

我们将使用 service principals 作为身份验证方法。

Azure service principal 是为与应用程序、托管服务和自动化工具一起使用以访问 Azure 资源而创建的身份。

步骤如下:

  1. 创建应用程序并收集 client_id、tenant_id 和 client_secret
  2. 在工作区中启用 service principal
  3. 将 service principal 添加到工作区

你可以逐步遵循本教程。

准备好了吗?现在是测试连接器的时候了!

运行连接器

连接器准备好后,我们现在可以连接到我们的 Elasticsearch 实例。

转到: Search > Content > Connectors > New connector 并选择 Customized Connector

选择要创建的名称,然后选择 “Create and attach an index” 以创建与连接器同名的新索引。

你现在可以使用 Docker 运行它或从源代码运行它。在此示例中,我们将使用 “Run from source”。

单击 “Generate Configuration”,然后将框中的内容粘贴到项目根目录中的 config.yml 文件中。在字段 service_type 上,你必须匹配 Connectors/config.py 中的连接器名称。在本例中,将 changeme 替换为 onelake。

现在,你可以使用以下命令运行连接器:

make install
make run

如果连接器正确初始化,你应该在控制台中看到如下消息:

注意:如果出现兼容性错误,请检查你的连接器/版本文件并与你的 Elasticsearch 集群版本进行比较:与 Elasticsearch 的版本兼容性。我们建议保持连接器版本和 Elasticsearch 版本同步。在本文中,我们使用 Elasticsearch 和连接器版本 8.15。

如果一切顺利,我们的本地连接器将与我们的 Elasticsearch 集群通信,我们将能够使用我们的 OneLake 凭据对其进行配置:

我们现在将索引来自 OneLake 的文档。为此,请单击 Sync > Full Content,运行完整内容同步:

同步完成后,你应该在控制台中看到以下内容:

在企业搜索 UI 中,你可以单击 “Documents” 来查看已索引的文档:

配置计划

你可以根据需要使用 UI 安排定期内容同步,以使索引保持更新并与 OneLake 同步。

要配置计划同步,请转到 “Search > Content > Connectors,然后选择你的连接器。然后单击 “scheduling”:

或者,你可以使用允许 CRON 表达式的更新连接器调度 API。

结论

在第二部分中,我们通过使用 Elastic 连接器框架并开发我们自己的 OneLake 连接器来轻松与我们的 Elastic Cloud 实例通信,将我们的配置更进一步。

想要获得 Elastic 认证?了解下一次 Elasticsearch 工程师培训何时开始!

Elasticsearch 包含新功能,可帮助你为你的用例构建最佳搜索解决方案。深入了解我们的示例笔记本以了解更多信息,开始免费云试用,或立即在你的本地机器上试用 Elastic。

原文:Indexing OneLake data into Elasticsearch - Part II - Elasticsearch Labs

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/893706.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PMP–一、二、三模–分类–14.敏捷

文章目录 敏捷中的角色职责与3个工件--题干关键词角色职责3个工件 高频考点分析&#xff08;一、过程&#xff1b;二、人员&#xff09;一、过程&#xff1a;1.1 变更管理&#xff1a;1.1.1 瀑布型变更&#xff08;一次交付、尽量限制、确定性需求 &#xff1e;风险储备&#x…

Vue2下篇

插槽&#xff1a; 基本插槽&#xff1a; 普通插槽&#xff1a;父组件向子组件传递静态内容。基本插槽只能有一个slot标签&#xff0c;因为这个是默认的位置&#xff0c;所以只能有一个 <!-- ParentComponent.vue --> <template> <ChildComponent> <p>…

【科研建模】Pycaret自动机器学习框架使用流程及多分类项目实战案例详解

Pycaret自动机器学习框架使用流程及项目实战案例详解 1 Pycaret介绍2 安装及版本需求3 Pycaret自动机器学习框架使用流程3.1 Setup3.2 Compare Models3.3 Analyze Model3.4 Prediction3.5 Save Model4 多分类项目实战案例详解4.1 ✅ Setup4.2 ✅ Compare Models4.3 ✅ Experime…

Linux学习笔记——网络管理命令

一、网络基础知识 TCP/IP四层模型 以太网地址&#xff08;MAC地址&#xff09;&#xff1a; 段16进制数据 IP地址&#xff1a; 子网掩码&#xff1a; 二、接口管命令 ip命令&#xff1a;字符终端&#xff0c;立即生效&#xff0c;重启配置会丢失 nmcli命令&#xff1a;字符…

手撕Diffusion系列 - 第九期 - 改进为Stable Diffusion(原理介绍)

手撕Diffusion系列 - 第九期 - 改进为Stable Diffusion&#xff08;原理介绍&#xff09; 目录 手撕Diffusion系列 - 第九期 - 改进为Stable Diffusion&#xff08;原理介绍&#xff09;DDPM 原理图Stable Diffusion 原理Stable Diffusion的原理解释Stable Diffusion 和 Diffus…

JAVAweb学习日记(八) 请数据库模型MySQL

一、MySQL数据模型 二、SQL语言 三、DDL 详细见SQL学习日记内容 四、DQL-条件查询 五、DQL-分组查询 聚合函数&#xff1a; 分组查询&#xff1a; 六、DQL-分组查询 七、分页查询 八、多表设计-一对多&一对一&多对多 一对多-外键&#xff1a; 一对一&#xff1a; 多…

微信小程序1.1 微信小程序介绍

1.1 微信小程序介绍 内容提要 1.1 什么是微信小程序 1.2 微信小程序的功能 1.3 微信小程序使用场景 1.4 微信小程序能取代App吗 1.5 微信小程序的发展历程 1.6微信小程序带来的机会

音频入门(一):音频基础知识与分类的基本流程

音频信号和图像信号在做分类时的基本流程类似&#xff0c;区别就在于预处理部分存在不同&#xff1b;本文简单介绍了下音频处理的方法&#xff0c;以及利用深度学习模型分类的基本流程。 目录 一、音频信号简介 1. 什么是音频信号 2. 音频信号长什么样 二、音频的深度学习分…

Midjourney中的强变化、弱变化、局部重绘的本质区别以及其有多逆天的功能

开篇 Midjourney中有3个图片“微调”&#xff0c;它们分别为&#xff1a; 强变化&#xff1b;弱变化&#xff1b;局部重绘&#xff1b; 在Discord里分别都是用命令唤出的&#xff0c;但如今随着AI技术的发达在类似AI可人一类的纯图形化界面中&#xff0c;我们发觉这样的逆天…

【Linux】命令为桥,存在为岸,穿越虚拟世界的哲学之道

文章目录 Linux基础入门&#xff1a;探索操作系统的内核与命令一、Linux背景与发展历史1.1 Linux的起源与发展1.2 Linux与Windows的对比 二、Linux的常用命令2.1 ls命令 - "List"&#xff08;列出文件)2.2 pwd命令 - "Print Working Directory"&#xff08…

[护网杯 2018]easy_tornado1

题目 、 依次点击文件查看 /flag.txt flag in /fllllllllllllag /welcome.txt render /hints.txt md5(cookie_secretmd5(filename)) tornado模板注入 报cookie /error?msg{{handler.settings}} cookie_secret: 6647062b-e68d-4406-90d3-06e307fa955c} 使用python脚本…

STM32+W5500+以太网应用开发+003_TCP服务器添加OLED(u8g2)显示状态

STM32W5500以太网应用开发003_TCP服务器添加OLED&#xff08;u8g2&#xff09;显示状态 实验效果3-TCP服务器OLED1 拷贝显示驱动代码1.1 拷贝源代码1.2 将源代码添加到工程1.3 修改代码优化等级1.4 添加头文件路径1.5 修改STM32CubeMX工程 2 修改源代码2.1 添加头文件2.2 main函…

基于微信小程序的英语学习交流平台设计与实现(LW+源码+讲解)

专注于大学生项目实战开发,讲解,毕业答疑辅导&#xff0c;欢迎高校老师/同行前辈交流合作✌。 技术范围&#xff1a;SpringBoot、Vue、SSM、HLMT、小程序、Jsp、PHP、Nodejs、Python、爬虫、数据可视化、安卓app、大数据、物联网、机器学习等设计与开发。 主要内容&#xff1a;…

ORB-SLAM2源码学习:Initializer.cc⑧: Initializer::CheckRT检验三角化结果

前言 ORB-SLAM2源码学习&#xff1a;Initializer.cc⑦: Initializer::Triangulate特征点对的三角化_cv::svd::compute-CSDN博客 经过上面的三角化我们成功得到了三维点&#xff0c;但是经过三角化成功的三维点并不一定是有效的&#xff0c;需要筛选才能作为初始化地图点。 …

macOS如何进入 Application Support 目录(cd: string not in pwd: Application)

错误信息 cd: string not in pwd: Application 表示在当前目录下找不到名为 Application Support 的目录。可能的原因如下&#xff1a; 拼写错误或路径错误&#xff1a;确保你输入的目录名称正确。目录名称是区分大小写的&#xff0c;因此请确保使用正确的大小写。正确的目录名…

记录一个连不上docker中的mysql的问题

引言 使用的debian12,不同发行版可能有些许差异&#xff0c;连接使用的工具是navicat lite 本来是毫无思绪的&#xff0c;以前在云服务器上可能是防火墙的问题&#xff0c;但是这个桌面环境我压根没有使用防火墙。 直到 ying192:~$ mysql -h127.0.0.1 -uroot ERROR 1045 (28…

Gradle自定义任务指南 —— 释放构建脚本的无限可能

文章目录 &#x1f50d;Gradle任务⚙️ 自定义任务的5大核心配置项1. 任务注册&#xff08;Registering Tasks&#xff09;2. group & description3. dependsOn4. inputs & outputs5. 类型化任务&#xff08;Task Types&#xff09; 任务常见配置参数传递方式1&#xf…

windows11关闭系统更新详细操作步骤

文章目录 1.打开注册表2.修改注册表内容2.1 新建文件2.2 修改值 3.修改设置 1.打开注册表 winR输入regedit(如下图所示) 2.修改注册表内容 进HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\WindowsUpdate\UX\Settings 2.1 新建文件 右侧界面右键即可 2.2 修改值 重命名为如下…

matlab绘图——彩色螺旋图

代码生成的图形是一个动态的彩色螺旋&#xff0c;展示了如何利用极坐标和颜色映射创建视觉吸引力强的图形。该图形可以用于数据可视化、艺术创作或数学演示&#xff0c;展示了 MATLAB 在图形处理方面的强大能力。通过调整 theta 和 r 的范围&#xff0c;可以创建出不同形状和复…

啥是EPS?

文章目录 1. 什么是EPS?2. 主要构成3. EPS的设计如何符合功能安全?4. 代表性的厂家1. 什么是EPS? EPS(Electric Power Steering,电动助力转向系统)是一种利用电动机提供转向助力的系统,取代了传统的液压助力转向系统(HPS)。EPS通过传感器检测驾驶员的转向意图,并由电…