airflow-scheduler_job源码分析

airflow-scheduler_job源码分析

基本概念

https://airflow.apache.org/docs/stable/concepts.html#

  • DAG: The work (tasks), and the order in which work should take place (dependencies), written in Python.
  • DAG Run: An instance of a DAG for a particular logical date and time.
  • Operator: A class that acts as a template for carrying out some work.
  • Task: Defines work by implementing an operator, written in Python.
  • Task Instance: An instance of a task - that has been assigned to a DAG and has a state associated with a specific DAG run (i.e for a specific execution_date).
  • execution_date: The logical date and time for a DAG Run and its Task Instances.

状态机

_images/task_lifecycle_diagram.png
  1. No status (scheduler created empty task instance)
  2. Scheduled (scheduler determined task instance needs to run)
  3. Queued (scheduler sent task to executor to run on the queue)
  4. Running (worker picked up a task and is now running it)
  5. Success (task completed)

code

代码部分略有删减,只保留关键code & log
/Users/yangxue.chen/code/python/airflow/airflow/jobs/scheduler_job.py

scheduler_loop.jpg
"""
This SchedulerJob runs for a specific time interval and schedules the jobs
that are ready to run. It figures out the latest runs for each
task and sees if the dependencies for the next schedules are met.
If so, it creates appropriate TaskInstances and sends run commands to the
executor. It does this for each task in each DAG and repeats.
"""

    def _execute(self):
        self.log.info("Starting the scheduler")

        def processor_factory(file_path, zombies):
            return DagFileProcessor()        
        
        self.processor_agent = DagFileProcessorAgent(processor_factory)        
        self._execute_helper()
    
    def _execute_helper(self):
        """
        The actual scheduler loop. The main steps in the loop are:
            #. Harvest DAG parsing results through DagFileProcessorAgent
            #. Find and queue executable tasks
                #. Change task instance state in DB
                #. Queue tasks in executor
            #. Heartbeat executor
                #. Execute queued tasks in executor asynchronously
                #. Sync on the states of running tasks 
        """
        self.executor.start()
        
        self.processor_agent.start()
        
        # For the execute duration, parse and schedule DAGs
        while (timezone.utcnow() - execute_start_time).total_seconds() < \
                self.run_duration or self.run_duration < 0:
            
            # Send tasks for execution if available
            if not self._validate_and_run_task_instances():
                continue   
  • 启动executor
  • 启动agent
  • 发送可运行的task

DagFileProcessorAgent

/Users/yangxue.chen/code/python/airflow/airflow/utils/dag_processing.py

"""
Agent for DAG file processing. It is responsible for all DAG parsing
related jobs in scheduler process. Mainly it can spin up DagFileProcessorManager
in a subprocess, collect DAG parsing results from it and communicate
signal/DAG parsing stat with it.

This class runs in the main `airflow scheduler` process.
"""
    def start(self):
        """
        Launch DagFileProcessorManager processor and start DAG parsing loop in manager.
        """
        self._parent_signal_conn, child_signal_conn = multiprocessing.Pipe()
        self._process = multiprocessing.Process(
            target=type(self)._run_processor_manager,
            args=(child_signal_conn)
        )
        self._process.start()
    
    @staticmethod
    def _run_processor_manager():
        processor_manager = DagFileProcessorManager()
        processor_manager.start()

DagFileProcessorManager

"""
Given a list of DAG definition files, this kicks off several processors
in parallel to process them and put the results to a multiprocessing.Queue
for DagFileProcessorAgent to harvest. The parallelism is limited and as the
processors finish, more are launched. The files are processed over and
over again, but no more often than the specified interval.
"""
    def start(self):
        """
        Use multiple processes to parse and generate tasks for the
        DAGs in parallel. By processing them in separate processes,
        we can get parallelism and isolation from potentially harmful
        user code.
        """
        while True:
            self._refresh_dag_dir()
            self._find_zombies()
            
            simple_dags = self.heartbeat()
        # How often to scan the DAGs directory for new files. Default to 5 minutes.
        self.dag_dir_list_interval = conf.getint('scheduler',
                                                 'dag_dir_list_interval')

def _refresh_dag_dir(self):
    """
    Refresh file paths from dag dir if we haven't done it for too long.
    """
    now = timezone.utcnow()
    elapsed_time_since_refresh = (now - self.last_dag_dir_refresh_time).total_seconds()
    if elapsed_time_since_refresh > self.dag_dir_list_interval:
        self._file_paths = list_py_file_paths(self._dag_directory)
        
        self.last_dag_dir_refresh_time = now

        self.set_file_paths(self._file_paths)

根据配置dag_dir_list_interval周期性地刷新dag文件目录

    self._parallelism = conf.getint('scheduler', 'max_threads')

