1. 概述
Airflow是airbnb开源的基于DAG(有向无环图)的用Python开发的任务管理系统。最简单的理解就是一个高级版的crontab,它解决了crontab无法解决的任务依赖问题。
项目于2014年启动,于2015年春季开源,于2016年加入Apache软件基金会的孵化计划。
Airflow提供了丰富的命令行工具用于系统管控,而其web管理界面同样也可以方便的管控调度任务,并且对任务运行状态进行实时监控,方便了系统的运维和管理。
1.1. 应用场景
- ETL场景,可以管理数据Extract、Transform、Load等操作,以及各个Task之间的依赖,且便于完成Debug、监控和Backfill操作
- 系统运维工作,可以管理服务器端Crontab作业,以及作业之间的依赖,从而简化运维工作的复杂度,提高运维效率
- 大数据平台的任务流管理,包含数据分析、数据交换、数据报表生成与发送,等
1.2. 优势
- 自带web管理界面,易上手
- 业务代码和调度代码完全解耦
- 通过python代码定义任务,并支持各种Operate操作器,灵活性大,能满足用户的各种需求
- python开源项目,支持扩展operate等插件,便于二次开发
1.3. 劣势
- 对分布式部署并不友好,例如node之间的dag和task同步问题
2. Concepts
2.1. 术语说明
2.1.1. airflow.DAG,实例化之后,称之为Dag Run
或Dag Instance
DAG(Directed Acyclic Graph)是有向无环图,也称为有向无循环图。在Airflow中,一个DAG定义了一个完整的作业。同一个DAG中的所有Task拥有相同的调度时间。
代码样例
from airflow import DAG
from datetime import datetime, timedelta
default_args = {
'owner' : 'airflow' ,
'depends_on_past' : False ,
'start_date' : datetime ( 2015 , 6 , 1 ),
'email' : [ 'airflow@example.com' ],
'email_on_failure' : False ,
'email_on_retry' : False ,
'retries' : 1 ,
'retry_delay' : timedelta( minutes = 5 ),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('test_dag', default_args=default_args)
参数说明
- dag_id: 唯一识别DAG,方便日后管理
- default_args: 默认参数,如果当前DAG实例的作业没有配置相应参数,则采用DAG实例的default_args中的相应参数
- schedule_interval: 配置DAG的执行周期,可采用crontab语法
- start_date,作业开始调度时间
- end_date,作业结束调度时间
- concurrency, the number of task instances allowed to run concurrently
- max_active_runs, maximum number of active DAG runs, beyond this number of DAG runs in a running state, the scheduler won't create new active DAG runs
- on_failure_callback, A function to be called when a DagRun of this dag fails.
- on_success_callback, A function to be called when a DagRun of this dag succeeds.
- backfill,填充任务,手动重跑过去失败的任务(指定日期)
- catchup,如果历史任务出错,调度器尝试按调度顺序重跑历史任务(而不是按照当前时间执行当前任务)。可以在dag中设置dag.catchup = False或者参数文件中设置catchup_by_default = False来禁用这个功能
2.1.2. airflow.operators,实例化之后,称之为Task
操作器,定义任务以哪种方式执行。airflow有多种operator,如BashOperator、DummyOperator、MySqlOperator、HiveOperator以及社区贡献的operator等,其中BaseOperator是所有operator的基础operator。
Operator Name | Description |
---|---|
BaseOperator | 基础operator,设置baseoperator会影响所有的operator |
BashOperator | executes a bash command |
DummyOperator | 空操作 |
PythonOperator | calls an arbitrary Python function |
EmailOperator | sends an email |
HTTPOperator | sends an HTTP request |
SqlOperator | executes a SQL command |
DockerOperator | execute a command inside a docker container |
Sensor | waits for a certain time, file, database row, S3 key, etc… |
代码样例
from airflow.operators.bash_operator import BashOperator
t1 = BashOperator (
task_id = 'print_date' ,
bash_command = 'date' ,
dag = dag )
t2 = BashOperator (
task_id = 'sleep' ,
bash_command = 'sleep 5' ,
retries = 3 ,
dag = dag )
t1 >> t2
# t2.set_upstream(t1)
# t1.set_downstream(t2)
Task为DAG中具体的作业任务,依赖于DAG,也就是必须存在于某个DAG中。
Task在DAG中可以配置依赖关系(当然也可以配置跨DAG依赖,但是并不推荐。跨DAG依赖会导致DAG图的直观性降低,并给依赖管理带来麻烦)。
参数说明:
- dag: 传递一个DAG实例,以使当前作业属于相应DAG
- task_id: 给任务一个标识符(名字),方便日后管理
- owner: 任务的拥有者,方便日后管理
- start_date: 任务的开始时间,即任务将在这个时间点之后开始调度
- retries: 失败后重试次数
- trigger_rule
all_success: (default) all parents have succeeded 父task全success
all_failed: all parents are in a failed or upstream_failed state 父task全failed或者upstream_failed状态
all_done: all parents are done with their execution 父task全执行过,不管success or failed
one_failed: fires as soon as at least one parent has failed, it does not wait for all parents to be done 当父task中有一个是failed状态时执行,不必等到所有的父task都执行
one_success: fires as soon as at least one parent succeeds, it does not wait for all parents to be done 当父task中有一个是success状态时执行,不必等到所有的父task都执行
dummy: dependencies are just for show, trigger at will 无条件执行
2.1.3. airflow.executor,即调度方式
在配置文件config/airflow.cfg
中,修改executor
变量。
- SequentialExecutor,顺序调度执行,默认执行器,通常只用于测试
- LocalExecutor,多进程本地调度执行
- CeleryExecutor,分布式调度执行,生产常用
- DaskExecutor,在Dask分布式群集中运行Airflow任务,主要用于数据分析
2.1.4. 其他
- JOB,最上层的工作,分为 SchedulerJob、BackfillJob 和 LocalTaskJob。SchedulerJob 由 Scheduler 创建,BackfillJob 由 Backfill 创建,LocalTaskJob 由前面两种 Job 创建
在早期版本 Airflow 中,DAG 执行主要有两种完全独立的执行途径:SchedulerJob 和 BackfillJob。在一次较大的重构中增加了 DagRun 方式,以跟踪 DAG 的执行状态
2.2. 服务构成
Webserver
Airflow提供了一个可视化的Web界面。启动 WebServer 后,就可以在 Web 界面上查看定义好的 DAG 并监控及改变运行状况。也可以在 Web 界面中对一些变量进行配置。
Scheduler
整个 Airflow 的调度由 Scheduler 负责发起,每隔一段时间 Scheduler 就会检查所有定义完成的 DAG 和定义在其中的作业,如果有符合运行条件的作业,Scheduler 就会发起相应的作业任务以供 Worker 接收。
Worker
一般来说我们用 Celery Worker 来执行具体的作业。Worker 可以部署在多台机器上,并可以分别设置接收的队列。当接收的队列中有作业任务时,Worker 就会接收这个作业任务,并开始执行。Airflow 会自动在每个部署 Worker 的机器上同时部署一个 Serve Logs 服务,这样我们就可以在 Web 界面上方便的浏览分散在不同机器上的作业日志了。
Flower
Flower 提供了一个可视化界面以监控所有 Celery Worker 的运行状况。这个服务并不是必要的。
3. 单机部署与测试
3.1. Install
pip install apache-airflow
如果遇到ImportError: cannot import name 'resolve_types'
,解决办法:
pip3 install cattrs==1.0.0
参考:https://github.com/apache/airflow/issues/11965
3.2. 启动
# 在airflow目录初始化数据库和airflow配置
airflow db init
# 启动 airflow web
airflow webserver
# 开始调度
airflow scheduler
- 其他常用命令
# 测试任务,格式:airflow test dag_id task_id execution_time
airflow test test_task test1 2019-09-10
# 查看生效的 DAGs
airflow list_dags -sd $AIRFLOW_HOME/dags
# 开始运行任务(同 web 界面点 trigger 按钮)
airflow trigger_dag test_task
# 暂停任务
airflow pause dag_id
# 取消暂停,等同于在 web 管理界面打开 off 按钮
airflow unpause dag_id
# 查看 task 列表
airflow list_tasks dag_id 查看task列表
# 清空任务状态
airflow clear dag_id
# 运行task
airflow run dag_id task_id execution_date
3.3. 测试
- 通过webserver,可以查看样例dag
tutorial
的源码
# [START tutorial]
# [START import_module]
from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
# [END import_module]
# [START default_args]
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(2),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
}
# [END default_args]
# [START instantiate_dag]
dag = DAG(
'tutorial',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
)
# [END instantiate_dag]
# t1, t2 and t3 are examples of tasks created by instantiating operators
# [START basic_task]
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag,
)
t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
retries=3,
dag=dag,
)
# [END basic_task]
# [START documentation]
dag.doc_md = __doc__
t1.doc_md = """\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""
# [END documentation]
# [START jinja_template]
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag,
)
# [END jinja_template]
t1 >> [t2, t3]
# [END tutorial]
通过Web页面启动
tutorial
dag在
~/airflow/logs
可以查看对应dag的log通过webserver查看task log,和task执行情况
4. 分布式部署与测试
采用docker部署airflow分布式调度系统,编排方式可以是k8s、swarm,这里采用docker-compose简单实现:
https://github.com/puckel/docker-airflow
$ docker-compose -f docker-compose-CeleryExecutor.yml up -d
$ docker-compose -f docker-compose-CeleryExecutor.yml scale worker=2