如何通过 Apache Airflow 将数据导入 Elasticsearch

作者:来自 Elastic Andre Luiz

了解如何通过 Apache Airflow 将数据导入 Elasticsearch。

Apache Airflow

Apache Airflow 是一个旨在创建、安排(schedule)和监控工作流的平台。它用于编排 ETL(Extract-Transform-Load) 流程、数据管道和其他复杂工作流,提供灵活性和可扩展性。它的可视化界面和实时监控功能使管道管理更易于访问和高效,让你可以跟踪执行的进度和结果。以下是它的四个主要支柱:

  • 动态:管道以 Python 定义,允许动态灵活地生成工作流。
  • 可扩展:Airflow 可以与各种环境集成,可以创建自定义运算符,并可以根据需要执行特定代码。
  • 优雅:管道以干净明确的方式编写。
  • 可扩展:其模块化架构使用消息队列来编排任意数量的工作器。

在实践中,Airflow 可用于以下场景:

  • 数据导入:编排将数据每日提取到 Elasticsearch 等数据库中。
  • 日志监控:管理日志文件的收集和处理,然后在 Elasticsearch 中进行分析以识别错误或异常。
  • 多种数据源集成:将来自不同系统(API、数据库、文件)的信息合并到 Elasticsearch 中的单个层中,简化搜索和报告。

DAG:Directed Acyclic Graphs - 有向无环图

在 Airflow 中,工作流由 DAG(有向无环图)表示。DAG 是一种定义任务执行顺序的结构。DAG 的主要特征是:

  • 由独立任务组成:每个任务代表一个工作单元,旨在独立执行。
  • 排序:任务的执行顺序在 DAG 中明确定义。
  • 可重用性:DAG 旨在重复执行,促进流程自动化。

Airflow 的主要组件

Airflow 生态系统由多个组件组成,它们共同协作以协调任务:

  • 调度程序 - scheduler:负责调度 DAG 并发送任务以供工作人员执行。
  • 执行器 - Exectutor:管理任务的执行,将其委托给工作人员。
  • Web 服务器 - Webserver:提供与 DAG 和任务交互的图形界面。
  • Dags 文件夹 - Dags folder:我们存储用 Python 编写的 DAG 的文件夹。
  • 元数据 - Metadata:作为工具存储库的数据库,由调度程序和执行器用于存储执行状态。

Apache Airflow 和 Elasticsearch

我们将演示如何使用 Apache Airflow 和 Elasticsearch 来协调任务并在 Elasticsearch 中索引结果。此演示的目标是创建一个任务管道来更新 Elasticsearch 索引中的记录。此索引包含电影数据库,用户可以在其中进行评分和分配评级。想象一个每天有数百个评级的场景,有必要保持评级记录更新。为此,将开发一个 DAG,它将每天执行,负责检索新的合并评级并更新索引中的记录。

在 DAG 流程中,我们将有一个获取评级的任务,然后是一个验证结果的任务。如果数据不存在,DAG 将被定向到失败任务。否则,数据将在 Elasticsearch 中编入索引。目标是通过一种带有负责计算分数的机制的方法检索评级,以更新索引中电影的评级字段。

使用 Apache Airflow 和 Elasticsearch 以及 Docker

要创建容器化环境,我们将使用 Apache Airflow 和 Docker。按照 “在 Docker 中运行 Airflow” 指南中的说明实际设置 Airflow。

至于 Elasticsearch,我将使用 Elastic Cloud 上的集群,但如果你愿意,也可以使用 Docker 配置 Elasticsearch。已经创建了一个包含电影目录的索引,其中电影数据已编入索引。这些电影的 “rating” 字段将被更新。

创建 DAG

通过 Docker 安装后,将创建一个文件夹结构,其中包括 dags 文件夹,我们必须将 DAG 文件放在该文件夹中,以便 Airflow 识别它们。

在此之前,我们需要确保安装了必要的依赖项。以下是此项目的依赖项:

pip install apache-airflow apache-airflow-providers-elasticsearch

我们将创建文件 update_ratings_movies.py 并开始编写任务代码。

现在,让我们导入必要的库:

from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook

我们将使用 ElasticsearchPythonHook,这是一个通过抽象连接和使用外部 API 来简化 Airflow 和 Elasticsearch 集群之间集成的组件。

接下来,我们定义 DAG,并指定其主要参数:

  • dag_id:DAG 的名称。
  • start_date:DAG 的启动时间。
  • schedule:定义周期(在我们的例子中是每日)。
  • doc_md:将导入并显示在 Airflow 界面中的文档。

