Airflow:解码Airflow执行日期

执行日期是Apache Airflow(用于编排复杂数据管道的开源平台)的关键概念。掌握执行日期的概念及其对工作流的影响对于构建高效、可靠和可维护的数据管道至关重要。在本实用指南中,我们将深入研究执行日期在气流中的作用,它们的目的,以及如何在您的工作流中处理它们,并提供示例和解释。

执行日期

执行日期定义

执行日期代表了任务实例(Task Instance)逻辑上应该执行的时间点,并不是任务实际开始或结束的时间。它主要用于数据处理的时间边界界定、任务依赖关系的确定以及数据分区等场景,为工作流中的任务提供了一个统一的时间参考基准。
在这里插入图片描述

与调度间隔的关系

Airflow 根据调度间隔(Schedule Interval)来确定执行日期序列。调度间隔定义了 DAG(有向无环图)运行的频率,比如每天、每小时等。例如,一个 DAG 的调度间隔设置为@daily,表示每天运行一次。如果以 2024 年 1 月 1 日为起始日期,那么执行日期序列就是 2024-01-01、2024-01-02、2024-01-03 等,每个执行日期对应一次 DAG 的运行实例。

在任务中的作用

  • 数据分区:在处理大规模数据时,常根据执行日期对数据进行分区。比如,有一个每天处理用户订单数据的任务,可按照执行日期将数据存储在不同的分区中,如/data/orders/year=2024/month=01/day=01对应执行日期为 2024-01-01 的任务数据。
  • 任务依赖:任务之间的依赖关系可以基于执行日期来确定。例如,任务 B 依赖于任务 A 在同一执行日期的数据处理结果,只有当任务 A 在 2024-01-01 这个执行日期完成后,任务 B 才会在相同执行日期开始执行。

理解执行日期

执行日期是一个时间戳,表示DAG运行的逻辑开始时间。它用于:

  1. 定义DAG内的任务处理数据的时间段或间隔。
  2. 控制DAG运行的执行顺序。
  3. 作为内置Airflow变量{{ds}}、{{prev_ds}}、{{next_ds}}的基础。

{{ds}}:代表当前任务实例的执行日期,格式通常为YYYY-MM-DD。它是根据 DAG 的调度间隔和启动时间来确定的。比如一个 DAG 的调度间隔为@daily,从 2024 年 1 月 1 日开始启动,那么在 2024 年 1 月 2 日执行的任务实例中,{{ds}}的值就是2024-01-02

{{prev_ds}}:表示当前执行日期的前一个日期。在上述例子中,2024 年 1 月 2 日执行的任务实例中,{{prev_ds}}的值为2024-01-01。它常用于需要依赖上一个执行日期数据或任务结果的场景。

{{next_ds}}:表示当前执行日期的下一个日期。在 2024 年 1 月 2 日执行的任务实例中,{{next_ds}}的值为2024-01-03。虽然在实际的任务执行中,下一个执行日期的任务会在未来时间点执行,但{{next_ds}}可以用于提前规划或设置一些与未来执行日期相关的参数。

必须注意的是,执行日期不是DAG运行的实际开始时间。实际的开始时间由调度器决定,可能晚于执行日期,具体取决于资源的可用性和DAG的计划。

在流程中处理执行日期

在工作流中,Airflow提供了几种处理执行日期的方法:

  • 内置变量:可以在任务参数、模板或Jinja表达式中使用内置变量{{ds}}、{{prev_ds}}和{{next_ds}}来引用执行日期和周围日期。
from airflow import DAG 
from airflow.operators.dummy import DummyOperator dag = DAG( dag_id='example_dag', start_date=datetime(2025, 1, 1), schedule_interval='@daily' 
) task = DummyOperator( task_id='example_task', dag=dag, execution_timeout='{{ prev_ds }}' ) 

在本例中,DummyOperator的execution_timeout参数将被设置为上一个执行日期,从而允许任务根据执行日期调整超时时间。

  • 任务上下文:使用execution_date键通过任务上下文访问执行日期,这在使用PythonOperator任务或自定义操作符时很有用。
