当前位置:  开发笔记 > 编程语言 > 正文

如何在使用Airflow实现的工作流中等待DAG任务中的异步事件?

如何解决《如何在使用Airflow实现的工作流中等待DAG任务中的异步事件?》经验,为你挑选了1个好方法。

我使用Airflow实现的工作流包含任务A,B,C和D.我希望工作流在任务C等待事件.在Airflow中,传感器用于通过轮询某个状态来检查某些条件,如果该条件为真,则会触发工作流中的下一个任务.我的要求是避免投票.这里有一个答案提到了一个rest_api_plugin的气流,它创建了rest_api端点来触发气流CLI - 使用这个插件,我可以在工作流程中触发任务.但是,在我的工作流程中,我希望实现一个等待休息API调用(异步事件)而不进行轮询的任务,一旦收到其余的API请求,任务就会被触发并恢复Airflow工作流程.

避免轮询的原因:效率低下,不按照我们的要求进行扩展.

更新

我按照@Daniel Huang的回答中提到的建议,创建了一个返回False的传感器.这个传感器在task:start_evaluating_cycle中实现,现在这个传感器任务没有检测到任何东西,但总是返回False:

class WaitForEventSensor(BaseSensorOperator):

    def poke(self, context):
        return False

start_evaluating_cycle = WaitForEventSensor(
    task_id="start_evaluating_cycle",
    dag=dag,
    poke_interval=60*60 # any number will do here, because it not polling just returning false
)

在此输入图像描述

我配置了rest_api_plugin并使用插件我试图标记任务:start_evaluating_cyle完成以继续工作流程.

在此输入图像描述

rest_api_plugin成功执行任务,我可以看到任务是使用flower运行的:

在此输入图像描述

但在工作流程中,任务:start_evaluating_cycle仍处于运行状态:

项目清单

rest_api_plugin正在运行独立于工作流的任务.如何让rest_api_plugin在工作流程内运行任务 - 而不是独立于工作流程.

但是,当我从气流UI管理员中选择任务并标记成功时:

项目清单

我需要这个网址:http:// localhost:8080/admin/airflow/success?task_id = start_evaluating_cycle&dag_id = faculty_evaluation_workflow&upstream = false&downstream = false&future = false&past = false&execution_date = 2017-11-26T06:48:54.297289&origin = http%3A% 2F%2Flocalhost%3A8080%2Fadmin%2Fairflow%2Fgraph%3Fexecution_date%3D2017-11-26T06%253A48%253A54.297289%26arrange%3DTB%26root%3D%26dag_id%3Dfaculty_evaluation_workflow%26_csrf_token%3DImM3NmU4ZTVjYTljZTQzYWJhNGI4Mzg2MmZmNDU5OGYxYWY0ODAxYWMi.DPv1Ww.EnWS6ffVLNcs923y6eVRV_8R-X8

当我确认时,工作流程会进一步发展 - 这就是我想要的,但我需要在其余的API调用中标记成功.

我担心的是:

    如何使用
    rest_api_plugin 将工作流内运行的任务标记为成功?

    是否可以使用气流管理员创建的URL,通过从外部系统调用任务来将任务标记为成功?

Daniel Huang.. 6

一种可能的解决方案是使用一个传感器,该传感器永远等待,直到外部设备手动将其状态设置为成功为止。

因此,您将拥有某种虚拟传感器:

class DummySensor(BaseSensorOperator):

    def poke(self, context):
        return False

像这样初始化:

task_c = DummySensor(
    dag=dag,
    task_id='task_c',
    interval=600,  # something fairly high since we're not polling for anything, just to check when to timeout
    timeout=3600,  # if nothing externally sets state to success in 1 hour, task will fail so task D does not run
)

当任务C启动时,它将仅处于RUNNING状态。然后,您可以使用REST API插件在满足条件时将任务C的状态设置为SUCCESS。然后将开始任务D和其他下游任务。

不利的一面是虚拟传感器在等待任何操作时仍保持在工作槽中。



1> Daniel Huang..:

一种可能的解决方案是使用一个传感器,该传感器永远等待,直到外部设备手动将其状态设置为成功为止。

因此,您将拥有某种虚拟传感器:

class DummySensor(BaseSensorOperator):

    def poke(self, context):
        return False

像这样初始化:

task_c = DummySensor(
    dag=dag,
    task_id='task_c',
    interval=600,  # something fairly high since we're not polling for anything, just to check when to timeout
    timeout=3600,  # if nothing externally sets state to success in 1 hour, task will fail so task D does not run
)

当任务C启动时,它将仅处于RUNNING状态。然后,您可以使用REST API插件在满足条件时将任务C的状态设置为SUCCESS。然后将开始任务D和其他下游任务。

不利的一面是虚拟传感器在等待任何操作时仍保持在工作槽中。

推荐阅读
手机用户2402852387
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有