当前位置:  开发笔记 > 编程语言 > 正文

AIORedis和PUB / SUB不是asnyc

如何解决《AIORedis和PUB/SUB不是asnyc》经验,为你挑选了1个好方法。

我使用aioredis编写了异步服务,该服务将在某个频道上侦听并以异步方式运行一些命令。

基本上,我从示例页面中获取了一个代码来编写一个小型测试应用程序,并删除了不必要的部分:

import asyncio
import aioredis

async def reader(ch):
    while (await ch.wait_message()):
        msg = await ch.get_json()
        print('Got Message:', msg)
        i = int(msg['sleep_for'])
        print('Sleep for {}'.format(i))
        await asyncio.sleep(i)
        print('End sleep')


async def main():
    sub = await aioredis.create_redis(('localhost', 6379))
    res = await sub.subscribe('chan:1')
    ch1 = res[0]
    tsk = await reader(ch1)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

还有另一个测试应用程序,它发布带有sleep_for字段的json blob ,然后在订阅者应用程序中使用该字段reader使用sleep语句来模拟协程内部的某些工作。

我希望“睡眠”以“并行”方式运行,但实际上它们以同步的方式出现在屏幕上,一个接一个。

我的猜测是,只要打到await ch.get_json(..)(或什至await ch.wait_message())行,我就应该能够处理下一条消息。在实践中,它像同步代码一样运行。我哪里错了?这可以使用连接池来处理,但这意味着存在一些不异步的问题,也不知道确切的含义。



1> Jashandeep S..:

我的猜测是,只要等待a。ch.get_json(..)(甚至是a.ch.wait_message())这一行,我就应该能够处理下一条消息。

async/await语法不是这样工作的。每次await在协同程序中命中a 时,该协同程序都会被“暂停”,从而控制了所谓的协同程序。如果正在休眠,它不会自动处理下一条消息。

您应该使用的ensure_future是在单独的协程中处理每个消息:

import asyncio
import aioredis

async def handle_msg(msg):
    print('Got Message:', msg)
    i = int(msg['sleep_for'])
    print('Sleep for {}'.format(i))
    await asyncio.sleep(i)
    print('End sleep')

async def reader(ch):
    while (await ch.wait_message()):
        msg = await ch.get_json()
        asyncio.ensure_future(handle_msg(msg))

async def main():
    sub = await aioredis.create_redis(('localhost', 6379))
    res = await sub.subscribe('chan:1')
    ch1 = res[0]
    tsk = await reader(ch1)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close() 

推荐阅读
jerry613
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有