airflow-scheduler_job源码分析
基本概念
https://airflow.apache.org/docs/stable/concepts.html#
- DAG: The work (tasks), and the order in which work should take place (dependencies), written in Python.
- DAG Run: An instance of a DAG for a particular logical date and time.
- Operator: A class that acts as a template for carrying out some work.
- Task: Defines work by implementing an operator, written in Python.
- Task Instance: An instance of a task - that has been assigned to a DAG and has a state associated with a specific DAG run (i.e for a specific execution_date).
- execution_date: The logical date and time for a DAG Run and its Task Instances.
状态机
- No status (scheduler created empty task instance)
- Scheduled (scheduler determined task instance needs to run)
- Queued (scheduler sent task to executor to run on the queue)
- Running (worker picked up a task and is now running it)
- Success (task completed)
code
代码部分略有删减,只保留关键code & log
/Users/yangxue.chen/code/python/airflow/airflow/jobs/scheduler_job.py
"""
This SchedulerJob runs for a specific time interval and schedules the jobs
that are ready to run. It figures out the latest runs for each
task and sees if the dependencies for the next schedules are met.
If so, it creates appropriate TaskInstances and sends run commands to the
executor. It does this for each task in each DAG and repeats.
"""
def _execute(self):
self.log.info("Starting the scheduler")
def processor_factory(file_path, zombies):
return DagFileProcessor()
self.processor_agent = DagFileProcessorAgent(processor_factory)
self._execute_helper()
def _execute_helper(self):
"""
The actual scheduler loop. The main steps in the loop are:
#. Harvest DAG parsing results through DagFileProcessorAgent
#. Find and queue executable tasks
#. Change task instance state in DB
#. Queue tasks in executor
#. Heartbeat executor
#. Execute queued tasks in executor asynchronously
#. Sync on the states of running tasks
"""
self.executor.start()
self.processor_agent.start()
# For the execute duration, parse and schedule DAGs
while (timezone.utcnow() - execute_start_time).total_seconds() < \
self.run_duration or self.run_duration < 0:
# Send tasks for execution if available
if not self._validate_and_run_task_instances():
continue
- 启动executor
- 启动agent
- 发送可运行的task
DagFileProcessorAgent
/Users/yangxue.chen/code/python/airflow/airflow/utils/dag_processing.py
"""
Agent for DAG file processing. It is responsible for all DAG parsing
related jobs in scheduler process. Mainly it can spin up DagFileProcessorManager
in a subprocess, collect DAG parsing results from it and communicate
signal/DAG parsing stat with it.
This class runs in the main `airflow scheduler` process.
"""
def start(self):
"""
Launch DagFileProcessorManager processor and start DAG parsing loop in manager.
"""
self._parent_signal_conn, child_signal_conn = multiprocessing.Pipe()
self._process = multiprocessing.Process(
target=type(self)._run_processor_manager,
args=(child_signal_conn)
)
self._process.start()
@staticmethod
def _run_processor_manager():
processor_manager = DagFileProcessorManager()
processor_manager.start()
DagFileProcessorManager
"""
Given a list of DAG definition files, this kicks off several processors
in parallel to process them and put the results to a multiprocessing.Queue
for DagFileProcessorAgent to harvest. The parallelism is limited and as the
processors finish, more are launched. The files are processed over and
over again, but no more often than the specified interval.
"""
def start(self):
"""
Use multiple processes to parse and generate tasks for the
DAGs in parallel. By processing them in separate processes,
we can get parallelism and isolation from potentially harmful
user code.
"""
while True:
self._refresh_dag_dir()
self._find_zombies()
simple_dags = self.heartbeat()
# How often to scan the DAGs directory for new files. Default to 5 minutes.
self.dag_dir_list_interval = conf.getint('scheduler',
'dag_dir_list_interval')
def _refresh_dag_dir(self):
"""
Refresh file paths from dag dir if we haven't done it for too long.
"""
now = timezone.utcnow()
elapsed_time_since_refresh = (now - self.last_dag_dir_refresh_time).total_seconds()
if elapsed_time_since_refresh > self.dag_dir_list_interval:
self._file_paths = list_py_file_paths(self._dag_directory)
self.last_dag_dir_refresh_time = now
self.set_file_paths(self._file_paths)
根据配置dag_dir_list_interval周期性地刷新dag文件目录
self._parallelism = conf.getint('scheduler', 'max_threads')
def heartbeat(self):
"""
This should be periodically called by the manager loop. This method will
kick off new processes to process DAG definition files and read the
results from the finished processors.
"""
simple_dags = self.collect_results()
if len(self._file_path_queue) == 0:
files_paths_to_queue = list(set(self._file_paths) -
set(file_paths_in_progress) -
set(file_paths_recently_processed) -
set(files_paths_at_run_limit))
self.log.debug(
"Queuing the following files for processing:\n\t%s",
"\n\t".join(files_paths_to_queue)
)
while (self._parallelism - len(self._processors) > 0 and
len(self._file_path_queue) > 0):
file_path = self._file_path_queue.pop(0)
processor = self._processor_factory(file_path, self._zombies)
processor.start()
return simple_dags
根据配置max_threads并行处理dag文件
DagFileProcessor
"""
Helps call SchedulerJob.process_file() in a separate process
"""
def start(self):
self._parent_channel, _child_channel = multiprocessing.Pipe()
self._process = multiprocessing.Process(
target=type(self)._run_file_processor,
args=()
)
self._process.start()
def _run_file_processor():
scheduler_job = SchedulerJob(dag_ids=dag_id_white_list, log=log)
result = scheduler_job.process_file()
result_channel.send(result)
def process_file(self, file_path, zombies, pickle_dags=False, session=None):
"""
Process a Python file containing Airflow DAGs.
This includes:
1. Execute the file and look for DAG objects in the namespace.
2. Pickle the DAG and save it to the DB (if necessary).
3. For each DAG, see what tasks should run and create appropriate task
instances in the DB.
4. Record any errors importing the file into ORM
5. Kill (in ORM) any task instances belonging to the DAGs that haven't
issued a heartbeat in a while.
Returns a list of SimpleDag objects that represent the DAGs found in
the file
"""
dagbag = models.DagBag(file_path, include_examples=False)
# Save individual DAGs in the ORM and update DagModel.last_scheduled_time
for dag in dagbag.dags.values():
dag.sync_to_db()
paused_dag_ids = [dag.dag_id for dag in dagbag.dags.values() if dag.is_paused]
for dag_id in dagbag.dags:
# Only return DAGs that are not paused
if dag_id not in paused_dag_ids:
dag = dagbag.get_dag(dag_id)
simple_dags.append(SimpleDag(dag)
ti_keys_to_schedule = []
self._process_dags(dagbag, dags, ti_keys_to_schedule)
for ti_key in ti_keys_to_schedule:
# Only schedule tasks that have their dependencies met, e.g. to avoid
# a task that recently got its state changed to RUNNING from somewhere
# other than the scheduler from getting its state overwritten.
if ti.are_dependencies_met():
# Task starts out in the scheduled state. All tasks in the
# scheduled state will be sent to the executor
ti.state = State.SCHEDULED
找到需要调度的task,将task状态置为scheduled,等待发送到executor执行,State.SCHEDULED
def _process_dags(self, dagbag, dags, tis_out):
"""
Iterates over the dags and processes them. Processing includes:
1. Create appropriate DagRun(s) in the DB.
2. Create appropriate TaskInstance(s) in the DB.
3. Send emails for tasks that have missed SLAs.
"""
for dag in dags:
if dag.is_paused:
continue
dag_run = self.create_dag_run(dag)
self._process_task_instances(dag, tis_out)
def create_dag_run(self, dag, session=None):
"""
This method checks whether a new DagRun needs to be created
for a DAG based on scheduling interval.
Returns DagRun if one is scheduled. Otherwise returns None.
"""
next_run = dag.create_dagrun()
return next_run
def create_dagrun():
run = DagRun()
session.add(run)
session.commit()
# create the associated task instances
# state is None at the moment of creation
run.verify_integrity(session=session)
run.refresh_from_db()
return run
def verify_integrity(self, session=None):
"""
Verifies the DagRun by checking for removed tasks or tasks that are not in the
database yet. It will set state to removed or add the task if required.
"""
# check for removed or restored tasks
task_ids = []
for ti in tis:
task_ids.append(ti.task_id)
try:
task = dag.get_task(ti.task_id)
except AirflowException:
if ti.state == State.REMOVED:
pass # ti has already been removed, just ignore it
elif self.state i`s not State.RUNNING and not dag.partial:
ti.state = State.REMOVED
is_task_in_dag = task is not None
should_restore_task = is_task_in_dag and ti.state == State.REMOVED
if should_restore_task:
ti.state = State.NONE
# check for missing tasks
# 全部task -> ti,一次创建dag下所有的ti,状态none
for task in six.itervalues(dag.task_dict):
if task.start_date > self.execution_date and not self.is_backfill:
continue
if task.task_id not in task_ids:
ti = TaskInstance(task, self.execution_date)
session.add(ti)
session.commit()
创建dag_run,并检查dag_run task完整性, State.NONE
# In order to be able to get queued a task must have one of these states
SCHEDULEABLE_STATES = {
State.NONE,
State.UP_FOR_RETRY,
State.UP_FOR_RESCHEDULE,
}
def _process_task_instances(self, dag, task_instances_list, session=None):
"""
This method schedules the tasks for a single DAG by looking at the
active DAG runs and adding task instances that should run to the
queue.
"""
dag_runs = DagRun.find(dag_id=dag.dag_id, state=State.RUNNING, session=session)
active_dag_runs = []
for run in dag_runs:
# todo: preferably the integrity check happens at dag collection time
run.verify_integrity(session=session)
if run.state == State.RUNNING:
active_dag_runs.append(run)
for run in active_dag_runs:
tis = run.get_task_instances(state=SCHEDULEABLE_STATES)
# this loop is quite slow as it uses are_dependencies_met for
# every task (in ti.is_runnable). This is also called in
# update_state above which has already checked these tasks
for ti in tis:
if ti.are_dependencies_met()
task_instances_list.append(ti.key)
def are_dependencies_met():
"""
Returns whether or not all the conditions are met for this task instance to be run
given the context for the dependencies (e.g. a task instance being force run from
the UI will ignore some dependencies).
"""
找到running的dag_run下SCHEDULEABLE_STATES task,并校验执行依赖
exec
/Users/yangxue.chen/code/python/airflow/airflow/jobs/scheduler_job.py
def _validate_and_run_task_instances(self, simple_dag_bag):
self._process_and_execute_tasks(simple_dag_bag)
# Call heartbeats
self.log.debug("Heartbeating the executor")
self.executor.heartbeat()
self._change_state_for_tasks_failed_to_execute()
# Process events from the executor
self._process_executor_events(simple_dag_bag)
return True
def _process_and_execute_tasks(self, simple_dag_bag):
# Handle cases where a DAG run state is set (perhaps manually) to
# a non-running state. Handle task instances that belong to
# DAG runs in those states
# If a task instance is up for retry but the corresponding DAG run
# isn't running, mark the task instance as FAILED so we don't try
# to re-run it.
self._change_state_for_tis_without_dagrun()
# If a task instance is scheduled or queued or up for reschedule,
# but the corresponding DAG run isn't running, set the state to
# NONE so we don't try to re-run it.
self._change_state_for_tis_without_dagrun()
self._execute_task_instances(simple_dag_bag,(State.SCHEDULED,))
def _execute_task_instances(simple_dag_bag,states):
"""
Attempts to execute TaskInstances that should be executed by the scheduler.
There are three steps:
1. Pick TIs by priority with the constraint that they are in the expected states
and that we do exceed max_active_runs or pool limits.
2. Change the state for the TIs above atomically.
3. Enqueue the TIs in the executor.
"""
executable_tis = self._find_executable_task_instances(states)
def query(result, items):
simple_tis_with_state_changed = \
self._change_state_for_executable_task_instances(items,states)
self._enqueue_task_instances_with_queued_state(
simple_dag_bag,
simple_tis_with_state_changed)
session.commit()
return result + len(simple_tis_with_state_changed)
return helpers.reduce_in_chunks(query, executable_tis, 0, self.max_tis_per_query)
def _change_state_for_executable_task_instances(task_instances,acceptable_states):
"""
Changes the state of task instances in the list with one of the given states
to QUEUED atomically, and returns the TIs changed in SimpleTaskInstance format.
"""
# set TIs to queued state
for task_instance in tis_to_set_to_queued:
task_instance.state = State.QUEUED
task_instance.queued_dttm = timezone.utcnow()
simple_task_instances = [SimpleTaskInstance(ti) for ti in tis_to_set_to_queued]
self.log.info("Setting the following %s tasks to queued state:\n\t%s")
return simple_task_instances
def _enqueue_task_instances_with_queued_state(simple_task_instances):
"""
Takes task_instances, which should have been set to queued, and enqueues them
with the executor.
"""
TI = models.TaskInstance
# actually enqueue them
for simple_task_instance in simple_task_instances:
command = TI.generate_command()
log.info("Sending %s to executor with priority %s and queue %s")
self.executor.queue_command()
找到scheduled的task,将状态置为queued,并执行,State.QUEUED
/Users/yangxue.chen/code/python/airflow/airflow/executors/base_executor.py
def queue_command(self, simple_task_instance, command, priority=1, queue=None):
key = simple_task_instance.key
if key not in self.queued_tasks and key not in self.running:
self.log.info("Adding to queue: %s", command)
self.queued_tasks[key] = (command, priority, queue, simple_task_instance)
else:
self.log.info("could not queue task %s", key)
send task to executor的实现就是将task加到executor的queued_tasks中排队