from datetime import datetime 
from airflow import DAG 
from airflow.operators.python import PythonOperator def print_execution_date(**context): execution_date = context['execution_date'] print(f'The execution date is: {execution_date}') dag = DAG( dag_id='example_dag_with_context',start_date=datetime(2025, 1, 1), schedule_interval='@daily' 
) task = PythonOperator( task_id='example_task_with_context', dag=dag, python_callable=print_execution_date, provide_context=True 
) 

在本例中,Python函数print_execution_date接收任务上下文并打印执行日期。

  • 执行日期算术:使用pendulum 库或Python内置的datetime模块来执行日期算术,例如计算时间范围的结束日期或确定两个日期之间的时间差。
import pendulum 
from datetime import datetime, timedelta 
from airflow import DAG 
from airflow.operators.python import PythonOperator def print_date_range(**context): execution_date = context['execution_date'] start_date = execution_date end_date = execution_date + timedelta(days=1) print(f'Date range: {start_date} - {end_date}') dag = DAG( dag_id='example_dag_with_date_arithmetic', start_date=datetime(2025, 1, 1), schedule_interval='@daily' 
) task = PythonOperator( task_id='example_task_with_date_arithmetic', dag=dag, python_callable=print_date_range, provide_context=True 
) 

在本例中,Python函数print_date_range使用timedelta类根据执行日期计算时间范围的结束日期,并打印日期范围。

完整ETL示例

假设我们有一个简单的 ETL 工作流,用于从数据库中提取销售数据,进行转换后加载到数据仓库中,调度间隔为@daily

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta# 定义默认参数
default_args = {'owner': 'airflow','depends_on_past': False,'start_date': datetime(2025, 1, 1),'retries': 1,'retry_delay': timedelta(minutes=5),
}# 创建DAG实例
dag = DAG('sales_etl_dag',default_args=default_args,schedule_interval='@daily'
)# 定义提取数据的任务
def extract_data(execution_date):# 这里的execution_date就是当前任务的执行日期print(f"Extracting sales data for {execution_date}")# 实际代码中,这里会连接数据库并提取对应执行日期的数据# 定义转换数据的任务
def transform_data(execution_date):print(f"Transforming sales data for {execution_date}")# 实际代码中,这里会对提取的数据进行转换处理# 定义加载数据的任务
def load_data(execution_date):print(f"Loading sales data for {execution_date} to data warehouse")# 实际代码中,这里会将转换后的数据加载到数据仓库# 创建提取数据的任务实例
extract_task = PythonOperator(task_id='extract_task',python_callable=extract_data,op_kwargs={'execution_date': '{{ execution_date }}'},dag=dag
)# 创建转换数据的任务实例
transform_task = PythonOperator(task_id='transform_task',python_callable=transform_data,op_kwargs={'execution_date': '{{ execution_date }}'},dag=dag
)# 创建加载数据的任务实例
load_task = PythonOperator(task_id='load_task',python_callable=load_data,op_kwargs={'execution_date': '{{ execution_date }}'},dag=dag
)# 设置任务依赖关系
extract_task >> transform_task >> load_task

在这个示例中,extract_datatransform_dataload_data函数都接收execution_date参数,用于确定处理数据的时间范围。在 Airflow 的 Web 界面或日志中,可以看到每个执行日期对应的任务实例的执行情况,比如在执行日期为 2025-01-02 时,任务会处理 2025-01-02 的销售数据。

管理执行日期的最佳实践

为确保在工作流程中有效和可维护地处理执行日期,请考虑以下最佳实践:

a.基于时间的操作始终使用执行日期:在处理基于时间的任务或数据处理时,依赖于执行日期,因为它为正在处理的时间段提供了一致和准确的参考。

在本例中,Python函数print_date_range使用timedelta类根据执行日期计算时间范围的结束日期,并打印日期范围。

管理执行日期的最佳实践

为确保在工作流程中有效且可维护地处理执行日期,请考虑以下最佳实践:

  • 基于时间的操作始终使用执行日期:在处理基于时间的任务或数据处理时,依赖于执行日期,因为它为正在处理的时间段提供了一致和准确的参考。
  • 避免使用系统时间:避免在任务中使用系统时间(例如datetime.now()),因为它可能导致数据管道中的不一致和难以调试的问题。
  • 注意时区:在处理执行日期时,始终考虑处理数据所在的时区。如有必要,使用pendulum库或Python的datetime模块将执行日期转换为适当的时区。
