我有一个python脚本,它执行以下操作:i.它接受数据的输入文件(通常是嵌套的JSON格式)ii.将数据逐行传递给另一个函数,该函数将数据处理成所需的格式iii.最后它将输出写入文件.
这是我目前的简单python线,这样做......
def manipulate(line): # a pure python function which transforms the data # ... return manipulated_json for line in f: components.append(manipulate(ujson.loads(line))) write_to_csv(components)`
这有效,但是python GIL将它限制在服务器上的一个核心,它的速度非常慢,特别是对于大量数据.
我通常处理的数据量约为4 gig gzip压缩,但偶尔我必须处理数百gig gzip压缩的数据.它不是必需的大数据,但仍无法在内存中进行处理,并且Python的GIL处理速度非常慢.
在寻找优化数据处理的解决方案时,我遇到了dask.虽然PySpark在当时似乎是我的明显解决方案,但是dask的承诺和它的简单性让我受益匪浅,我决定尝试一下.
经过对dask的大量研究以及如何使用它,我整理了一个非常小的脚本来复制我当前的过程.该脚本如下所示:
import dask.bag as bag import json bag.from_filenames('input.json.gz').map(json.loads).map(lambda x:manipulate(x)).concat().to_dataframe().to_csv('output.csv.gz')`
这工作并产生与原始非dask脚本相同的结果,但它仍然只在服务器上使用一个CPU.所以,它根本没有帮助.事实上,它的速度较慢.
我究竟做错了什么?我错过了什么吗?我仍然相当新闻,所以如果我忽略了某些事情或者我应该做一些完全不同的事情,请告诉我.
另外,是否有任何替代方法可以使用服务器的全部容量(即所有CPU)来完成我需要做的事情?
谢谢,
Ť