如果我们有一个由一列类别和一列值组成的Pandas数据框,我们可以通过执行以下操作删除每个类别中的均值:
df["DemeanedValues"] = df.groupby("Category")["Values"].transform(lambda g: g - numpy.mean(g))
据我所知,Spark数据帧不直接提供这种分组/转换操作(我在Spark 1.5.0上使用PySpark).那么,实现这种计算的最佳方法是什么?
我尝试过如下使用group-by/join:
df2 = df.groupBy("Category").mean("Values") df3 = df2.join(df)
但它很慢,因为据我所知,每个类别都需要对DataFrame进行全面扫描.
我认为(但尚未验证)如果我将group-by/mean的结果收集到字典中,然后在UDF中使用该字典,我可以加快速度,如下所示:
nameToMean = {...} f = lambda category, value: value - nameToMean[category] categoryDemeaned = pyspark.sql.functions.udf(f, pyspark.sql.types.DoubleType()) df = df.withColumn("DemeanedValue", categoryDemeaned(df.Category, df.Value))
在不牺牲性能的情况下,是否存在表达此类操作的惯用方法?
据我所知,每个类别都需要对DataFrame进行全面扫描.
不,不.DataFrame聚合使用类似于的逻辑执行aggregateByKey
.请参阅DataFrame groupBy行为/优化较慢的部分join
需要排序/改组.但它仍然不需要每组扫描.
如果这是一个确切的代码,你使用它很慢,因为你没有提供连接表达式.因此,它只是执行笛卡尔积.所以它不仅效率低下而且不正确.你想要这样的东西:
from pyspark.sql.functions import col means = df.groupBy("Category").mean("Values").alias("means") df.alias("df").join(means, col("df.Category") == col("means.Category"))
我认为(但尚未验证)如果我将group-by/mean的结果收集到字典中,然后在UDF中使用该字典,我可以加快速度
虽然性能会因具体情况而有所不同,但仍有可能.使用Python UDF的一个问题是它必须将数据移入和移出Python.不过,这绝对值得一试.你应该考虑使用广播变量nameToMean
.
在不牺牲性能的情况下,是否存在表达此类操作的惯用方法?
在PySpark 1.6中你可以使用broadcast
函数:
df.alias("df").join( broadcast(means), col("df.Category") == col("means.Category"))
但它在<= 1.5时不可用.