我使用芹菜来更新我的新闻聚合网站中的RSS提要.我为每个feed使用一个@task,事情看起来效果很好.
有一个细节,我不能确定处理得好:所有的feed都是每分钟用@periodic_task更新一次,但是如果一个feed仍在从最后一个周期性任务中更新时新的一个怎么办?(例如,如果Feed非常慢,或者离线且任务保持在重试循环中)
目前我存储任务结果并检查其状态如下:
import socket from datetime import timedelta from celery.decorators import task, periodic_task from aggregator.models import Feed _results = {} @periodic_task(run_every=timedelta(minutes=1)) def fetch_articles(): for feed in Feed.objects.all(): if feed.pk in _results: if not _results[feed.pk].ready(): # The task is not finished yet continue _results[feed.pk] = update_feed.delay(feed) @task() def update_feed(feed): try: feed.fetch_articles() except socket.error, exc: update_feed.retry(args=[feed], exc=exc)
也许有一种更复杂/更健壮的方法可以使用我错过的一些芹菜机制来实现相同的结果?
基于MattH的答案,您可以使用这样的装饰器:
def single_instance_task(timeout): def task_exc(func): @functools.wraps(func) def wrapper(*args, **kwargs): lock_id = "celery-single-instance-" + func.__name__ acquire_lock = lambda: cache.add(lock_id, "true", timeout) release_lock = lambda: cache.delete(lock_id) if acquire_lock(): try: func(*args, **kwargs) finally: release_lock() return wrapper return task_exc
然后,像这样使用它......
@periodic_task(run_every=timedelta(minutes=1)) @single_instance_task(60*10) def fetch_articles() yada yada...
从官方文档:确保任务一次只执行一个.
使用https://pypi.python.org/pypi/celery_once似乎非常好,包括报告错误和针对某些参数测试唯一性.
你可以这样做:
from celery_once import QueueOnce from myapp.celery import app from time import sleep @app.task(base=QueueOnce, once=dict(keys=('customer_id',))) def start_billing(customer_id, year, month): sleep(30) return "Done!"
只需要在项目中进行以下设置:
ONCE_REDIS_URL = 'redis://localhost:6379/0' ONCE_DEFAULT_TIMEOUT = 60 * 60 # remove lock after 1 hour in case it was stale
如果你正在寻找一个不使用Django的例子,那么试试这个例子(警告:使用Redis代替,我已经使用过).
装饰器代码如下(完全归功于文章的作者,去看看)
import redis REDIS_CLIENT = redis.Redis() def only_one(function=None, key="", timeout=None): """Enforce only one celery task at a time.""" def _dec(run_func): """Decorator.""" def _caller(*args, **kwargs): """Caller.""" ret_value = None have_lock = False lock = REDIS_CLIENT.lock(key, timeout=timeout) try: have_lock = lock.acquire(blocking=False) if have_lock: ret_value = run_func(*args, **kwargs) finally: if have_lock: lock.release() return ret_value return _caller return _dec(function) if function is not None else _dec