我使用aioredis编写了异步服务,该服务将在某个频道上侦听并以异步方式运行一些命令。
基本上,我从示例页面中获取了一个代码来编写一个小型测试应用程序,并删除了不必要的部分:
import asyncio import aioredis async def reader(ch): while (await ch.wait_message()): msg = await ch.get_json() print('Got Message:', msg) i = int(msg['sleep_for']) print('Sleep for {}'.format(i)) await asyncio.sleep(i) print('End sleep') async def main(): sub = await aioredis.create_redis(('localhost', 6379)) res = await sub.subscribe('chan:1') ch1 = res[0] tsk = await reader(ch1) loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close()
还有另一个测试应用程序,它发布带有sleep_for
字段的json blob ,然后在订阅者应用程序中使用该字段reader
使用sleep
语句来模拟协程内部的某些工作。
我希望“睡眠”以“并行”方式运行,但实际上它们以同步的方式出现在屏幕上,一个接一个。
我的猜测是,只要打到await ch.get_json(..)
(或什至await ch.wait_message()
)行,我就应该能够处理下一条消息。在实践中,它像同步代码一样运行。我哪里错了?这可以使用连接池来处理,但这意味着存在一些不异步的问题,也不知道确切的含义。
我的猜测是,只要等待a。ch.get_json(..)(甚至是a.ch.wait_message())这一行,我就应该能够处理下一条消息。
async/await
语法不是这样工作的。每次await
在协同程序中命中a 时,该协同程序都会被“暂停”,从而控制了所谓的协同程序。如果正在休眠,它不会自动处理下一条消息。
您应该使用的ensure_future
是在单独的协程中处理每个消息:
import asyncio import aioredis async def handle_msg(msg): print('Got Message:', msg) i = int(msg['sleep_for']) print('Sleep for {}'.format(i)) await asyncio.sleep(i) print('End sleep') async def reader(ch): while (await ch.wait_message()): msg = await ch.get_json() asyncio.ensure_future(handle_msg(msg)) async def main(): sub = await aioredis.create_redis(('localhost', 6379)) res = await sub.subscribe('chan:1') ch1 = res[0] tsk = await reader(ch1) loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close()