Airflow基本,2022-11-12

(2022.11.12)
(airflow==2.4.2)
Airflow是Apache协议下用于开发、schedule、监控批(处理工作)模式的工作流(workflow)平台。工作流在Airflow中被表达为有向无环图(DAG, directed acyclic graph),其中包含若干任务(tasks),还包括依赖和数据流。


airflow workflow demo

DAG表示出了任务间的依赖关系和执行顺序。任务描述了工作内容,可能 包括获取数据、执行分析、触发系统等。

其可扩展的python框架可连接任何技术。提供的web页面用于管理工作流。部署方式多样,可单机部署,也可分布部署。

Components

Airflow包括如下组件

  • a scheduler:用于触发预定的workflow,提交任务以执行等
  • an executor:用于执行任务。默认安装中,executor运行scheduler中的每个任务,而开发环境(production-suitable executors)多数executor将任务执行推送给workers执行
  • a webserver:提供UI用于监视(inspect)、触发和对DAGs与任务的debug,Flask
  • DAG文件夹:由scheduler、executor和workers读取
  • metadata数据库:scheduler、executor和webserver存储数据用
airflow architecture diagram basic

多数executor通常会引入其他组件用于和workers通信,比如任务队列(task queue),但仍然可将executor和workers当做airflow中的同一个逻辑组件,管理(handle)任务执行。

Airflow本身对用户运行的内容不可知(agnostic),可运行用户设置的任何任务。

Scheduler

scheduler执行两个特定任务

  • schedule和触发workflow
  • 将scheduled workflows提交到executor

scheduler在触发task时,为了判断是否触发,会运行一个子线程(subprocess)用于监控DAG文件夹,也就是包括DAGs在内的Python脚本所在的文件夹,遍历DAGs文件夹并执行DAG。默认情况下,scheduler每分钟执行一次查询,可在config文件中修改。

scheduler使用executor运行待运行的task。

Executor

executor负责运行tasks。Airflow安装时每次只有一个executor。executor被定义为Airflowconfig文件(airflow.cfg)的核心部分。

[core]
executor = KubernetesExecutor

executor分两种,local executor和remote executor。local包括DebugExecutor,LocalExecutor和SequentialExecutor。remote executor包括CeleryExecutor和KubernetesExecutor。local用于在本地执行任务,在scheduler线程内部。remote executor在远程执行任务,比如在Kubernets cluster的一个pod中,且常通过一系列workers/worker pool实现。

Metadata Database

元数据库用于保存executor、scheduler和web server的相关数据。默认使用SQLite,Airflow可使用任何SQLAlchemy支持的数据库。一般情况用于常选择PostgreSQL。

Queue

有的DAG中还会借用消息队列/任务队列。一旦识别出即将被激活的任务,scheduler将该任务推送金任务队列等待执行。

Airflow workers从队列中获取任务并执行。

另有web service用Flask实现


airflow DAGs UI

Workloads

DAG中标出了一系列任务,共三类常见任务

  • Operators:预定义任务,可与DAG中的大多数任务绑定(string)
  • Sensors:Operators的特殊子集,仅等待外部事件发生
  • TaskFlow:用@task装饰,定制的Python函数,装饰成一个Task

尽管在Airflow内部,这些其实是BaseOperator的子集(subclass),task和operator的概念也经常混用不做区分,但尽量把他们当做独立概念。Operators和Sensors堪比末班,当你在DAG文件中调用其中一个,都会创建一个task。

Operators最常用的是BashOperatorPythonOperator,分别执行bash命令的任务和Python任务。另DummyOperator,该operator什么都不做,用于组织DAG中的任务,该任务被scheduler evaluated,但不被executor处理。

特点

所有Airflow的工作流都在Python中定义,工作流如同代码一样工作,这种设计的目的包括

  • dynamic:airflow的pipeline被设置为python代码,允许动态pipeline生成
  • extensible:架构包括operators用于连接各种技术。所有airflow的组件都可轻易扩展到适应环境
  • flexible:workflow参数化(parameterization)利用了内建的Jinja模板引擎(workflow parameterization is built-in leveraging the Jinja templating engine)

优势:

  • airflow有operator,可以连接扩展新技术。如果workflow有清晰的起止日期且有规律的间隔,则可以编码为Airflow的DAG(directed acyclic graph)。
  • Airflow工作流需要开发者编码,用Python开发
  • 保存为version control,因此可以roll back回到之前的版本
  • workflow可多人同时协作完成
  • 可测试
  • 组件可扩展
  • 丰富的schedule和执行语法使开发者能轻松定义复杂的pipeline,并定期执行。backfilling回填允许用户在修改逻辑之后,基于历史数据重新运行或运行pipeline。而能重新运行部分pipeline也可以提高效率。
  • UI提供的内容包括pipeline的深度视图、单个任务、pipeline随着时间变化的总览。从UI上可以监控logs和管理任务,比如任务失败后的重新运行。
  • 其开源特性使得你在组件上的提交和贡献可以被外部使用者继续开发、测试和使用。活跃的社区为开发者提供了沟通的平台。

缺点:

  • batch workflow,由CLI和REST API触发,不适用event-based workflows。不是streaming solution,但streaming system比如Kafka经常与airflow同时用于完成任务。Kafka用于数据ingestion和实时处理,事件数据写入存储位,airflow用于周期性的执行批数据处理。

案例分析

ETL案例

从外部数据源获取数据存入GCP的Big query, i.e., data warehouse,并执行一系列转换操作。可构建DAG,其中含两个任务

  • 从外部数据源拷贝数据到BigQuery
  • 执行转换

第二个任务的执行取决于第一个任务是否顺利。

