我使用Airflow实现的工作流包含任务A,B,C和D.我希望工作流在任务C等待事件.在Airflow中,传感器用于通过轮询某个状态来检查某些条件,如果该条件为真,则会触发工作流中的下一个任务.我的要求是避免投票.这里有一个答案提到了一个rest_api_plugin的气流,它创建了rest_api端点来触发气流CLI - 使用这个插件,我可以在工作流程中触发任务.但是,在我的工作流程中,我希望实现一个等待休息API调用(异步事件)而不进行轮询的任务,一旦收到其余的API请求,任务就会被触发并恢复Airflow工作流程.
避免轮询的原因:效率低下,不按照我们的要求进行扩展.
更新
我按照@Daniel Huang的回答中提到的建议,创建了一个返回False的传感器.这个传感器在task:start_evaluating_cycle中实现,现在这个传感器任务没有检测到任何东西,但总是返回False:
class WaitForEventSensor(BaseSensorOperator): def poke(self, context): return False start_evaluating_cycle = WaitForEventSensor( task_id="start_evaluating_cycle", dag=dag, poke_interval=60*60 # any number will do here, because it not polling just returning false )
我配置了rest_api_plugin并使用插件我试图标记任务:start_evaluating_cyle完成以继续工作流程.
rest_api_plugin成功执行任务,我可以看到任务是使用flower运行的:
但在工作流程中,任务:start_evaluating_cycle仍处于运行状态:
rest_api_plugin正在运行独立于工作流的任务.如何让rest_api_plugin在工作流程内运行任务 - 而不是独立于工作流程.
但是,当我从气流UI管理员中选择任务并标记成功时:
我需要这个网址:http:// localhost:8080/admin/airflow/success?task_id = start_evaluating_cycle&dag_id = faculty_evaluation_workflow&upstream = false&downstream = false&future = false&past = false&execution_date = 2017-11-26T06:48:54.297289&origin = http%3A% 2F%2Flocalhost%3A8080%2Fadmin%2Fairflow%2Fgraph%3Fexecution_date%3D2017-11-26T06%253A48%253A54.297289%26arrange%3DTB%26root%3D%26dag_id%3Dfaculty_evaluation_workflow%26_csrf_token%3DImM3NmU4ZTVjYTljZTQzYWJhNGI4Mzg2MmZmNDU5OGYxYWY0ODAxYWMi.DPv1Ww.EnWS6ffVLNcs923y6eVRV_8R-X8
当我确认时,工作流程会进一步发展 - 这就是我想要的,但我需要在其余的API调用中标记成功.
我担心的是:
如何使用
rest_api_plugin 将工作流内运行的任务标记为成功?
是否可以使用气流管理员创建的URL,通过从外部系统调用任务来将任务标记为成功?
Daniel Huang.. 6
一种可能的解决方案是使用一个传感器,该传感器永远等待,直到外部设备手动将其状态设置为成功为止。
因此,您将拥有某种虚拟传感器:
class DummySensor(BaseSensorOperator): def poke(self, context): return False
像这样初始化:
task_c = DummySensor( dag=dag, task_id='task_c', interval=600, # something fairly high since we're not polling for anything, just to check when to timeout timeout=3600, # if nothing externally sets state to success in 1 hour, task will fail so task D does not run )
当任务C启动时,它将仅处于RUNNING状态。然后,您可以使用REST API插件在满足条件时将任务C的状态设置为SUCCESS。然后将开始任务D和其他下游任务。
不利的一面是虚拟传感器在等待任何操作时仍保持在工作槽中。
一种可能的解决方案是使用一个传感器,该传感器永远等待,直到外部设备手动将其状态设置为成功为止。
因此,您将拥有某种虚拟传感器:
class DummySensor(BaseSensorOperator): def poke(self, context): return False
像这样初始化:
task_c = DummySensor( dag=dag, task_id='task_c', interval=600, # something fairly high since we're not polling for anything, just to check when to timeout timeout=3600, # if nothing externally sets state to success in 1 hour, task will fail so task D does not run )
当任务C启动时,它将仅处于RUNNING状态。然后,您可以使用REST API插件在满足条件时将任务C的状态设置为SUCCESS。然后将开始任务D和其他下游任务。
不利的一面是虚拟传感器在等待任何操作时仍保持在工作槽中。