当前位置:  开发笔记 > 编程语言 > 正文

注册Celery基于类的任务

如何解决《注册Celery基于类的任务》经验,为你挑选了2个好方法。

Python 3.x,Celery 4.x ......

我有一个基于类的任务.

myproj/celery.py

from celery import Celery

# django settings stuff...

app = Celery('myproj')
app.autodiscover_tasks()

app1/tasks.py

import celery

class EmailTask(celery.Task):
    def run(self, *args, **kwargs):
        self.do_something()

如果我做:

$ celery worker -A myproj -l info
[tasks]
  . app2.tasks.debug_task
  . app2.tasks.test

因此,芹菜装饰器用于注册任务,但基于类的任务未注册.

如何让基于类的任务注册?

更新1:

如果我添加以下行 app1/tasks.py

from myproj.celery import app
email_task = app.tasks[EmailTask.name]

.

$ celery worker -A myproj -l info
  File "myproj/app1/tasks.py", line 405, in 
    email_task = app.tasks[EmailTask.name]
  File "/usr/local/lib/python3.5/site-packages/celery/app/registry.py", line 19, in __missing__
    raise self.NotRegistered(key)
celery.exceptions.NotRegistered

更新2:

我可以run通过包装器同步执行我的任务().但是,我无法运行任务异步,即通过delay.

app1/tasks.py

@app.task
def email_task():
    """
    Wrapper to call class based task
    """
    task = EmailTask()
    # task.delay()  # Won't work!!!
    task.run()

.

$./manage.py shell
> from app1.tasks import EmailTask
> task1 = EmailTask()
> task1.run() # a-okay
> task2 = EmailTask()
> task2.delay() # nope
  

# And on the worker...
[2017-01-22 08:07:28,120: INFO/PoolWorker-1] Task app1.tasks.email_task[41e5bc7d-058a-400e-9f73-c853c0f60a2a] succeeded in 0.0701281649817247s: None
[2017-01-22 08:10:31,909: ERROR/MainProcess] Received unregistered task of type None.
The message has been ignored and discarded.

Oleksandr Da.. 15

你可以在这里找到完整的描述,但对我来说它足以添加

from myapp.celery import app
app.tasks.register(MyTaskTask())


dtk.. 5

随着celery==4.2.1我不得不使用的返回值Celery.register_task()的任务实例调用delay()上:

# my_app/tasks.py
import celery

from my_app.celery import app

class MyTask(celery.Task):
    def run(self):
        [...]

MyTask = app.register_task(MyTask())

然后使用它:

# my_app/app.py
from my_app.tasks import MyTask

[...]

MyTask.delay()

该解决方案已在Github问题中进行了描述,并在此处进行了说明。

HTH,DTK



1> Oleksandr Da..:

你可以在这里找到完整的描述,但对我来说它足以添加

from myapp.celery import app
app.tasks.register(MyTaskTask())



2> dtk..:

随着celery==4.2.1我不得不使用的返回值Celery.register_task()的任务实例调用delay()上:

# my_app/tasks.py
import celery

from my_app.celery import app

class MyTask(celery.Task):
    def run(self):
        [...]

MyTask = app.register_task(MyTask())

然后使用它:

# my_app/app.py
from my_app.tasks import MyTask

[...]

MyTask.delay()

该解决方案已在Github问题中进行了描述,并在此处进行了说明。

HTH,DTK

推荐阅读
k78283381
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有