使用AirFlow调度MaxCompute

简介: airflow是Airbnb开源的一个用python编写的调度工具,基于有向无环图(DAG),airflow可以定义一组有依赖的任务,按照依赖依次执行,通过python代码定义子任务,并支持各种Operate操作器,灵活性大,能满足用户的各种需求。本文主要介绍使用Airflow的python Operator调度MaxCompute 任务

背景

airflow是Airbnb开源的一个用python编写的调度工具,基于有向无环图(DAG),airflow可以定义一组有依赖的任务,按照依赖依次执行,通过python代码定义子任务,并支持各种Operate操作器,灵活性大,能满足用户的各种需求。本文主要介绍使用Airflow的python Operator调度MaxCompute 任务

一、环境准备

  • Python 2.7.5  PyODPS支持Python2.6以上版本
  • Airflow apache-airflow-1.10.7

1.安装MaxCompute需要的包

pip install setuptools>=3.0

pip install requests>=2.4.0

pip install greenlet>=0.4.10  # 可选,安装后能加速Tunnel上传。

pip install cython>=0.19.0  # 可选,不建议Windows用户安装。

pip install pyodps

注意:如果requests包冲突,先卸载再安装对应的版本

2.执行如下命令检查安装是否成功

python -c "from odps import ODPS"

二、开发步骤

1.在Airflow家目录编写python调度脚本Airiflow_MC.py

# -*- coding: UTF-8 -*-

import sys

import os

from odps import ODPS

from odps import options

from airflow import DAG

from airflow.operators.python_operator import PythonOperator

from datetime import datetime, timedelta

from configparser import ConfigParser

import time

reload(sys)

sys.setdefaultencoding('utf8')

#修改系统默认编码。

# MaxCompute参数设置

options.sql.settings = {'options.tunnel.limit_instance_tunnel': False, 'odps.sql.allow.fullscan': True}

cfg = ConfigParser()

cfg.read("odps.ini")

print(cfg.items())

odps = ODPS(cfg.get("odps","access_id"),cfg.get("odps","secret_access_key"),cfg.get("odps","project"),cfg.get("odps","endpoint"))

default_args = {

   'owner': 'airflow',

   'depends_on_past': False,

   'retry_delay': timedelta(minutes=5),

   'start_date':datetime(2020,1,15)

   # 'email': ['airflow@example.com'],

   # 'email_on_failure': False,

   # 'email_on_retry': False,

   # 'retries': 1,

   # 'queue': 'bash_queue',

   # 'pool': 'backfill',

   # 'priority_weight': 10,

   # 'end_date': datetime(2016, 1, 1),

}

dag = DAG(

   'Airiflow_MC', default_args=default_args, schedule_interval=timedelta(seconds=30))

def read_sql(sqlfile):

   with io.open(sqlfile, encoding='utf-8', mode='r') as f:

       sql=f.read()

   f.closed

   return sql

def get_time():

   print '当前时间是{}'.format(time.time())

   return time.time()

def mc_job ():

   project = odps.get_project()  # 取到默认项目。

   instance=odps.run_sql("select * from long_chinese;")

   print(instance.get_logview_address())

   instance.wait_for_success()

   with instance.open_reader() as reader:

       count = reader.count

   print("查询表数据条数:{}".format(count))

   for record in reader:

       print record

   return count

t1 = PythonOperator (

   task_id = 'get_time' ,

   provide_context = False ,

   python_callable = get_time,

   dag = dag )

t2 = PythonOperator (

   task_id = 'mc_job' ,

   provide_context = False ,

   python_callable = mc_job ,

   dag = dag )

t2.set_upstream(t1)

2.提交

python Airiflow_MC.py

3.进行测试

# print the list of active DAGs

airflow list_dags

# prints the list of tasks the "tutorial" dag_id

airflow list_tasks Airiflow_MC

# prints the hierarchy of tasks in the tutorial DAG

airflow list_tasks Airiflow_MC --tree

#测试task

airflow test Airiflow_MC get_time 2010-01-16

airflow test Airiflow_MC mc_job 2010-01-16

4.运行调度任务

登录到web界面点击按钮运行

03.png

5.查看任务运行结果

1.点击view log

04.png

2.查看结果

 原文链接
本文为阿里云原创内容,未经允许不得转载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/512726.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

css让背景图片显示透明遮罩_CSS项目测试(支持深色模式)

*事先声明:本文章教程教学、文章封面来源自[CSS]聚光灯项目 by CodingStartup起码课,且已经CodingStartup起码课授权允许转载!为了保持原有风格,本文始终保持与CodingStartup起码课的视频风格一致*在出现同样的代码块时&#xff0…

一文读懂阿里云直播技术是如何实现的

简介: 东京奥运会已落下帷幕。比赛期间,全球亿万观众蜂拥至各大转播平台观看奥运赛事,平台直播能力显的尤为重要。阿里云作为视频直播平台的技术提供商,凭借在产品技术、资源带宽、服务保障等方面优势,可为各大转播平台…

低代码发展专访系列之七:低代码的火爆需要不一样的声音么?

编辑 | 曹芊芊话题:低代码发展系列专访前言:2019年开始,低代码爆火。有人认为它是第四代编程语言,有人认为它是开发模式的颠覆,也有人认为是企业管理模式的变革……有很多声音,社区讨论很热烈。CSDN随后展开…

启动、内存、卡顿三大分析,用户体验就用它?

简介: 启动分析支持通过预置采集和个性化自定义两种方式定义启动阶段,可以分别查询首次启动、冷启动、热启动的情况效果,并可以与设备、系统、版本、地域等维度做交叉筛选查询。 随着大量应用涌入市场加入“App内卷之战”,终端用…

adb echo shell 覆盖_一次写shell脚本的经历记录

