将 OneLake 数据索引到 Elasticsearch - 第 1 部分

作者:来自 Elastic Gustavo Llermaly

学习配置 OneLake,使用 Python 消费数据并在 Elasticsearch 中索引文档,然后运行语义搜索。

OneLake 是一款工具,可让你连接到不同的 Microsoft 数据源,例如 Power BI、Data Activator 和 Data factory 等。它支持将数据集中在 DataLakes 中,DataLakes 是支持全面数据存储、分析和处理的大容量存储库。

在本文中,我们将学习如何配置 OneLake、使用 Python 消费数据以及在 Elasticsearch 中索引文档,然后运行语义搜索。

有时,你可能希望在非结构化数据和来自不同来源和软件提供商的结构化数据中运行搜索,并使用 Kibana 创建可视化。对于这种任务,在 Elasticsearch 中索引文档作为中央存储库会变得非常有用。

在这个例子中,我们将使用一家名为 Shoestic 的虚拟公司,这是一家在线鞋店。我们在结构化文件 (CSV) 中列出了产品列表,而一些产品的数据表则采用非结构化格式 (DOCX)。这些文件存储在 OneLake 中。

你可以在此处找到包含完整示例(包括测试文档)的笔记本。

步骤

  • OneLake 初始配置
  • 使用 Python 连接到 OneLake
  • 索引文档
  • 查询

OneLake 初始配置

OneLake 架构可以总结如下:

要使用 OneLake 和 Microsoft Fabric,我们需要一个 Office 365 帐户。如果你没有,可以在此处创建一个试用帐户。

使用你的帐户登录 Microsoft Fabric。然后,创建一个名为 “ShoesticWorkspace” 的工作区。进入新创建的工作区后,创建一个 Lakehouse 并将其命名为“ShoesticDatalake”。最后一步是在 “Files” 中创建一个新文件夹。单击 “new subfolder” 并将其命名为 “ProductsData”。

完成了!我们准备开始提取数据了。

使用 Python 连接到 OneLake

配置完 OneLake 后,我们现在可以准备 Python 脚本。Azure 有处理凭据并与 OneLake 通信的库。

pip install azure-identity elasticsearch==8.14 azure-storage-file-datalake azure-cli python-docx

“azure-identity azure-storage-file-datalake” 库让我们可以与 OneLake 交互,同时 “azure-cli” 可以访问凭据并授予权限。为了读取文件内容以便稍后将其索引到 Elasticsearch,我们使用 python-docx。

在我们的本地环境中保存 Microsoft 凭据

我们将使用 “az login” 进入我们的 Microsoft 帐户并运行:

 az login --allow-no-subscriptions

标志 “ --allow-no-subscriptions”允许我们在没有有效订阅的情况下向 Microsoft Azure 进行身份验证。

此命令将打开一个浏览器窗口,你必须在其中访问你的帐户,然后选择你帐户的订阅号。

现在我们可以开始编写代码了!

创建一个名为 onelake.py 的文件并添加以下内容:

_onelake.py_

# Importing dependencies 
import chardet 
from azure.identity import DefaultAzureCredential 
from docx import Document 
from azure.storage.filedatalake import DataLakeServiceClient # Initializing the OneLake client 
ONELAKE_ACCOUNT_NAME = "onelake" 
ONELAKE_WORKSPACE_NAME = "ShoesticWorkspace" 
# Path in format <DataLake>.Lakehouse/files/<Folder path> 
ONELAKE_DATA_PATH = "shoesticDatalake.Lakehouse/Files/ProductsData" # Microsoft token 
token_credential = DefaultAzureCredential() # OneLake services 
service_client = DataLakeServiceClient( account_url=f"https://{ONELAKE_ACCOUNT_NAME}.dfs.fabric.microsoft.com", credential=token_credential, 
) 
file_system_client = service_client.get_file_system_client(ONELAKE_WORKSPACE_NAME) 
directory_client = file_system_client.get_directory_client(ONELAKE_DATA_PATH) # OneLake functions   # Upload a file to a LakeHouse directory 
def upload_file_to_directory(directory_client, local_path, file_name): file_client = directory_client.get_file_client(file_name) with open(local_path, mode="rb") as data: file_client.upload_data(data, overwrite=True) print(f"File: {file_name} uploaded to the data lake.") # Get directory contents from your lake folder 
def list_directory_contents(file_system_client, directory_name): paths = file_system_client.get_paths(path=directory_name) for path in paths: print(path.name + "\n") # Get a file by name from your lake folder 
def get_file_by_name(file_name, directory_client): return directory_client.get_file_client(file_name) # Decode docx 
def get_docx_content(file_client): download = file_client.download_file() file_content = download.readall() temp_file_path = "temp.docx" with open(temp_file_path, "wb") as temp_file: temp_file.write(file_content) doc = Document(temp_file_path) text = [] for paragraph in doc.paragraphs: text.append(paragraph.text) return "\n".join(text) # Decode csv 
def get_csv_content(file_client): download = file_client.download_file() file_content = download.readall() result = chardet.detect(file_content) encoding = result["encoding"] return file_content.decode(encoding) 

