我正在尝试读取和处理1000个文件,但不幸的是,处理文件的时间大约是从磁盘读取文件的3倍,因此我希望在读入时处理这些文件(当我在我继续阅读其他文件).
在一个完美的世界中,我有一个一次读取一个文件的生成器,我想将这个生成器传递给一个工作池,这些工作器在(缓慢)生成时处理来自生成器的项目.
这是一个例子:
def process_file(file_string): ... return processed_file pool = Pool(processes=4) path = 'some/path/' results = pool.map(process_file, (open(path+part,'rb').read() for part in os.listdir(path)))
上面代码的唯一问题是在池开始之前所有文件都被读入内存,这意味着我需要等待磁盘读取所有内容,并且还消耗大量内存.
Pool.map
和Pool.map_async
list
IFY的iterable
传递给他们,让您的发电机总是会实现的处理,甚至开始前充分.
各种Pool.imap*
函数似乎将输入作为生成器处理,因此您可以更改:
results = pool.map(process_file, (open(path+part,'rb').read() for part in os.listdir(path)))
至:
# If you can process outputs one at a time, drop the list wrapper # If you can process outputs without order mattering, imap_unordered will # get you the best results results = list(pool.imap(process_file, (open(path+part,'rb').read() for part in os.listdir(path))))
并且在处理之前得到相同的结果而不是啜饮,但是AFAICT,他们仍然会尽可能快地完全填充队列,这可能会导致大量数据未完成并且内存使用过多; 除此之外,您将在一个进程中读取所有数据,然后通过IPC发送所有数据,这意味着您仍然主要是I/O上的瓶颈.
在你的位置,我将读取移动到任务本身(如果可以,请避免读取整个文件,按行或按块处理,而不是一次读取整个文件).你可以获得并行读取,减少IPC,并且在前几个文件被处理之前你不会冒险啜饮所有文件; 你永远不会有比工人更多的文件.所以最终结果如下:
def process_file(path): with open(path, 'rb') as f: file_string = f.read() ... same as before ... return processed_file pool = Pool(processes=4) path = 'some/path/' results = pool.imap(process_file, (os.path.join(path, part) for part in os.listdir(path)))