1.Scheduler的启动和停止命令
1.1 Scheduler启动命令
对于Airflow的Scheduler我们一般会使用如下命令启动:
airflow scheduler \
--pid /data/bdetl/airflow/pids/airflow-scheduler.pid \
--stdout /data/bdetl/logs/airflow/airflow-scheduler.out \
--stderr /data/bdetl/logs/airflow/airflow-scheduler.out \
-l /data/bdetl/logs/airflow/airflow-scheduler.log \
-D
更多参数的可以参考Scheduler参数:
参数 | 示意 |
---|---|
-sd, --subdir | 从指定的路径中查找dags文件。默认为'[AIRFLOW_HOME]/dags',其中[AIRFLOW_HOME]是我们在'airflow.cfg'中为'AIRFLOW_HOME'设置的值。 |
-r, --run-duration | 设置退出前Scheduler程序的循环执行的时间(单位:秒)。 |
-n, --num_runs | 设置退出Scheduler程序前,所有的dag文件被解析执行的次数。 |
-p, --do_pickle | 是否将DAG对象以序列化的方式发送给worker节点执行。 |
1.2 Scheduler停止命令
cat /data/bdetl/airflow/pids/airflow-scheduler.pid | xargs kill -15
执行如上命令后,会杀死scheduler进程,并清除airflow-scheduler.pid文件。
2.Scheduler程序源码
如下文章中:
ti表示task_instance,即任务实例;
tis表示task_instances;
代码是基于airflow1.10.11版本。
2.1 cli.scheduler(): 接受命令行中的airflow scheduler命令
根据指定的参数,生成一个SchedulerJob,再执行job的run方法。
@cli_utils.action_logging
def scheduler(args):
py2_deprecation_waring()
print(settings.HEADER)
# 生成一个SchedulerJob
job = jobs.SchedulerJob(
dag_id=args.dag_id,
subdir=process_subdir(args.subdir),
run_duration=args.run_duration,
num_runs=args.num_runs,
do_pickle=args.do_pickle)
# daemon模式
if args.daemon:
# 设置pid以及日志路径
pid, stdout, stderr, log_file = setup_locations("scheduler",
args.pid,
args.stdout,
args.stderr,
args.log_file)
handle = setup_logging(log_file)
stdout = open(stdout, 'w+')
stderr = open(stderr, 'w+')
ctx = daemon.DaemonContext(
pidfile=TimeoutPIDLockFile(pid, -1),
files_preserve=[handle],
stdout=stdout,
stderr=stderr,
)
# 执行schedulerJob的run方法
with ctx:
job.run()
stdout.close()
stderr.close()
else:
signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGTERM, sigint_handler)
signal.signal(signal.SIGQUIT, sigquit_handler)
job.run()
2.2 BaseJob.run(): 向job表中新增SchdulerJob记录并调用子类的处理逻辑
执行上述的job.run()方法之后,会执行SchdulerJob父类的BaseJob的run方法:
def run(self):
Stats.incr(self.__class__.__name__.lower() + '_start', 1, 1)
# Adding an entry in the DB
with create_session() as session:
self.state = State.RUNNING
# 往db中添加一条running的schdulerJob记录
session.add(self)
session.commit()
id_ = self.id
make_transient(self)
self.id = id_
try:
# 执行子类的实现的_execute()方法
self._execute()
# In case of max runs or max duration
self.state = State.SUCCESS
except SystemExit:
# In case of ^C or SIGTERM
self.state = State.SUCCESS
except Exception:
self.state = State.FAILED
raise
finally:
# job执行完之后,填充end_date并更新记录
self.end_date = timezone.utcnow()
session.merge(self)
session.commit()
Stats.incr(self.__class__.__name__.lower() + '_end', 1, 1)
如代码所示,该方法主要会在job表中新建一条scheduler job的记录:
- 如果_execute()方法(包含一个while循环)正常执行结束,则SchedulerJob的state为SUCCESS;
- 如果执行_execute()过程中,手动结束程序(
ctrl-c
orkill -15 pid
),则SchedulerJob的state为SUCCESS; - 如果执行_execute()过程中抛出异常,则SchedulerJob的state为FAILED;
- 最后添加SchedulerJob的end_date,并更新db中的记录。
2.3 SchdulerJob._execute(): SchdulerJob的具体执行逻辑
执行上述self._execute()
会跳转到子类的如下方法:
SchdulerJob._execute()
def _execute(self):
self.log.info("Starting the scheduler")
# DAGs can be pickled for easier remote execution by some executors
pickle_dags = False
if self.do_pickle and self.executor.__class__ not in \
(executors.LocalExecutor, executors.SequentialExecutor):
pickle_dags = True
self.log.info("Running execute loop for %s seconds", self.run_duration)
self.log.info("Processing each file at most %s times", self.num_runs)
# Build up a list of Python files that could contain DAGs
self.log.info("Searching for files in %s", self.subdir)
# 根据指定的self.subdir路径,查找dag文件
known_file_paths = list_py_file_paths(self.subdir)
self.log.info("There are %s files in %s", len(known_file_paths), self.subdir)
# When using sqlite, we do not use async_mode
# so the scheduler job and DAG parser don't access the DB at the same time.
async_mode = not self.using_sqlite
# AIRFLOW SETTINGS:处理dag文件的时候,DagFileProcessor的超时时间,超时则kill掉处理进程
processor_timeout_seconds = conf.getint('core', 'dag_file_processor_timeout')
processor_timeout = timedelta(seconds=processor_timeout_seconds)
"""
新建一个file processor agent:
dag_directory:默认的dag文件路径或用户指定的dags文件路径self.subdir
file_paths:dags文件夹下的dag文件路径list
max_runs:scheduler解析dag文件的次数,默认为-1,表示一直解析
processor_factory:用于创建DagFileProcessor进程(AbstractDagFileProcessor子类)
processor_timeout:DagFileProcessor进程超时时间
async_mode:是否使用异步模式启动DagFileProcessorManager,如果db不是sqlite,则默认使用异步模式
"""
self.processor_agent = DagFileProcessorAgent(self.subdir,
known_file_paths,
self.num_runs,
type(self)._create_dag_file_processor,
processor_timeout,
self.dag_ids,
pickle_dags,
async_mode)
try:
self._execute_helper()
except Exception:
self.log.exception("Exception when executing execute_helper")
finally:
self.processor_agent.end()
self.log.info("Exited execute loop")
_execute()是Schduler的主方法,执行调度系统的主逻辑,主要包含一下几部分:
2.3.1 list_py_file_paths(self.subdir): 找到指定路径下的dag文件
# Build up a list of Python files that could contain DAGs
self.log.info("Searching for files in %s", self.subdir)
# 根据指定的self.subdir路径,查找dag文件
known_file_paths = list_py_file_paths(self.subdir)
self.log.info("There are %s files in %s", len(known_file_paths), self.subdir)
list_py_file_paths(self.subdir)方法会遍历self.subdir文件夹,并在该文件夹下寻找dag文件,最终返回的结果如下所示:
后续的步骤需要对找到的dag文件进行解析。
2.3.2 创建DagFileProcessorAgent来解析找到的dag文件
# AIRFLOW SETTINGS:处理dag文件的时候,DagFileProcessor的超时时间,超时则kill掉处理进程
processor_timeout_seconds = conf.getint('core', 'dag_file_processor_timeout')
processor_timeout = timedelta(seconds=processor_timeout_seconds)
"""
新建一个file processor agent:
dag_directory:默认的dags文件路径或用户指定的dags文件路径
file_paths:dags文件夹下的py文件路径list
max_runs:scheduler解析py文件的次数,默认为-1,表示一致解析
processor_factory:用于创建DagFileProcessor进程(AbstractDagFileProcessor子类)的方法
processor_timeout:DagFileProcessor进程超时时间
async_mode:是否使用异步模式启动DagFileProcessorManager,如果db不是sqlite,则默认使用异步模式
"""
self.processor_agent = DagFileProcessorAgent(self.subdir,
known_file_paths,
self.num_runs,
type(self)._create_dag_file_processor,
processor_timeout,
self.dag_ids,
pickle_dags,
async_mode)
如上涉及到一个airflow的配置参数,表示处理dag文件的时候,DagFileProcessor的超时时间,超时则kill掉处理进程,airflow.cfg
配置信息如下:
# How long before timing out a DagFileProcessor, which processes a dag file
dag_file_processor_timeout = 50
- DagFileProcessorAgent
处理DAG文件的代理程序,它负责在整个调度过程中所有与DAG解析相关的工作。 DagFileProcessorAgent会创建Scheluer的子进程DagFileProcessorManager,而DagFileProcessorManager会为每一个dag文件创建一个DagFileProcessor进程,来处理dag文件并收集DAG文件的解析结果,并在解析dag文件的过程中,进行进程间通信,向scheluer主进程汇报文件处理结果。
如下图所示的是DagFileProcessorAgent,DagFileProcessorManager,DagFileProcessor以及对应的dag文件之间的对应:
2.3.3 SchdulerJob._execute_helper(): Schduler程序循环的主逻辑
如下部分为整个Scheduler程序的核心部分,其代码如下所示:
def _execute_helper(self):
"""
The actual scheduler loop. The main steps in the loop are:
#. Harvest DAG parsing results through DagFileProcessorAgent
#. Find and queue executable tasks
#. Change task instance state in DB
#. Queue tasks in executor
#. Heartbeat executor
#. Execute queued tasks in executor asynchronously
#. Sync on the states of running tasks
Following is a graphic representation of these steps.
.. image:: ../docs/img/scheduler_loop.jpg
:rtype: None
"""
# 根据选择的executor,执行其start方法
self.executor.start()
self.log.info("Resetting orphaned tasks for active dag runs")
# 在启动scheduler程序的时候,将前缀为非backfill的Running的DagRun下状态为SCHEDULED,QUEUED的ti的状态重置为None,使其后续可以被调度执行
self.reset_state_for_orphaned_tasks()
# Start after resetting orphaned tasks to avoid stressing out DB.
# 执行processor_agent的start方法,启动代理的DagFileProcessorManager,开始循环解析dags文件
self.processor_agent.start()
execute_start_time = timezone.utcnow()
# Last time that self.heartbeat() was called.
last_self_heartbeat_time = timezone.utcnow()
# For the execute duration, parse and schedule DAGs
# 开始循环接收DagFileProcessorManager的解析结果,并调度对应的dag和tis,while循环终止条件:
# 1.设置了run_duration,并且while循环执行时间到达run_duration(默认-1);
# 2.设置了num_runs,并且所有的dag文件都被处理了num_runs次。
while (timezone.utcnow() - execute_start_time).total_seconds() < \
self.run_duration or self.run_duration < 0:
self.log.debug("Starting Loop...")
loop_start_time = time.time()
if self.using_sqlite:
self.processor_agent.heartbeat()
# For the sqlite case w/ 1 thread, wait until the processor
# is finished to avoid concurrent access to the DB.
self.log.debug(
"Waiting for processors to finish since we're using sqlite")
self.processor_agent.wait_until_finished()
# 开始收集被解析的dag文件信息,调用的是self.processor_agent.harvest_simple_dags()方法
self.log.debug("Harvesting DAG parsing results")
simple_dags = self._get_simple_dags()
self.log.debug("Harvested {} SimpleDAGs".format(len(simple_dags)))
# Send tasks for execution if available
simple_dag_bag = SimpleDagBag(simple_dags)
# 开始调度被解析到的tis
# 1.处理文件解析的结果,并将其入executor的queued_tasks,并修改tis状态:SCHEDULED->QUEUED;
# 2.执行executor的heartbeat方法,异步执行queued_tasks中的tis,并同步tis的执行状态;
# 3.由于限制条件(pool,slots,concurrency等),对于executor的queued_tasks中未被执行的tis,清空queued_tasks,并将tis状态修改为SCHEDULED
# 4.根据executor中异步执行的ti的结果,进行相应的逻辑处理
if not self._validate_and_run_task_instances(simple_dag_bag=simple_dag_bag):
continue
# Heartbeat the scheduler periodically
# 当前时间和上一次scheduler心跳间隔
time_since_last_heartbeat = (timezone.utcnow() -
last_self_heartbeat_time).total_seconds()
# self.heartrate为配置文件中指定的scheduler心跳频率
if time_since_last_heartbeat > self.heartrate:
self.log.debug("Heartbeating the scheduler")
# 执行heartbeat()方法
# 1.使用当前时间更新job表中SchedulerJob心跳时间;
# 2.如果job的状态被修改为SHUTDOWN,则kill当前job。
self.heartbeat()
# 设置当前心跳执行的时间
last_self_heartbeat_time = timezone.utcnow()
is_unit_test = conf.getboolean('core', 'unit_test_mode')
loop_end_time = time.time()
# while循环用时
loop_duration = loop_end_time - loop_start_time
self.log.debug(
"Ran scheduling loop in %.2f seconds",
loop_duration)
if not is_unit_test:
self.log.debug("Sleeping for %.2f seconds", self._processor_poll_interval)
# 如果设置了_processor_poll_interval则让while程序sleep指定时间
time.sleep(self._processor_poll_interval)
if self.processor_agent.done:
self.log.info("Exiting scheduler loop as all files"
" have been processed {} times".format(self.num_runs))
# 所有文件配处理了num_runs次,终止while循环
break
if loop_duration < 1 and not is_unit_test:
sleep_length = 1 - loop_duration
self.log.debug(
"Sleeping for {0:.2f} seconds to prevent excessive logging"
.format(sleep_length))
# 如果while循环间隔小于1秒,则让while循环sleep(1 - loop_duration)秒,即while循环最小间隔为1秒
sleep(sleep_length)
# Stop any processors
# 循环执行完了,向DagFileProcessorManager发送终止信号,让其停止所有的DagFileProcessor进程。
self.processor_agent.terminate()
# Verify that all files were processed, and if so, deactivate DAGs that
# haven't been touched by the scheduler as they likely have been
# deleted.
if self.processor_agent.all_files_processed:
self.log.info(
"Deactivating DAGs that haven't been touched since %s",
execute_start_time.isoformat()
)
# 将dag表中未被处理的dag记录的is_active设置为False
models.DAG.deactivate_stale_dags(execute_start_time)
# 执行executor的end方法,结束executor
self.executor.end()
settings.Session.remove()
2.3.3.1 self.executor.start(): 启动任务执行器
BaseJob的构造器中指定的executor:
self.executor = executor or executors.get_default_executor()
根据配置文件获得executor:
def get_default_executor():
"""Creates a new instance of the configured executor if none exists and returns it"""
global DEFAULT_EXECUTOR
if DEFAULT_EXECUTOR is not None:
return DEFAULT_EXECUTOR
# 根据airflow.cfg中的配置获取执行executor
executor_name = conf.get('core', 'EXECUTOR')
DEFAULT_EXECUTOR = _get_executor(executor_name)
log.info("Using executor %s", executor_name)
return DEFAULT_EXECUTOR
airflow.cfg
配置信息,生产环境一般会设置成CeleryExecutor:
# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor
executor = CeleryExecutor
执行CeleryExecutor的start方法,仅输出一行日志显示当前executor使用多少进程来同步任务元数据:
def start(self):
self.log.debug(
'Starting Celery Executor using %s processes for syncing',
self._sync_parallelism
)
2.3.3.1 self.reset_state_for_orphaned_tasks(): scheduler启动之后重置给定状态的tis
在启动scheduler程序的时候,将非backfill前缀的而状态为RUNNING的的DagRun下状态为SCHEDULED,QUEUED的tis的状态重置为None,这样可以让scheduler程序后续将这些tis正常调度。
2.3.3.2 self.processor_agent.start(): 启动 DagFileProcessorManager 开始循环解析dag文件
# Start after resetting orphaned tasks to avoid stressing out DB.
# 执行processor_agent的start方法,启动DagFileProcessorManager处理器,开始循环解析dags文件
self.processor_agent.start()
DagFileProcessorAgent.start()方法:
def start(self):
"""
Launch DagFileProcessorManager processor and start DAG parsing loop in manager.
"""
if six.PY2:
context = multiprocessing
else:
mp_start_method = self._get_multiprocessing_start_method()
context = multiprocessing.get_context(mp_start_method)
# Scheduler和DagFileProcessorManager的进程双向通讯管道
self._parent_signal_conn, child_signal_conn = context.Pipe()
# 创建一个进程,其target为要执行的内容,args为传入target的参数
self._process = context.Process(
target=type(self)._run_processor_manager,
args=(
self._dag_directory,
self._file_paths,
self._max_runs,
# getattr prevents error while pickling an instance method.
# 获得一个进程工厂,主要是用来创建DagFileProcessor
getattr(self, "_processor_factory"),
self._processor_timeout,
# Schduler进程(DagFileProcessorAgent)与子进程DagFileProcessorManager通信的管道
child_signal_conn,
self._dag_ids,
self._pickle_dags,
self._async_mode,
)
)
# 执行process的start方法之后,会调用target的run方法
self._process.start()
self.log.info("Launched DagFileProcessorManager with pid: %s", self._process.pid)
DagFileProcessorAgent和DagFileProcessorManager 的交互逻辑如下所示:
2.3.3.3 核心代码Scheduler程序的while循坏
_execute_helper方法的while循环是整个调度的核心,对于解析到的dag信息作出调度处理如下图所示:
循环的主要步骤如下:
通过DagFileProcessorAgent获取DAG文件解析结果
-
查找并排队可执行的任务
- 改变DB中的tis状态;
- 在执行器中对任务进行排队
-
心跳执行器
- 在执行器中异步执行排队的任务(调用executor的trigger_tasks方法)
- 同步运行任务的状态(调用sync方法同步任务执行状态)
2.3.3.3.1 self._get_simple_dags(): 收集dag文件的解析结果
# 开始收集dag文件信息,调用的是self.processor_agent.harvest_simple_dags()方法
self.log.debug("Harvesting DAG parsing results")
simple_dags = self._get_simple_dags()
self.log.debug("Harvested {} SimpleDAGs".format(len(simple_dags)))
self._get_simple_dags()方法最终会调用self.processor_agent.harvest_simple_dags()
方法:
- DagFileProcessorAgent会保存dag文件的统计信息,其他代码会根据agent中保存的最新信息做相应处理;
- 解析到的simple_dags信息(这里返回的都是可以被执行的dag,而其在db中对应的tis会被设置为SCHEDULED状态),会在后续交给Scheduler进行任务调度处理。
2.3.3.3.2 SimpleDagBag(simple_dags): 将收集到的所有的simple_dags包装成SimpleDagBag
# Send tasks for execution if available
simple_dag_bag = SimpleDagBag(simple_dags)
class SimpleDagBag(BaseDagBag):
"""
A collection of SimpleDag objects with some convenience methods.
"""
def __init__(self, simple_dags):
"""
Constructor.
:param simple_dags: SimpleDag objects that should be in this
:type list(airflow.utils.dag_processing.SimpleDagBag)
"""
self.simple_dags = simple_dags
self.dag_id_to_simple_dag = {}
for simple_dag in simple_dags:
self.dag_id_to_simple_dag[simple_dag.dag_id] = simple_dag
@property
def dag_ids(self):
"""
:return: IDs of all the DAGs in this
:rtype: list[unicode]
"""
return self.dag_id_to_simple_dag.keys()
def get_dag(self, dag_id):
"""
:param dag_id: DAG ID
:type dag_id: unicode
:return: if the given DAG ID exists in the bag, return the BaseDag
corresponding to that ID. Otherwise, throw an Exception
:rtype: airflow.utils.dag_processing.SimpleDag
"""
if dag_id not in self.dag_id_to_simple_dag:
raise AirflowException("Unknown DAG ID {}".format(dag_id))
return self.dag_id_to_simple_dag[dag_id]
2.3.3.3.3 self._validate_and_run_task_instances: 验证并执行tis
_Scheduler.validate_and_run_task_instances()
def _validate_and_run_task_instances(self, simple_dag_bag):
if len(simple_dag_bag.simple_dags) > 0:
try:
# 1.处理文件解析的结果,并将其入executor的queued_tasks,并修改tis状态:SCHEDULED->QUEUED;
self._process_and_execute_tasks(simple_dag_bag)
except Exception as e:
self.log.error("Error queuing tasks")
self.log.exception(e)
return False
# Call heartbeats
self.log.debug("Heartbeating the executor")
# 2.调用executor的heartbeat发送心跳
# 1) executor.trigger_tasks(open_slots)异步执行queued_tasks中的tis;
# 2) executor.sync()同步tis的元数据
self.executor.heartbeat()
# 3.对于executor的queued_tasks中未被执行的ti,清空queued_tasks,并将其状态修改为SCHEDULED
self._change_state_for_tasks_failed_to_execute()
# Process events from the executor
# 4.根据executor中异步执行的tis的结果,进行相应的逻辑处理
self._process_executor_events(simple_dag_bag)
return True
-
self._process_and_execute_tasks(simple_dag_bag)
-
处理那些dag_run中状态为非running的的task_instance,如果task_instance的状态为up_for_retry,但是其dag_run不是running状态,那么将task_instance的状态设置为failed,后续不再调度它们;
-
如果task_instance的状态为scheduled/queued/up for reschedule,但是其dag_run的状态不是running那么将其状态设置为None,后续不再调度它们;
-
准备执行那些满足条件的task_instance:
-
按照条件来查找出可被执行的tis:满足条件(priority/concurrency/max_active_runs/pool limits)状态为SCHEDULED的task_instance;
-
对应dag的不为paused,dag_run不是backfill,tis为SCHEDULED状态;
-
在db中修改上述可被执行的task_instance的状态为QUEUED;
-
在executor中对上述的task_instance生成
airflow run XXX
命令,并将这些命令执行入队操作(放入executor的queued_tasks的字典中);queued_tasks字典中的元素如下:
key:
self.dag_id
,self.task_id
,self.execution_date
,self.try_number
value: (
command
,priority
,queue
,simple_task_instance
)
-
-
-
self.executor.heartbeat()
根据可用的slots数,将执行任务的
airflow run xxx
命令通过celery发送给远端的worker来执行;-
获取远端worker执行任务的执行状态和并在Schduler节点的executor中的保存任务状态信息,其结果会在保存在event_buffer字典中:
key:
self.dag_id
,self.task_id
,self.execution_date
,self.try_number
value:
State.FAILED
orState.SUCCESS
event_buffer字典中任务状态信息会在self._process_executor_events方法中使用。
-
self._change_state_for_tasks_failed_to_execute()
- 对于executor的queued_tasks中未被执行的tis,在db中找到对应的QUEUED状态的tis,将其状态修改为SCHEDULED,并清空queued_tasks字典;
-
self._process_executor_events(simple_dag_bag)
-
如果executor已执行的task收到的状态回复为FAILED或SUCCESS,但是db中ti状态为QUEUED,则任务可能为killed externally,将其db中的tis的状态修改为FAILED。
我们在使用external_task_sensor的时候,当external_task_sensor设置的reschedule_date时间非常短,可能会造成上面的问题。
主要原因是第一次执行external_task_sensor的时候,不满足条件将其设置为reschedule,而Scheduler又非常快速的将该dag文件解析入队执行,导致当前的ti的状态被修改为QUEUED状态,但是这是由于才开始执行到self._process_executor_events,会出现SUCCESS != QUEUED的情况,导致最终db中的ti状态变成FAILED。
参考:
-
2.3.3.3.4 周期的执行SchedulerJob的心跳方法
# Heartbeat the scheduler periodically
# 当前时间和上一次scheduler心跳间隔
time_since_last_heartbeat = (timezone.utcnow() -
last_self_heartbeat_time).total_seconds()
# self.heartrate为配置文件中scheduler_heartbeat_sec指定的scheduler心跳频率
if time_since_last_heartbeat > self.heartrate:
self.log.debug("Heartbeating the scheduler")
# 执行heartbeat()方法
# 1.如果job的状态被修改为SHUTDOWN,则kill当前job;
# 2.如果没有达到指定job的心跳频率(job_heartbeat_sec),则sleep;
# 3.使用当前时间更新job表中心跳时间。
self.heartbeat()
# 设置当前心跳执行的时间
last_self_heartbeat_time = timezone.utcnow()
airflow.cfg中的scheduler_heartbeat_sec配置项:
# The scheduler constantly tries to trigger new tasks (look at the
# scheduler section in the docs for more information). This defines
# how often the scheduler should run (in seconds).
scheduler_heartbeat_sec = 5
airflow.cfg中的job_heartbeat_sec配置项:
# Task instances listen for external kill signal (when you clear tasks
# from the CLI or the UI), this defines the frequency at which they should
# listen (in seconds).
job_heartbeat_sec = 5
2.3.3.3.5 self._processor_poll_interval: 轮训间隔时间
if not is_unit_test:
self.log.debug("Sleeping for %.2f seconds", self._processor_poll_interval)
# 如果设置了_processor_poll_interval则让while程序sleep指定时间
time.sleep(self._processor_poll_interval)
airflow.cfg中的processor_poll_interval配置项:
# The number of seconds to wait between consecutive DAG file processing
processor_poll_interval = 1
2.3.3.3.6 self.num_runs: while循环终止条件dag文件达到指定的处理次数
if self.processor_agent.done:
self.log.info("Exiting scheduler loop as all files"
" have been processed {} times".format(self.num_runs))
# 所有文件配处理了num_runs次,终止while循环
break
airflow.cfg中的num_runs配置项:
# The number of times to try to schedule each DAG file
# -1 indicates unlimited number
num_runs = -1
2.3.3.3.7 设置while循环的最小轮训时间
if loop_duration < 1 and not is_unit_test:
sleep_length = 1 - loop_duration
self.log.debug(
"Sleeping for {0:.2f} seconds to prevent excessive logging"
.format(sleep_length))
# 如果while循环间隔小于1秒,则让while循环sleep(1 - loop_duration)秒,即while循环最小间隔为1秒
sleep(sleep_length)
2.3.3.4 self.processor_agent.terminate(): 向DagFileProcessorManager发送终止信号
# Stop any processors
# 向DagFileProcessorManager发送终止信号,让其停止所有的DagFileProcessor进程。
self.processor_agent.terminate()
DagFileProcessorAgent.terminate()方法:
def terminate(self):
"""
Send termination signal to DAG parsing processor manager
and expect it to terminate all DAG file processors.
"""
if self._process and self._process.is_alive():
self.log.info("Sending termination message to manager.")
try:
# 通过Schduler进程的通信管道,向DagFileProcessorManager发送TERMINATE_MANAGER的信号
self._parent_signal_conn.send(DagParsingSignal.TERMINATE_MANAGER)
except ConnectionError:
pass
2.3.3.5 models.DAG.deactivate_stale_dags(execute_start_time):
# Verify that all files were processed, and if so, deactivate DAGs that
# haven't been touched by the scheduler as they likely have been
# deleted.
if self.processor_agent.all_files_processed:
self.log.info(
"Deactivating DAGs that haven't been touched since %s",
execute_start_time.isoformat()
)
# 将dag表中未被处理的dag记录的is_active设置为False
models.DAG.deactivate_stale_dags(execute_start_time)
2.3.3.6 self.executor.end(): 结束executor
# 执行executor的end方法,结束executor
self.executor.end()
CeleryExecutor.end()方法:
def end(self, synchronous=False):
if synchronous:
while any([
task.state not in celery_states.READY_STATES
for task in self.tasks.values()]):
time.sleep(5)
self.sync()
2.3.4 self.processor_agent.end():结束DagFileProcessorManager
结束Scheduler进程的子进程DagFileProcessorManager:
def end(self):
"""
Terminate (and then kill) the manager process launched.
:return:
"""
if not self._process:
self.log.warning('Ending without manager process.')
return
reap_process_group(self._process.pid, log=self.log)
self._parent_signal_conn.close()