定义任务

现在,让我们定义 DAG 的任务。第一个任务将负责检索电影评级数据。我们将使用 PythonOperator,并将 task_id 设置为“get_movie_ratings”。python_callable 参数将调用负责获取 ratings 的函数。

get_ratings_operator = PythonOperator(task_id='get_movie_ratings',python_callable=get_movie_ratings_task
)

接下来,我们需要验证结果是否有效。为此,我们将使用带有 BranchPythonOperator 的条件。task_id 将为 “validate_result”,python_callable 将调用验证函数。op_args 参数将用于将上一个任务 “get_movie_ratings” 的结果传递给验证函数。

validate_result = BranchPythonOperator(task_id='validate_result',python_callable=validate_result,op_args=["{{ task_instance.xcom_pull(task_ids='get_movie_ratings') }}"]
)

如果验证成功,我们将从 “get_movie_ratings” 任务中获取数据并将其索引到 Elasticsearch 中。为此,我们将创建一个新任务 “index_movie_ratings”,它将使用 PythonOperator。op_args 参数将 “get_movie_ratings” 任务的结果传递给索引函数。

index_ratings_operator = PythonOperator(task_id='index_movie_ratings',python_callable=index_movie_ratings_task,op_args=["{{ task_instance.xcom_pull(task_ids='get_movie_ratings') }}"]
)

如果验证表明失败,DAG 将继续执行失败通知任务。在此示例中,我们只是打印一条消息,但在实际场景中,我们可以配置警报来通知失败。

failed_get_rating_operator = PythonOperator(task_id='failed_get_rating_operator',python_callable=lambda: print('Ratings were False, skipping indexing.')
)

最后,我们定义任务依赖关系,确保它们以正确的顺序执行:

get_ratings_operator >> validate_result >> [index_ratings_operator, failed_get_rating_operator]

以下是我们 DAG 的完整代码:

"""
DAG update Rating Movies
"""
import ast
import randomfrom airflow import DAG
from datetime import datetimefrom airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHookdef index_movie_ratings_task(movies):es_hook = ElasticsearchPythonHook(hosts=None,es_conn_args={"cloud_id": "cloud_id""api_key": "api-key"})es_client = es_hook.get_connactions = []for movie in ast.literal_eval(movies):actions.append({"update": {"_id": movie["id"],"_index": "movies"}})actions.append({"doc": {"rating": movie["rating"]},"doc_as_upsert": True})result = es_client.bulk(operations=actions)print(f"Ingestion completed.")print(result)return Truedef get_movie_ratings_task():movies = [{"id": i, "rating": round(random.uniform(1, 10), 1)}for i in range(1, 100)]return moviesdef validate_result(result):if not result:return 'failed_get_rating_operator'else:return 'index_movie_ratings'with DAG(dag_id="update_ratings_movies_2024",start_date=datetime(2024, 12, 29),schedule="@daily",doc_md=__doc__,
):get_ratings_operator = PythonOperator(task_id='get_movie_ratings',python_callable=get_movie_ratings_task)validate_result = BranchPythonOperator(task_id='validate_result',python_callable=validate_result,op_args=["{{ task_instance.xcom_pull(task_ids='get_movie_ratings') }}"],provide_context=True)index_ratings_operator = PythonOperator(task_id='index_movie_ratings',python_callable=index_movie_ratings_task,op_args=["{{ task_instance.xcom_pull(task_ids='get_movie_ratings') }}"])failed_get_rating_operator = PythonOperator(task_id='failed_get_rating_operator',python_callable=lambda: print('Ratings were False, skipping indexing.'))get_ratings_operator >> validate_result >> [index_ratings_operator, failed_get_rating_operator]

可视化 DAG 执行

在 Apache Airflow 界面中,我们可以可视化 DAG 的执行。只需转到 “DAG” 选项卡并找到你创建的 DAG 即可。

下面,我们可以直观地看到任务的执行情况及其各自的状态。通过选择特定日期的执行,我们可以访问每个任务的日志。请注意,在 index_movie_ratings 任务中,我们可以在索引中看到索引结果,并且它已成功完成。

在其他选项卡中,可以访问有关任务和 DAG 的其他信息,以协助分析和解决潜在问题。

结论

在本文中,我们演示了如何将 Apache Airflow 与 Elasticsearch 集成以创建数据提取解决方案。我们展示了如何配置 DAG、定义负责检索、验证和索引电影数据的任务,以及如何在 Airflow 界面中监控和可视化这些任务的执行。