import pendulum 
from datetime import datetime 
from airflow import DAG 
from airflow.operators.python import PythonOperator def print_local_execution_date(**context): execution_date_utc = context['execution_date'] local_timezone = pendulum.timezone("America/New_York") local_execution_date = execution_date_utc.in_timezone(local_timezone) print(f'Local execution date: {local_execution_date}') dag = DAG( dag_id='example_dag_with_time_zone', start_date=datetime(2023, 1, 1), schedule_interval='@daily' 
) task = PythonOperator( task_id='example_task_with_time_zone', dag=dag, python_callable=print_local_execution_date, provide_context=True 
) 

在本例中,Python函数print_local_execution_date使用pendulum库将执行日期转换为“America/New_York”时区,并打印本地执行日期。

  • 用不同的执行日期测试工作流:确保任务使用不同的执行日期都能正常工作,特别是在处理跨时间界限的任务时,比如月底或年底。

最后总结

执行日期在Apache Airflow中起着至关重要的作用,它为数据管道中正在处理的时间段提供一致和准确的参考。通过掌握工作流中执行日期的处理,你可以构建高效、可靠和可维护的数据管道,这些管道尊重基于时间的依赖关系,并适应不断变化的需求。不断探索Apache Airflow资源和社区支持的丰富生态系统,以提高你对这个强大的数据流程编排平台的技能和知识。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/69035.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

探究 Facebook 隐私安全发展方向,未来走向何方?

随着社交媒体的普及,隐私和数据安全问题成为了全球关注的焦点。Facebook,作为全球最大的社交平台之一,其隐私安全问题尤其引人注目。近年来,随着用户数据泄露事件的不断发生,Facebook 不断调整其隐私政策,探…

ray.rllib 入门实践-2:配置算法

前言: ray.rllib的算法配置方式有多种,网上的不同教程各不相同,有的互不兼容,本文汇总罗列了多种算法配置方式,给出推荐,并在最后给出可运行代码。 四种配置方式 方法1 import os from ray.rllib.algori…

Kaggle入门

title: Kaggle入门 tags: Kaggle abbrlink: 26966 date: 2023-08-19 22:23:36 Kaggle 入门 什么是 Kaggle? Kaggle是一个进行数据挖掘和预测竞赛的在线平台。 从公司的角度,可以提供一些数据,进而提出一个实际需要解决的问题。 从参赛者…

css-设置元素的溢出行为为可见overflow: visible;

1.前言 overflow 属性用于设置当元素的内容溢出其框时如何处理。 2. overflow overflow 属性的一些常见值: 1 visible:默认值。内容不会被剪裁,会溢出元素的框。 2 hidden:内容会被剪裁,不会显示溢出的部分。 3 sc…

状态模式——C++实现

目录 1. 状态模式简介 2. 代码示例 3. 单例状态对象 4. 状态模式与策略模式的辨析 1. 状态模式简介 状态模式是一种行为型模式。 状态模式的定义:状态模式允许对象在内部状态改变时改变它的行为,对象看起来好像修改了它的类。 通俗的说就是一个对象…

Word 中实现方框内点击自动打 √ ☑

注: 本文为 “Word 中方框内点击打 √ ☑ / 打 ☒” 相关文章合辑。 对第一篇增加了打叉部分,第二篇为第一篇中方法 5 “控件” 实现的详解。 在 Word 方框内打 √ 的 6 种技巧 2020-03-09 12:38 使用 Word 制作一些调查表、检查表等,通常…

利用 Three.js 实现 3D 粒子正方体效果

在这篇文章中,我将向大家展示如何使用 Three.js 创建一个带有粒子的 3D 正方体效果。通过这段代码,我们将能够在浏览器中渲染一个 3D 正方体形状,并且该正方体内部填充了大量粒子(可视化效果)。你可以通过鼠标控制视角…

DRF开发避坑指南01

在当今快速发展的Web开发领域,Django REST Framework(DRF)以其强大的功能和灵活性成为了众多开发者的首选。然而,错误的使用方法不仅会导致项目进度延误,还可能影响性能和安全性。本文将从我个人本身遇到的相关坑来给大…

ES设置证书和创建用户,kibana连接es

1、启动好es 2、进入es容器 docker exec -it es /bin/bash 3、生成ca证书 ./bin/elasticsearch-certutil ca 注:两个红方框位置直接回车 4、生成cert证书 ./bin/elasticsearch-certutil cert --ca elastic-stack-ca.p12 注:前两个红框直接回车&am…

