Elasticsearch:将数据从 Snowflake 摄取到 Elasticsearch

作者:来自 Elastic Ashish Tiwari

为了利用 Elasticsearch® 提供的强大搜索功能,许多企业在 Elasticsearch 中保留可搜索数据的副本。 Elasticsearch 是一种经过验证的技术,适用于传统文本搜索以及用于语义搜索用例的向量搜索。 Elasticsearch Relevance EngineTM (ESRE) 使你能够在专有数据上添加语义搜索,这些数据可以与生成式 AI 技术集成以构建现代搜索体验。

Snowflake 是一种完全托管的 SaaS(软件即服务),为数据仓库、数据湖、数据工程、数据科学、数据应用程序开发以及实时/共享数据的安全共享和消费提供单一平台。

在本博客中,我们将了解如何使用以下方法将 snowflake 数据导入 Elasticsearch:

  1. 使用 Logstash®(定期同步)
  2. 使用 Snowflake Elasticsearch Python 脚本(一次性同步)

先决条件

Snowflake 凭证

注册后你将收到以下所有凭据,或者你可以从 Snowflake 面板获取它们。

  • 账户用户名
  • 账号密码
  • 账户标识符

Elastic® 凭证

  1. 访问 https://cloud.elastic.co 并注册。
  2. 单击 Create deployment。 在弹出窗口中,你可以更改设置或保留默认设置。
  3. 下载或复制部署凭据(用户名和密码)。
  4. 同时复制 Cloud ID。
  5. 准备好部署后,单击 “Continue”(或单击 “Open Kibana”)。 它会将你重定向到 Kibana® 仪表板。

使用 Logstash

Logstash 是一个免费且开放的 ETL 工具,你可以在其中提供多个源作为输入、转换(修改)它并推送到你最喜欢的存储。 Logstash 的著名用例之一是从文件中读取日志并将其推送到 Elasticsearch。 我们还可以使用 filter 插件动态修改数据,它会将更新的数据推送到输出。

我们将使用 JDBC input plugin 从 Snowflake 中提取数据,并使用 Elasticsearch output plugin 将数据推送到 Elasticsearch。

  1. 参考文档安装 Logstash。
  2. 转到 Maven 中央存储库并下载:https://repo1.maven.org/maven2/net/snowflake/snowflake-jdbc。
  3. 单击你需要的版本的目录并下载 snowflake-jdbc-#.#.#.jar 文件。 就我而言,我已经下载了snowflake-jdbc-3.9.2.jar。 (有关 Snowflake JDBC 驱动程序的更多信息,请参阅官方文档。)
  4. 通过创建文件 sf-es.conf 创建管道。 添加以下代码片段并替换所有凭证。

sf-es.conf

input {jdbc {jdbc_driver_library => "/usr/share/logstash/logstash_external_configs/driver/snowflake-jdbc-3.9.2.jar"jdbc_driver_class => "net.snowflake.client.jdbc.SnowflakeDriver"jdbc_connection_string => "jdbc:snowflake://<account_identifier>.snowflakecomputing.com/?db=SNOWFLAKE_SAMPLE_DATA&warehouse=COMPUTE_WH&schema=TPCH_SF1"jdbc_user => "<snowflake_username>"jdbc_password => "<snowflake_password>"schedule => "* * * * *"statement => "select * from customer limit 10;"}
}filter {}output {elasticsearch {cloud_id => "<elastic cloud_id>"cloud_auth => "<elastic_username>:<elastic_password>"index => "sf_customer"}
}

jdbc_connection_string

db=SNOWFLAKE_SAMPLE_DATA
warehouse=COMPUTE_WH
schema=TPCH_SF1

Schedule:你可以在此处计划使用 cron 语法定期运行此流程。 每次运行时,你的数据都会增量移动。 你可以查看有关 scheduling 的更多信息。

请根据你的要求进行更改。

JDBC 分页(可选):这将导致一条 sql 语句被分解为多个查询。 每个查询将使用限制和偏移量来共同检索完整的结果集。 你可以使用它在一次运行中移动所有数据。

通过添加以下配置来启用 JDBC 分页:

jdbc_paging_enabled => true,
jdbc_paging_mode => "explicit",
jdbc_page_size => 100000

        5. 运行 Logstash

bin/logstash -f sf-es.conf

Snowflake-Elasticsearch Python 脚本

如果 Logstash 当前尚未到位或尚未实现,我编写了一个小型 Python 实用程序(可在 GitHub 上找到)从 Snowflake 提取数据并将其推送到 Elasticsearch。 这将一次性提取你的所有数据。 因此,如果你有少量数据需要非定期迁移,则可以使用此实用程序。

注意:这不是官方 Elastic 连接器的一部分。 弹性连接器为各种数据源提供支持。 如果你需要从任何受支持的数据源同步数据,则可以使用此连接器。

