airflow-kubernetes_executor源码分析

airflow-kubernetes_executor源码分析

上一篇分析了调度部分,今天继续分析执行器

KubernetesExecutor

/Users/yangxue.chen/code/python/airflow/airflow/contrib/executors/kubernetes_executor.py

    def start(self):
        """
        Executors may need to get things started. For example LocalExecutor
        starts N workers.
        """
        """Starts the executor"""
        self.log.info('Start Kubernetes executor')
        self.task_queue = self._manager.Queue()
        self.result_queue = self._manager.Queue()   
        self.kube_client = get_kube_client()
        self.kube_scheduler = AirflowKubernetesScheduler(
            self.kube_config, self.task_queue, self.result_queue,
            self.kube_client, self.worker_uuid
        )

init初始化

/Users/yangxue.chen/code/python/airflow/airflow/executors/base_executor.py

PARALLELISM = conf.getint('core', 'PARALLELISM')

def heartbeat(self):
        # Triggering new jobs
        if not self.parallelism:
            open_slots = len(self.queued_tasks)
        else:
            open_slots = self.parallelism - len(self.running)

        num_running_tasks = len(self.running)
        num_queued_tasks = len(self.queued_tasks)

        self.log.debug("%s running task instances", num_running_tasks)
        self.log.debug("%s in queue", num_queued_tasks)
        self.log.debug("%s open slots", open_slots)
        
        self.trigger_tasks(open_slots)

        # Calling child class sync method
        self.log.debug("Calling the %s sync method", self.__class__)
        self.sync()
        
    def trigger_tasks(self, open_slots):
        """
        Trigger tasks

        :param open_slots: Number of open slots
        :return:
        """
        sorted_queue = sorted(
            [(k, v) for k, v in self.queued_tasks.items()],
            key=lambda x: x[1][1],
            reverse=True)
        for i in range(min((open_slots, len(self.queued_tasks)))):
            key, (command, _, queue, simple_ti) = sorted_queue.pop(0)
            self.queued_tasks.pop(key)            
            self.running[key] = command            
            self.execute_async()        
            
    def execute_async(self, key, command, queue=None, executor_config=None):
        """Executes task asynchronously"""
        # Patch kube_scheduler worker, so we can mount a dmp bucket when pod start
        kube_executor_config = KubernetesExecutorConfig.from_dict(executor_config)
        self.kube_scheduler.worker_configuration = WorkerConfiguration(kube_config=self.kube_config)

        self.task_queue.put((key, command, kube_executor_config))            
  1. 根据并行度“AIRFLOW__CORE__PARALLELISM”和运行任务数确定剩余slot
  2. 异步执行排队的task,将task入队等待执行
    def sync(self):
        """Synchronize task state."""
        if self.running:
            self.log.debug('self.running: %s', self.running)
        if self.queued_tasks:
            self.log.debug('self.queued: %s', self.queued_tasks)

        self.kube_scheduler.sync()

        while True:
            try:
                results = self.result_queue.get_nowait()
                try:
                    key, state, pod_id, resource_version = results
                    self.log.info('Changing state of %s to %s', results, state)
                    self._change_state(key, state, pod_id)
                finally:
                    self.result_queue.task_done()
            except Empty:
                break

        for _ in range(self.kube_config.worker_pods_creation_batch_size):
                task = self.task_queue.get_nowait()
                self.kube_scheduler.run_next(task)
  1. Scheduler.sync
  2. 从result_queue中获取result
  3. 根据result改变task状态
  4. 从task_queue中获取task,交给scheuler执行
def _change_state(self, key, state, pod_id):
    if state != State.RUNNING:
        if self.kube_config.delete_worker_pods:
            self.kube_scheduler.delete_pod(pod_id)
            self.log.info('Deleted pod: %s', str(key))
        try:
            self.running.pop(key)
        except KeyError:
            self.log.debug('Could not find key: %s', str(key))
    self.event_buffer[key] = state

删除将非运行状态的任务pod

AirflowKubernetesScheduler

def sync(self):
    """
    The sync function checks the status of all currently running kubernetes jobs.
    If a job is completed, it's status is placed in the result queue to
    be sent back to the scheduler.

    :return:

    """
    while True:
        try:
            task = self.watcher_queue.get_nowait()
            try:
                self.process_watcher_task(task)
            finally:
                self.watcher_queue.task_done()
        except Empty:
            break
   