将文件上传到 OneLake

在此示例中,我们将使用一个 CSV 文件和一些包含有关我们鞋店产品信息的 .docx 文件。虽然你可以使用 UI 上传它们,但我们将使用 Python 来完成。在此处下载文件。

我们将文件放在文件夹 /data 中,位于名为 upload_files.py 的新 Python 脚本旁边:

# upload_files.py # Importing dependencies 
from azure.identity import DefaultAzureCredential 
from azure.storage.filedatalake import DataLakeServiceClient from functions import list_directory_contents, upload_file_to_directory 
from onelake import ONELAKE_DATA_PATH, directory_client, file_system_client csv_file_name = "products.csv" 
csv_local_path = f"./data/{csv_file_name}" docx_files = ["beach-flip-flops.docx", "classic-loafers.docx", "sport-sneakers.docx"] 
docx_local_paths = [f"./data/{file_name}" for file_name in docx_files] # Upload files to Lakehouse 
upload_file_to_directory(directory_client, csv_local_path, csv_file_name) for docx_local_path in docx_local_paths: docx_file_name = docx_local_path.split("/")[-1] upload_file_to_directory(directory_client, docx_local_path, docx_file_name) # To check that the files have been uploaded, run "list_directory_contents" function to show the contents of the /ProductsData folder in our Datalake: 
print("Upload finished, Listing files: ") 
list_directory_contents(file_system_client, ONELAKE_DATA_PATH) 

运行上传脚本:

python upload_files.py

结果应该是:

Upload finished, Listing files: 
shoesticDatalake.Lakehouse/Files/ProductsData/beach-flip-flops.docx 
shoesticDatalake.Lakehouse/Files/ProductsData/classic-loafers.docx 
shoesticDatalake.Lakehouse/Files/ProductsData/products.csv 
shoesticDatalake.Lakehouse/Files/ProductsData/sport-sneakers.docx 

现在我们已经准备好文件了,让我们开始使用 Elasticsearch 分析和搜索我们的数据!

索引文档

我们将使用 ELSER 作为向量数据库的嵌入提供程序,以便我们可以运行语义查询。

我们选择 ELSER 是因为它针对 Elasticsearch 进行了优化,在域外检索方面胜过大多数竞争对手,这意味着按原样使用模型,而无需针对你自己的数据进行微调。

配置 ELSER

首先创建推理端点:

