使用 AirFlow 调度 MaxCompute,


背景

airflow是Airbnb开源的一个用python编写的调度工具,基于有向无环图(DAG),airflow可以定义一组有依赖的任务,按照依赖依次执行,通过python代码定义子任务,并支持各种Operate操作器,灵活性大,能满足用户的各种需求。本文主要介绍使用Airflow的python Operator调度MaxCompute 任务

一、环境准备

Python 2.7.5 PyODPS支持Python2.6以上版本
Airflow apache-airflow-1.10.7

1.安装MaxCompute需要的包

pip install setuptools>=3.0

pip install requests>=2.4.0

pip install greenlet>=0.4.10 # 可选,安装后能加速Tunnel上传。

pip install cython>=0.19.0 # 可选,不建议Windows用户安装。

pip install pyodps

注意:如果requests包冲突,先卸载再安装对应的版本

2.执行如下命令检查安装是否成功

python -c "from odps import ODPS"

二、开发步骤

1.在Airflow家目录编写python调度脚本Airiflow_MC.py

  1. # -*- coding: UTF-8 -*- 
  2.  
  3. import sys 
  4.  
  5. import os 
  6.  
  7. from odps import ODPS 
  8.  
  9. from odps import options 
  10.  
  11. from airflow import DAG 
  12.  
  13. from airflow.operators.python_operator import PythonOperator 
  14.  
  15. from datetime import datetime, timedelta 
  16.  
  17. from configparser import ConfigParser 
  18.  
  19. import time 
  20.  
  21. reload(sys) 
  22.  
  23. sys.setdefaultencoding('utf8') 
  24.  
  25. #修改系统默认编码。 
  26.  
  27. # MaxCompute参数设置 
  28.  
  29. options.sql.settings = {'options.tunnel.limit_instance_tunnel': False, 'odps.sql.allow.fullscan': True} 
  30.  
  31. cfg = ConfigParser() 
  32.  
  33. cfg.read("odps.ini") 
  34.  
  35. print(cfg.items()) 
  36.  
  37. odps = ODPS(cfg.get("odps","access_id"),cfg.get("odps","secret_access_key"),cfg.get("odps","project"),cfg.get("odps","endpoint")) 

  1. default_args = { 
  2.  
  3. 'owner': 'airflow', 
  4.  
  5. 'depends_on_past': False, 
  6.  
  7. 'retry_delay': timedelta(minutes=5), 
  8.  
  9. 'start_date':datetime(2020,1,15) 
  10.  
  11. # 'email': ['airflow@example.com'], 
  12.  
  13. # 'email_on_failure': False, 
  14.  
  15. # 'email_on_retry': False, 
  16.  
  17. # 'retries': 1, 
  18.  
  19. # 'queue': 'bash_queue', 
  20.  
  21. # 'pool': 'backfill', 
  22.  
  23. # 'priority_weight': 10, 
  24.  
  25. # 'end_date': datetime(2016, 1, 1), 
  26.  
  27.  
  28. dag = DAG( 
  29.  
  30. 'Airiflow_MC', default_args=default_args, schedule_interval=timedelta(seconds=30)) 
  31.  
  32. def read_sql(sqlfile): 
  33.  
  34. with io.open(sqlfile, encoding='utf-8', mode='r') as f: 
  35.  
  36. sql=f.read() 
  37.  
  38. f.closed 
  39.  
  40. return sql 
  41.  
  42. def get_time(): 
  43.  
  44. print '当前时间是{}'.format(time.time()) 
  45.  
  46. return time.time() 
  47.  
  48. def mc_job (): 
  49.  
  50. project = odps.get_project() # 取到默认项目。 
  51.  
  52. instance=odps.run_sql("select * from long_chinese;") 
  53.  
  54. print(instance.get_logview_address()) 
  55.  
  56. instance.wait_for_success() 
  57.  
  58. with instance.open_reader() as reader: 
  59.  
  60. count = reader.count 
  61.  
  62. print("查询表数据条数:{}".format(count)) 
  63.  
  64. for record in reader: 
  65.  
  66. print record 
  67.  
  68. return count 
  69.  
  70. t1 = PythonOperator ( 
  71.  
  72. task_id = 'get_time' , 
  73.  
  74. provide_context = False , 
  75.  
  76. python_callable = get_time, 
  77.  
  78. dag = dag ) 
  79.  
  80. t2 = PythonOperator ( 
  81.  
  82. task_id = 'mc_job' , 
  83.  
  84. provide_context = False , 
  85.  
  86. python_callable = mc_job , 
  87.  
  88. dag = dag ) 
  89.  
  90. t2.set_upstream(t1) 

2.提交

  1. python Airiflow_MC.py 

3.进行测试

  1. # print the list of active DAGs 
  2.  
  3. airflow list_dags 
  4.  
  5. # prints the list of tasks the "tutorial" dag_id 
  6.  
  7. airflow list_tasks Airiflow_MC 
  8.  
  9. # prints the hierarchy of tasks in the tutorial DAG 
  10.  
  11. airflow list_tasks Airiflow_MC --tree 
  12.  
  13. #测试task 
  14.  
  15. airflow test Airiflow_MC get_time 2010-01-16 
  16.  
  17. airflow test Airiflow_MC mc_job 2010-01-16 

4.运行调度任务

登录到web界面点击按钮运行

5.查看任务运行结果

1.点击view log

2.查看结果

评论关闭