Apache Zeppelin结合Apache Airflow使用1

Apache Zeppelin结合Apache Airflow使用1

文章目录

  • Apache Zeppelin结合Apache Airflow使用1
  • 前言
  • 一、安装Airflow
  • 二、使用步骤
    • 1.目标
    • 2.编写DAG
    • 2.加载、执行DAG
  • 总结


前言

之前学了Zeppelin的使用,今天开始结合Airflow串任务。

Apache Airflow和Apache Zeppelin是两个不同的工具,各自用于不同的目的。Airflow用于编排和调度工作流,而Zeppelin是一个交互式数据分析和可视化的笔记本工具。虽然它们有不同的主要用途,但可以结合使用以满足一些复杂的数据处理和分析需求。

下面是一些结合使用Airflow和Zeppelin的方式:

  1. Airflow调度Zeppelin Notebooks:

    • 使用Airflow编写调度任务,以便在特定时间或事件触发时运行Zeppelin笔记本。
    • 在Airflow中使用Zeppelin的REST API或CLI命令来触发Zeppelin笔记本的执行。
  2. 数据流管道:

    • 使用Airflow编排数据处理和转换任务,例如从数据源提取数据、清理和转换数据。
    • 在Zeppelin中创建笔记本,用于进一步的数据分析、可视化和报告生成。
    • Airflow任务完成后,触发Zeppelin笔记本执行以基于最新数据执行分析。
  3. 参数传递:

    • 通过Airflow参数传递,将一些参数值传递给Zeppelin笔记本,以便在不同任务之间共享信息。
    • Zeppelin笔记本可以从Airflow任务中获取参数值,以适应特定的数据分析需求。
  4. 日志和监控:

    • 使用Airflow监控工作流的运行情况,查看任务的日志和执行状态。
    • 在Zeppelin中记录和可视化Airflow工作流的关键指标,以获得更全面的工作流性能洞察。
  5. 整合数据存储:

    • Airflow可以用于从不同数据源中提取数据,然后将数据传递给Zeppelin进行进一步的分析。
    • Zeppelin可以使用Airflow任务生成的数据,进行更深入的数据挖掘和分析。

结合使用Airflow和Zeppelin能够充分发挥它们各自的优势,实现更全面、可控和可视化的数据处理和分析工作流。


一、安装Airflow

安装参考:
https://airflow.apache.org/docs/apache-airflow/stable/start.html

CentOS 7.9安装后启动会报错,还需要配置下sqlite,参考:https://airflow.apache.org/docs/apache-airflow/2.8.0/howto/set-up-database.html#setting-up-a-sqlite-database

