airflow 动态创建task

airflow 动态创建task

通过http接口获取一个列表结果,遍历列表值,每条记录动态创建一个task

实现方式

动态创建task需要写两个dag实现,auto_rebuild_cube通过http的task获取到需要遍历的列表,提取name到xcom中。
第二个dag文件auto_build 通过 XCom.get_one 方法指定dag文件和execution_date,其中execution_date因为需要指定,所以我这里通过pendulum.now('Asia/Shanghai')直接拿的当前时间。

文件:auto_rebuild_cube

# -*- coding: utf-8 -*-
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.operators.http_operator import SimpleHttpOperator

from airflow import DAG
from airflow.operators.latest_only_operator import LatestOnlyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.settings import json

import common
from common import china_days_ago

result = []

dag = DAG(
    dag_id='auto_rebuild_cube',
    default_args=common.default_args,
    start_date=china_days_ago(1),
    description='获取遍历列表',
    schedule_interval="9 * * * *",
)

get_all_cubes = SimpleHttpOperator(
    task_id="get_all_cubes",
    endpoint='kylin/api/cubes',
    headers={"Content-Type": "application/json", "Authorization": "Basic QURNSU46S1lMSU4="},
    data={"limit": "100"},
    method='GET',
    xcom_push=True,
    http_conn_id=common.global_kylin_http_id
)


def multitasking_task(**kwargs):
    xcom = kwargs['task_instance'].xcom_pull(task_ids="get_all_cubes")
    for data in json.loads(xcom):
        result.append(data['name'])
    kwargs['task_instance'].xcom_push(key="cubeNames", value=result)


multi_task = PythonOperator(
    task_id='multi_task',
    python_callable=multitasking_task,
    dag=dag,
    provide_context=True
)

push_task = BashOperator(
    task_id='push_task',
    bash_command="echo {{ task_instance.xcom_pull(key='cubeNames') }} ",
    dag=dag
)

trigger_build_cube = TriggerDagRunOperator(
    task_id='trigger_build_cube',
    trigger_dag_id="build_cube",
    python_callable=common.conditionally_trigger,
    params={'condition_param': True, 'message': '获取列表成功,即将开始动态创建task'},
    dag=dag
)
latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)

latest_only >> get_all_cubes >> multi_task >> push_task >> trigger_build_cube

文件:auto_cube

import json

from airflow import DAG
from airflow.models import XCom
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.http_operator import SimpleHttpOperator
from sqlalchemy_utils.types.enriched_datetime.pendulum_datetime import pendulum

import common
from common import china_days_ago

dag = DAG(dag_id='build_cube',
          default_args=common.default_args,
          start_date=china_days_ago(1),
          schedule_interval=None)


def get_data():
    execution_date = pendulum.now('Asia/Shanghai')
    print("the execution_date is {}", execution_date)
    cube_names = XCom.get_one(dag_id='auto_rebuild_cube', key='cubeNames', execution_date=execution_date,
                              include_prior_dates=True)
    print("cubeNames is {}", cube_names)
    return cube_names


def multitasking_task(data):
    return SimpleHttpOperator(
        task_id="rebuild_cube_{}".format(data),
        endpoint='kylin/api/cubes/{}/rebuild'.format(data),
        headers={"Content-Type": "application/json", "Authorization": "Basic QURNSU46S1lMSU4="},
        data=json.dumps({"buildType": "BUILD"}),
        method='PUT',
        http_conn_id=common.global_kylin_http_id
    )


start = DummyOperator(
    task_id="start",
    dag=dag
)

end = DummyOperator(
    task_id="end",
    dag=dag
)

for data in get_data():
    start >> [multitasking_task(data)] >> end

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,941评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,397评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,345评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,851评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,868评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,688评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,414评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,319评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,775评论 1 315
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,945评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,096评论 1 350
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,789评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,437评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,993评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,107评论 1 271
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,308评论 3 372
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,037评论 2 355

推荐阅读更多精彩内容