我正在使用Google App Engine(Python)实时处理一些事件消息.总之,我有100多个任务,我需要在消息进入时快速运行.我尝试了一些方法(延迟库,线程),我认为最好的解决方案是使用任务队列并异步将这些任务添加到队列中我想要.这是我正在做的一个例子.
tasks = [] task = Task(url=url_for('main.endpoints_worker'),params={'id': id}) tasks.append(task.add_async(queue_name='event-message')) for task in tasks: task.get_result()
当我这样做时,我的大部分时间都花在将这些任务添加到队列中.有没有办法加快速度?有更好的方法吗?
说实话,每次运行时都会遇到很多不同的时间.有时我大约100ms(这可能会很好),但有时我大约1s.
我本以为扩展工作会更快,但批量添加到任务队列执行.以下是我所看到的建议方法:
tasks = [Task(url=url_for('main.endpoints_worker'),params={'id': id}) for id in id_list] rpc = Queue('event-message').add_async(tasks) rpc.get_result()
更新:由于添加到队列时100任务限制,我需要再次检查此问题. 通过批量创建我的任务(100个组),我已经大大提高了代码的吞吐量,但我仍然不明白为什么将多组任务添加到队列这么快就会减慢速度.一个任务queue.add_async运行<40毫秒没问题.当我做2个或更多queue.add_async时,那个时间变慢了.我很想知道为什么?另外我怎么解决这个问题?
当我在没有异步的情况下添加批量任务时,每个任务都需要<40ms.为什么在使用异步时它们需要更长的时间?
另一个更新我认为问题可能与争用相关,但即使我将这些任务中的每一个添加到不同的队列,我也会得到相同的结果.
您可以通过批量排队来节省大量时间.以下内容适用于您:
tasks = [Task(url=url_for('main.endpoints_worker'),params={'id': id}) for id in id_list] rpc = Queue('event-message').add_async(tasks) rpc.wait()
请注意,您无法使用延迟库批量提交任务.