当前位置:  开发笔记 > 编程语言 > 正文

Spark中的嵌套JSON

如何解决《Spark中的嵌套JSON》经验,为你挑选了1个好方法。

我将以下JSON作为DataFrame加载:

root
 |-- data: struct (nullable = true)
 |    |-- field1: string (nullable = true)
 |    |-- field2: string (nullable = true)
 |-- moreData: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- more1: string (nullable = true)
 |    |    |-- more2: string (nullable = true)
 |    |    |-- more3: string (nullable = true)

我想从这个DataFrame中获取以下RDD:

RDD[(more1, more2, more3, field1, field2)]

我怎样才能做到这一点?我想我必须使用flatMap嵌套的JSON?



1> zero323..:

结合使用explode和点语法应该可以解决问题:

import org.apache.spark.sql.functions.explode

case class Data(field1: String, field2: String)
case class MoreData(more1: String, more2: String, more3: String)

val df = sc.parallelize(Seq(
  (Data("foo", "bar"), Array(MoreData("a", "b", "c"), MoreData("d", "e", "f")))
)).toDF("data", "moreData")

df.printSchema
// root
//  |-- data: struct (nullable = true)
//  |    |-- field1: string (nullable = true)
//  |    |-- field2: string (nullable = true)
//  |-- moreData: array (nullable = true)
//  |    |-- element: struct (containsNull = true)
//  |    |    |-- more1: string (nullable = true)
//  |    |    |-- more2: string (nullable = true)
//  |    |    |-- more3: string (nullable = true)

val columns = Seq(
  $"moreData.more1", $"moreData.more2", $"moreData.more3",
  $"data.field1", $"data.field2")

val aRDD = df.withColumn("moreData", explode($"moreData"))
  .select(columns: _*)
  .rdd

aRDD.collect
// Array[org.apache.spark.sql.Row] = Array([a,b,c,foo,bar], [d,e,f,foo,bar])

根据您的要求,您可以使用map来提取行中的值:

import org.apache.spark.sql.Row

aRDD.map{case Row(m1: String, m2: String, m3: String, f1: String, f2: String) =>
  (m1, m2, m3, f1, f2)}

另请参阅使用复杂类型查询Spark SQL DataFrame

推荐阅读
kikokikolove
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有