def heartbeat(self):
    """
    This should be periodically called by the manager loop. This method will
    kick off new processes to process DAG definition files and read the
    results from the finished processors.
    """
    simple_dags = self.collect_results()
    
    if len(self._file_path_queue) == 0:
            files_paths_to_queue = list(set(self._file_paths) -
                                        set(file_paths_in_progress) -
                                        set(file_paths_recently_processed) -
                                        set(files_paths_at_run_limit))

    self.log.debug(
         "Queuing the following files for processing:\n\t%s",
         "\n\t".join(files_paths_to_queue)
    )            
    
    while (self._parallelism - len(self._processors) > 0 and
               len(self._file_path_queue) > 0):       

        file_path = self._file_path_queue.pop(0)
        processor = self._processor_factory(file_path, self._zombies)
        processor.start()        
        
    return simple_dags  

根据配置max_threads并行处理dag文件
DagFileProcessor

"""
Helps call SchedulerJob.process_file() in a separate process
"""
def start(self):
    self._parent_channel, _child_channel = multiprocessing.Pipe()
    self._process = multiprocessing.Process(
        target=type(self)._run_file_processor,
        args=()
    )
    self._process.start()
    
def _run_file_processor():
    scheduler_job = SchedulerJob(dag_ids=dag_id_white_list, log=log)
    result = scheduler_job.process_file()    
    result_channel.send(result)   
