当前位置:  开发笔记 > 编程语言 > 正文

SPARK DataFrame:如何基于相同的列值为每个组有效地拆分数据框

如何解决《SPARKDataFrame:如何基于相同的列值为每个组有效地拆分数据框》经验,为你挑选了1个好方法。

我有一个生成的DataFrame,如下所示:

df.groupBy($"Hour", $"Category")
  .agg(sum($"value").alias("TotalValue"))
  .sort($"Hour".asc,$"TotalValue".desc))

结果如下:

+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
|   0|   cat26|      30.9|
|   0|   cat13|      22.1|
|   0|   cat95|      19.6|
|   0|  cat105|       1.3|
|   1|   cat67|      28.5|
|   1|    cat4|      26.8|
|   1|   cat13|      12.6|
|   1|   cat23|       5.3|
|   2|   cat56|      39.6|
|   2|   cat40|      29.7|
|   2|  cat187|      27.9|
|   2|   cat68|       9.8|
|   3|    cat8|      35.6|
| ...|    ....|      ....|
+----+--------+----------+

我想根据每个独特的价值作出新的dataframes col("Hour"),即

对于小时== 0的组

对于小时== 1的组

对于小时== 2的组,依此类推...

因此,所需的输出将是:

df0 as:

+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
|   0|   cat26|      30.9|
|   0|   cat13|      22.1|
|   0|   cat95|      19.6|
|   0|  cat105|       1.3|
+----+--------+----------+

df1 as:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
|   1|   cat67|      28.5|
|   1|    cat4|      26.8|
|   1|   cat13|      12.6|
|   1|   cat23|       5.3|
+----+--------+----------+

同样

df2 as:

+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
|   2|   cat56|      39.6|
|   2|   cat40|      29.7|
|   2|  cat187|      27.9|
|   2|   cat68|       9.8|
+----+--------+----------+

非常感谢您的帮助。

编辑1:

我尝试过的

df.foreach(
  row => splitHour(row)
  )

def splitHour(row: Row) ={
    val Hour=row.getAs[Long]("Hour")

    val HourDF= sparkSession.createDataFrame(List((s"$Hour",1)))

    val hdf=HourDF.withColumnRenamed("_1","Hour_unique").drop("_2")

    val mydf: DataFrame =df.join(hdf,df("Hour")===hdf("Hour_unique"))

    mydf.write.mode("overwrite").parquet(s"/home/dev/shaishave/etc/myparquet/$Hour/")
  }

此策略的问题:

当它在df具有超过一百万行的数据帧上运行时,花了8个小时,并且在单个节点上为大约10 GB的RAM提供了火花作业。因此,join事实证明效率很低。

注意事项:我必须将每个数据帧编写mydf为镶木地板,该镶木具有需要维护(而不是扁平化)的嵌套架构。



1> Denny Lee..:

如我的评论中所述,解决该问题的一种可能的简便方法是使用:

df.write.partitionBy("hour").saveAsTable("myparquet")

如前所述,文件夹结构将是myparquet/hour=1myparquet/hour=2......,myparquet/hour=24而不是myparquet/1myparquet/2......, myparquet/24

要更改文件夹结构,您可以

    可能hcat.dynamic.partitioning.custom.pattern在显式的HiveContext中使用Hive配置设置;更多信息,请参见HCatalog DynamicPartitions。

    另一种方法是在执行df.write.partitionBy.saveAsTable(...)命令后直接更改文件系统,例如从文件夹名称中for f in *; do mv $f ${f/${f:0:5}/} ; done删除Hour=文本。

重要的是要注意,通过更改文件夹的命名模式,当您spark.read.parquet(...)在该文件夹中运行时,Spark不会自动了解动态分区,因为它缺少partitionKey(即Hour)信息。

推荐阅读
有风吹过best
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有