一位前端小白的2024总结

目录 简要 一、迷茫点的解决 (1)前端领域该怎么学? (2)旧技术还需要学吗? (3)我该学些什么? 二、折磨点的解决 (1)学技术成果回报太慢怎么…

数据分析学习路线

阶段 1:数学与统计基础 1.1 数学基础 数据分析涉及大量的数学知识,尤其是统计学。虽然你不需要成为数学专家,但一些基本的数学概念对你理解数据分析非常重要。 线性代数: 矩阵运算:理解矩阵乘法、求逆等操作。特征值…

python爬虫 爬取站长素材 (图片)(自学6)

安装 :lxml 地址 : Installing lxml pip install lxml 或者 sudo pip install lxml 下面开始 写代码 下载 站长素材的图片 import urllib.requestfrom lxml import etreeimport osdef create_request(page):if(page 1):url "https://sc.chinaz.…

《OpenCV》——图像透视转换

图像透视转换简介 在 OpenCV 里,图像透视转换属于重要的几何变换,也被叫做投影变换。下面从原理、实现步骤、相关函数和应用场景几个方面为你详细介绍。 原理 实现步骤 选取对应点:要在源图像和目标图像上分别找出至少四个对应的点。这些对…

spring---@Pointcut表达式

spring语法 execution 方法表达式:execution(modifiers-pattern? ret-type-pattern declaring-type-pattern/name-pattern(param-pattern) throws-pattern?) 修饰符匹配(modifier-pattern?):可以省略。代表匹配任意修饰符方法;或者显示…

第十五届蓝桥杯大赛软件赛省赛C/C++ 大学 B 组

第十五届的题目在规定时间内做出了前5道,还有2道找时间再磨一磨。现在把做的一些思路总结如下: 题1:握手问题 问题描述 小蓝组织了一场算法交流会议,总共有 50人参加了本次会议。在会议上,大家进行了握手交流。按照惯例…

Linux - 五种常见I/O模型

I/O操作 (输入/输出操作, Input/Output) 是指计算机与外部设备就行数据交互的过程. 什么是外部设备: 如键盘, 鼠标, 硬盘, 网卡等. 五种常见的 I/O 模型: 阻塞 I/O非阻塞 I/O信号驱动 I/OI/O 多路复用异步 I/O 阻塞 I/O 阻塞 I/O 的特点: 当用户发起 I/O 请求后, 进程/线程就…

问题修复记录:Linux docker 部署 dify,无法调用宿主机本地服务

重磅推荐专栏: 《大模型AIGC》 《课程大纲》 《知识星球》 本专栏致力于探索和讨论当今最前沿的技术趋势和应用领域,包括但不限于ChatGPT和Stable Diffusion等。我们将深入研究大型模型的开发和应用,以及与之相关的人工智能生成内容(AIGC)技术。通过深入的技术解析和实践经…

【UE5插件】RuntimeSpeechRecognizer

作用:语音识别 获取途径: Runtime Audio Importer | Fab 示例蓝图: 如何使用插件 |Georgy 开发文档 UE5.3 RuntimeSpeechRecognizer Streaming Example posted by gtreshchev | blueprintUE | PasteBin For Unreal Engine RuntimeSpeechReco…

2025年最新深度学习环境搭建:Win11+ cuDNN + CUDA + Pytorch +深度学习环境配置保姆级教程

本文目录 一、查看驱动版本1.1 查看显卡驱动1.2 显卡驱动和CUDA对应版本1.3 Pytorch和Python对应的版本1.4 Pytorch和CUDA对应的版本 二、安装CUDA三、安装cuDANN四、安装pytorch五、验证是否安装成功 一、查看驱动版本 1.1 查看显卡驱动 输入命令nvidia-smi可以查看对应的驱…

unity插件Excel转换Proto插件-ExcelToProtobufferTool

unity插件Excel转换Proto插件-ExcelToProtobufferTool **ExcelToProtobufTool 插件文档****1. 插件概述****2. 默认配置类:DefaultIProtoPathConfig****属性说明** **3. 自定义配置类****定义规则****示例代码** **4. 使用方式****4.1 默认路径****4.2 自定义路径**…