PUT _inference/sparse_embedding/onelake-inference-endpoint 
{ "service": "elser", "service_settings": { "num_allocations": 1, "num_threads": 1 } 

在后台加载模型时,如果你以前没有使用过 ELSER,则可能会收到 502 Bad Gateway 错误。在 Kibana 中,你可以在 “Machine Learning” > “Trained Models” 中检查模型状态。等到模型部署完成后再继续执行后续步骤。

索引数据

现在,由于我们同时拥有结构化数据和非结构化数据,因此我们将在 Kibana DevTools 控制台中使用具有不同映射的两个不同索引。

对于我们的结构化销售,让我们创建以下索引:

PUT shoestic-products 
{ "mappings": { "properties": { "product_id": { "type": "keyword" }, "product_name": { "type": "text" }, "amount": { "type": "float" }, "tags": { "type": "keyword" } } } 
} 

为了索引我们的非结构化数据(产品数据表),我们将使用:

PUT shoestic-products-descriptions 
{ "mappings": { "properties": { "title": { "type": "text", "analyzer": "english" }, "super_body": { "type": "semantic_text", "inference_id": "onelake-inference-endpoint" }, "body": { "type": "text", "copy_to": "super_body" } } } 
} 

注意:使用带有 copy_to 的字段很重要,这样还可以运行全文搜索,而不仅仅是在正文字段上运行语义搜索。

读取 OneLake 文件

在开始之前,我们需要使用这些命令(使用你自己的云 ID 和 API 密钥)初始化我们的 Elasticsearch 客户端。

创建一个名为 indexing.py 的 Python 脚本并添加以下几行:

# Importing dependencies 
import csv 
from io import StringIO from onelake import directory_client 
from elasticsearch import Elasticsearch, helpers from functions import get_csv_content, get_docx_content, get_file_by_name 
from upload_files_to_onelake import csv_file_client ELASTIC_CLUSTER_ID = "your-cloud-id" 
ELASTIC_API_KEY = "your-api-key" # Elasticsearch client 
es_client = Elasticsearch( cloud_id=ELASTIC_CLUSTER_ID, api_key=ELASTIC_API_KEY, 
) docx_files = ["beach-flip-flops.docx", "classic-loafers.docx", "sport-sneakers.docx"] 
docx_local_paths = [f"./data/{file_name}" for file_name in docx_files] csv_file_client = get_file_by_name("products.csv", directory_client) 
docx_files_clients = [] for docx_file_name in docx_files: docx_files_clients.append(get_file_by_name(docx_file_name, directory_client)) # We use these functions to extract data from the files: 
csv_content = get_csv_content(csv_file_client) 
reader = csv.DictReader(StringIO(csv_content)) 
docx_contents = [] for docx_file_client in docx_files_clients: docx_contents.append(get_docx_content(docx_file_client)) print("CSV FILE CONTENT: ", csv_content) 
print("DOCX FILE CONTENT: ", docx_contents) # The CSV tags are separated by commas (,). We'll turn these tags into an array: 
rows = csv_content.splitlines() 
reader = csv.DictReader(rows) 
modified_rows = [] for row in reader: row["tags"] = row["tags"].replace('"', "").split(",") modified_rows.append(row) print(row["tags"]) # We can now index the files into Elasticsearch 
reader = modified_rows 
csv_actions = [{"_index": "shoestic-products", "_source": row} for row in reader] docx_actions = [ { "_index": "shoestic-products-descriptions", "_source": {"title": docx_file_name, "body": docx}, } for docx_file_name, docx in zip(docx_files, docx_contents) 
] helpers.bulk(es_client, csv_actions) 
print("CSV data indexed successfully.") 
helpers.bulk(es_client, docx_actions) 
print("DOCX data indexed successfully.") 

现在运行脚本:

python indexing.py

查询

在 Elasticsearch 中对文档进行索引后,我们就可以测试语义查询了。在本例中,我们将在某些产品(tag)中搜索唯一术语。我们将针对结构化数据运行关键字搜索,针对非结构化数据运行语义搜索。

1. 关键字搜索

GET shoestic-products/_search 
{ "query": { "term": { "tags": "summer" } } 
} 

结果:

"_source": { "product_id": "P-118", "product_name": "Casual Sandals", "amount": "128.22", "tags": [ "casual", "summer" ] } 

2. 语义搜索:

GET shoestic-products-descriptions/_search 
{ "_source": { "excludes": [ "*embeddings", "*chunks" ] }, "query": { "semantic": { "field": "super_body", "query": "summer" } } 
} 

*我们排除了嵌入和块只是为了便于阅读。

结果:

"hits": { "total": { "value": 3, "relation": "eq" }, "max_score": 4.3853106, "hits": [ { "_index": "shoestic-products-descriptions", "_id": "P2Hj6JIBF7lnCNFTDQEA", "_score": 4.3853106, "_source": { "super_body": { "inference": { "inference_id": "onelake-inference-endpoint", "model_settings": { "task_type": "sparse_embedding" } } }, "title": "beach-flip-flops.docx", "body": "Ideal for warm, sunny days by the water, these lightweight essentials are water-resistant and come in bright colors, bringing a laid-back vibe to any outing in the sun." } } ] } 

如你所见,当使用关键字搜索时,我们会得到与其中一个标签的完全匹配,相反,当我们使用语义搜索时,我们会得到与描述中的含义匹配的结果,而无需完全匹配。

结论

OneLake 使使用来自不同 Microsoft 来源的数据变得更容易,然后索引这些文档 Elasticsearch 允许我们使用高级搜索工具。在第一部分中,我们学习了如何连接到 OneLake 并在 Elasticsearch 中索引文档。在第二部分中,我们将使用 Elastic 连接器框架制作更强大的解决方案。敬请期待!

想要获得 Elastic 认证?了解下一次 Elasticsearch 工程师培训的时间!

Elasticsearch 包含许多新功能,可帮助你为你的用例构建最佳搜索解决方案。深入了解我们的示例笔记本以了解更多信息,开始免费云试用,或立即在你的本地机器上试用 Elastic。

原文:Indexing OneLake data into Elasticsearch - Part 1 - Elasticsearch Labs

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/68245.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

开源项目Umami网站统计MySQL8.0版本Docker+Linux安装部署教程

Umami是什么&#xff1f; Umami是一个开源项目&#xff0c;简单、快速、专注用户隐私的网站统计项目。 下面来介绍如何本地安装部署Umami项目&#xff0c;进行你的网站统计接入。特别对于首次使用docker的萌新有非常好的指导、参考和帮助作用。 Umami的github和docker镜像地…

Java程序基础⑪Java的异常体系和使用

目录 1. 异常的概念和分类 1.1 异常的概念 1.2 异常的分类 2. 异常的体系结构 3. 异常的处理 3.1 异常的抛出 3.2 异常的捕获与处理 3.3 异常的处理流程 4. 自定义异常类 4.1 自定义异常类的规则 4.2 自定义异常案例 1. 异常的概念和分类 1.1 异常的概念 在Java中&…

大话特征工程:1.维数灾难与特征轮回

一、维度深渊 公元 2147 年&#xff0c;人类文明进入了数据驱动的超级智能时代。从金融到医疗&#xff0c;从教育到娱乐&#xff0c;所有决策都仰赖“全维计算网络”&#xff08;高维特征空间&#xff09;。这套系统将全球所有信息抽象成数以亿计的多维特征&#xff08…

libOnvif通过组播不能发现相机

使用libOnvif库OnvifDiscoveryClient类&#xff0c; auto discovery new OnvifDiscoveryClient(QUrl(“soap.udp://239.255.255.250:3702”), cb.Build()); 会有错误&#xff1a; end of file or no input: message transfer interrupted or timed out(30 sec max recv delay)…

JVM常见知识点

在《深入理解Java虚拟机》一书中&#xff0c;介绍了JVM的相关特性。 1、JVM的内存区域划分 在真实的操作系统中&#xff0c;对于地址空间进行了分区域的设计&#xff0c;由于JVM是仿照真实的机器进行设计的&#xff0c;那么也进行了分区域的设计。核心区域有四个&#xff0c;…

Windows系统Tai时长统计工具的使用体验

Windows系统Tai时长统计工具的使用体验 一、Tai介绍1.1 Tai简介1.2 安装环境要求 二、下载及安装Tai2.1 下载Tai2.2 运行Tai工具 三、Tai的使用体验3.1 系统设置3.2 时长统计3.3 分类管理 四、总结 一、Tai介绍 1.1 Tai简介 Tai是一款专为Windows系统设计的开源软件&#xff…

【架构面试】二、消息队列和MySQL和Redis

MQ MQ消息中间件 问题引出与MQ作用 常见面试问题&#xff1a;面试官常针对项目中使用MQ技术的候选人提问&#xff0c;如如何确保消息不丢失&#xff0c;该问题可考察候选人技术能力。MQ应用场景及作用&#xff1a;以京东系统下单扣减京豆为例&#xff0c;MQ用于交易服和京豆服…

HTML一般标签和自闭合标签介绍

在HTML中&#xff0c;标签用于定义网页内容的结构和样式。标签通常分为两类&#xff1a;一般标签&#xff08;也称为成对标签或开放闭合标签&#xff09;和自闭合标签&#xff08;也称为空标签或自结束标签&#xff09;。 以下是这两类标签的详细说明&#xff1a; 一、一般标…

Android GLSurfaceView 覆盖其它控件问题 (RK平台)

平台 涉及主控: RK3566 Android: 11/13 问题 在使用GLSurfaceView播放视频的过程中, 增加了一个播放控制面板, 覆盖在视频上方. 默认隐藏setVisibility(View.INVISIBLE);点击屏幕再显示出来. 然而, 在RK3566上这个简单的功能却无法正常工作. 通过缩小视频窗口可以看到, 实际…

Java Web-Tomcat Servlet

Web服务器-Tomcat Web服务器简介 Web 服务器是一种软件程序&#xff0c;它主要用于在网络上接收和处理客户端&#xff08;如浏览器&#xff09;发送的 HTTP 请求&#xff0c;并返回相应的网页内容或数据。以下是关于 Web 服务器的详细介绍&#xff1a; 功能 接收请求&#…

[Computer Vision]实验二:图像特征点提取

目录 一、实验内容 二、实验过程及结果 2.1 Harris角点检测 2.2 SIFT算法 三、实验小结 一、实验内容 采用Harris与SIFT分别提取特征点及对应的描述子&#xff0c;对比两者的区别&#xff08;特征点数量、分布、描述子维度、图像变化对二者的影响等&#xff09;利用特征匹…

【AI非常道】二零二五年一月,AI非常道

经常在社区看到一些非常有启发或者有收获的话语&#xff0c;但是&#xff0c;往往看过就成为过眼云烟&#xff0c;有时再想去找又找不到。索性&#xff0c;今年开始&#xff0c;看到好的言语&#xff0c;就记录下来&#xff0c;一月一发布&#xff0c;亦供大家参考。 有关AI非…

牛客周赛 Round 78 A-C

A.时间表查询&#xff01; 链接&#xff1a;https://ac.nowcoder.com/acm/contest/100671/A 来源&#xff1a;牛客网 题目描述 今天是2025年1月25日&#xff0c;今年的六场牛客寒假算法基础集训营中&#xff0c;前两场比赛已经依次于 20250121、20250123 举行&#xff1b;而…

网安加·百家讲坛 | 樊山:数据安全之威胁建模

作者简介&#xff1a;樊山&#xff0c;锦联世纪教育能源工业互联网数字安全CSM(新能源运维师)课程特聘培训讲师&#xff0c;哈尔滨工业大学&#xff08;深圳&#xff09;信飞合创数据合规联合实验室特聘专家&#xff0c;武汉赛博网络安全人才研究中心资深专家&#xff1b;近24年…

java后端之登录认证

基础登录功能&#xff1a;根据提供的用户名和密码判断是否存在于数据库 LoginController.java RestController Slf4j public class LoginController {Autowiredprivate UserService userService;PostMapping("/login")public Result login(RequestBody User user) {…

基于SpringBoot的网上考试系统

作者&#xff1a;计算机学姐 开发技术&#xff1a;SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等&#xff0c;“文末源码”。 专栏推荐&#xff1a;前后端分离项目源码、SpringBoot项目源码、Vue项目源码、SSM项目源码、微信小程序源码 精品专栏&#xff1a;…

Elastic Agent 对 Kafka 的新输出:数据收集和流式传输的无限可能性

作者&#xff1a;来 Elastic Valerio Arvizzigno, Geetha Anne 及 Jeremy Hogan 介绍 Elastic Agent 的新功能&#xff1a;原生输出到 Kafka。借助这一最新功能&#xff0c;Elastic 用户现在可以轻松地将数据路由到 Kafka 集群&#xff0c;从而实现数据流和处理中无与伦比的可扩…

【ROS2】RViz2界面类 VisualizationFrame 详解

1、简述 VisualizationFrame 继承自 QMainWindow 和 WindowManagerInterface; 窗口顶部是常规布局:菜单栏 和 工具栏 窗口中心是 RenderPanel,用来渲染3D画面 周围是dock区域,包括:DisplaysPanel、ViewsPanel、TimePanel、SelectionPanel 和 ToolPropertiesPanel Windo…

poi在word中打开本地文件

poi版本 5.2.0 方法1&#xff1a;使用XWPFFieldRun&#xff08;推荐&#xff09; 比如打开当前相对路径的aaaaa.docx XWPFFieldRun run paragraph.createFieldRun();CTRPr ctrPr run.getCTR().addNewRPr();CTFonts font ctrPr.addNewRFonts();// 设置字体font.setAscii(&quo…

PCIE模式配置

对于VU系列FPGA&#xff0c;当DMA/Bridge Subsystem for PCI Express IP配置为Bridge模式时&#xff0c;等同于K7系列中的AXI Memory Mapped To PCI Express IP。