这种方法可以轻松适应不同类型的数据和工作流,使 Airflow 成为在各种场景中编排数据管道的有用工具。

参考资料:

Apache AirFlow

  • https://airflow.apache.org/

使用 Docker 安装 Apache Airflow

  • https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html

Elasticsearch Python Hook

  • https://airflow.apache.org/docs/apache-airflow-providers-elasticsearch/stable/hooks/elasticsearch_python_hook.html

Python 运算符

  • https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html

想要获得 Elastic 认证?了解下一期 Elasticsearch 工程师培训何时开始!

Elasticsearch 包含许多新功能,可帮助你为你的用例构建最佳搜索解决方案。深入了解我们的示例笔记本以了解更多信息,开始免费云试用,或立即在吗的本地机器上试用 Elastic。

原文:How to ingest data to Elasticsearch through Apache Airflow - Elasticsearch Labs

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/893135.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

通过图形界面展现基于本地知识库构建RAG应用

1. 客户需求 快速完成概念验证(PoC)通过图形界面快速完成演示本地私有数据对比不同模型和成本,决定如何部署 2. 阿里云基于本地知识库构建RAG应用 参考方案: 百炼本地知识库方案 解决方案: FastAPI Gradio Llamaindex qwen-plus 主要三大…

TP4056锂电池充放电芯片教程文章详解·内置驱动电路资源!!!

目录 TP4056工作原理 TP4056引脚详解 TP4056驱动电路图 锂电池充放电板子绘制 编写不易,仅供学习,感谢理解。 TP4056工作原理 TP4056是专门为单节锂电池或锂聚合物电池设计的线性充电器,充电电流可以用外部电阻设定,最大充电…

【Vim Masterclass 笔记21】S09L39:Vim 设置与 vimrc 文件的用法示例(二)

文章目录 S09L39 Vim Settings and the Vimrc File - Part 21 Vim 的配色方案与 color 命令2 map 命令3 示例:用 map 命令快速生成 HTML 代码片段4 Vim 中的 Leader 键5 用 mkvimrc 命令自动生成配置文件 写在前面 本篇为 Vim 自定义配置的第二部分。当中的每个知识…

论文速读|ParGo: Bridging Vision-Language with Partial and Global Views.AAAI25

论文地址:https://arxiv.org/abs/2408.12928 代码地址:https://github.com/bytedance/ParGo bib引用: misc{wang2025pargobridgingvisionlanguagepartial,title{ParGo: Bridging Vision-Language with Partial and Global Views}, author{An…

2024年博客之星年度评选—创作影响力评审入围名单公布

2024年博客之星活动地址https://www.csdn.net/blogstar2024 TOP 300 榜单排名 用户昵称博客主页 身份 认证 评分 原创 博文 评分 平均 质量分评分 互动数据评分 总分排名三掌柜666三掌柜666-CSDN博客1001002001005001wkd_007wkd_007-CSDN博客1001002001005002栗筝ihttps:/…

20250118拿掉荣品pro-rk3566开发板上Android13下在uboot和kernel启动阶段的Rockchip这个LOGO标识

20250118拿掉荣品pro-rk3566开发板上Android13下在uboot和kernel启动阶段的Rockchip这个LOGO标识 2025/1/18 15:12 缘起:做飞凌OK3588-C开发板/核心板【Linux R4】的时候,测试/生产要求没有开机LOGO【飞凌/Rockchip】 要求:黑屏或者中性界面。…

【转】厚植根基,同启新程!一文回顾 2024 OpenHarmony 社区年度工作会议精彩瞬间

在数字化浪潮奔腾不息的今天,开源技术已成为推动科技创新与产业发展的强大引擎。2025年1月10日-11日,OpenAtom OpenHarmony(开放原子开源鸿蒙,以下简称“OpenHarmony”或“开源鸿蒙”)社区2024年度工作会议于深圳盛大启…

Mybatis 进阶 / Mybatis—Puls (详细)

目录 一.动态SQL 1.1标签 1.2 标签 1.3标签 1.4标签 1.5标签 1.6标签 mybatis总结: 二.Mybatis-Puls 2.1准备工作 2.2CRUD单元测试 2.2.1创建UserInfo实体类 2.2.2编写Mapper接⼝类 2.2.3 测试类 2.3 常见注解 2.3.1TableName 2.3.2TableField 2.4打印日…

Go 切片:用法和本质