1) 安装

git clone https://github.com/ashishtiwari1993/snowflake-elasticsearch-connector.git
cd snowflake-elasticsearch-connector

2)安装依赖项

git clone https://github.com/ashishtiwari1993/snowflake-elasticsearch-connector.git
cd snowflake-elasticsearch-connector

3)修改配置

  • 打开 config/connector.yml。
  • 将凭证替换为以下内容:
snowflake:username: <sf_username>password: <sf_password>account: <sf_account_identifier>database: <db_name>table: <table_name>columns: ""warehouse: ""scheme: ""limit: 50elasticsearch:host: https://localhost:9200username: elasticpassword: elastic@123ca_cert: /path/to/elasticsearch/config/certs/http_ca.crtindex: <sf_customer>

4)运行 Connector

python __main__.py

验证数据

登录 Kibana 并转到 ☰ > Management > Dev Tools。

将以下 API GET 请求复制并粘贴到控制台窗格中,然后单击 ▶(播放)按钮。 这会查询新索引中的所有记录。

GET sf_customer/_search
{"query": {"match_all": {}}
}

结论

我们已成功将数据从 Snowflake 迁移到 Elastic Cloud。 你可以在任何 Elasticsearch 实例上实现相同的目标,无论是在云端还是在本地。

开始在你的数据集上利用全文和语义搜索功能。 你还可以将你的数据与 LLM 连接起来,以构建问答功能。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/637878.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

0121-1-计算机网络安全

计算机网络安全 1.Get 和 Post 的区别 结构&#xff1a;get 有请求体&#xff0c;post没有请求体 应用场景&#xff1a;get 用于获取数据&#xff0c;post用于提交数据&#xff1b; 缓存&#xff1a;get 的缓存保存在浏览器和web服务器日志中&#xff1b; 传输方式&#x…

Centos使用Docker搭建自己的Gitlab(社区版和设置汉化、修改密码、设置SSH秘钥、添加拉取命令端口号)

根据我的经验 部署Gitlab&#xff08;社区版&#xff09; 至少需要2核4g的服务器 带宽3~4M 1. 在自己电脑上安装终端&#xff1a;宝塔ssl终端 或者 FinalShell&#xff0c;根据喜好安装即可 http://www.hostbuf.com/t/988.html http://www.hostbuf.com/downloads/finalshell_w…

基于OpenSSL的SSL/TLS加密套件全解析

概述 SSL/TLS握手时&#xff0c;客户端与服务端协商加密套件是很重要的一个步骤&#xff0c;协商出加密套件后才能继续完成后续的握手和加密通信。而现在SSL/TLS协议通信的实现&#xff0c;基本都是通过OpenSSL开源库&#xff0c;本文章就主要介绍下加密套件的含义以及如何在O…

LC 对角线遍历

LC 对角线遍历 题目描述&#xff1a; 给你一个大小为 m x n 的矩阵 mat &#xff0c;请以对角线遍历的顺序&#xff0c;用一个数组返回这个矩阵中的所有元素。 题目实例&#xff1a; 示例一&#xff1a; 输入&#xff1a;mat [[1,2,3],[4,5,6],[7,8,9]] 输出&#xff1a;[…

前后置、断言、提取变量、数据库操作功能

前置操作和后置操作都是 API 请求在发送和响应过程中执行的脚本&#xff0c;主要用于在发起 API 请求前和获得响应后完成验证或执行某些操作&#xff0c;目的是为了提高 API 调试和测试的效率&#xff0c;并确保接口的正确性。 前置操作​ 前置操作是在 API 请求之前执行的脚本…

Redis(五)

1、布隆过滤 1.1、简介 由一个初值都为零的bit数组和多个哈希函数构成&#xff0c;可以用来快速判断集合中是否存在某个元素&#xff0c;减少占用内存&#xff0c;不保存数据信息&#xff0c;只是在内存中做出一个标记。 它实际上是一个很长的二进制数组(00000000)一系列随机h…

【数据结构与算法】归并排序详解:归并排序算法,归并排序非递归实现

一、归并排序 归并排序是一种经典的排序算法&#xff0c;它使用了分治法的思想。下面是归并排序的算法思想&#xff1a; 递归地将数组划分成较小的子数组&#xff0c;直到每个子数组的长度为1或者0。将相邻的子数组合并&#xff0c;形成更大的已排序的数组&#xff0c;直到最…

OpenCV读取摄像头窗口变大且很卡的解决方法

