我尝试根据DataFrame 在分区列中具有N
(假设N=3
)不同值的列重新分区DataFrame x
,例如:
val myDF = sc.parallelize(Seq(1,1,2,2,3,3)).toDF("x") // create dummy data
我想实现的是repartiton myDF
通过x
不产生空分区.有没有比这样做更好的方法?
val numParts = myDF.select($"x").distinct().count.toInt myDF.repartition(numParts,$"x")
(如果我没有指定numParts
的repartiton
,大多数我的分区是空的(如repartition
创建200个分区)...)
我想到了迭代df
分区并在其中获取记录计数以找到非空分区的解决方案.
val nonEmptyPart = sparkContext.longAccumulator("nonEmptyPart") df.foreachPartition(partition => if (partition.length > 0) nonEmptyPart.add(1))
当我们得到非空分区(nonEmptyPart
)时,我们可以使用coalesce()
(检查coalesce()vs reparation())清理空分区.
val finalDf = df.coalesce(nonEmptyPart.value.toInt) //coalesce() accepts only Int type
它可能是也可能不是最好的,但是这个解决方案将避免因我们不使用而改组repartition()
val df1 = sc.parallelize(Seq(1, 1, 2, 2, 3, 3)).toDF("x").repartition($"x") val nonEmptyPart = sc.longAccumulator("nonEmptyPart") df1.foreachPartition(partition => if (partition.length > 0) nonEmptyPart.add(1)) val finalDf = df1.coalesce(nonEmptyPart.value.toInt) println(s"nonEmptyPart => ${nonEmptyPart.value.toInt}") println(s"df.rdd.partitions.length => ${df1.rdd.partitions.length}") println(s"finalDf.rdd.partitions.length => ${finalDf.rdd.partitions.length}")
产量
nonEmptyPart => 3 df.rdd.partitions.length => 200 finalDf.rdd.partitions.length => 3