def process_watcher_task(self, task):
        """Process the task by watcher."""
        pod_id, state, labels, resource_version = task
        self.log.info(
            'Attempting to finish pod; pod_id: %s; state: %s; labels: %s',
            pod_id, state, labels
        )
        key = self._labels_to_key(labels=labels)
        if key:
            self.log.debug('finishing job %s - %s (%s)', key, state, pod_id)
            self.result_queue.put((key, state, pod_id, resource_version))            
  1. 从watch_queue获取task变更
  2. 放入result_queue

这样就和调用处executor.sync result_queu取值串联起来了

def run_next(self, next_job):
    """
    The run_next command will check the task_queue for any un-run jobs.
    It will then create a unique job-id, launch that job in the cluster,
    and store relevant info in the current_jobs map so we can track the job's
    status
    """
    self.log.info('Kubernetes job is %s', str(next_job))
    key, command, kube_executor_config = next_job
    dag_id, task_id, execution_date, try_number = key
    self.log.debug("Kubernetes running for command %s", command)
    self.log.debug("Kubernetes launching image %s", self.kube_config.kube_image)

    pod = self.worker_configuration.make_pod()
    
    # the watcher will monitor pods, so we do not block.
    self.launcher.run_pod_async(pod, **self.kube_config.kube_client_request_args)
    
def run_pod_async(self, pod, **kwargs):
    req = self.kube_req_factory.create(pod)
    self.log.debug('Pod Creation Request: \n%s', json.dumps(req, indent=2))
    resp = self._client.create_namespaced_pod(body=req, namespace=pod.namespace, **kwargs)
    self.log.debug('Pod Creation Response: %s', resp)

    return resp

根据task创建pod。需要注意创建不会block,利用k8s list-watch机制对pod生命周期进行管理

KubernetesJobWatcher

k8s list-watch机制

def _run(self, kube_client, resource_version, worker_uuid, kube_config):
    watcher = watch.Watch()

    kwargs = {'label_selector': 'airflow-worker={}'.format(worker_uuid)}
    if resource_version:
        kwargs['resource_version'] = resource_version
    if kube_config.kube_client_request_args:
        for key, value in kube_config.kube_client_request_args.items():
            kwargs[key] = value

    last_resource_version = None
    for event in watcher.stream(kube_client.list_namespaced_pod, self.namespace,
                                **kwargs):
        task = event['object']
        self.log.info(
            'Event: %s had an event of type %s',
            task.metadata.name, event['type']
        )
        if event['type'] == 'ERROR':
            return self.process_error(event)
        self.process_status(
            task.metadata.name, task.status.phase, task.metadata.labels,
            task.metadata.resource_version
        )

    return last_resource_version

pod过滤条件:{'label_selector': 'airflow-worker={}'.format(worker_uuid)}。重点关注两个状态处理了函数

    def process_error(self, event):
        """Process error response"""
        self.log.error(
            'Encountered Error response from k8s list namespaced pod stream => %s',
            event
        )
        raw_object = event['raw_object']
        if raw_object['code'] == 410:
            self.log.info(
                'Kubernetes resource version is too old, must reset to 0 => %s',
                (raw_object['message'],)
            )
            # Return resource version 0
            return '0'
        raise AirflowException(
            'Kubernetes failure for %s with code %s and message: %s' %
            (raw_object['reason'], raw_object['code'], raw_object['message'])
        )

error没什么好注意的

    def process_status(self, pod_id, status, labels, resource_version):
        """Process status response"""
        if status == 'Pending':
            self.log.info('Event: %s Pending', pod_id)
        elif status == 'Failed':
            self.log.info('Event: %s Failed', pod_id)
            self.watcher_queue.put((pod_id, State.FAILED, labels, resource_version))
        elif status == 'Succeeded':
            self.log.info('Event: %s Succeeded', pod_id)
            self.watcher_queue.put((pod_id, None, labels, resource_version))
        elif status == 'Running':
            self.log.info('Event: %s is Running', pod_id)
        else:
            self.log.warning(
                'Event: Invalid state: %s on pod: %s with labels: %s with '
                'resource_version: %s', status, pod_id, labels, resource_version
            )

着重关注Failed & Succeeded ,只有这两个结束状态才会puh到watcher_queuq,这里就和之前watcher_queue取值串联起来了

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,463评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,868评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,213评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,666评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,759评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,725评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,716评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,484评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,928评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,233评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,393评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,073评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,718评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,308评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,538评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,338评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,260评论 2 352