我将以下JSON作为DataFrame加载:
root
|-- data: struct (nullable = true)
| |-- field1: string (nullable = true)
| |-- field2: string (nullable = true)
|-- moreData: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- more1: string (nullable = true)
| | |-- more2: string (nullable = true)
| | |-- more3: string (nullable = true)
我想从这个DataFrame中获取以下RDD:
RDD[(more1, more2, more3, field1, field2)]
我怎样才能做到这一点?我想我必须使用flatMap
嵌套的JSON?
结合使用explode
和点语法应该可以解决问题:
import org.apache.spark.sql.functions.explode case class Data(field1: String, field2: String) case class MoreData(more1: String, more2: String, more3: String) val df = sc.parallelize(Seq( (Data("foo", "bar"), Array(MoreData("a", "b", "c"), MoreData("d", "e", "f"))) )).toDF("data", "moreData") df.printSchema // root // |-- data: struct (nullable = true) // | |-- field1: string (nullable = true) // | |-- field2: string (nullable = true) // |-- moreData: array (nullable = true) // | |-- element: struct (containsNull = true) // | | |-- more1: string (nullable = true) // | | |-- more2: string (nullable = true) // | | |-- more3: string (nullable = true) val columns = Seq( $"moreData.more1", $"moreData.more2", $"moreData.more3", $"data.field1", $"data.field2") val aRDD = df.withColumn("moreData", explode($"moreData")) .select(columns: _*) .rdd aRDD.collect // Array[org.apache.spark.sql.Row] = Array([a,b,c,foo,bar], [d,e,f,foo,bar])
根据您的要求,您可以使用map来提取行中的值:
import org.apache.spark.sql.Row aRDD.map{case Row(m1: String, m2: String, m3: String, f1: String, f2: String) => (m1, m2, m3, f1, f2)}
另请参阅使用复杂类型查询Spark SQL DataFrame