视频讲解 OpenCV读取摄像头窗口变大且很卡的解决方法 测试过程 读取摄像头窗口变大且很卡的代码 import cv2 cap cv2.VideoCapture(0) if not cap.isOpened():print("Cannot open camera")exit() while True:ret, frame cap.read()if not ret:print("no str…

Arduino开发实例-SDS011粉尘检测传感器驱动

SDS011粉尘检测传感器驱动 文章目录 SDS011粉尘检测传感器驱动1、SDS011介绍2、硬件准备及接线3、代码实现在本文中,将介绍如何使用 Arduino 动粉尘传感器 SDS011 制作空气质量监测系统。 1、SDS011介绍 粉尘本身根据它们的大小分为两类。 直径在2.5至10微米之间的称为粗颗粒…

conda install命令无法安装pytorch

由于网络问题&#xff0c;直接采用conda install命令可能无法直接下载对应的cuda包。 方法一&#xff1a;采用pip命令替代 步骤1&#xff1a;切换pip的源为国内源&#xff1a; 若只是临时切换&#xff1a; pip install -i https://pypi.tuna.tsinghua.edu.cn/simple some-p…

SpringMVC-.xml的配置

文章目录 一、对pom.xml的配置二、对web.xml1.第一种方式2. 第二种方式 三、对SpringMVC.xml的配置 一、对pom.xml的配置 <!-- 打包成war包--><packaging>war</packaging> <dependencies><!-- SpringMVC--><dependency><gro…

Minio文件分片上传实现

资源准备 MacM1Pro 安装Parallels19.1.0请参考 https://blog.csdn.net/qq_41594280/article/details/135420241 MacM1Pro Parallels安装CentOS7.9请参考 https://blog.csdn.net/qq_41594280/article/details/135420461 部署Minio和整合SpringBoot请参考 https://blog.csdn.net/…

k8s-ingress一

Comfigmap&#xff1a;存储数据 Date&#xff1a; Key&#xff1a;value 挂载的方式&#xff0c;把配置信息传给容器 生产当中的yml文件很长&#xff1a; 有deployment 容器的探针 资源限制 Configmap 存储卷 Service Ingress K8s的对外服务&#xff0c;ingress Se…

【Linux学习】进程信号

目录 十七.进程信号 导言 17.1 linux中的信号列表 17.2 标准信号与实时信号 17.3 信号的产生 17.3.1 通过终端按键产生信号 17.3.2 调用系统函数产生信号 17.3.3 软件条件产生信号 17.3.4 硬件异常产生信号 17.3.5 【补充】核心转储 Core Dump 17.4 信号的阻塞 17.4.1 信号相关…

将字符串中可能被视为正则表达式的特殊字符进行转义re.escape()

【小白从小学Python、C、Java】 【计算机等考500强证书考研】 【Python-数据分析】 将字符串中可能被视为 正则表达式的特殊字符 进行转义 re.escape() [太阳]选择题 请问以下代码最后输出的结果是&#xff1f; import re s [a-z] print("【显示】s ",s) print(&q…

python24.1.21面向对象编程

面向对象编程&#xff1a;创建对象&#xff0c;定义对象的方法和属性 封装&#xff1a;隐藏内部实现细节&#xff0c;只通过外部接口访问使用 继承&#xff1a;允许创建有层次的类&#xff08;子类&#xff0c;父类&#xff09; 多态&#xff1a;同样接口&#xff0c;对象具体…

JVM:Java类加载机制

Java类加载机制的全过程&#xff1a; 加载、验证、准备、初始化和卸载这五个阶段的顺序是确定的&#xff0c;类型的加载过程必须按照这种顺序按部就班地开始&#xff0c;而解析阶段则不一定&#xff1a;它在某些情况下可以在初始化阶段之后再开始&#xff0c; 这是为了支持Java…

QWidget: Must construct a QApplication before a QWidget 20:10:25: 程序异常结束。

如果你在Windows上混合并匹配了Release和Debug的dll&#xff0c;则会导致这种情况。我的链接的库是release的版本&#xff0c;也就是qwt.dll&#xff0c;但是点击Qt Creator的运行按钮默认是debug启动&#xff0c;所以报错了&#xff0c;Qt Creator运行按钮里选择release就可以…

重构改善既有代码的设计-学习(一):封装

1、封装记录&#xff08;Encapsulate Record&#xff09; 一些记录性结构&#xff08;例如hash、map、hashmap、dictionary等&#xff09;&#xff0c;一条记录上持有什么字段往往不够直观。如果其使用范围比较宽&#xff0c;这个问题往往会造成许多困扰。所以&#xff0c;记录…

【Linux系统编程二十八】基于条件变量的阻塞队列(生产消费模型)

【Linux系统编程二十八】基于条件变量的阻塞队列(生产消费模型&#xff09; 一.同步问题二.条件变量1.实现原理2.等待的前提3.使用接口①.【定义条件变量】②.【初始化条件变量】③.【让线程去条件变量下等待】④.【为什么第二个参数是锁&#xff1f;】条件变量和锁的关系是什么…