使用 Logstash 及 enrich processor 实现数据丰富自动化

在我之前的文章:

  • Elasticsearch:enrich processor (7.5发行版新功能)

  • Elasticsearch:使用 Elasticsearch ingest pipeline 丰富数据

通过上面的两篇文章的介绍,我们应该充分掌握了如何使用 enrich processor 来丰富数据了。特别是在上面的第二篇文章中,我们需要使用手动来一个一个地通过 Kibana 的界面来写入数据。我们感觉还是比较麻烦。如果我们能够实现自动化来完成整个的操作,那将是非常好的。在今天的文章中,我们将结合 enrich processor 和 Logstash 来实现数据的丰富自动化。我们可以利用 Linux 所提供的脚本来完成数据摄入的自动化。

在一下的展示中,我将使用如下的架构来进行展示:

数据描述

在进行我们的练习之前,我们下载所需要的数据及相关文档:

git clone https://github.com/evermight/elasticsearch-ingest
arallels@ubuntu2004:~/data/elasticsearch-ingest/part-3$ pwd
/home/parallels/data/elasticsearch-ingest/part-3
parallels@ubuntu2004:~/data/elasticsearch-ingest/part-3$ tree -L 3
.
├── 01-zip_geo.sh
├── 02-customer.sh
├── 03-product.sh
├── 04-order_item.sh
├── 05-order.sh
├── data
│   ├── customer
│   │   ├── data.csv
│   │   └── readme.txt
│   ├── mysql
│   │   ├── load.sql
│   │   └── readme.md
│   ├── order
│   │   ├── data.csv
│   │   └── data.xlsx
│   ├── order_item
│   │   ├── data.csv
│   │   └── data.xlsx
│   ├── product
│   │   └── data.csv
│   └── zip_geo
│       ├── data.csv
│       └── data.xlsx
├── env.sample
├── logstash
│   ├── customer.conf
│   ├── order.conf
│   ├── order_item.conf
│   ├── product.conf
│   └── zip_geo.conf
├── mapping
│   ├── customer.json
│   ├── order.json
│   └── zip_geo.json
├── part-3.pdf
├── part-3.pptx
├── pipeline
│   ├── customer.json
│   ├── order_item.json
│   └── order.json
├── policy
│   ├── customer.json
│   ├── order_item.json
│   ├── product.json
│   └── zip_geo.json
├── readme.md
├── run.sh
└── teardown.sh

如上所示,我们的文档结构如上所示。我们的数据结构如下:

我们有如上的几个表格。它们之间的数据是相互关联的。我们知道在 Elasticsearch 中的数据,它不像传统的关系数据库,在查询的时候,我们可以通过 join 来丰富数据,而且为了能够提高数据的查询速度,我们最好把数据实现扁平化,这也就是的数据的非规范化(denormalization)。我们可以详细阅读文章 “Elasticsearch:Elasticsearch 中索引映射的非规范化”。在摄入数据的时候,我们希望把相关的内容最终能丰富到最后的文档中。我们希望实现如下的内容:

从上面的最终结果,我们可以看出来,我们需要的数据来自不同的表格。这个需要我们使用 enrich processor 来帮我们完成。

文件目录描述

在项目的目录(part-3)下面,我们可以看到如下的几个子目录:

  • data:在这个目录里它含有我们需要的各个数据以及它们的来源
  • mapping:在这个目录中,它含有各个表格数据的 mapping。通常我们并不需要预先定义数据的类型。我们可以让 Elasticsearch 帮我们自动识别数据的类型,但这往往不是最佳的。通过定义相应数据的 mapping,一方面它可以帮忙明确地定义数据字段的类型,比如 geo_point 数据类型,另一方面,通过设置 mapping,也可以提高数据的摄入速度
  • policy:在这个目录中,它定义了使用 enrich processor 时所需要的 policies。
  • pipeline:在这个目录里,它定义了在 enrich 时,我们需要使用到的 enrich processor
  • logstash:在这个目录里,它定义了 Logstash 需要使用到的配置文件

写入文档的顺序

由于我们的数据是一个关系数据表格,在我们写入数据的时候,我们先从上面图中的右边开始写入数据,这是因为左边的表格依赖于右边的表格。只有它们的数据是准备好的状态,那么我们才可以利用它们来丰富左边的表格。这也就是我们看到的如下的脚本:

如上图所示,我们可以看到  

01-zip_geo.sh 
02-customer.sh
03-product.sh 
04-order_item.sh
05-order.sh  

这个其实就是我们执行脚本的顺序。我们需要按照上面的顺序从上到下来进行执行。

