以下将与Spark 2.0一起使用.您可以使用自2.0版本以来可用的映射函数将列作为Map.
val df1 = df.groupBy(col("school_name")).agg(collect_list(map($"name",$"age")) as "map") df1.show(false)
这将给你低于输出.
+-----------+------------------------------------+ |school_name|map | +-----------+------------------------------------+ |school B |[Map(cathy -> 10), Map(shaun -> 5)] | |school A |[Map(michael -> 7), Map(emily -> 5)]| +-----------+------------------------------------+
现在,您可以使用UDF
将单个地图加入单个地图,如下所示.
import org.apache.spark.sql.functions.udf val joinMap = udf { values: Seq[Map[String,Int]] => values.flatten.toMap } val df2 = df1.withColumn("map", joinMap(col("map"))) df2.show(false)
这将提供所需的输出Map[String,Int]
.
+-----------+-----------------------------+ |school_name|map | +-----------+-----------------------------+ |school B |Map(cathy -> 10, shaun -> 5) | |school A |Map(michael -> 7, emily -> 5)| +-----------+-----------------------------+
如果要将列值转换为JSON String,则Spark 2.1.0引入了to_json函数.
val df3 = df2.withColumn("map",to_json(struct($"map"))) df3.show(false)
该to_json
函数将返回以下输出.
+-----------+-------------------------------+ |school_name|map | +-----------+-------------------------------+ |school B |{"map":{"cathy":10,"shaun":5}} | |school A |{"map":{"michael":7,"emily":5}}| +-----------+-------------------------------+