要想更好的了解一个知识点,实战是最好的经历。 题目 我这里放一道题目: package mainimport "fmt"func SliceRise(s []int) {s append(s, 0)for i : range s {s[i]}fmt.Println(s) }func SlicePrint() {s1 : []int{1, 2}s2 : s1s2 append…

如何下载对应城市的地理json文件

这里采用的是阿里地图工具进行查找: DataV.GeoAtlas地理小工具系列 由阿里云DataV数据可视化团队出品,多年深耕数据可视化领域,数据大屏业务开拓者和领航者。致力用震撼而清晰的视觉语言,让更多人读懂大数据,受惠数据驱动的决策方式 第一步打开网站 : …

AI 大爆发时代,音视频未来路在何方?

AI 大模型突然大火了 回顾2024年,计算机领域最大的变革应该就是大模型进一步火爆了。回顾下大模型的发展历程: 萌芽期:(1950-2005) 1956年:计算机专家约翰麦卡锡首次提出“人工智能”概念,标志…

解决wordpress媒体文件无法被搜索的问题

最近,我在wordpress上遇到了一个令人困扰的问题:我再也无法在 WordPress 的媒体库中搜索媒体文件了。之前,搜索媒体非常方便,但现在无论是图片还是其他文件,似乎都无法通过名称搜索到。对于我这样需要频繁使用图片的博主来说,这简直是个大麻烦。 问题源头 一开始,我怀…

代码随想录训练营第五十一天| 99.岛屿数量 深搜 岛屿数量 广搜 100.岛屿的最大面积

99.岛屿数量 深搜 题目链接:99. 岛屿数量 讲解链接:代码随想录 就是dfs模版题目 在dfs里可以先定义方向数组移动 再遍历分别向四个方向移动 同时记得更新当前nextx nexty 再判断是否越界 再执行判断条件 当前位置未走过 visited[i][j] false 一开始jav…

springboot之YAML语法

目录 一、基本语法 写一个端口号和一个路径 Controller里的方法: 然后这样写才能访问到: 这是在yml里面写的,也可以写在properties里 再访问: 二、值的写法 1.普通类型(数字、字符串、布尔) 例子1: 配置文件…

ASP .NET Core 学习 (.NET 9)- 创建 API项目,并配置Swagger及API 分组或版本

本系列为个人学习 ASP .NET Core学习全过程记录,基于.NET 9 和 VS2022 ,实现前后端分离项目基础框架搭建和部署,以简单、易理解为主,注重页面美观度和后台代码简洁明了,可能不会使用过多的高级语法和扩展,后…

LuaJIT Garbage Collector Algorithms

Explain 本篇文章是对Make Pall发表wili内容《LuaJIT 3.0 new Garbage Collector》的翻译和扩展,因为原文是对LuaJIT 2.x GC重要功能的简介和对LuaJIT 3.0 new GC的工作计划,所以它并不是系统性介绍GC的文章。希望以后能有精力系统性的对LuaJIT 2.x GC做…

ChatGPT大模型极简应用开发-CH1-初识 GPT-4 和 ChatGPT

文章目录 1.1 LLM 概述1.1.1 语言模型和NLP基础1.1.2 Transformer及在LLM中的作用1.1.3 解密 GPT 模型的标记化和预测步骤 1.2 GPT 模型简史:从 GPT-1 到 GPT-41.2.1 GPT11.2.2 GPT21.2.3 GPT-31.2.4 从 GPT-3 到 InstructGPT1.2.5 GPT-3.5、Codex 和 ChatGPT1.2.6 …

基于单片机的直流电机控制系统(论文+源码)

1 系统方案设计 本设计基于单片机的直流电机控制系统的总体架构设计如图2.1所示,其采用STM32F103单片机作为控制器,结合ESP8266 WiFi通信模块、L9110电机驱动电路、OLED液晶、按键等构成整个系统。用户在使用时,可以通过按键或者手机APP设定直…

【Linux】Linux入门(2)常见指令

目录 Linux下的文件ls 指令 --- 展示目录pwd指令 --- 显示当前目录cd 指令 --- 改变工作目录touch指令 --- 创建普通文件stat指令 --- 查看文件属性mkdir指令 --- 创建目录rmdir指令 --- 删除目录rm指令 --- 同时删除文件或目录man指令 --- 访问帮助手册cp指令 复制文件或目录m…

《自动驾驶与机器人中的SLAM技术》ch4:基于预积分和图优化的 GINS

前言:预积分图优化的结构 1 预积分的图优化顶点 这里使用 《自动驾驶与机器人中的SLAM技术》ch4:预积分学 中提到的散装的形式来实现预积分的顶点部分,所以每个状态被分为位姿()、速度、陀螺零偏、加计零偏四种顶点&am…