airflow 动态创建task
通过http接口获取一个列表结果,遍历列表值,每条记录动态创建一个task
实现方式
动态创建task需要写两个dag实现,auto_rebuild_cube通过http的task获取到需要遍历的列表,提取name到xcom中。
第二个dag文件auto_build 通过 XCom.get_one 方法指定dag文件和execution_date,其中execution_date因为需要指定,所以我这里通过pendulum.now('Asia/Shanghai')直接拿的当前时间。
文件:auto_rebuild_cube
# -*- coding: utf-8 -*-
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.operators.http_operator import SimpleHttpOperator
from airflow import DAG
from airflow.operators.latest_only_operator import LatestOnlyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.settings import json
import common
from common import china_days_ago
result = []
dag = DAG(
dag_id='auto_rebuild_cube',
default_args=common.default_args,
start_date=china_days_ago(1),
description='获取遍历列表',
schedule_interval="9 * * * *",
)
get_all_cubes = SimpleHttpOperator(
task_id="get_all_cubes",
endpoint='kylin/api/cubes',
headers={"Content-Type": "application/json", "Authorization": "Basic QURNSU46S1lMSU4="},
data={"limit": "100"},
method='GET',
xcom_push=True,
http_conn_id=common.global_kylin_http_id
)
def multitasking_task(**kwargs):
xcom = kwargs['task_instance'].xcom_pull(task_ids="get_all_cubes")
for data in json.loads(xcom):
result.append(data['name'])
kwargs['task_instance'].xcom_push(key="cubeNames", value=result)
multi_task = PythonOperator(
task_id='multi_task',
python_callable=multitasking_task,
dag=dag,
provide_context=True
)
push_task = BashOperator(
task_id='push_task',
bash_command="echo {{ task_instance.xcom_pull(key='cubeNames') }} ",
dag=dag
)
trigger_build_cube = TriggerDagRunOperator(
task_id='trigger_build_cube',
trigger_dag_id="build_cube",
python_callable=common.conditionally_trigger,
params={'condition_param': True, 'message': '获取列表成功,即将开始动态创建task'},
dag=dag
)
latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)
latest_only >> get_all_cubes >> multi_task >> push_task >> trigger_build_cube
文件:auto_cube
import json
from airflow import DAG
from airflow.models import XCom
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.http_operator import SimpleHttpOperator
from sqlalchemy_utils.types.enriched_datetime.pendulum_datetime import pendulum
import common
from common import china_days_ago
dag = DAG(dag_id='build_cube',
default_args=common.default_args,
start_date=china_days_ago(1),
schedule_interval=None)
def get_data():
execution_date = pendulum.now('Asia/Shanghai')
print("the execution_date is {}", execution_date)
cube_names = XCom.get_one(dag_id='auto_rebuild_cube', key='cubeNames', execution_date=execution_date,
include_prior_dates=True)
print("cubeNames is {}", cube_names)
return cube_names
def multitasking_task(data):
return SimpleHttpOperator(
task_id="rebuild_cube_{}".format(data),
endpoint='kylin/api/cubes/{}/rebuild'.format(data),
headers={"Content-Type": "application/json", "Authorization": "Basic QURNSU46S1lMSU4="},
data=json.dumps({"buildType": "BUILD"}),
method='PUT',
http_conn_id=common.global_kylin_http_id
)
start = DummyOperator(
task_id="start",
dag=dag
)
end = DummyOperator(
task_id="end",
dag=dag
)
for data in get_data():
start >> [multitasking_task(data)] >> end