当前位置:  开发笔记 > 编程语言 > 正文

气流中的Python脚本调度

如何解决《气流中的Python脚本调度》经验,为你挑选了2个好方法。

嗨,大家好,

我需要使用airflow来安排我的python 文件(其中包含从sql和一些连接中提取数据).我已经成功地将气流安装到我的linux服务器中,我可以使用气流网络服务器.但即使在完成文档后,我也不清楚我需要在哪里编写脚本以进行调度以及该脚本如何在airflow webserver中可用,这样我才能看到状态

就配置而言,我知道dag文件夹在我的主目录中的位置以及示例dags所在的位置.

注意:请不要将此标记为重复与如何在Airflow中运行bash脚本文件,因为我需要运行位于不同位置的python文件.

请在Airflow网络服务器中找到以下配置:

在此输入图像描述

下面是AIRFLOW_HOME目录中dag文件夹的屏幕截图

在此输入图像描述

还可以在下面找到DAG创建屏幕截图和Missing DAG错误的屏幕截图

在此输入图像描述

在此输入图像描述

在我选择简单的 DAG之后,填充了丢失DAG的错误

在此输入图像描述



1> postrational..:

您应该使用它PythonOperator来调用您的函数.如果你想在其他地方定义这个功能,你可以直接从模块中导入它,只要它可以在你的模块中访问PYTHONPATH.

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from my_script import my_python_function

dag = DAG('tutorial', default_args=default_args)

PythonOperator(dag=dag,
               task_id='my_task_powered_by_python',
               provide_context=False,
               python_callable=my_python_function,
               op_args=['arguments_passed_to_callable'],
               op_kwargs={'keyword_argument':'which will be passed to function'})

如果您的函数my_python_function在脚本文件中/path/to/my/scripts/dir/my_script.py

然后在启动Airflow之前,您可以将脚本的路径添加到PYTHONPATH类似的内容中,以便:

export PYTHONPATH=/path/to/my/scripts/dir/:$PYTHONPATH

更多信息请访问:https: //airflow.incubator.apache.org/code.html#airflow.operators.PythonOperator

默认args和教程中的其他注意事项:https://airflow.incubator.apache.org/tutorial.html



2> liferacer..:

您还可以使用bashoperator在Airflow中执行python脚本.您可以将脚本放在DAG文件夹中的文件夹中.如果您的脚本位于其他位置,只需提供这些脚本的路径即可.

    from airflow import DAG
    from airflow.operators import BashOperator,PythonOperator
    from datetime import datetime, timedelta

    seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
                                      datetime.min.time())

    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': seven_days_ago,
        'email': ['airflow@airflow.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
      )

    dag = DAG('simple', default_args=default_args)
t1 = BashOperator(
    task_id='testairflow',
    bash_command='python /home/airflow/airflow/dags/scripts/file1.py',
    dag=dag)

推荐阅读
sx-March23
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有