[root@slas bin]# airflow standalone
Traceback (most recent call last):File "/root/.pyenv/versions/3.9.10/bin/airflow", line 5, in <module>from airflow.__main__ import mainFile "/root/.pyenv/versions/3.9.10/lib/python3.9/site-packages/airflow/__init__.py", line 52, in <module>from airflow import configuration, settingsFile "/root/.pyenv/versions/3.9.10/lib/python3.9/site-packages/airflow/configuration.py", line 2326, in <module>conf.validate()File "/root/.pyenv/versions/3.9.10/lib/python3.9/site-packages/airflow/configuration.py", line 718, in validateself._validate_sqlite3_version()File "/root/.pyenv/versions/3.9.10/lib/python3.9/site-packages/airflow/configuration.py", line 824, in _validate_sqlite3_versionraise AirflowConfigException(
airflow.exceptions.AirflowConfigException: error: SQLite C library too old (< 3.15.0). See https://airflow.apache.org/docs/apache-airflow/2.8.0/howto/set-up-database.html#setting-up-a-sqlite-database

二、使用步骤

1.目标

我想做个简单的demo,包括两个节点,实现如图所示功能,读取csv,去重:
在这里插入图片描述
csv文件输入在airflow上实现,去重在zeppelin上实现。

2.编写DAG

先实现extract_data_script.py,做个简单的读取csv指定列数据写入新的csv文件。

import argparse
import pandas as pddef extract_and_write_data(date, output_csv, columns_to_extract):# 读取指定列的数据csv_file_path = f"/home/works/datasets/data_{date}.csv"df = pd.read_csv(csv_file_path, usecols=columns_to_extract)# 将数据写入新的 CSV 文件df.to_csv(output_csv, index=False)if __name__ == "__main__":parser = argparse.ArgumentParser()parser.add_argument("--date", type=str, required=True, help="Date parameter passed by Airflow")args = parser.parse_args()# 输出 CSV 文件路径(替换为实际的路径)output_csv_path = "/home/works/output/extracted_data.csv"# 指定要提取的列columns_to_extract = ['column1', 'column2', 'column3']# 调用函数进行数据提取和写入extract_and_write_data(args.date, output_csv_path, columns_to_extract)

然后在 Zeppelin 中创建一个 Python 笔记本(Notebook),其中包含被 Airflow DAG 调用的代码。加载先前从 output/extracted_data.csv 文件中提取的数据:

%python# 导入必要的库
import pandas as pd# 加载先前从 CSV 文件中提取的数据
csv_file_path = "/home/works/output/extracted_data.csv"
# 读取 CSV 文件
df = pd.read_csv(csv_file_path)# 过滤掉 column1 为空的行
df = df[df['column1'].notnull()]# 去重,以 column2、column3 字段为联合去重依据
deduplicated_df = df.drop_duplicates(subset=["column2", "column3"])# 保存去重后的结果到新的 CSV 文件
deduplicated_df.to_csv("/home/works/output/dd_data.csv", index=False)

将这个 Zeppelin 笔记本保存,并记住笔记本的paragraph ID, Airflow DAG 需要使用这个 ID 来调用 Zeppelin 笔记本。

接下来,用VSCode编写zeppelin_integration.py代码如下,上传到$AIRFLOW_HOME/dags目录下:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedeltadefault_args = {'owner': 'airflow','depends_on_past': False,'start_date': datetime(2024, 1, 1),'email_on_failure': False,'email_on_retry': False,'retries': 1,'retry_delay': timedelta(minutes=5),
}dag = DAG('zeppelin_integration',default_args=default_args,schedule=timedelta(days=1),
)extract_data_task = BashOperator(task_id='extract_data',bash_command='python /home/works/z/extract_data_script.py --date {{ ds }}',dag=dag,
)run_zeppelin_notebook_task = BashOperator(task_id='run_zeppelin_notebook',bash_command='curl -X POST -HContent-Type:application/json http://IP:PORT/api/notebook/run/2JND7T68E/paragraph_1705372327640_1111015359',dag=dag,
)# Set the task dependencies
extract_data_task >> run_zeppelin_notebook_task

2.加载、执行DAG

如下命令进行测试,先执行下代码看看语法是否都正确,然后list出tasks,并逐一test:

# python zeppelin_integration.py # airflow tasks list zeppelin_integration
extract_data
run_zeppelin_notebook# airflow tasks test zeppelin_integration extract_data 20240122
[2024-01-22T08:57:45.805+0800] {dagbag.py:538} INFO - Filling up the DagBag from /root/airflow/dags
[2024-01-22T08:57:47.853+0800] {taskinstance.py:1957} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: zeppelin_integration.extract_data __airflow_temporary_run_2024-01-22T00:57:47.740537+00:00__ [None]>
[2024-01-22T08:57:47.860+0800] {taskinstance.py:1957} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: zeppelin_integration.extract_data __airflow_temporary_run_2024-01-22T00:57:47.740537+00:00__ [None]>
[2024-01-22T08:57:47.861+0800] {taskinstance.py:2171} INFO - Starting attempt 1 of 2
[2024-01-22T08:57:47.861+0800] {taskinstance.py:2250} WARNING - cannot record queued_duration for task extract_data because previous state change time has not been saved
[2024-01-22T08:57:47.862+0800] {taskinstance.py:2192} INFO - Executing <Task(BashOperator): extract_data> on 2024-01-20T00:00:00+00:00
[2024-01-22T08:57:47.900+0800] {taskinstance.py:2481} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='zeppelin_integration' AIRFLOW_CTX_TASK_ID='extract_data' AIRFLOW_CTX_EXECUTION_DATE='2024-01-20T00:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='__airflow_temporary_run_2024-01-22T00:57:47.740537+00:00__'
[2024-01-22T08:57:47.904+0800] {subprocess.py:63} INFO - Tmp dir root location: /tmp
[2024-01-22T08:57:47.905+0800] {subprocess.py:75} INFO - Running command: ['/bin/bash', '-c', 'python /home/works/z/extract_data_script.py --date 2024-01-20']
[2024-01-22T08:57:47.914+0800] {subprocess.py:86} INFO - Output:
[2024-01-22T08:57:48.553+0800] {subprocess.py:97} INFO - Command exited with return code 0
[2024-01-22T08:57:48.632+0800] {taskinstance.py:1138} INFO - Marking task as SUCCESS. dag_id=zeppelin_integration, task_id=extract_data, execution_date=20240120T000000, start_date=, end_date=20240122T005748# airflow tasks test zeppelin_integration run_zeppelin_notebook 20240122
[2024-01-22T09:01:43.665+0800] {dagbag.py:538} INFO - Filling up the DagBag from /root/airflow/dags
[2024-01-22T09:01:45.835+0800] {taskinstance.py:1957} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: zeppelin_integration.run_zeppelin_notebook __airflow_temporary_run_2024-01-22T01:01:45.733341+00:00__ [None]>
[2024-01-22T09:01:45.843+0800] {taskinstance.py:1957} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: zeppelin_integration.run_zeppelin_notebook __airflow_temporary_run_2024-01-22T01:01:45.733341+00:00__ [None]>
[2024-01-22T09:01:45.844+0800] {taskinstance.py:2171} INFO - Starting attempt 1 of 2
[2024-01-22T09:01:45.844+0800] {taskinstance.py:2250} WARNING - cannot record queued_duration for task run_zeppelin_notebook because previous state change time has not been saved
[2024-01-22T09:01:45.845+0800] {taskinstance.py:2192} INFO - Executing <Task(BashOperator): run_zeppelin_notebook> on 2024-01-22T00:00:00+00:00
[2024-01-22T09:01:45.904+0800] {taskinstance.py:2481} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='zeppelin_integration' AIRFLOW_CTX_TASK_ID='run_zeppelin_notebook' AIRFLOW_CTX_EXECUTION_DATE='2024-01-22T00:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='__airflow_temporary_run_2024-01-22T01:01:45.733341+00:00__'
[2024-01-22T09:01:45.909+0800] {subprocess.py:63} INFO - Tmp dir root location: /tmp
[2024-01-22T09:01:45.910+0800] {subprocess.py:75} INFO - Running command: ['/bin/bash', '-c', 'curl -X POST -HContent-Type:application/json http://100.100.30.220:8181/api/notebook/run/2JND7T68E/paragraph_1705372327640_1111015359']
[2024-01-22T09:01:45.921+0800] {subprocess.py:86} INFO - Output:
[2024-01-22T09:01:45.931+0800] {subprocess.py:93} INFO -   % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
[2024-01-22T09:01:45.931+0800] {subprocess.py:93} INFO -                                  Dload  Upload   Total   Spent    Left  Speed
100    50  100    50    0     0      8      0  0:00:06  0:00:06 --:--:--    12
[2024-01-22T09:01:52.003+0800] {subprocess.py:93} INFO - {"status":"OK","body":{"code":"SUCCESS","msg":[]}}
[2024-01-22T09:01:52.003+0800] {subprocess.py:97} INFO - Command exited with return code 0
[2024-01-22T09:01:52.098+0800] {taskinstance.py:1138} INFO - Marking task as SUCCESS. dag_id=zeppelin_integration, task_id=run_zeppelin_notebook, execution_date=20240122T000000, start_date=, end_date=20240122T010152

最后用命令airflow scheduler将它添加到airflow里。

# airflow scheduler____________       _________________    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /_/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2024-01-22T09:28:21.829+0800] {task_context_logger.py:63} INFO - Task context logging is enabled
[2024-01-22T09:28:21.831+0800] {executor_loader.py:115} INFO - Loaded executor: SequentialExecutor
[2024-01-22T09:28:21.868+0800] {scheduler_job_runner.py:808} INFO - Starting the scheduler
[2024-01-22T09:28:21.869+0800] {scheduler_job_runner.py:815} INFO - Processing each file at most -1 times
。。。

页面上会增加一个DAG,如图:
在这里插入图片描述
在Actions里可以点击执行。


总结

以上就是今天要讲的内容,总体来说集成两个工具还是很方便的,期待后面更多的应用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/642581.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C语言数据结构(3)——线性表其二(单链表)

欢迎来到博主的专栏——C语言数据结构 博主id&#xff1a;代码小豪 文章目录 单链表不连续存储的线性表单链表单链表的结构头指针单链表的操作打印单链表 空链表单链表的插入尾插法 头插法 单链表的查找任意位置处的节点插入单链表节点的删除 销毁链表 单链表 顺序表是一个物…

万字长文详解Java线程池面试题

王有志&#xff0c;一个分享硬核 Java 技术的互金摸鱼侠 加入 Java 人的提桶跑路群&#xff1a;共同富裕的Java人 今天是《面霸的自我修养》第 6 篇文章&#xff0c;我们一起来看看面试中会问到哪些关于线程池的问题吧。数据来源&#xff1a; 大部分来自于各机构&#xff08;J…

【K8S】Kubernetes 中滚动发布由浅入深实战

目录 一、Kubernetes中滚动发布的需求背景1.1 滚动发布1.2 滚动发布、蓝绿发布、金丝雀发布的区别 二、Kubernetes中实现滚动发布2.1 定义Kubernetes中的版本2.2 创建 Deployment 资源对象2.2.1 在 Yaml 中定义 Deployment 资源对象2.2.2 执行命令创建 Deployment 资源对象 三、…

Asp.net core 框架入门

概述 appsettings.json&#xff1a;配置文件&#xff0c;数据库连接字符串配置信息 Program.cs&#xff1a;程序入口文件&#xff08;里面有个Main方法&#xff09; Startup.cs&#xff1a;启动配置文件 依赖项&#xff1a;管理项目所依赖的第三方组件的安装&#xff0c;配…

WampServer

开发笔记 推荐链接php无法保存SESSION问题部署SSL时候产生的问题 推荐链接 链接目录 php无法保存SESSION问题 php.ini文件和phpForApache.ini 文件 里面都有 对路径的控制&#xff0c;相关路径问题可能也需要进行修改&#xff0c;打开文件搜索wamp64或wamp 就可以看到了&…

“深入理解RabbitMQ交换机的原理与应用“

深入理解RabbitMQ交换机的原理与应用 引言1. RabbitMQ交换机简介介绍1.1 什么是RabbitMQ&#xff1f;1.1.1 消息中间件的作用1.1.2 RabbitMQ的特点和优势 1.2 RabbitMQ的基本概念1.2.1 队列1.2.2 交换机1.2.3 路由键 1.3 交换机的作用和分类1.3.1 直连交换机&#xff08;direct…

VS Code Json格式化插件-JSON formatter

&#x1f9aa;整个文件格式化 按快捷键Shift Alt F &#x1f96a;仅格式化选择内容 需要选择完整的json段落即&#xff1a;{} 或 [] 括起来的部分&#xff0c;再按快捷键Ctrl K F

社区公益培训系统功能说明

社区公益培训系统功能说明 本系统将用于社区面向居民开展的公益培训课程展示&#xff0c;在线报名&#xff0c;并按班级排课上课&#xff0c;上课时学员要扫码签到&#xff0c;经常旷课的学员将禁止再报名其他课程。 1. 用户注册与登录 - 提供用户注册和登录功能&#xff0c;…

鸿蒙不再兼容安卓,鸿蒙开发薪资高达4w+,程序员是否需转行鸿蒙?

鸿蒙系统的崛起 鸿蒙系统的推出经历了长时间的研发和完善&#xff0c;它是一款自主研发的操作系统&#xff0c;集成了最新的技术和创新理念。该系统具备卓越的安全性、兼容性和扩展性&#xff0c;因此备受关注。最初&#xff0c;鸿蒙系统主要应用于华为手机产品&#xff0c;但…

惬意上手Python —— 装饰器和内置函数

1. Python装饰器 Python中的装饰器是一种特殊类型的函数&#xff0c;它允许用户在不修改原函数代码的情况下&#xff0c;增加或修改函数的行为。 具体来说,装饰器的工作原理基于Python的函数也是对象这一事实&#xff0c;可以被赋值给变量、作为参数传递给其他函数或者作为其他…

比较有创意的网站

有创意的网站通常展示了独特的设计、交互或内容。以下是一些备受赞誉的有创意的网站&#xff0c;你可以参考&#xff1a; Awwwards: Awwwards 是一个评选并展示全球最优秀网站的平台。你可以在这里找到很多有创意的网站设计。 Awwwards CSS Design Awards: 类似于Awwwards&…

3d gaussian splatting笔记(paper部分翻译)

本文为3DGS paper的部分翻译。 基于点的&#x1d6fc;混合和 NeRF 风格的体积渲染本质上共享相同的图像形成模型。 具体来说&#xff0c;颜色 &#x1d436; 由沿射线的体积渲染给出&#xff1a; 其中密度 &#x1d70e;、透射率 &#x1d447; 和颜色 c 的样本是沿着射线以…

VSCode插件 —— Cody AI (免费AI助手!)

之前介绍过一款 阿里云免费的AI开发工具——通义灵码 TONGYI Lingma 本文再推荐一个可以极大提高开发前端开发效率的工具 —— Cody AI &#xff08;Sourcegraph&#xff09;&#xff0c;同样是免费的&#xff01; 不过&#xff0c;使用Cody AI需要有github 或 Google 、 git…

vue3相比vue2的效率提升

1、静态提升 2、预字符串化 3、缓存事件处理函数 4、Block Tree 5、PatchFlag 一、静态提升 在vue3中的app.vue文件如下&#xff1a; 在服务器中&#xff0c;template中的内容会变异成render渲染函数。 最终编译后的文件&#xff1a; 1.静态节点优化 那么这里为什么是两部分…

内网安全管理系统(保密管理系统)

在当今信息化的时代&#xff0c;企业的内网已经成为其核心资产的重要组成部分。 随着企业的快速发展和信息化程度的提升&#xff0c;内网安全问题日益凸显&#xff0c;如何保障内网的安全和机密信息的保密性&#xff0c;已经成为企业亟待解决的问题。 内网安全管理系统(保密管…

现在的小年轻真的卷得过分了,真是完全不把自己当人啊

现在的小年轻真的卷得过分了&#xff0c;真是完全不把自己当人啊 都说00后躺平了&#xff0c;但是有一说一&#xff0c;该卷的还是卷。这不&#xff0c;前段时间我们公司来了个00后&#xff0c;工作都没两年&#xff0c;跳槽到我们公司起薪18K&#xff0c;都快接近我了。后来才…

常用电子器件学习——MOS管

MOS管介绍 MOS&#xff0c;是MOSFET的缩写。MOSFET 金属-氧化物半导体场效应晶体管&#xff0c;简称金氧半场效晶体管&#xff08;Metal-Oxide-Semiconductor Field-Effect Transistor, MOSFET&#xff09;。 一般是金属(metal)—氧化物(oxide)—半导体(semiconductor)场效应晶…

RabbitMQ消息应答与发布

消息应答 RabbitMQ一旦向消费者发送了一个消息,便立即将该消息,标记为删除. 消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个很长的任务并仅仅执行了一半就突然挂掉了,在这种情况下,我们将丢失正在处理的消息,后续给消费者发送的消息也就无法接收到了. 为了…

OpenHarmony驱动消息机制管理

驱动消息机制管理 当用户态应用和内核态驱动需要交互时&#xff0c;可以使用HDF框架的消息机制来实现。 消息机制的功能主要有以下两种&#xff1a; 用户态应用发送消息到驱动。 用户态应用接收驱动主动上报事件。 配置管理 HCS&#xff08;HDF Configuration Source&…

深入分析 Linux 网络丢包问题

热门IT课程【视频教程】-华为/思科/红帽/oraclehttps://xmws-it.blog.csdn.net/article/details/134398330 所谓丢包&#xff0c;是指在网络数据的收发过程中&#xff0c;由于种种原因&#xff0c;数据包还没传输到应用程序中&#xff0c;就被丢弃了。这些被丢弃包的数量&#…