def process_file(self, file_path, zombies, pickle_dags=False, session=None):
    """
    Process a Python file containing Airflow DAGs.

    This includes:

    1. Execute the file and look for DAG objects in the namespace.
    2. Pickle the DAG and save it to the DB (if necessary).
    3. For each DAG, see what tasks should run and create appropriate task
    instances in the DB.
    4. Record any errors importing the file into ORM
    5. Kill (in ORM) any task instances belonging to the DAGs that haven't
    issued a heartbeat in a while.

    Returns a list of SimpleDag objects that represent the DAGs found in
    the file
    """    
    dagbag = models.DagBag(file_path, include_examples=False)  

    # Save individual DAGs in the ORM and update DagModel.last_scheduled_time
    for dag in dagbag.dags.values():
       dag.sync_to_db() 
    
    paused_dag_ids = [dag.dag_id for dag in dagbag.dags.values() if dag.is_paused]
    
    for dag_id in dagbag.dags:
    # Only return DAGs that are not paused
       if dag_id not in paused_dag_ids:
          dag = dagbag.get_dag(dag_id)
          simple_dags.append(SimpleDag(dag)
    
    ti_keys_to_schedule = []
    self._process_dags(dagbag, dags, ti_keys_to_schedule)
    
    for ti_key in ti_keys_to_schedule:
        # Only schedule tasks that have their dependencies met, e.g. to avoid
        # a task that recently got its state changed to RUNNING from somewhere
        # other than the scheduler from getting its state overwritten.
        if ti.are_dependencies_met():
            # Task starts out in the scheduled state. All tasks in the
            # scheduled state will be sent to the executor
            ti.state = State.SCHEDULED

找到需要调度的task,将task状态置为scheduled,等待发送到executor执行,State.SCHEDULED

def _process_dags(self, dagbag, dags, tis_out):
    """
    Iterates over the dags and processes them. Processing includes:

    1. Create appropriate DagRun(s) in the DB.
    2. Create appropriate TaskInstance(s) in the DB.
    3. Send emails for tasks that have missed SLAs.
    """
    for dag in dags:
        if dag.is_paused:
            continue        
        
        dag_run = self.create_dag_run(dag)   
        self._process_task_instances(dag, tis_out) 
def create_dag_run(self, dag, session=None):
    """
    This method checks whether a new DagRun needs to be created
    for a DAG based on scheduling interval.
    Returns DagRun if one is scheduled. Otherwise returns None.
    """
    next_run = dag.create_dagrun()
    return next_run

def create_dagrun():
    run = DagRun()
    session.add(run)
    session.commit()
       
    # create the associated task instances
    # state is None at the moment of creation
    run.verify_integrity(session=session)
    run.refresh_from_db()
    return run

def verify_integrity(self, session=None):
    """
    Verifies the DagRun by checking for removed tasks or tasks that are not in the
    database yet. It will set state to removed or add the task if required.
    """

    # check for removed or restored tasks
    task_ids = []
    for ti in tis:
        task_ids.append(ti.task_id)
        
        try:
            task = dag.get_task(ti.task_id)
        except AirflowException:
            if ti.state == State.REMOVED:
                pass  # ti has already been removed, just ignore it
            elif self.state i`s not State.RUNNING and not dag.partial:
                ti.state = State.REMOVED
        
        is_task_in_dag = task is not None
        should_restore_task = is_task_in_dag and ti.state == State.REMOVED
        if should_restore_task:
            ti.state = State.NONE
            
   # check for missing tasks
    # 全部task -> ti,一次创建dag下所有的ti,状态none
   for task in six.itervalues(dag.task_dict):
       if task.start_date > self.execution_date and not self.is_backfill:
          continue

       if task.task_id not in task_ids:
          ti = TaskInstance(task, self.execution_date)
          session.add(ti)

   session.commit()  

创建dag_run,并检查dag_run task完整性, State.NONE

# In order to be able to get queued a task must have one of these states
SCHEDULEABLE_STATES = {
    State.NONE,
    State.UP_FOR_RETRY,
    State.UP_FOR_RESCHEDULE,
}

def _process_task_instances(self, dag, task_instances_list, session=None):
    """
    This method schedules the tasks for a single DAG by looking at the
    active DAG runs and adding task instances that should run to the
    queue.
    """
    dag_runs = DagRun.find(dag_id=dag.dag_id, state=State.RUNNING, session=session)
    active_dag_runs = []
    for run in dag_runs:   
        # todo: preferably the integrity check happens at dag collection time
        run.verify_integrity(session=session)
        
        if run.state == State.RUNNING:
            active_dag_runs.append(run)    
            
    for run in active_dag_runs:
        tis = run.get_task_instances(state=SCHEDULEABLE_STATES)
        
        # this loop is quite slow as it uses are_dependencies_met for
        # every task (in ti.is_runnable). This is also called in
        # update_state above which has already checked these tasks        
        for ti in tis:
            if ti.are_dependencies_met()
                task_instances_list.append(ti.key)
                
def are_dependencies_met():
    """
    Returns whether or not all the conditions are met for this task instance to be run
    given the context for the dependencies (e.g. a task instance being force run from
    the UI will ignore some dependencies).
    """

找到running的dag_run下SCHEDULEABLE_STATES task,并校验执行依赖

exec

/Users/yangxue.chen/code/python/airflow/airflow/jobs/scheduler_job.py

def _validate_and_run_task_instances(self, simple_dag_bag):
    self._process_and_execute_tasks(simple_dag_bag)

    # Call heartbeats
    self.log.debug("Heartbeating the executor")
    self.executor.heartbeat()

    self._change_state_for_tasks_failed_to_execute()

    # Process events from the executor
    self._process_executor_events(simple_dag_bag)
    return True

def _process_and_execute_tasks(self, simple_dag_bag):
    # Handle cases where a DAG run state is set (perhaps manually) to
    # a non-running state. Handle task instances that belong to
    # DAG runs in those states
    # If a task instance is up for retry but the corresponding DAG run
    # isn't running, mark the task instance as FAILED so we don't try
    # to re-run it.
    self._change_state_for_tis_without_dagrun()
    # If a task instance is scheduled or queued or up for reschedule,
    # but the corresponding DAG run isn't running, set the state to
    # NONE so we don't try to re-run it.
    self._change_state_for_tis_without_dagrun()
    
    self._execute_task_instances(simple_dag_bag,(State.SCHEDULED,))

def _execute_task_instances(simple_dag_bag,states):
    """
    Attempts to execute TaskInstances that should be executed by the scheduler.

    There are three steps:
    1. Pick TIs by priority with the constraint that they are in the expected states
    and that we do exceed max_active_runs or pool limits.
    2. Change the state for the TIs above atomically.
    3. Enqueue the TIs in the executor.    
    """
    executable_tis = self._find_executable_task_instances(states)
    
    def query(result, items):
        simple_tis_with_state_changed = \
            self._change_state_for_executable_task_instances(items,states)

        self._enqueue_task_instances_with_queued_state(
                simple_dag_bag,
                simple_tis_with_state_changed)
        session.commit()
        return result + len(simple_tis_with_state_changed)    

    return helpers.reduce_in_chunks(query, executable_tis, 0, self.max_tis_per_query)

def _change_state_for_executable_task_instances(task_instances,acceptable_states):
    """
    Changes the state of task instances in the list with one of the given states
    to QUEUED atomically, and returns the TIs changed in SimpleTaskInstance format.
    """
    
    # set TIs to queued state
    for task_instance in tis_to_set_to_queued:
        task_instance.state = State.QUEUED
        task_instance.queued_dttm = timezone.utcnow()
    
    simple_task_instances = [SimpleTaskInstance(ti) for ti in tis_to_set_to_queued]
    self.log.info("Setting the following %s tasks to queued state:\n\t%s")    
    return simple_task_instances

def _enqueue_task_instances_with_queued_state(simple_task_instances):
    """
    Takes task_instances, which should have been set to queued, and enqueues them
    with the executor.
    """
    TI = models.TaskInstance
    # actually enqueue them
    for simple_task_instance in simple_task_instances:
        command = TI.generate_command()
        log.info("Sending %s to executor with priority %s and queue %s")

        self.executor.queue_command()

找到scheduled的task,将状态置为queued,并执行,State.QUEUED

/Users/yangxue.chen/code/python/airflow/airflow/executors/base_executor.py

    def queue_command(self, simple_task_instance, command, priority=1, queue=None):
        key = simple_task_instance.key
        if key not in self.queued_tasks and key not in self.running:
            self.log.info("Adding to queue: %s", command)
            self.queued_tasks[key] = (command, priority, queue, simple_task_instance)
        else:
            self.log.info("could not queue task %s", key)

send task to executor的实现就是将task加到executor的queued_tasks中排队

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,463评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,868评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,213评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,666评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,759评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,725评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,716评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,484评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,928评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,233评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,393评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,073评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,718评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,308评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,538评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,338评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,260评论 2 352