🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客
🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。
🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频
PythonOperator可以调用Python函数,由于Python基本可以调用任何类型的任务,如果实在找不到合适的Operator,将任务转为Python函数,使用PythonOperator即可。
关于PythonOperator常用参数如下,更多参数可以查看官网:airflow.operators.python — Airflow Documentation
python_callable(python callable):调用的python函数op_kwargs(dict):调用python函数对应的 **args 参数,dict格式,使用参照案例。op_args(list):调用python函数对应的 *args 参数,多个封装到一个tuple中,list格式,使用参照案例。
PythonOperator调度案例
import random
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator# python中 * 关键字参数允许你传入0个或任意个参数,这些可变参数在函数调用时自动组装为一个tuple。
# python中 ** 关键字参数允许你传入0个或任意个含参数名的参数,这些关键字参数在函数内部自动组装为一个dict。
def print__hello1(*a,**b):print(a)print(b)print("hello airflow1")# 返回的值只会打印到日志中return{"sss1":"xxx1"}def print__hello2(random_base):print(random_base)print("hello airflow2")# 返回的值只会打印到日志中return{"sss2":"xxx2"}default_args = {'owner':'maliu','start_date':datetime(2021, 10, 1),'retries': 1, # 失败重试次数'retry_delay': timedelta(minutes=5) # 失败重试间隔
}dag = DAG(dag_id = 'execute_pythoncode',default_args=default_args,schedule_interval=timedelta(minutes=1)
)first=PythonOperator(task_id='first',#填写 print__hello1 方法时,不要加上“()”python_callable=print__hello1,# op_args 对应 print_hello1 方法中的a参数op_args=[1,2,3,"hello","world"],# op_kwargs 对应 print__hello1 方法中的b参数op_kwargs={"id":"1","name":"zs","age":18},dag = dag
)second=PythonOperator(task_id='second',#填写 print__hello2 方法时,不要加上“()”python_callable=print__hello2,# random_base 参数对应 print_hello2 方法中参数“random_base”op_kwargs={"random_base":random.randint(0,9)},dag=dag
)first >> second