使用Python 3的concurrent.futures
模块进行并行工作相当容易,如下所示.
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: future_to = {executor.submit(do_work, input, 60): input for input in dictionary} for future in concurrent.futures.as_completed(future_to): data = future.result()
将项目插入和检索到队列中也非常方便.
q = queue.Queue() for task in tasks: q.put(task) while not q.empty(): q.get()
我有一个在后台运行的脚本,用于监听更新.现在,理论上假设,当这些更新到达时,我会将它们排队并使用它们同时处理它们ThreadPoolExecutor
.
现在,单独地说,所有这些组件都是孤立的,并且有意义,但我如何一起使用它们呢?我不知道是否可以ThreadPoolExecutor
实时从队列中提取工作,除非预先确定的数据是什么?
简而言之,我想做的就是,每秒接收4条消息的更新,将它们推入队列,然后让我的concurrent.futures对它们进行处理.如果我不这样做,那么我会陷入一种缓慢的顺序方法.
我们来看下面Python文档中的规范示例:
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() except Exception as exc: print('%r generated an exception: %s' % (url, exc)) else: print('%r page is %d bytes' % (url, len(data)))
列表URLS
是固定的.是否有可能实时提供此列表并让工作人员在它们来时处理它,可能是为了管理目的从队列中进行处理?我对我的方法是否真的有可能感到困惑?
Python文档中的示例,扩展为从队列中获取其工作.需要注意的是,此代码使用concurrent.futures.wait
而不是concurrent.futures.as_completed
允许在等待其他工作完成时启动新工作.
import concurrent.futures import urllib.request import time import queue q = queue.Queue() URLS = ['http://www.foxnews.com/', 'http://www.cnn.com/', 'http://europe.wsj.com/', 'http://www.bbc.co.uk/', 'http://some-made-up-domain.com/'] def feed_the_workers(spacing): """ Simulate outside actors sending in work to do, request each url twice """ for url in URLS + URLS: time.sleep(spacing) q.put(url) return "DONE FEEDING" def load_url(url, timeout): """ Retrieve a single page and report the URL and contents """ with urllib.request.urlopen(url, timeout=timeout) as conn: return conn.read() # We can use a with statement to ensure threads are cleaned up promptly with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: # start a future for a thread which sends work in through the queue future_to_url = { executor.submit(feed_the_workers, 0.25): 'FEEDER DONE'} while future_to_url: # check for status of the futures which are currently working done, not_done = concurrent.futures.wait( future_to_url, timeout=0.25, return_when=concurrent.futures.FIRST_COMPLETED) # if there is incoming work, start a new future while not q.empty(): # fetch a url from the queue url = q.get() # Start the load operation and mark the future with its URL future_to_url[executor.submit(load_url, url, 60)] = url # process any completed futures for future in done: url = future_to_url[future] try: data = future.result() except Exception as exc: print('%r generated an exception: %s' % (url, exc)) else: if url == 'FEEDER DONE': print(data) else: print('%r page is %d bytes' % (url, len(data))) # remove the now completed future del future_to_url[future]
每次取url
两次输出:
'http://www.foxnews.com/' page is 67574 bytes 'http://www.cnn.com/' page is 136975 bytes 'http://www.bbc.co.uk/' page is 193780 bytes 'http://some-made-up-domain.com/' page is 896 bytes 'http://www.foxnews.com/' page is 67574 bytes 'http://www.cnn.com/' page is 136975 bytes DONE FEEDING 'http://www.bbc.co.uk/' page is 193605 bytes 'http://some-made-up-domain.com/' page is 896 bytes 'http://europe.wsj.com/' page is 874649 bytes 'http://europe.wsj.com/' page is 874649 bytes