当前位置:  开发笔记 > 编程语言 > 正文

如何使用Spark DataFrames查询JSON数据列?

如何解决《如何使用SparkDataFrames查询JSON数据列?》经验,为你挑选了1个好方法。

我有一个Cassandra表,为简单起见,看起来像:

key: text
jsonData: text
blobData: blob

我可以使用spark和spark-cassandra-connector为此创建一个基本数据框:

val df = sqlContext.read
  .format("org.apache.spark.sql.cassandra")
  .options(Map("table" -> "mytable", "keyspace" -> "ks1"))
  .load()

我正在努力将JSON数据扩展到其底层结构中.我最终希望能够根据json字符串中的属性进行过滤并返回blob数据.像jsonData.foo ="bar"之类的东西并返回blobData.这目前可能吗?



1> zero323..:

Spark> = 2.4

如果需要,可以使用schema_of_json函数确定模式:

import org.apache.spark.sql.functions.{lit, schema_of_json, from_json}

val schema = schema_of_json(lit(df.select($"jsonData").as[String].first))
df.withColumn("jsonData", from_json($"jsonData", schema, Map[String, String]()))

Spark> = 2.1

你可以使用from_json功能:

import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._

val schema = StructType(Seq(
  StructField("k", StringType, true), StructField("v", DoubleType, true)
))

df.withColumn("jsonData", from_json($"jsonData", schema))

Spark> = 1.6

您可以使用get_json_object哪个采用列和路径:

import org.apache.spark.sql.functions.get_json_object

val exprs = Seq("k", "v").map(
  c => get_json_object($"jsonData", s"$$.$c").alias(c))

df.select($"*" +: exprs: _*)

并将字段提取到单个字符串,这些字符串可以进一步转换为预期类型.

path参数使用点语法$.表示,前导表示文档根(因为上面的代码使用字符串插值$必须进行转义,因此$$.).

Spark <= 1.5:

这目前可能吗?

据我所知,这不是直接可能的.你可以尝试类似的东西:

val df = sc.parallelize(Seq(
  ("1", """{"k": "foo", "v": 1.0}""", "some_other_field_1"),
  ("2", """{"k": "bar", "v": 3.0}""", "some_other_field_2")
)).toDF("key", "jsonData", "blobData")

我假设该blob字段不能用JSON表示.否则你的出租车省略拆分和加入:

import org.apache.spark.sql.Row

val blobs = df.drop("jsonData").withColumnRenamed("key", "bkey")
val jsons = sqlContext.read.json(df.drop("blobData").map{
  case Row(key: String, json: String) =>
    s"""{"key": "$key", "jsonData": $json}"""
}) 

val parsed = jsons.join(blobs, $"key" === $"bkey").drop("bkey")
parsed.printSchema

// root
//  |-- jsonData: struct (nullable = true)
//  |    |-- k: string (nullable = true)
//  |    |-- v: double (nullable = true)
//  |-- key: long (nullable = true)
//  |-- blobData: string (nullable = true)

另一种(更便宜,但更复杂)的方法是使用UDF来解析JSON并输出一个structmap列.例如这样的事情:

import net.liftweb.json.parse

case class KV(k: String, v: Int)

val parseJson = udf((s: String) => {
  implicit val formats = net.liftweb.json.DefaultFormats
  parse(s).extract[KV]
})

val parsed = df.withColumn("parsedJSON", parseJson($"jsonData"))
parsed.show

// +---+--------------------+------------------+----------+
// |key|            jsonData|          blobData|parsedJSON|
// +---+--------------------+------------------+----------+
// |  1|{"k": "foo", "v":...|some_other_field_1|   [foo,1]|
// |  2|{"k": "bar", "v":...|some_other_field_2|   [bar,3]|
// +---+--------------------+------------------+----------+

parsed.printSchema

// root
//  |-- key: string (nullable = true)
//  |-- jsonData: string (nullable = true)
//  |-- blobData: string (nullable = true)
//  |-- parsedJSON: struct (nullable = true)
//  |    |-- k: string (nullable = true)
//  |    |-- v: integer (nullable = false)

推荐阅读
手机用户2402851335
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有