当前位置:  开发笔记 > 编程语言 > 正文

在Apache Spark中删除空的DataFrame分区

如何解决《在ApacheSpark中删除空的DataFrame分区》经验,为你挑选了1个好方法。

我尝试根据DataFrame 在分区列中具有N(假设N=3)不同值的列重新分区DataFrame x,例如:

val myDF = sc.parallelize(Seq(1,1,2,2,3,3)).toDF("x") // create dummy data

我想实现的是repartiton myDF通过x不产生空分区.有没有比这样做更好的方法?

val numParts = myDF.select($"x").distinct().count.toInt
myDF.repartition(numParts,$"x")

(如果我没有指定numPartsrepartiton,大多数我的分区是空的(如repartition创建200个分区)...)



1> mrsrinivas..:

我想到了迭代df分区并在其中获取记录计数以找到非空分区的解决方案.

val nonEmptyPart = sparkContext.longAccumulator("nonEmptyPart") 

df.foreachPartition(partition =>
  if (partition.length > 0) nonEmptyPart.add(1))

当我们得到非空分区(nonEmptyPart)时,我们可以使用coalesce()(检查coalesce()vs reparation())清理空分区.

val finalDf = df.coalesce(nonEmptyPart.value.toInt) //coalesce() accepts only Int type

它可能是也可能不是最好的,但是这个解决方案将避免因我们不使用而改组repartition()


解决评论的示例
val df1 = sc.parallelize(Seq(1, 1, 2, 2, 3, 3)).toDF("x").repartition($"x")
val nonEmptyPart = sc.longAccumulator("nonEmptyPart")

df1.foreachPartition(partition =>
  if (partition.length > 0) nonEmptyPart.add(1))

val finalDf = df1.coalesce(nonEmptyPart.value.toInt)

println(s"nonEmptyPart => ${nonEmptyPart.value.toInt}")
println(s"df.rdd.partitions.length => ${df1.rdd.partitions.length}")
println(s"finalDf.rdd.partitions.length => ${finalDf.rdd.partitions.length}")

产量

nonEmptyPart => 3
df.rdd.partitions.length => 200
finalDf.rdd.partitions.length => 3

推荐阅读
重庆制造漫画社
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有