摄入数据

我们知道在我们摄入数据的时候,我们可以使用 Logstash 来写入 CSV 文档。Logstash 的好处是,它含有丰富的 filters 来供我们对数据进行处理。

针对 Elastic Stack 8.x 的安装来说,在默认的情况下,Elasticsearch 是带有安全的。针对自签名的集群来说,它通常还含有证书。针对带有安全的集群,我们可以参考文章 “Logstash:如何连接到带有 HTTPS 访问的集群”。下面,我们以摄入 zip_geo 为例来进行展示。在摄入数据的时候,我们需要使用到 fingerprint。我们可以参考文章 “Beats:使用 fingerprint 来连接 Beats/Logstash 和 Elasticsearch”。

在 logstash 目录下,我们可以看到如下的 zip_geo.conf 文档:

zip_geo.conf

input {file {path => "##PROJECTPATH##/data/zip_geo/data.csv"start_position => "beginning"sincedb_path => "/dev/null" mode => "read"exit_after_read => truefile_completed_action => "log"file_completed_log_path => "##PROJECTPATH##/.logstash-status"}
}filter {csv {autodetect_column_names => true}mutate {convert => {"zip" => "integer""point" => "string"}}
}output {elasticsearch {hosts => ["##ELASTICHOST##"]ssl => ##ELASTICSSL##user => "##ELASTICUSER##"password => "##ELASTICPASS##"index => "zip_geo"ssl => trueca_trusted_fingerprint => "##FINGERPRINT##"}
}

这是一个标准的 Logstash 配置文件。在上面,我们可以看到一下奇奇怪怪的的像 ##PROJECTPATH## 这样的占位符号。这个需要在哪里配置呢?

我们回到项目的根目录下(part-3),我们可以看到一个叫做 env.sample 的文档。我们通过如下的命令来来创建一个叫做 .env 的文件:

cp env.sample .env

我们可以使用我们喜欢的编辑器来编辑这个 .env 文件:

vi .env
PROJECTPATH="/home/parallels/data/elasticsearch-ingest/part-3"
ELASTICHOST="192.168.0.3:9200"
ELASTICSSL="true"
ELASTICUSER="elastic"
ELASTICPASS="h6y=vgnen2vkbm6D+z6-"
FINGERPRINT="bd0a26dc646ef1cb3cb5e132e77d6113e1b46d56ee390dd3c6f0b2d2b16962c4"
LOGSTASHPATH="/home/parallels/elastic/logstash-8.8.2"

我们根据自己的配置填入上面的信息。其中 FINGERPRINT 最为简单的办法就是通过 Kibana 的配置文件 config/kibana.yml 文件来获得。我们保存好上面的文件。这里其实就是定义的环境变量。我们接下来查看 1-zip_geo.sh 文件:

1-zip_geo.sh

#!/bin/bashsource ./.envhostprotocol="http"
if [ "$ELASTICSSL" = "true" ]; thenhostprotocol="https"
ficurl -k -X PUT -u $ELASTICUSER:$ELASTICPASS "$hostprotocol://$ELASTICHOST/zip_geo"
curl -k -X PUT -u $ELASTICUSER:$ELASTICPASS "$hostprotocol://$ELASTICHOST/zip_geo/_mapping" \
-H "Content-Type: application/json" \
-d @$PROJECTPATH/mapping/zip_geo.jsonlogstashconf=`cat ${PROJECTPATH}/logstash/zip_geo.conf`
logstashconf="${logstashconf//\#\#PROJECTPATH\#\#/"$PROJECTPATH"}"
logstashconf="${logstashconf//\#\#ELASTICHOST\#\#/"$ELASTICHOST"}"
logstashconf="${logstashconf//\#\#ELASTICSSL\#\#/"$ELASTICSSL"}"
logstashconf="${logstashconf//\#\#ELASTICUSER\#\#/"$ELASTICUSER"}"
logstashconf="${logstashconf//\#\#ELASTICPASS\#\#/"$ELASTICPASS"}"
logstashconf="${logstashconf//\#\#FINGERPRINT\#\#/"$FINGERPRINT"}"
$LOGSTASHPATH/bin/logstash -e "$logstashconf"curl -k -X PUT -u $ELASTICUSER:$ELASTICPASS "$hostprotocol://$ELASTICHOST/_enrich/policy/zip_geo_policy" \
-H "Content-Type: application/json" \
-d @$PROJECTPATH/policy/zip_geo.jsonsleep 30
curl -k -X PUT -u $ELASTICUSER:$ELASTICPASS "$hostprotocol://$ELASTICHOST/_enrich/policy/zip_geo_policy/_execute"

上面的代码看起来很负责,一下子看不太明白。在开始的部分,我们从环境变量里得到 ELASTICSSL 的值。如果 Elasticsearch 集群的访问是 https 访问的,那么这个值应该设置为 true。这个在接下来的 curl 指令中需要用到。值得注意的是:由于我们的集群是自签名的,我们使用 -k 选项来绕开证书的配置,尽管我们也可以通过设置来配置证书的访问。

记下来,我们使用 curl 指令来创建 zip_geo 索引。它的指令的格式有点类似:

curl -k -u elastic:h6y=vgnen2vkbm6D+z6- https://localhost:9200/zip_geo

如果是在 Kibana 中的 Dev Tools 中进行操作,它相当于:

PUT zip_geo

上述指令创建一个叫做 zip_geo 的指令。

接下来的指令,它相当于:

curl -k -X PUT -u elastic:h6y=vgnen2vkbm6D+z6- ”https://localhost:9200/zip_geo/_mapping" \
-H "Content-Type: application/json" \
-d /Users/liuxg/data/elasticsearch-ingest/part-3/mapping/zip_geo.json

上述命令相当于在 Kibana 中打入如下的命令:

PUT zip_geo/_mapping
{"properties": {"zip": {"type": "long"},"point": {"type": "geo_point"}}
}

下面的代码:

logstashconf=`cat ${PROJECTPATH}/logstash/zip_geo.conf`
logstashconf="${logstashconf//\#\#PROJECTPATH\#\#/"$PROJECTPATH"}"
logstashconf="${logstashconf//\#\#ELASTICHOST\#\#/"$ELASTICHOST"}"
logstashconf="${logstashconf///\#\#ELASTICSSL\#\#/"$ELASTICSSL"}"
logstashconf="${logstashconf//\#\#ELASTICUSER\#\#/"$ELASTICUSER"}"
logstashconf="${logstashconf//\#\#ELASTICPASS\#\#/"$ELASTICPASS"}"
logstashconf="${logstashconf//\#\#FINGERPRINT\#\#/"$FINGERPRINT"}"
./bin/logstash -e "$logstashconf"

这部分代码的真正意思是替换 zip_geo,conf 里含有 “## ... ##" 部分的字符串进行替换。如果你对这个不是很熟悉的话,请参阅网上的链接。在上面的最后部分,我们使用 Logstash 来运行在 logstashconf 变量里的管道。

下面的代码:

curl -k -X PUT -u $ELASTICUSER:$ELASTICPASS "$hostprotocol://$ELASTICHOST/_enrich/policy/zip_geo_policy" \
-H "Content-Type: application/json" \
-d @$PROJECTPATH/policy/zip_geo.json

它用来运行 zip_geo_policy 以生成相应的 .enrich_zip_geo_policy,,,,, 索引。它想到于如下的命令:

curl -k -X PUT -u elastic:h6y=vgnen2vkbm6D+z6- "https://localhost:9200/_enrich/policy/zip_geo_policy" \
-H "Content-Type: application/json" \
-d @$PROJECTPATH/policy/zip_geo.json

在 Kibana 中,我们可以打入如下的命令来实现同样的功能:

PUT /_enrich/policy/zip_geo_policy
{"match": {"indices": "zip_geo","match_field": "zip","enrich_fields": ["point"]}
}

由于生成丰富索引需要一定的时间,在脚本的部分,我们挂起 30 秒的时间,当然这个依赖于数据量的多少。

在最后的部分,我们执行:

curl -k -X PUT -u $ELASTICUSER:$ELASTICPASS "$hostprotocol://$ELASTICHOST/_enrich/policy/zip_geo_policy/_execute"

它相当于执行:

curl -k -X PUT -u elastic:h6y=vgnen2vkbm6D+z6- "https://localhost:9200/_enrich/policy/zip_geo_policy/_execute"

在 Kibana 中,我们可以通过如下的命令来完成相应的功能:


PUT /_enrich/policy/zip_geo_policy/_execute

好了,让我们来执行第一个脚本:

运行完,我们的第一个脚本后,我们可以在 Kibana 中进行查看:

我们按照同样的套路依次执行如下的脚本:

02-customer.sh
03-product.sh 
04-order_item.sh
05-order.sh  

在运行完 02-customer.sh 后,我们可以看到:

我们接着运行 02-product.sh 脚本。我们可以查看到 product 索引的文档:

我们再接着运行 04-order_item.sh 脚本:

我们接下来运行 05-order.sh:

从上面,我们可以看到我们最终想要的结果。

为了能删除所有之前创建的资源,我们可以一键删除:

./teardown.sh

然后,我们可以再使用一个命令来完成所有的运行:

parallels@ubuntu2004:~/data/elasticsearch-ingest/part-3$ cat run.sh
./01-zip_geo.sh
./02-customer.sh
./03-product.sh
./04-order_item.sh
./05-order.sh
./run.sh

特别注意的一点是,我们的 enrich processor 是在 ingest pipeline 里被调用的,比如:

output {elasticsearch {hosts => ["##ELASTICHOST##"]ssl => ##ELASTICSSL##user => "##ELASTICUSER##"password => "##ELASTICPASS##"index => "customer"pipeline => "customer_pipeline"ca_trusted_fingerprint => "##FINGERPRINT##"    }
}

你可以在地址下载所有的代码:GitHub - evermight/elasticsearch-ingest

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/7242.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

uniapp使用自定义导航栏和手机自带的状态栏重叠

【问题界面】&#xff1a; 【正常界面】&#xff1a; 【解决方法】&#xff1a; 在页面顶部添加代码<!-- #ifndef H5 --> <statusBar></statusBar> <!-- #endif --> 2.引入占位条并注册 import statusBar from "/uni_modules/uni-nav-bar/c…

IDEA使用lombok实体类加上@Data注解后无法找到get和set方法

文章目录 一、问题原因二、解决方法1.File→Settings2.Plugins→搜索"lombok"→Install3.Restart IDE&#xff08;重启IDEA&#xff09; 一、问题原因 IDEA没有安装lombok插件 二、解决方法 1.File→Settings 2.Plugins→搜索"lombok"→Install 3.Restart…

IDEA安装热部署插件JRebel详解

JRebel 简介 JRebel是一套JavaEE开发工具。JRebel允许开发团队在有限的时间内完成更多的任务修正更多的问题&#xff0c;发布更高质量的软件产品。 JRebel是收费软件&#xff0c;用户可以在JRebel官方站点下载30天的评估版本。 Jrebel 可快速实现热部署&#xff0c;节省了大量重…

STM32MX配置EEPROM(AT24C02)------保姆级教程

———————————————————————————————————— ⏩ 大家好哇&#xff01;我是小光&#xff0c;嵌入式爱好者&#xff0c;一个想要成为系统架构师的大三学生。 ⏩最近在开发一个STM32H723ZGT6的板子&#xff0c;使用STM32CUBEMX做了很多驱动&#x…

vue3 项目打包后白屏

根据Vue3.x文档&#xff0c;在 vue.config.js/vite.config.ts 统一对webpack、跨域、端口号等属性进行配置。 1.在 vue.config.js/vite.config.ts添加publicPath属性并将值更改成 ‘./’ 在这里插入图片描述 2.若还没有解决就去路由中将history模式设置成默认的Hash模式&…

探寻智能化未来:AI与Web3共创金融领域巨大潜力

人工智能&#xff08;AI&#xff09;和Web3技术的迅猛发展为我们带来了许多新的机遇和影响。在数字经济和社会的浪潮中&#xff0c;结合了AI的智能化能力和Web3的去中心化与区块链技术&#xff0c;我们将进入一个智能化的Web3时代。人工智能和Web3技术是开拓生产力极限和重新定…

【git学习2】多人合作中git的使用

提交代码 中间打勾&#xff1a;commit提交代码 最后点击向上的箭头 push到远程仓库。 团队开发中git的使用 第一步先从远程仓库中某个分支拉下来代码&#xff0c;比如下图只有一个分支master 新建文件夹&#xff0c;存放这个拉下来的项目&#xff0c;克隆项目地址&#xff…

抖音seo矩阵系统源码保姆式开发部署指导

抖音seo霸屏&#xff0c;是一种专为抖音视频创作者和传播者打造的视频批量剪辑&#xff0c;批量分发产品。使用抖音seo霸屏软件&#xff0c;可以帮助用户快速高效的制作出高质量的优质视频。 使用方法&#xff1a;1. 了解用户的行为习惯 2. 充分利用自身资源进行开发 3. 不…

Matlab 点云平面特征提取

文章目录 一、简介二、实现代码2.1基于k个邻近点2.2基于邻近半径参考资料一、简介 点云中存在这各种各样的几何特征,这里基于每个点的邻域协方差来获取该点的所具有的基础几何特征(如下图所示),这样的做法虽然不能很好的提取出点云中的各个部分,但却是可以作为一种数据预处…

视频超分新方法--助力实现高清wav2lip数字人

文章目录 前言一、解决方案详解总结前言 ` 随着人工智能的不断发展,数字人技术也越来越重要,很多人都开启了学习模型 但是使用神级模型wav2lip生成的数字人嘴部不清晰怎么办。 很影响使用效果,接下来教大家如何优化这个问题,如下图所示: 一、解决方案详解 因为wav2lip是…

Xshell使用sftp传输文件

单击工具栏新建回话图标&#xff0c;在弹出的新建回话窗口中协议选择SFTP&#xff0c;输入主机名或ip地址&#xff0c;端口号22&#xff0c;单击连接&#xff0c;输入用户名和密码完成创建连接。 本地/远程目录设置&#xff1a;新建会话时在下图中SFTP中设置文件上传下载的本地…

【Nodejs】Express模板使用

1.Express脚手架的安装 安装Express脚手架有两种方式&#xff1a; 使用express-generator安装 使用命令行进入项目目录&#xff0c;依次执行&#xff1a; cnpm i -g express-generator可通过express -h查看命令行的指令含义 express -hUsage: express [options] [dir] Optio…

Zia和ChatGPT如何协同工作?

有没有集成ChatGPT的CRM系统推荐&#xff1f;Zoho CRM已经正式与ChatGPT集成。下面我们将从使用场景、使用价值和使用范围等方面切入讲述CRMAI的应用和作用。 Zia和ChatGPT如何协同工作&#xff1f; Zia和ChatGPT是不同的人工智能模型&#xff0c;在CRM中呈现出共生的关系。 …

JAVA基础-Stream流

引言 Java 8 版本新增的Stream&#xff0c;配合同版本出现的Lambda &#xff0c;给我们操作集合&#xff08;Collection&#xff09;提供了极大的 便利。Stream流是JDK8新增的成员&#xff0c;允许以声明性方式处理数据集合&#xff0c;可以把Stream流看作是遍历数据集 合的一个…

java多线程常见面试题

1、线程和进程的区别 本质区别&#xff1a; 进程是一个程序的实例&#xff0c;是操作系统资源分配的最小单位&#xff1b;&#xff0c;是任务调度与执行的最小单位包含关系&#xff1a; 进程至少由一个线程组成&#xff0c;线程可看做轻量级进程资源开销&#xff1a; 进程有自…

通用文字识别OCR 之实现自动化办公

摘要 随着技术的发展&#xff0c;通用文字识别&#xff08;OCR&#xff09;已经成为现代办公环境中不可或缺的工具之一。OCR技术可以将印刷或手写文本转换为可编辑或可搜索的数字文本&#xff0c;极大地提高了办公效率并实现了自动化办公。本文将深入探讨OCR技术在实现自动化办…

一百三十五、Azkaban——AzkabanWebServer服务开启后秒退

一、问题 Azkaban的AzkabanWebServer服务开启后秒退&#xff0c;造成Azkaban的页面登录不上 AzkabanWebServer服务开启后&#xff0c;第一个jps里面有AzkabanWebServer&#xff0c;随后第二个jps里面没有AzkabanWebServer 二、问题原因 MySQL中azkaban数据库的表executors的…

Spring,SpringBoot,Spring MVC的区别是什么

1.Spring是什么 我们通常所说的 Spring 指的是 Spring Framework&#xff08;Spring 框架&#xff09;&#xff0c;它是⼀个开源框架&#xff0c;有着活跃⽽庞⼤的社区&#xff0c;这就是它之所以能⻓久不衰的原因。Spring ⽀持⼴泛的应⽤场景&#xff0c;它可以让 Java 企业级…

遇到了一个存在XSS(存储型)漏洞的网站

第一个漏洞self xss&#xff08;存储型&#xff09; 存在漏洞的网站是https://www.kuangstudy.com/ 然后点击个人设置 在编辑主页中&#xff0c;我们可以用最简单的script语句进行注入&#xff0c;提交&#xff1b; 出现弹窗&#xff0c;说明它已经把代码进行解析&#x…

LLM - Chinese-Llama-2-7b 初体验

目录 一.引言 二.模型下载 三.快速测试 四.训练数据 五.总结 一.引言 自打 LLama-2 发布后就一直在等大佬们发布 LLama-2 的适配中文版&#xff0c;也是这几天蹲到了一版由 LinkSoul 发布的 Chinese-Llama-2-7b&#xff0c;其共发布了一个常规版本和一个 4-bit 的量化版本…