我读了以下SO thead,现在我想了解它.这是我的例子:
import dask.dataframe as dd import pandas as pd from dask.multiprocessing import get import random df = pd.DataFrame({'col_1':random.sample(range(10000), 10000), 'col_2': random.sample(range(10000), 10000) }) def test_f(col_1, col_2): return col_1*col_2 ddf = dd.from_pandas(df, npartitions=8) ddf['result'] = ddf.map_partitions(test_f, columns=['col_1', 'col_2']).compute(get=get)
它会在下面生成以下错误.我究竟做错了什么?另外我不清楚如何将其他参数传递给函数map_partitions
?
--------------------------------------------------------------------------- TypeError Traceback (most recent call last) ~\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\utils.py in raise_on_meta_error(funcname) 136 try: --> 137 yield 138 except Exception as e: ~\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\core.py in _emulate(func, *args, **kwargs) 3130 with raise_on_meta_error(funcname(func)): -> 3131 return func(*_extract_meta(args, True), **_extract_meta(kwargs, True)) 3132 TypeError: test_f() got an unexpected keyword argument 'columns' During handling of the above exception, another exception occurred: ValueError Traceback (most recent call last)in () ----> 1 ddf['result'] = ddf.map_partitions(test_f, columns=['col_1', 'col_2']).compute(get=get) ~\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\core.py in map_partitions(self, func, *args, **kwargs) 469 >>> ddf.map_partitions(func).clear_divisions() # doctest: +SKIP 470 """ --> 471 return map_partitions(func, self, *args, **kwargs) 472 473 @insert_meta_param_description(pad=12) ~\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\core.py in map_partitions(func, *args, **kwargs) 3163 3164 if meta is no_default: -> 3165 meta = _emulate(func, *args, **kwargs) 3166 3167 if all(isinstance(arg, Scalar) for arg in args): ~\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\core.py in _emulate(func, *args, **kwargs) 3129 """ 3130 with raise_on_meta_error(funcname(func)): -> 3131 return func(*_extract_meta(args, True), **_extract_meta(kwargs, True)) 3132 3133 ~\AppData\Local\conda\conda\envs\tensorflow\lib\contextlib.py in __exit__(self, type, value, traceback) 75 value = type() 76 try: ---> 77 self.gen.throw(type, value, traceback) 78 except StopIteration as exc: 79 # Suppress StopIteration *unless* it's the same exception that ~\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\utils.py in raise_on_meta_error(funcname) 148 ).format(" in `{0}`".format(funcname) if funcname else "", 149 repr(e), tb) --> 150 raise ValueError(msg) 151 152 ValueError: Metadata inference failed in `test_f`. Original error is below: ------------------------ TypeError("test_f() got an unexpected keyword argument 'columns'",) Traceback: --------- File "C:\Users\some_user\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\utils.py", line 137, in raise_on_meta_error yield File "C:\Users\some_user\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\core.py", line 3131, in _emulate return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
Primer.. 12
在map_partitions
docs中有一个例子可以实现完全正在尝试做的事情:
ddf.map_partitions(lambda df: df.assign(z=df.x * df.y))
当您调用时map_partitions
(就像您调用.apply()
时一样pandas.DataFrame
),您尝试map
(或apply
)的函数将被赋予数据帧作为第一个参数.
在dask.dataframe.map_partitions
这种情况下,第一个参数将是一个分区,如果是pandas.DataFrame.apply
- 整个数据帧.
这意味着您的函数必须接受数据帧(分区)作为第一个参数,并且在您的情况下可能如下所示:
def test_f(df, col_1, col_2): return df.assign(result=df[col_1] * df[col_2])
请注意,在您调用之前,会发生在这种情况下分配新列(即计划发生).compute()
.
在您的示例中,您在调用之后指定了列.compute()
,哪种类型会失败使用dask的目的.即在你调用之后.compute()
,如果有足够的空间用于那些结果,那么该操作的结果将被加载到内存中(如果不是你就得到MemoryError
).
因此,对于您工作的示例,您可以:
1)使用函数(列名作为参数):
def test_f(df, col_1, col_2): return df.assign(result=df[col_1] * df[col_2]) ddf_out = ddf.map_partitions(test_f, 'col_1', 'col_2') # Here is good place to do something with BIG ddf_out dataframe before calling .compute() result = ddf_out.compute(get=get) # Will load the whole dataframe into memory
2)使用lambda
(在函数中使用硬编码的列名称):
ddf_out = ddf.map_partitions(lambda df: df.assign(result=df.col_1 * df.col_2)) # Here is good place to do something with BIG ddf_out dataframe before calling .compute() result = ddf_out.compute(get=get) # Will load the whole dataframe into memory
更新:
要逐行应用函数,以下是您链接的帖子的引用:
map
/apply
您可以在一个系列中逐行映射函数
map
df.mycolumn.map(func)您可以在数据框中逐行映射函数
apply
df.apply(func, axis=1)
即你的问题中的示例函数,它可能看起来像这样:
def test_f(dds, col_1, col_2): return dds[col_1] * dds[col_2]
由于您将逐行应用它,因此函数的第一个参数将是一个序列(即数据帧的每一行都是一个序列).
要应用此功能,您可以这样调用它:
dds_out = ddf.apply( test_f, args=('col_1', 'col_2'), axis=1, meta=('result', int) ).compute(get=get)
这将返回一个名为的系列'result'
.
我想你也可以.apply
用一个函数调用每个分区,但它看起来不再高效,然后.apply
直接调用dataframe.但可能是你的测试会证明不然.
在map_partitions
docs中有一个例子可以实现完全正在尝试做的事情:
ddf.map_partitions(lambda df: df.assign(z=df.x * df.y))
当您调用时map_partitions
(就像您调用.apply()
时一样pandas.DataFrame
),您尝试map
(或apply
)的函数将被赋予数据帧作为第一个参数.
在dask.dataframe.map_partitions
这种情况下,第一个参数将是一个分区,如果是pandas.DataFrame.apply
- 整个数据帧.
这意味着您的函数必须接受数据帧(分区)作为第一个参数,并且在您的情况下可能如下所示:
def test_f(df, col_1, col_2): return df.assign(result=df[col_1] * df[col_2])
请注意,在您调用之前,会发生在这种情况下分配新列(即计划发生).compute()
.
在您的示例中,您在调用之后指定了列.compute()
,哪种类型会失败使用dask的目的.即在你调用之后.compute()
,如果有足够的空间用于那些结果,那么该操作的结果将被加载到内存中(如果不是你就得到MemoryError
).
因此,对于您工作的示例,您可以:
1)使用函数(列名作为参数):
def test_f(df, col_1, col_2): return df.assign(result=df[col_1] * df[col_2]) ddf_out = ddf.map_partitions(test_f, 'col_1', 'col_2') # Here is good place to do something with BIG ddf_out dataframe before calling .compute() result = ddf_out.compute(get=get) # Will load the whole dataframe into memory
2)使用lambda
(在函数中使用硬编码的列名称):
ddf_out = ddf.map_partitions(lambda df: df.assign(result=df.col_1 * df.col_2)) # Here is good place to do something with BIG ddf_out dataframe before calling .compute() result = ddf_out.compute(get=get) # Will load the whole dataframe into memory
更新:
要逐行应用函数,以下是您链接的帖子的引用:
map
/apply
您可以在一个系列中逐行映射函数
map
df.mycolumn.map(func)您可以在数据框中逐行映射函数
apply
df.apply(func, axis=1)
即你的问题中的示例函数,它可能看起来像这样:
def test_f(dds, col_1, col_2): return dds[col_1] * dds[col_2]
由于您将逐行应用它,因此函数的第一个参数将是一个序列(即数据帧的每一行都是一个序列).
要应用此功能,您可以这样调用它:
dds_out = ddf.apply( test_f, args=('col_1', 'col_2'), axis=1, meta=('result', int) ).compute(get=get)
这将返回一个名为的系列'result'
.
我想你也可以.apply
用一个函数调用每个分区,但它看起来不再高效,然后.apply
直接调用dataframe.但可能是你的测试会证明不然.
您test_f
有两个参数:col_1
和col_2
。您传递了一个参数ddf
。
尝试类似
In [5]: dd.map_partitions(test_f, ddf['col_1'], ddf['col_2']) Out[5]: Dask Series Structure: npartitions=8 0 int64 1250 ... ... 8750 ... 9999 ... dtype: int64 Dask Name: test_f, 32 tasks