用Airflow调度数仓(CK)的ETL脚本

安装


前提:安装了python,我这里是python3

下载

按照官网:
pip3 install apache-airflow
包太多,下载太慢
改为清华的镜像:
pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple apache-airflow
又有包找不到
最后用豆瓣的镜像(注意要加trusted):
pip3 install apache-airflow -i http://pypi.douban.com/simple/ --trusted-host pypi.douban.com
下载非常快。
【报错】
unable to execute 'gcc': No such file or directory
error: command 'gcc' failed with exit status 1

【解决办法】

yum install -y zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gcc make

【报错】
psutil/_psutil_common.c:9:20: fatal error: Python.h: No such file or directory
#include <Python.h>
^
compilation terminated.
error: command 'gcc' failed with exit status 1

【解决办法】

yum install python3-devel

修改后台数据库

  • 首次运行
    安装完以后,先运行一次:
    airflow initdb
    这时会创建好默认的配置文件,在~/airflow.cfg。
  • 修改配置
    将~/airflow.cfg中的:
1.executor = LocalExecutor
2.sql_alchemy_conn =  mysql://root:XX66xxx123@192.168.11.100/airflow?charset=utf8mb4
(**注意:需要提前在mysql中把数据库airflow创建好**)
(如果用pgsql,连接串为:postgresql+psycopg2://user:password@hostname/database_name)
  • 然后再次initdb
    airflow initdb
    【报错】
    ModuleNotFoundError: No module named 'MySQLdb'
    【解决办法】
    pip3 install mysqlclient
    【又报错】
    OSError: mysql_config not found
    【解决办法】

yum install mysql-devel
pip install mysqlclient

启动web GUI

airflow webserver -p 8090 -D
(因为8080被占用了)
![启动成功](https://upload-images.jianshu.io/upload_images/20370486-6ddfcde76c5087fe.png?
imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
这时候就可以在web页面管理了:登录http://127.0.0.1:8090

GUI

启动调度器

airflow scheduler -D

airflow scheduler

启停


重启webserver和scheduler

su airflow
ps -ef|egrep 'scheduler|airflow-webserver'|grep -v grep|awk '{print $2}'|xargs kill -9
rm -rf /root/airflow/airflow-scheduler.pid
airflow webserver -p 8080 -D
airflow scheduler -D
tail -f /root/airflow/airflow-scheduler.err

重启worker
su airflow
ps -ef|egrep 'serve_logs|celeryd'|grep -v grep
rm -rf /home/airflow/airflow/airflow-worker.pid
airflow worker -D
tail -f /home/airflow/airflow/airflow-worker.err 什么也不打印就是没有问题

command layout: command subcommand dag_id task_id date
命令格式: 命令 子命令 dagid 命令id 日期

创建第一个DAG


我的数仓是建设在clickhouse上面的,所以这里我就用一个ETL任务来实验。

  • 注意到在配置文件中,有如下配置
[core]
dags_folder = /root/airflow/dags

所以在/root/airflow/目录下创建dags这个目录,然后把脚本放进去(包括python脚本和shell,sql等都放进去,方便调度):


dags目录结构
  • 照着官方的教学编写第一个dag python,我这里取名叫dag_jw.py:
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

"""
### Tutorial Documentation
Documentation that goes along with the Airflow tutorial located
[here](https://airflow.apache.org/tutorial.html)
"""
# [START ]
from datetime import timedelta,datetime

# [START import_module]
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

import pendulum

local_tz = pendulum.timezone("Asia/Shanghai")

# [END import_module]

# [START default_args]
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'airflow',
    'start_date': datetime(2020, 2, 25, tzinfo=local_tz),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}
# [END default_args]

# [START instantiate_dag]
dag = DAG(
    'dag_jw',
    default_args=default_args,
    description='clickhouse ETL jobs',
    schedule_interval=timedelta(days=1),
    tags=['order'],
)
# [END instantiate_dag]

# t1, t2 and t3 are examples of tasks created by instantiating operators
# [START basic_task]
t1 = BashOperator(
    task_id='imp_ods_dwd_jw_querydoc_order_ps_d',
    bash_command='sh ./scripts/imp_ods_dwd_jw_querydoc_order_ps_d.sh %s'%datetime.now(tz=local_tz).strftime("%Y-%m-%d"),
    dag=dag,
)

t2 = BashOperator(
    task_id='imp_dwd_dws_jw_order_ps_d_querydoc',
    depends_on_past=False,
    bash_command='sh ./scripts/imp_dwd_dws_jw_order_ps_d_querydoc.sh %s'%datetime.now(tz=local_tz).strftime("%Y-%m-%d"),
    retries=3,
    dag=dag,
)
# [END basic_task]

# [START documentation]
dag.doc_md = __doc__

t1.doc_md = """\
#### Task Documentation
订单类别:querydoc  
step:ods层导入dwd
"""
# [END documentation]

t1 >> [t2]
# [END ]

在这个脚本中,我创建了两个task:t1和t2 , 分别调用了./script下面的两个shell。
shell脚本类似于:

#!/usr/bin/bash
dt=$1
echo ${dt}
clickhouse-client --host=192.168.11.100 --user=default --password=XX66xxx123 -m --multiquery -q"
insert into dws.v_dws_jw_order_ps_d(
    dt                   
    ,order_type          
    ,order_id 
...
"

我这里贴出来主要是为了备忘CK的命令行执行SQL的参数(-m -n -q "")。
然后这两个脚本的依赖关系是t2依赖于t1(因为先要从ODS层导入数据到DWD层,再从DWD导数据到DWS,这只是个简化流程,用于测试)
另外请忽略我在脚本中直接使用明文密码。

  • 编好python脚本后,就执行它:
    python3 dag_jw.py
    【报错】
    注意,如果这里遇到报错说
    无法导入from airflow.operators.bash import BashOperator
    就照着我上面改:
    from airflow.operators.bash_operator import BashOperator

执行完毕后,可以用:
airflow list dags看到刚刚创建的dag:

airflow list dags

这样就创建成功了。
当然,去web上面看更方便:
image.png

注意到,这里显示的时间跟我在py文件中定义的时间相差8小时,没关系,因为官方文档上说目前web server只支持UTC时区。 所以看的时候心里面+8小时就行了。

调试

非常尴尬的是,正在我写此文的时候,作业跑完了,但是failed了。我去看看先。
在/root/airflow/logs下面可以找到相关日志(当然在web上也可以):


出错日志

原来是我在python中使用的相对目录找不到。看来是我理解错误,那个.py脚本仅仅是用来创建DAG的,并不是实际运行的,所以我还是改成绝对目录试试。

  • 首先在web上把failed的dag删了,然后重新执行python3 dag_jw.py:


    完美收官

好了,Airflow初次体验,完美收官。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,222评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,455评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,720评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,568评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,696评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,879评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,028评论 3 409
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,773评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,220评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,550评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,697评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,360评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,002评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,782评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,010评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,433评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,587评论 2 350

推荐阅读更多精彩内容