我有一个带有架构的数据帧:
[visitorId: string, trackingIds: array, emailIds: array ]
正在寻找一种方法来分组(或者可能汇总?)由visitorid组成的数据帧,其中的trackingIds和emailIds列将一起追加.所以例如,如果我的初始df看起来像:
visitorId |trackingIds|emailIds +-----------+------------+-------- |a158| [666b] | [12] |7g21| [c0b5] | [45] |7g21| [c0b4] | [87] |a158| [666b, 777c]| []
我希望我的输出df看起来像这样
visitorId |trackingIds|emailIds +-----------+------------+-------- |a158| [666b,666b,777c]| [12,''] |7g21| [c0b5,c0b4] | [45, 87]
试图使用groupBy
和agg
运营商但没有太多运气.
Spark> = 2.4
您可以flatten
udf
使用内置功能进行更换flatten
import org.apache.spark.sql.functions.flatten
剩下的就是原样.
Spark> = 2.0,<2.4
它可能但非常昂贵.使用您提供的数据:
case class Record( visitorId: String, trackingIds: Array[String], emailIds: Array[String]) val df = Seq( Record("a158", Array("666b"), Array("12")), Record("7g21", Array("c0b5"), Array("45")), Record("7g21", Array("c0b4"), Array("87")), Record("a158", Array("666b", "777c"), Array.empty[String])).toDF
和辅助函数:
import org.apache.spark.sql.functions.udf val flatten = udf((xs: Seq[Seq[String]]) => xs.flatten)
我们可以用占位符填充空白:
import org.apache.spark.sql.functions.{array, lit, when} val dfWithPlaceholders = df.withColumn( "emailIds", when(size($"emailIds") === 0, array(lit(""))).otherwise($"emailIds"))
collect_lists
并且flatten
:
import org.apache.spark.sql.functions.{array, collect_list} val emailIds = flatten(collect_list($"emailIds")).alias("emailIds") val trackingIds = flatten(collect_list($"trackingIds")).alias("trackingIds") df .groupBy($"visitorId") .agg(trackingIds, emailIds) // +---------+------------------+--------+ // |visitorId| trackingIds|emailIds| // +---------+------------------+--------+ // | a158|[666b, 666b, 777c]| [12, ]| // | 7g21| [c0b5, c0b4]|[45, 87]| // +---------+------------------+--------+
使用静态类型Dataset
:
df.as[Record] .groupByKey(_.visitorId) .mapGroups { case (key, vs) => vs.map(v => (v.trackingIds, v.emailIds)).toArray.unzip match { case (trackingIds, emailIds) => Record(key, trackingIds.flatten, emailIds.flatten) }} // +---------+------------------+--------+ // |visitorId| trackingIds|emailIds| // +---------+------------------+--------+ // | a158|[666b, 666b, 777c]| [12, ]| // | 7g21| [c0b5, c0b4]|[45, 87]| // +---------+------------------+--------+
Spark 1.x
您可以转换为RDD和组
import org.apache.spark.sql.Row dfWithPlaceholders.rdd .map { case Row(id: String, trcks: Seq[String @ unchecked], emails: Seq[String @ unchecked]) => (id, (trcks, emails)) } .groupByKey .map {case (key, vs) => vs.toArray.unzip match { case (trackingIds, emailIds) => Record(key, trackingIds.flatten, emailIds.flatten) }} .toDF // +---------+------------------+--------+ // |visitorId| trackingIds|emailIds| // +---------+------------------+--------+ // | 7g21| [c0b5, c0b4]|[45, 87]| // | a158|[666b, 666b, 777c]| [12, ]| // +---------+------------------+--------+
@ zero323的答案非常完整,但Spark为我们提供了更大的灵活性.以下解决方案怎么样?
import org.apache.spark.sql.functions._ inventory .select($"*", explode($"trackingIds") as "tracking_id") .select($"*", explode($"emailIds") as "email_id") .groupBy("visitorId") .agg( collect_list("tracking_id") as "trackingIds", collect_list("email_id") as "emailIds")
然而,这留下了所有空集合(所以有一些改进的余地:))