当前位置:  开发笔记 > 编程语言 > 正文

与芹菜的猎鹰蟒蛇的例子

如何解决《与芹菜的猎鹰蟒蛇的例子》经验,为你挑选了1个好方法。

这是一个小例子.文件结构:

/project
      __init__.py
      app.py # routes, falcon etc.
      tasks.py # celery 
example.py # script for demonstration how it works 

app.py:

import json

import falcon
from tasks import add
from celery.result import AsyncResult


class StartTask(object):

    def on_get(self, req, resp):
        # start task
        task = add.delay(4, 4)
        resp.status = falcon.HTTP_200
        # return task_id to client
        result = {'task_id': task.id}
        resp.body = json.dumps(result)


class TaskStatus(object):

    def on_get(self, req, resp, task_id):
        # get result of task by task_id and generate content to client
        task_result = AsyncResult(task_id)
        result = {'status': task_result.status, 'result': task_result.result}
        resp.status = falcon.HTTP_200
        resp.body = json.dumps(result)


app = falcon.API()

# registration of routes
app.add_route('/start_task', StartTask())
app.add_route('/task_status/{task_id}', TaskStatus())

tasks.py:

from time import sleep

import celery


app = celery.Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')


@app.task
def add(x, y):
    """
    :param int x:
    :param int y:
    :return: int
    """
    # sleep just for demonstration
    sleep(5)

    return x + y

现在我们需要开始celery申请.转到project文件夹并运行:

celery -A tasks worker --loglevel=info

在此之后我们需要开始Falcon申请.转到project文件夹并运行:

gunicorn app:app

好.都准备好了.

example.py 是小客户端,可以帮助理解:

from time import sleep

import requests
# start new task
task_info = requests.get('http://127.0.0.1:8000/start_task')
task_info = task_info.json()

while True:
    # check status of task by task_id while task is working
    result = requests.get('http://127.0.0.1:8000/task_status/' + task_info['task_id'])
    task_status = result.json()

    print task_status

    if task_status['status'] == 'SUCCESS' and task_status['result']:
        print 'Task with id = %s is finished' % task_info['task_id']
        print 'Result: %s' % task_status['result']
        break
    # sleep and check status one more time
    sleep(1)

只要打电话python ./example.py,你应该看到这样的事情:

{u'status': u'PENDING', u'result': None}
{u'status': u'PENDING', u'result': None}
{u'status': u'PENDING', u'result': None}
{u'status': u'PENDING', u'result': None}
{u'status': u'PENDING', u'result': None}
{u'status': u'SUCCESS', u'result': 8}
Task with id = 76542904-6c22-4536-99d9-87efd66d9fe7 is finished
Result: 8

希望这对你有所帮助.



1> Danila Ganch..:

这是一个小例子.文件结构:

/project
      __init__.py
      app.py # routes, falcon etc.
      tasks.py # celery 
example.py # script for demonstration how it works 

app.py:

import json

import falcon
from tasks import add
from celery.result import AsyncResult


class StartTask(object):

    def on_get(self, req, resp):
        # start task
        task = add.delay(4, 4)
        resp.status = falcon.HTTP_200
        # return task_id to client
        result = {'task_id': task.id}
        resp.body = json.dumps(result)


class TaskStatus(object):

    def on_get(self, req, resp, task_id):
        # get result of task by task_id and generate content to client
        task_result = AsyncResult(task_id)
        result = {'status': task_result.status, 'result': task_result.result}
        resp.status = falcon.HTTP_200
        resp.body = json.dumps(result)


app = falcon.API()

# registration of routes
app.add_route('/start_task', StartTask())
app.add_route('/task_status/{task_id}', TaskStatus())

tasks.py:

from time import sleep

import celery


app = celery.Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')


@app.task
def add(x, y):
    """
    :param int x:
    :param int y:
    :return: int
    """
    # sleep just for demonstration
    sleep(5)

    return x + y

现在我们需要开始celery申请.转到project文件夹并运行:

celery -A tasks worker --loglevel=info

在此之后我们需要开始Falcon申请.转到project文件夹并运行:

gunicorn app:app

好.都准备好了.

example.py 是小客户端,可以帮助理解:

from time import sleep

import requests
# start new task
task_info = requests.get('http://127.0.0.1:8000/start_task')
task_info = task_info.json()

while True:
    # check status of task by task_id while task is working
    result = requests.get('http://127.0.0.1:8000/task_status/' + task_info['task_id'])
    task_status = result.json()

    print task_status

    if task_status['status'] == 'SUCCESS' and task_status['result']:
        print 'Task with id = %s is finished' % task_info['task_id']
        print 'Result: %s' % task_status['result']
        break
    # sleep and check status one more time
    sleep(1)

只要打电话python ./example.py,你应该看到这样的事情:

{u'status': u'PENDING', u'result': None}
{u'status': u'PENDING', u'result': None}
{u'status': u'PENDING', u'result': None}
{u'status': u'PENDING', u'result': None}
{u'status': u'PENDING', u'result': None}
{u'status': u'SUCCESS', u'result': 8}
Task with id = 76542904-6c22-4536-99d9-87efd66d9fe7 is finished
Result: 8

希望这对你有所帮助.

推荐阅读
oDavid_仔o_880
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有