点击上方“我的小碗汤”,选择“置顶公众号”精品文章,第一时间送达redis在容器化的过程中,涉及到纵向扩pod实例cpu、内存以及redis实例的maxmemory值,statefulset管理的pod需要重启。所以把redis集群的状态检查放到了健康检查中&a…

当新零售遇上 Serverless

简介: Serverless 的出现给传统企业数字化转型带了更多机遇。 某零售商超行业的龙头企业,其主要业务涵盖购物中心、大卖场、综合超市、标准超市、精品超市、便利店及无人值守智慧商店等零售业态,涉及全渠道零售、仓储物流、餐饮、消费服务、…

如果还不懂如何使用 Consumer 接口,就来看这篇!

作者 | 阿Q来源 | 阿Q说代码背景在开发过程中我遇到这么一个问题:表结构:一张主表A ,一张关联表B ,表 A 中存储着表 B 记录的状态。场景:第一步创建主表数据,插入A表;第二步调用第三方接口插入B…

京东:Flink SQL 优化实战

简介: 本文着重从 shuffle、join 方式的选择、对象重用、UDF 重用等方面介绍了京东在 Flink SQL 任务方面做的优化措施。 本文作者为京东算法服务部的张颖和段学浩,并由 Apache Hive PMC,阿里巴巴技术专家李锐帮忙校对。主要内容为&#xff1…

Spring Boot参数校验以及分组校验的使用

简介: 做web开发基本上每个接口都要对参数进行校验,如果参数比较少,还比较容易处理,一但参数比较多了的话代码中就会出现大量的if-else语句。虽然这种方式简单直接,但会大大降低开发效率和代码可读性。所以我们可以使用…

长文解析:作为容器底层技术的半壁江山, cgroup如何突破并发创建瓶颈?

简介: io_uring 作为一种新型高性能异步编程框架,代表着 Linux 内核未来的方向,当前仍处于快速发展中。阿里云联合 InfoQ 发起《io_uring 介绍及应用实践》的技术公开课,围绕 OpenAnolis 龙蜥社区 Anolis OS 8 全方位解析高性能存…

Orion:谷歌的新一代SDN控制器

作者 | 魏煌松来源 | 鲜枣课堂时至今日,谷歌在2015年公布的成果,“利用SDN将广域网带宽利用率提升至接近100%”,仍然是SDN的一个标杆案列,也是难以逾越的巅峰。但事实上,当时使用的SDN控制器Onix,早已退出了…

移动云正式发布基于龙蜥 Anolis OS 的 BC-Linux V8.2 通用版操作系统

简介: 2020年12月CentOS项目组宣布CentOS 8将于2021年12月31日结束支持,这意味着从2022年开始,使用CentOS 8的用户,将无法得到来自官方的新硬件支持、bug修复和安全补丁。针对这一情况,移动云大云操作系统团队基于国内…

干掉讨厌的 CPU 限流,让容器跑得更快

简介: 让人讨厌的 CPU 限流影响容器运行,有时人们不得不牺牲容器部署密度来避免 CPU 限流出现。本文介绍的 CPU Burst 技术可以帮助您既能保证容器运行服务质量,又不降低容器部署密度。文章分为上下两篇,该文为上篇,下…

微弱信号检测_机动车检测线常用传感器介绍

机动车检测线中经常会运用到各种传感器,这些传感器相当于车辆检测系统的“眼睛”、“鼻子”和“耳朵”,通过台体装置和装在台体中的传感器,能够把车辆的性能数据转换成计算机系统能够识别的信号,供计算机处理和计算,最…

赋能开发者,英特尔发布oneAPI 2022工具包

英特尔发布了oneAPI 2022工具包。此次发布的最新增强版工具包扩展了跨架构开发的特性,为开发者提供更强的实用性和更丰富的架构选择,用以加速计算。 英特尔公司首席技术官、高级副总裁、软件和先进技术事业部总经理 Greg Lavender表示:“我十…

Quick BI V4.0功能“炸弹”来袭,重磅推出即席分析、模板市场、企业微信免密登录等强势功能

简介: 2021年7月,Quick BI公共云版本迭代新功能:重磅推出即席分析、模板市场,分析门槛再降低;推出企业微信无缝对接,移动端类目个性配置及管理提升多端能力;数据建模配置交互升级至拖拽模式提升…

打印速度快点的打印机_瞒着领导偷偷给你们发两台打印机

前几次小粉笔组织的活动都被“投诉”!说我们打印机太少~小粉笔心领神会,在这个月的活动预算费用上悄咪咪加了【两台打印机】~(看小粉笔多疼你们!)希望知道的笔芯不要把这条推文转发给我领导(要不然你们以后就没有打印机了~哼!)现在…

数据库误操作后悔药来了:AnalyticDB PostgreSQL教你实现分布式一致性备份恢复

简介: 本文将介绍AnalyticDB PostgreSQL版备份恢复的原理与使用方法。 一、背景 AnalyticDB PostgreSQL版(简称ADB PG)是阿里云数据库团队基于PostgreSQL内核(简称PG)打造的一款云原生数据仓库产品。在数据实时交互式…

与变异风险词赛跑 阿里探索AI治理网络风险

最近,阿里安全一线风控小二可粒发现,在禁售的风险防控库里,有人试图“上新”新品种,不法份子借助在社交媒体上走红的“魔法改运”等说辞,引人入玄学骗局。 尽量提前发现风险问题,提早布防是阿里安全风控部…

高效研发运维体系构建的流程和方法论

简介: 云计算产品大多都会与云原生发生关联,云原生正在重塑整个软件的生命周期。但到底什么是云原生?云原生带来的最大技术创新和未来机会是什么?围绕云原生,是否可以构建出一套云上的开发&运维体系,打…