Python - Airflow任务调度系统初识

1. 概述

Apache Airflow

Airflow是airbnb开源的基于DAG(有向无环图)的用Python开发的任务管理系统。最简单的理解就是一个高级版的crontab,它解决了crontab无法解决的任务依赖问题。

项目于2014年启动,于2015年春季开源,于2016年加入Apache软件基金会的孵化计划。

Airflow提供了丰富的命令行工具用于系统管控,而其web管理界面同样也可以方便的管控调度任务,并且对任务运行状态进行实时监控,方便了系统的运维和管理。

有向无环图

1.1. 应用场景

  • ETL场景,可以管理数据Extract、Transform、Load等操作,以及各个Task之间的依赖,且便于完成Debug、监控和Backfill操作
  • 系统运维工作,可以管理服务器端Crontab作业,以及作业之间的依赖,从而简化运维工作的复杂度,提高运维效率
  • 大数据平台的任务流管理,包含数据分析、数据交换、数据报表生成与发送,等

1.2. 优势

  • 自带web管理界面,易上手
  • 业务代码和调度代码完全解耦
  • 通过python代码定义任务,并支持各种Operate操作器,灵活性大,能满足用户的各种需求
  • python开源项目,支持扩展operate等插件,便于二次开发

1.3. 劣势

  • 对分布式部署并不友好,例如node之间的dag和task同步问题

2. Concepts

2.1. 术语说明

2.1.1. airflow.DAG,实例化之后,称之为Dag RunDag Instance

DAG(Directed Acyclic Graph)是有向无环图,也称为有向无循环图。在Airflow中,一个DAG定义了一个完整的作业。同一个DAG中的所有Task拥有相同的调度时间。

代码样例
from airflow import DAG
from datetime import datetime, timedelta

default_args = {
    'owner' : 'airflow' ,
    'depends_on_past' : False ,
    'start_date' : datetime ( 2015 , 6 , 1 ),
    'email' : [ 'airflow@example.com' ],
    'email_on_failure' : False ,
    'email_on_retry' : False ,
    'retries' : 1 ,
    'retry_delay' : timedelta( minutes = 5 ),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}
dag = DAG('test_dag', default_args=default_args)
参数说明
  • dag_id: 唯一识别DAG,方便日后管理
  • default_args: 默认参数,如果当前DAG实例的作业没有配置相应参数,则采用DAG实例的default_args中的相应参数
  • schedule_interval: 配置DAG的执行周期,可采用crontab语法
  • start_date,作业开始调度时间
  • end_date,作业结束调度时间
  • concurrency, the number of task instances allowed to run concurrently
  • max_active_runs, maximum number of active DAG runs, beyond this number of DAG runs in a running state, the scheduler won't create new active DAG runs
  • on_failure_callback, A function to be called when a DagRun of this dag fails.
  • on_success_callback, A function to be called when a DagRun of this dag succeeds.
  • backfill,填充任务,手动重跑过去失败的任务(指定日期)
  • catchup,如果历史任务出错,调度器尝试按调度顺序重跑历史任务(而不是按照当前时间执行当前任务)。可以在dag中设置dag.catchup = False或者参数文件中设置catchup_by_default = False来禁用这个功能

2.1.2. airflow.operators,实例化之后,称之为Task

操作器,定义任务以哪种方式执行。airflow有多种operator,如BashOperator、DummyOperator、MySqlOperator、HiveOperator以及社区贡献的operator等,其中BaseOperator是所有operator的基础operator。

Operator Name Description
BaseOperator 基础operator,设置baseoperator会影响所有的operator
BashOperator executes a bash command
DummyOperator 空操作
PythonOperator calls an arbitrary Python function
EmailOperator sends an email
HTTPOperator sends an HTTP request
SqlOperator executes a SQL command
DockerOperator execute a command inside a docker container
Sensor waits for a certain time, file, database row, S3 key, etc…
代码样例
from airflow.operators.bash_operator import BashOperator

t1 = BashOperator (
    task_id = 'print_date' ,
    bash_command = 'date' ,
    dag = dag )

t2 = BashOperator (
    task_id = 'sleep' ,
    bash_command = 'sleep 5' ,
    retries = 3 ,
    dag = dag )

t1 >> t2
# t2.set_upstream(t1)
# t1.set_downstream(t2)

Task为DAG中具体的作业任务,依赖于DAG,也就是必须存在于某个DAG中。

Task在DAG中可以配置依赖关系(当然也可以配置跨DAG依赖,但是并不推荐。跨DAG依赖会导致DAG图的直观性降低,并给依赖管理带来麻烦)。

参数说明:

  • dag: 传递一个DAG实例,以使当前作业属于相应DAG
  • task_id: 给任务一个标识符(名字),方便日后管理
  • owner: 任务的拥有者,方便日后管理
  • start_date: 任务的开始时间,即任务将在这个时间点之后开始调度
  • retries: 失败后重试次数
  • trigger_rule

all_success: (default) all parents have succeeded 父task全success

all_failed: all parents are in a failed or upstream_failed state 父task全failed或者upstream_failed状态

all_done: all parents are done with their execution 父task全执行过,不管success or failed