这个case可以用Airflow实现。


airflow arch

Web Server与workers、DAG文件夹、metadata db通信,用于获取任务执行logs,DAG结构和任务的状态。

workers和DAG文件夹通信来推测DAGs的结构和执行任务,也与metadata db通信用于读并存储连接信息、变量和XCOM。

XComs (“Cross-communications”), a system where you can have tasks push and pull small bits of metadata.

Scheduler与DAG文件夹通信以推测DAGs的结构并调度任务,与metadata db通信写入DAG运行和任务的相关信息。与任务队列通信将即将触发的任务推入其中。

案例代码

典型代码案例如下

from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator
# A DAG represents a workflow, a collection of tasks
with DAG(dag_id="demo", start_date=datetime(2022, 1, 1), schedule="0 0 * * *") as dag:
    # Tasks are represented as operators
    hello = BashOperator(task_id="hello", bash_command="echo hello")
    @task()
    def airflow():
        print("airflow")
    # Set dependencies between tasks
    hello >> airflow() # redefine the action of symbol >>

TODO:代码分析

Airflow一些初始化设置

(2022.11.20 Sun)
安装好Airflow之后,i.e.,pip install apache-airflow[==x],首先修改环境变量用于存储配置文件airflow.cfg的路径。
进入文件~/.bashrc,修改环境变量AIRFLOW_HOME为指定路径用于存放配置文件。

export AIRFLOW_HOME=/home/airflow

修改配置文件中,DAGs文件的存放路径。此外,还可将默认的SQLite数据库换成MySQL或PostgreSQL。在更换默认数据库时,注意在对应的数据库中创建相应的账户、db和table以及相应权限。

>> nano /home/airflow/airflow.cfg

dags_folder = /home/airflow/dags
# sql_alchemy_conn = sqlite:////home/airflow/airflow.db
sql_alchemy_conn = mysql+pymysql://airflow:airflow@localhost:3306/airflow

数据库初始化

>> airflow db init
[2022-11-20 11:56:02,860] {migration.py:204} INFO - Context impl SQLiteImpl.
[2022-11-20 11:56:02,860] {migration.py:207} INFO - Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Running stamp_revision  -> b0d31815b5a6
WARNI [airflow.models.crypto] empty cryptography key - values will not be stored encrypted.
Initialization done

直到出现Initialisation done则表示初始化数据库成功。

创建用户,赋Admin权限。

>> airflow users create 
--role Admin 
--username admin 
--email admin 
--firstname admin 
--lastname admin 
--password admin

[2022-11-20 12:05:12,797] {manager.py:824} WARNING - No user yet created, use flask fab command to do it.
[2022-11-20 12:05:15,293] {manager.py:212} INFO - Added user admin
User "admin" created with role "Admin"

启动Airflow

做好如上的配置,可启动Airflow,分别需要启动webserverscheduler。加入选项-D设置为守护进程。

>> airflow webserver -D
>> airflow scheduler -D

启动之后可在浏览器打开Airflow的UI,http://0.0.0.0:8080/home,按之前设定的admin账户密码登录,即可看到Airflow的UI界面。

Airflow UI

关闭webserver和scheduler,因webserver开启多个gunicorn进程,分别关闭相对低效,使用如下命令关闭

# 停止airflow webserver
ps -ef | grep 'airflow' | grep 'webserver' | awk '{print $2}' | xargs kill -9 
cd $AIRFLOW_HOME  
rm -rf airflow-webserver.pid
rm -rf airflow-webserver-monitor.pid

# 停止airflow scheduler  
ps -ef | grep 'airflow' | grep 'scheduler' | awk '{print $2}' | xargs kill -9  
cd $AIRFLOW_HOME  
rm -rf airflow-scheduler.pid

Airflow的常用命令

(2022.12.04 Sun)

# 测试任务,格式:airflow test dag_id task_id execution_time
airflow test test_task test1 2019-09-10

# 查看生效的 dags
airflow list_dags -sd $AIRFLOW_HOME/dags

# 开始运行任务(同web界面点trigger按钮)
airflow trigger_dag test_task    

# 暂停任务
airflow pause dag_id      

# 取消暂停,等同于在web管理界面打开off按钮
airflow unpause dag_id     

# 查看task列表
airflow list_tasks dag_id  查看task列表

# 清空任务状态
airflow clear dag_id       

# 运行task
airflow run dag_id task_id execution_date

Reference

1 Apache airflow official website
2 towardsdatascience, Apache Airflow architecture, Giorgos Myrianthous

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,122评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,070评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,491评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,636评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,676评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,541评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,292评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,211评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,655评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,846评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,965评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,684评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,295评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,894评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,012评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,126评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,914评论 2 355

推荐阅读更多精彩内容

  • 声明:本文转自我的个人博客,有兴趣的可以查看原文。转发请注明来源。 最近工作需要,使用airflow搭建了公司的E...
    此星爷非彼星爷阅读 39,835评论 3 19
  • [TOC] Airflow简介 Apache Airflow是一个提供基于DAG(有向无环图)来编排工作流的、可视...
    端碗吹水阅读 2,251评论 0 4
  • 在快速启动部分中设置很简单,构建生产级环境需要更多的工作,下面来了解一下。 1. 设置配置选项 第一次运行Airf...
    路小漫阅读 9,447评论 0 3
  • 概述 Xxl-Job 简述 XXL-JOB是一个国内轻量级分布式任务调度平台,其核心设计目标是开发迅速、学习简单、...
    centychen阅读 11,266评论 1 12
  • 架构 airflow的架构如下图所示: Workers:执行指定的任务(tasks)。 Scheduler(调度器...
    似水之星阅读 3,247评论 0 0