当前位置:  开发笔记 > 编程语言 > 正文

简单的dask map_partitions示例

如何解决《简单的daskmap_partitions示例》经验,为你挑选了2个好方法。

我读了以下SO thead,现在我想了解它.这是我的例子:

import dask.dataframe as dd
import pandas as pd
from dask.multiprocessing import get
import random

df = pd.DataFrame({'col_1':random.sample(range(10000), 10000), 'col_2': random.sample(range(10000), 10000) })

def test_f(col_1, col_2):
    return col_1*col_2

ddf = dd.from_pandas(df, npartitions=8)

ddf['result'] = ddf.map_partitions(test_f, columns=['col_1', 'col_2']).compute(get=get)

它会在下面生成以下错误.我究竟做错了什么?另外我不清楚如何将其他参数传递给函数map_partitions

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
~\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\utils.py in raise_on_meta_error(funcname)
    136     try:
--> 137         yield
    138     except Exception as e:

~\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\core.py in _emulate(func, *args, **kwargs)
   3130     with raise_on_meta_error(funcname(func)):
-> 3131         return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
   3132 

TypeError: test_f() got an unexpected keyword argument 'columns'

During handling of the above exception, another exception occurred:

ValueError                                Traceback (most recent call last)
 in ()
----> 1 ddf['result'] = ddf.map_partitions(test_f, columns=['col_1', 'col_2']).compute(get=get)

~\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\core.py in map_partitions(self, func, *args, **kwargs)
    469         >>> ddf.map_partitions(func).clear_divisions()  # doctest: +SKIP
    470         """
--> 471         return map_partitions(func, self, *args, **kwargs)
    472 
    473     @insert_meta_param_description(pad=12)

~\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\core.py in map_partitions(func, *args, **kwargs)
   3163 
   3164     if meta is no_default:
-> 3165         meta = _emulate(func, *args, **kwargs)
   3166 
   3167     if all(isinstance(arg, Scalar) for arg in args):

~\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\core.py in _emulate(func, *args, **kwargs)
   3129     """
   3130     with raise_on_meta_error(funcname(func)):
-> 3131         return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
   3132 
   3133 

~\AppData\Local\conda\conda\envs\tensorflow\lib\contextlib.py in __exit__(self, type, value, traceback)
     75                 value = type()
     76             try:
---> 77                 self.gen.throw(type, value, traceback)
     78             except StopIteration as exc:
     79                 # Suppress StopIteration *unless* it's the same exception that

~\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\utils.py in raise_on_meta_error(funcname)
    148                ).format(" in `{0}`".format(funcname) if funcname else "",
    149                         repr(e), tb)
--> 150         raise ValueError(msg)
    151 
    152 

ValueError: Metadata inference failed in `test_f`.

Original error is below:
------------------------
TypeError("test_f() got an unexpected keyword argument 'columns'",)

Traceback:
---------
  File "C:\Users\some_user\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\utils.py", line 137, in raise_on_meta_error
    yield
  File "C:\Users\some_user\AppData\Local\conda\conda\envs\tensorflow\lib\site-packages\dask\dataframe\core.py", line 3131, in _emulate
    return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))

Primer.. 12

map_partitionsdocs中有一个例子可以实现完全正在尝试做的事情:

ddf.map_partitions(lambda df: df.assign(z=df.x * df.y))

当您调用时map_partitions(就像您调用.apply()时一样pandas.DataFrame),您尝试map(或apply)的函数将被赋予数据帧作为第一个参数.

dask.dataframe.map_partitions这种情况下,第一个参数将是一个分区,如果是pandas.DataFrame.apply- 整个数据帧.

这意味着您的函数必须接受数据帧(分区)作为第一个参数,并且在您的情况下可能如下所示:

def test_f(df, col_1, col_2):
    return df.assign(result=df[col_1] * df[col_2])

请注意,在您调用之前,会发生在这种情况下分配新列(即计划发生).compute().

在您的示例中,您在调用之后指定了列.compute(),哪种类型会失败使用dask的目的.即在你调用之后.compute(),如果有足够的空间用于那些结果,那么该操作的结果将被加载到内存中(如果不是你就得到MemoryError).

因此,对于您工作的示例,您可以:

1)使用函数(列名作为参数):

def test_f(df, col_1, col_2):
    return df.assign(result=df[col_1] * df[col_2])


ddf_out = ddf.map_partitions(test_f, 'col_1', 'col_2')

# Here is good place to do something with BIG ddf_out dataframe before calling .compute()

result = ddf_out.compute(get=get)  # Will load the whole dataframe into memory