one_failed: fires as soon as at least one parent has failed, it does not wait for all parents to be done 当父task中有一个是failed状态时执行,不必等到所有的父task都执行

one_success: fires as soon as at least one parent succeeds, it does not wait for all parents to be done 当父task中有一个是success状态时执行,不必等到所有的父task都执行

dummy: dependencies are just for show, trigger at will 无条件执行

2.1.3. airflow.executor,即调度方式

在配置文件config/airflow.cfg中,修改executor变量。

  • SequentialExecutor,顺序调度执行,默认执行器,通常只用于测试
  • LocalExecutor,多进程本地调度执行
  • CeleryExecutor,分布式调度执行,生产常用
  • DaskExecutor,在Dask分布式群集中运行Airflow任务,主要用于数据分析

2.1.4. 其他

  • JOB,最上层的工作,分为 SchedulerJob、BackfillJob 和 LocalTaskJob。SchedulerJob 由 Scheduler 创建,BackfillJob 由 Backfill 创建,LocalTaskJob 由前面两种 Job 创建
    在早期版本 Airflow 中,DAG 执行主要有两种完全独立的执行途径:SchedulerJob 和 BackfillJob。在一次较大的重构中增加了 DagRun 方式,以跟踪 DAG 的执行状态

2.2. 服务构成

单节点部署

Webserver

Airflow提供了一个可视化的Web界面。启动 WebServer 后,就可以在 Web 界面上查看定义好的 DAG 并监控及改变运行状况。也可以在 Web 界面中对一些变量进行配置。

image.png

Scheduler

整个 Airflow 的调度由 Scheduler 负责发起,每隔一段时间 Scheduler 就会检查所有定义完成的 DAG 和定义在其中的作业,如果有符合运行条件的作业,Scheduler 就会发起相应的作业任务以供 Worker 接收。

Worker

一般来说我们用 Celery Worker 来执行具体的作业。Worker 可以部署在多台机器上,并可以分别设置接收的队列。当接收的队列中有作业任务时,Worker 就会接收这个作业任务,并开始执行。Airflow 会自动在每个部署 Worker 的机器上同时部署一个 Serve Logs 服务,这样我们就可以在 Web 界面上方便的浏览分散在不同机器上的作业日志了。

Flower

Flower 提供了一个可视化界面以监控所有 Celery Worker 的运行状况。这个服务并不是必要的。

image.png

3. 单机部署与测试

3.1. Install

pip install apache-airflow

如果遇到ImportError: cannot import name 'resolve_types',解决办法:

pip3 install cattrs==1.0.0

参考:https://github.com/apache/airflow/issues/11965

3.2. 启动

# 在airflow目录初始化数据库和airflow配置
airflow db init
# 启动 airflow web
airflow webserver
# 开始调度
airflow scheduler
  • 其他常用命令
# 测试任务,格式:airflow test dag_id task_id execution_time
airflow test test_task test1 2019-09-10
# 查看生效的 DAGs
airflow list_dags -sd $AIRFLOW_HOME/dags
# 开始运行任务(同 web 界面点 trigger 按钮)
airflow trigger_dag test_task    
# 暂停任务
airflow pause dag_id      
# 取消暂停,等同于在 web 管理界面打开 off 按钮
airflow unpause dag_id     
# 查看 task 列表
airflow list_tasks dag_id  查看task列表
# 清空任务状态
airflow clear dag_id       
# 运行task
airflow run dag_id task_id execution_date

3.3. 测试

  • 通过webserver,可以查看样例dagtutorial的源码
# [START tutorial]
# [START import_module]
from datetime import timedelta

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

# [END import_module]

# [START default_args]
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}
# [END default_args]

# [START instantiate_dag]
dag = DAG(
    'tutorial',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
)
# [END instantiate_dag]

# t1, t2 and t3 are examples of tasks created by instantiating operators
# [START basic_task]
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag,
)

t2 = BashOperator(
    task_id='sleep',
    depends_on_past=False,
    bash_command='sleep 5',
    retries=3,
    dag=dag,
)
# [END basic_task]

# [START documentation]
dag.doc_md = __doc__

t1.doc_md = """\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""
# [END documentation]

# [START jinja_template]
templated_command = """
{% for i in range(5) %}
    echo "{{ ds }}"
    echo "{{ macros.ds_add(ds, 7)}}"
    echo "{{ params.my_param }}"
{% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    depends_on_past=False,
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag,
)
# [END jinja_template]

t1 >> [t2, t3]
# [END tutorial]
  • 通过Web页面启动tutorialdag

  • ~/airflow/logs可以查看对应dag的log

  • 通过webserver查看task log,和task执行情况

4. 分布式部署与测试

分布式部署

采用docker部署airflow分布式调度系统,编排方式可以是k8s、swarm,这里采用docker-compose简单实现:
https://github.com/puckel/docker-airflow

$ docker-compose -f docker-compose-CeleryExecutor.yml up -d
$ docker-compose -f docker-compose-CeleryExecutor.yml scale worker=2

5. 参考资料

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,185评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,445评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,684评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,564评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,681评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,874评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,025评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,761评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,217评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,545评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,694评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,351评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,988评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,778评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,007评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,427评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,580评论 2 349

推荐阅读更多精彩内容