使用 Logstash 丰富你的 Elasticsearch 文档

作者:来自 Elastic David Pilato

我们在上一篇文章中看到,我们可以使用摄取管道中的 Elasticsearch Enrich Processor 在 Elasticsearch® 中进行数据丰富。 但有时,你需要执行更复杂的任务,或者你的数据源不是 Elasticsearch,而是另一个源。 或者,你可能希望存储在 Elasticsearch 和第三方系统中,在这种情况下,将管道的执行转移到 Logstash® 很有意义。

使用 Elasticsearch 丰富 Elasticsearch 数据

使用 Logstash,使用类似于以下的管道,这非常容易:

input {# Read all documents from Elasticsearchelasticsearch {hosts => ["${ELASTICSEARCH_URL}"]user => "elastic"password => "${ELASTIC_PASSWORD}"index => "kibana_sample_data_logs"docinfo => trueecs_compatibility => "disabled"}
}filter {# Enrich every document with Elasticsearchelasticsearch {hosts => ["${ELASTICSEARCH_URL}"]user => "elastic"password => "${ELASTIC_PASSWORD}"index => "vip"query => "ip:%{[clientip]}"sort => "ip:desc"fields => {"[name]" => "[name]""[vip]" => "[vip]"}}mutate { remove_field => ["@version", "@timestamp"] }
}output {if [name] {# Write all modified documents to Elasticsearchelasticsearch {manage_template => falsehosts => ["${ELASTICSEARCH_URL}"]user => "elastic"password => "${ELASTIC_PASSWORD}"index => "%{[@metadata][_index]}"document_id => "%{[@metadata][_id]}"}}
}

总共,我们有 14074 个事件需要解析。 虽然不是很多,但对于这个演示来说已经足够了。 这是一个示例事件:

{"agent": "Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24","bytes": 1831,"clientip": "30.156.16.164","extension": "","geo": {"srcdest": "US:IN","src": "US","dest": "IN","coordinates": {"lat": 55.53741389,"lon": -132.3975144}},"host": "elastic-elastic-elastic.org","index": "kibana_sample_data_logs","ip": "30.156.16.163","machine": {"ram": 9663676416,"os": "win xp"},"memory": 73240,"message": "30.156.16.163 - - [2018-09-01T12:43:49.756Z] \"GET /wp-login.php HTTP/1.1\" 404 1831 \"-\" \"Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24\"","phpmemory": 73240,"referer": "http://www.elastic-elastic-elastic.com/success/timothy-l-kopra","request": "/wp-login.php","response": 404,"tags": ["success","info"],"timestamp": "2023-03-18T12:43:49.756Z","url": "https://elastic-elastic-elastic.org/wp-login.php","utc_time": "2023-03-18T12:43:49.756Z","event": {"dataset": "sample_web_logs"}
}

正如我们在上一篇文章中看到的,vip 索引包含有关我们客户的信息:

{ "ip" : "30.156.16.164", "vip": true, "name": "David P" 
}

我们可以通过以下方式运行管道:

docker run \--name=logstash \--rm -it \-v $(pwd)/logstash-config/pipeline/:/usr/share/logstash/pipeline/ \-e XPACK_MONITORING_ENABLED=false \-e ELASTICSEARCH_URL="$ELASTICSEARCH_URL" \-e ELASTIC_PASSWORD="$ELASTIC_PASSWORD" \docker.elastic.co/logstash/logstash:8.12.0

丰富的文档现在看起来像这样:

{"agent": "Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24","bytes": 1831,"clientip": "30.156.16.164","extension": "","geo": {"srcdest": "US:IN","src": "US","dest": "IN","coordinates": {"lat": 55.53741389,"lon": -132.3975144}},"host": "elastic-elastic-elastic.org","index": "kibana_sample_data_logs","ip": "30.156.16.163","machine": {"ram": 9663676416,"os": "win xp"},"memory": 73240,"message": "30.156.16.163 - - [2018-09-01T12:43:49.756Z] \"GET /wp-login.php HTTP/1.1\" 404 1831 \"-\" \"Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24\"","phpmemory": 73240,"referer": "http://www.elastic-elastic-elastic.com/success/timothy-l-kopra","request": "/wp-login.php","response": 404,"tags": ["success","info"],"timestamp": "2023-03-18T12:43:49.756Z","url": "https://elastic-elastic-elastic.org/wp-login.php","utc_time": "2023-03-18T12:43:49.756Z","event": {"dataset": "sample_web_logs"},"vip": true,"name": "David P"
}

实际上很简单,但有一个问题:速度很慢。 通过网络进行查找,尽管 Elasticsearch 速度极快,但仍然会减慢整个管道的速度。

使用静态 JDBC 过滤器

我最近在 ParisJUG 遇到了 Laurent,他来自令人惊叹的 Elastic Consulting 团队,我们讨论了这个问题。 他告诉我,他的一位客户必须面对这个问题。 他建议改用 Logstash 中的 Elasticsearch 缓存。

问题是:Logstash 中没有这样的过滤器缓存插件。 他找到了一种非常聪明的方法来解决该问题,即利用静态 JDBC 过滤器插件和 Elasticsearch JDBC 驱动程序。

请注意,这需要拥有白金许可证(或试用版)。

添加 Elasticsearch JDBC 驱动程序

我们首先需要将 JDBC 驱动程序添加到 Logstash 实例中。

mdir -p logstash-config/lib
wget https://artifacts.elastic.co/maven/org/elasticsearch/plugin/x-pack-sql-jdbc/8.12.0/x-pack-sql-jdbc-8.12.0.jar
mv x-pack-sql-jdbc-8.12.0.jar logstash-config/lib

我们只需要与 Logstash docker 实例共享此目录:

time docker run \--name=logstash \--rm -it \-v $(pwd)/logstash-config/pipeline/:/usr/share/logstash/pipeline/ \-v $(pwd)/logstash-config/lib/:/tmp/lib/ \-e XPACK_MONITORING_ENABLED=false \-e ELASTICSEARCH_URL="$ELASTICSEARCH_URL" \-e ELASTIC_PASSWORD="$ELASTIC_PASSWORD" \docker.elastic.co/logstash/logstash:8.12.0

更新管道

input 部分不变。 但现在,我们要在内存中创建一个名为 vip 的临时表(为了保持一致性)。 该表结构是使用 local_db_objects 参数定义的:

jdbc_static {local_db_objects => [ {name => "vip"index_columns => ["ip"]columns => [["name", "VARCHAR(255)"],["vip", "BOOLEAN"],["ip", "VARCHAR(64)"]]} ]
}

当 jdbc_static 启动时,我们要首先从 Elasticsearch vip索引中读取所有数据集。 这是在 loaders 选项中完成的:

jdbc_static {loaders => [ {query => "select name, vip, ip from vip"local_table => "vip"} ]jdbc_user => "elastic"jdbc_password => "${ELASTIC_PASSWORD}"jdbc_driver_class => "org.elasticsearch.xpack.sql.jdbc.EsDriver"jdbc_driver_library => "/tmp/lib/x-pack-sql-jdbc-8.12.0.jar"jdbc_connection_string => "jdbc:es://${ELASTICSEARCH_URL}"
}

每次我们需要进行查找时,我们都希望使用以下语句来执行它:

SELECT name, vip FROM vip WHERE ip = "THE_IP"

这可以使用 local_lookups 参数定义:

jdbc_static {local_lookups => [ {query => "SELECT name, vip FROM vip WHERE ip = :ip"parameters => { "ip" => "clientip" }target => "vip"} ]
}

如果没有找到数据,我们可以使用 default_hash 选项提供默认值:

jdbc_static {local_lookups => [ {query => "SELECT name, vip FROM vip WHERE ip = :ip"parameters => { "ip" => "clientip" }target => "vip" default_hash => {name => nilvip => false}} ]
}

最后,这将在事件中生成 vip.name 和 vip.vip 字段。

我们现在可以定义我们想要对这些临时字段执行的操作:

jdbc_static {add_field => { name => "%{[vip][0][name]}" }add_field => { vip => "%{[vip][0][vip]}" }remove_field => ["vip"]
}

这给出了以下过滤器:

filter {# Enrich every document with Elasticsearch via static JDBCjdbc_static {loaders => [ {query => "select name, vip, ip from vip"local_table => "vip"} ]local_db_objects => [ {name => "vip"index_columns => ["ip"]columns => [["name", "VARCHAR(255)"],["vip", "BOOLEAN"],["ip", "VARCHAR(64)"]]} ]local_lookups => [ {query => "SELECT name, vip FROM vip WHERE ip = :ip"parameters => { "ip" => "clientip" }target => "vip" default_hash => {name => nilvip => false}} ]add_field => { name => "%{[vip][0][name]}" }add_field => { vip => "%{[vip][0][vip]}" }remove_field => ["vip"]jdbc_user => "elastic"jdbc_password => "${ELASTIC_PASSWORD}"jdbc_driver_class => "org.elasticsearch.xpack.sql.jdbc.EsDriver"jdbc_driver_library => "/tmp/lib/x-pack-sql-jdbc-8.12.0.jar"jdbc_connection_string => "jdbc:es://${ELASTICSEARCH_URL}"}mutate { remove_field => ["@version", "@timestamp"] }
}

将修改后的文档写入Elasticsearch

在第一个管道中,我们测试事件中是否确实存在名称字段:

if [name] {# Index to Elasticsearch
}

我们仍然可以使用类似的东西,但因为我们提供了默认值,以防在 Elasticsearch vip 索引中找不到 ip,所以现在它会在标签表中生成一个新的 _jdbcstaticdefaultsused 标签。

我们可以用它来知道我们是否发现了某些东西,如果是前者,则将我们的数据发送到 Elasticsearch:

output {if "_jdbcstaticdefaultsused" not in [tags] {# Write all the modified documents to Elasticsearchelasticsearch {manage_template => falsehosts => ["${ELASTICSEARCH_URL}"]user => "elastic"password => "${ELASTIC_PASSWORD}"index => "%{[@metadata][_index]}"document_id => "%{[@metadata][_id]}"}}
}

更快吗?

因此,当我们在这个小数据集上运行测试时,我们可以看到,使用 Elasticsearch 过滤器方法,需要两分钟多一点的时间来丰富我们的数据集:

real    2m3.146s
user    0m0.077s
sys     0m0.042s

当使用 JDBC 静态过滤器方法运行管道时,现在只需不到一分钟:

real    0m48.575s
user    0m0.064s
sys     0m0.039s

正如我们所看到的,我们显着减少了该丰富管道的执行时间(增益约为 60%)。

如果你有一个可以轻松放入 Logstash JVM 内存的小型 Elasticsearch 索引,你可以尝试此策略(或类似的策略)。 如果你有数亿个文档,你仍然应该使用 Elasticsearch Filter Plugin。

结论

在这篇文章中,我们了解了当我们需要在 Elasticsearch 中执行一些查找时,如何使用 JDBC 静态过滤器插件来加速数据丰富管道。 在下一篇文章中,我们将了解如何使用 Elastic Agent 在边缘进行类似的丰富。

本文中描述的任何特性或功能的发布和时间安排均由 Elastic 自行决定。 当前不可用的任何特性或功能可能无法按时交付或根本无法交付

更多阅读:

  • Logstash:Jdbc static filter plugin 介绍

  • Logstash:运用 jdbc_streaming 来丰富我们的数据

原文:Enrich your Elasticsearch documents with Logstash | Elastic Blog

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/734719.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python的scripts文件夹作用

Windows系统: Scripts文件夹通常位于Python的安装目录下,如C:\Python\Scripts。该文件夹内包含了各种有用的工具,例如pip、virtualenv等,这些工具有助于管理和配置Python环境和依赖包。 Linux系统: 在Linux系统中&…

集简云新增通义千问qwen 72b chat、qwen1.5 等多种大语言模型,提升多语言支持能力

通义千问再开源!继发布多模态模型后,通义千问 1.5 版本也在春节前上线。 此次大模型包括六个型号:0.5B、1.8B、4B、7B、14B 和 72B,性能评测基础能力在在语言理解、代码生成、推理能力等多项基准测试中均展现出优异的性能&#x…

大话设计模式——5.代理模式(Proxy Pattern)

1.定义 为其他具体对象提供一种代理用以控制对这个对象的访问,属于结构型模式。 UML图: 2.示例 生活中有许多的代理,如房产中介,房主出售的房子挂在中介处,中介帮忙寻找需要的客户,客户不需要直接接触房…

linux tar分卷压缩与windows合并解压

linux tar分卷压缩 tar -czf - Shadowsocks | split -b 1000k -d - shadowsocks.tar.gz tar -czf - 文件夹名 | split -b 1000k -d - 输出文件 如有如下几个tar分卷:logs.tar.gza1、logs.tar.gza2、logs.tar.gza3,在Windows下如何进行合并呢&#xff…

【备战蓝桥杯系列】单源最短路径Dijkstra算法模板

Dijkstra算法模板 蓝桥杯中也是会考到图论最短路的,一旦考到,基本是不会太难的,只要知道板子就基本能拿分了。 两个板子如下 朴素Dijkstra算法 适应情况:稠密图,正权边 时间复杂度 O(n^2 m) int dijkst(){memse…

银河麒麟服务器ky10 server wvp镜像制作

在线安装docker yum install docker -y cat >/etc/docker/daemon.json<<EOF{"registry-mirrors": ["https://registry.docker-cn.com","https://dockerhub.azk8s.cn","https://hub-mirror.c.163.com"]} EOF systemctl start …

php集成修改数据库的字段

1.界面效果 2.代码 <?phpecho <form action"" method"post"><label for"table">表名:</label><input type"text" id"table" name"table"><br><div id"fieldsContaine…

js【详解】async await

为什么要使用 async await async await 实现了使用同步的语法实现异步&#xff0c;不再需要借助回调函数&#xff0c;让代码更加易于理解和维护。 (async function () {// await 必须放在 async 函数中try {// 加载第一张图片const img1 await loadImg1()// 加载第二张图片co…

P1002 [NOIP2002 普及组] 过河卒

题目 原题目链接 题目描述 棋盘上 A A A 点有一个过河卒&#xff0c;需要走到目标 B B B 点。卒行走的规则&#xff1a;可以向下、或者向右。同时在棋盘上 C C C 点有一个对方的马&#xff0c;该马所在的点和所有跳跃一步可达的点称为对方马的控制点。因此称之为“马拦过河…

比较两组二维平面结构的演化

假设1个6*6的二维平面空间&#xff0c;这个空间的行和列只能按照1-2-3-4-5-6-1的顺序变换。这个平面上的物体只能平移。在这个空间里有力&#xff0c;在这些力的作用下&#xff0c;两个点按照 1-7的顺序运动。 - - - - - - - - - - - - - - - A - - - - - …

序列化相关知识总结

目录 一、序列化1.1 基本概念1.1.1 序列化1.1.2 反序列化1.1.3 数据结构、对象与二进制串1.1.4 序列化/反序列化的目的 1.2 几种常见的序列化和反序列化协议1.2.1 XML&SOAP1.2.2 JSON&#xff08;Javascript Object Notation&#xff09;1.2.3 Protobuf 二、安卓下的序列化…

React 开发者完全指南:React.FC()、函数组件 和更多

前言 React.FC 是一个 TypeScript 类型&#xff0c;用于 React 函数组件。FC 代表 Functional Component&#xff08;函数组件&#xff09;。这个类型的使用有助于在 TypeScript 项目中编写类型安全的 React 组件。使用 React.FC 为组件定义类型就可以享受到 TypeScript 提供的…

什么是IP白名单?为什么要设置IP白名单?

在互联网的世界里&#xff0c;IP地址是每个设备与网络进行通信的关键标识。然而&#xff0c;并不是所有的IP地址都可以无限制地访问所有网络资源。为了保障网络安全和资源管理&#xff0c;很多网站和服务会设置IP白名单。本文将详细介绍IP白名单的定义、作用以及为什么要设置IP…

ARMv8/ARMv9架构入门到精通-学习方法

目录 1、学习ARM基础知识2、学习ARM异常(中断)3、学习MMU4、学习Cache5、学习Trustzone和安全架构6、学习ARM架构和各类IP推荐 本文转自 周贺贺&#xff0c;baron&#xff0c;代码改变世界ctw&#xff0c;Arm精选&#xff0c; 资深安全架构专家&#xff0c;11年手机安全/SOC底层…

SpringMVC06、数据处理

6、数据处理 6.1、处理提交数据 1、提交的域名称和处理方法的参数名一致 提交数据 : http://localhost:8080/hello?namekuangshen 处理方法 : RequestMapping("/hello") public String hello(String name){System.out.println(name);return "hello";…

Flask基于配置文件添加项目config配置

文章目录 1. 直接在app文件中添加配置2. 基于配置config文件添加配置2.1 直接在配置文件中定义2.2 调用配置文件中的类2.3 基于字典类实现多种环境配置 Flask 项目中&#xff0c;我们会加载很多配置&#xff0c;比如设置数据库连接信息&#xff0c;设置日志所在路径等等。配置的…

【PTA】L1-011 L1-012 L1-013 L1-014 L1-015(C)第三天

L1-011 A-B 分数 20 作者 陈越 单位 浙江大学 本题要求你计算A−B。不过麻烦的是&#xff0c;A和B都是字符串 —— 即从字符串A中把字符串B所包含的字符全删掉&#xff0c;剩下的字符组成的就是字符串A−B。 输入格式&#xff1a; 输入在2行中先后给出字符串A和B。两字符…

javase day02笔记

第二天课堂笔记 源文件的组成部分★★ 源文件外部结构 class 类名{}main方法 public static void main(String [] args){}main方法可有可无 没有main的情况&#xff0c;编译成功&#xff0c;运行失败&#xff0c;没有程序入口 多个main情况&#xff0c;编译报错&#xff0c;…

半监督 伪标签

什么是半监督学习 半监督学习也是一类更接近于人类学习方法的机器学习范式。试想这样一个场景&#xff0c;我们小时候学习识别小猫、小狗、汽车等等物品时&#xff0c;往往只需要父母进行一两次的指导&#xff0c;我们就能很准确地辨认出什么是猫狗。这背后有一个重要原因是&am…

抖音素材网站去哪下载?给你推荐六个抖音自媒体网站

各位抖音视频创作达人们&#xff0c;是否在苦苦寻觅那些能够点燃观众热情&#xff0c;让视频内容跃然屏上的素材宝库呢&#xff1f;此刻&#xff0c;你们的寻觅之旅将迎来终点&#xff01;我将向你们隆重推荐10个精心挑选的视频素材库&#xff0c;它们定能让你们的抖音视频如同…