2)使用lambda(在函数中使用硬编码的列名称):

ddf_out = ddf.map_partitions(lambda df: df.assign(result=df.col_1 * df.col_2))

# Here is good place to do something with BIG ddf_out dataframe before calling .compute()

result = ddf_out.compute(get=get)  # Will load the whole dataframe into memory

更新:

要逐行应用函数,以下是您链接的帖子的引用:

map/apply

您可以在一个系列中逐行映射函数 map

df.mycolumn.map(func)

您可以在数据框中逐行映射函数 apply

df.apply(func, axis=1)

即你的问题中的示例函数,它可能看起来像这样:

def test_f(dds, col_1, col_2):
    return dds[col_1] * dds[col_2]

由于您将逐行应用它,因此函数的第一个参数将是一个序列(即数据帧的每一行都是一个序列).

要应用此功能,您可以这样调用它:

dds_out = ddf.apply(
    test_f, 
    args=('col_1', 'col_2'), 
    axis=1, 
    meta=('result', int)
).compute(get=get)

这将返回一个名为的系列'result'.

我想你也可以.apply用一个函数调用每个分区,但它看起来不再高效,然后.apply直接调用dataframe.但可能是你的测试会证明不然.



1> Primer..:

map_partitionsdocs中有一个例子可以实现完全正在尝试做的事情:

ddf.map_partitions(lambda df: df.assign(z=df.x * df.y))

当您调用时map_partitions(就像您调用.apply()时一样pandas.DataFrame),您尝试map(或apply)的函数将被赋予数据帧作为第一个参数.

dask.dataframe.map_partitions这种情况下,第一个参数将是一个分区,如果是pandas.DataFrame.apply- 整个数据帧.

这意味着您的函数必须接受数据帧(分区)作为第一个参数,并且在您的情况下可能如下所示:

def test_f(df, col_1, col_2):
    return df.assign(result=df[col_1] * df[col_2])

请注意,在您调用之前,会发生在这种情况下分配新列(即计划发生).compute().

在您的示例中,您在调用之后指定了列.compute(),哪种类型会失败使用dask的目的.即在你调用之后.compute(),如果有足够的空间用于那些结果,那么该操作的结果将被加载到内存中(如果不是你就得到MemoryError).

因此,对于您工作的示例,您可以:

1)使用函数(列名作为参数):

def test_f(df, col_1, col_2):
    return df.assign(result=df[col_1] * df[col_2])


ddf_out = ddf.map_partitions(test_f, 'col_1', 'col_2')

# Here is good place to do something with BIG ddf_out dataframe before calling .compute()

result = ddf_out.compute(get=get)  # Will load the whole dataframe into memory

2)使用lambda(在函数中使用硬编码的列名称):

ddf_out = ddf.map_partitions(lambda df: df.assign(result=df.col_1 * df.col_2))

# Here is good place to do something with BIG ddf_out dataframe before calling .compute()

result = ddf_out.compute(get=get)  # Will load the whole dataframe into memory

更新:

要逐行应用函数,以下是您链接的帖子的引用:

map/apply

您可以在一个系列中逐行映射函数 map

df.mycolumn.map(func)

您可以在数据框中逐行映射函数 apply

df.apply(func, axis=1)

即你的问题中的示例函数,它可能看起来像这样:

def test_f(dds, col_1, col_2):
    return dds[col_1] * dds[col_2]

由于您将逐行应用它,因此函数的第一个参数将是一个序列(即数据帧的每一行都是一个序列).

要应用此功能,您可以这样调用它:

dds_out = ddf.apply(
    test_f, 
    args=('col_1', 'col_2'), 
    axis=1, 
    meta=('result', int)
).compute(get=get)

这将返回一个名为的系列'result'.

我想你也可以.apply用一个函数调用每个分区,但它看起来不再高效,然后.apply直接调用dataframe.但可能是你的测试会证明不然.



2> TomAugspurge..:

test_f有两个参数:col_1col_2。您传递了一个参数ddf

尝试类似

In [5]: dd.map_partitions(test_f, ddf['col_1'], ddf['col_2'])
Out[5]:
Dask Series Structure:
npartitions=8
0       int64
1250      ...
        ...
8750      ...
9999      ...
dtype: int64
Dask Name: test_f, 32 tasks


您可以通过在上述语句中添加`meta ='dtype'`参数来解决“ ValueError:元数据推断失败。”。其中dtype是您期望的数据类型。
我只是尝试过,所以不起作用。ValueError:元数据推理在test_f中失败。
推荐阅读
雨天是最美
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有