我有一个Cassandra表,为简单起见,看起来像:
key: text jsonData: text blobData: blob
我可以使用spark和spark-cassandra-connector为此创建一个基本数据框:
val df = sqlContext.read .format("org.apache.spark.sql.cassandra") .options(Map("table" -> "mytable", "keyspace" -> "ks1")) .load()
我正在努力将JSON数据扩展到其底层结构中.我最终希望能够根据json字符串中的属性进行过滤并返回blob数据.像jsonData.foo ="bar"之类的东西并返回blobData.这目前可能吗?
Spark> = 2.4
如果需要,可以使用schema_of_json
函数确定模式:
import org.apache.spark.sql.functions.{lit, schema_of_json, from_json} val schema = schema_of_json(lit(df.select($"jsonData").as[String].first)) df.withColumn("jsonData", from_json($"jsonData", schema, Map[String, String]()))
Spark> = 2.1
你可以使用from_json
功能:
import org.apache.spark.sql.functions.from_json import org.apache.spark.sql.types._ val schema = StructType(Seq( StructField("k", StringType, true), StructField("v", DoubleType, true) )) df.withColumn("jsonData", from_json($"jsonData", schema))
Spark> = 1.6
您可以使用get_json_object
哪个采用列和路径:
import org.apache.spark.sql.functions.get_json_object val exprs = Seq("k", "v").map( c => get_json_object($"jsonData", s"$$.$c").alias(c)) df.select($"*" +: exprs: _*)
并将字段提取到单个字符串,这些字符串可以进一步转换为预期类型.
该path
参数使用点语法$.
表示,前导表示文档根(因为上面的代码使用字符串插值$
必须进行转义,因此$$.
).
Spark <= 1.5:
这目前可能吗?
据我所知,这不是直接可能的.你可以尝试类似的东西:
val df = sc.parallelize(Seq( ("1", """{"k": "foo", "v": 1.0}""", "some_other_field_1"), ("2", """{"k": "bar", "v": 3.0}""", "some_other_field_2") )).toDF("key", "jsonData", "blobData")
我假设该blob
字段不能用JSON表示.否则你的出租车省略拆分和加入:
import org.apache.spark.sql.Row val blobs = df.drop("jsonData").withColumnRenamed("key", "bkey") val jsons = sqlContext.read.json(df.drop("blobData").map{ case Row(key: String, json: String) => s"""{"key": "$key", "jsonData": $json}""" }) val parsed = jsons.join(blobs, $"key" === $"bkey").drop("bkey") parsed.printSchema // root // |-- jsonData: struct (nullable = true) // | |-- k: string (nullable = true) // | |-- v: double (nullable = true) // |-- key: long (nullable = true) // |-- blobData: string (nullable = true)
另一种(更便宜,但更复杂)的方法是使用UDF来解析JSON并输出一个struct
或map
列.例如这样的事情:
import net.liftweb.json.parse case class KV(k: String, v: Int) val parseJson = udf((s: String) => { implicit val formats = net.liftweb.json.DefaultFormats parse(s).extract[KV] }) val parsed = df.withColumn("parsedJSON", parseJson($"jsonData")) parsed.show // +---+--------------------+------------------+----------+ // |key| jsonData| blobData|parsedJSON| // +---+--------------------+------------------+----------+ // | 1|{"k": "foo", "v":...|some_other_field_1| [foo,1]| // | 2|{"k": "bar", "v":...|some_other_field_2| [bar,3]| // +---+--------------------+------------------+----------+ parsed.printSchema // root // |-- key: string (nullable = true) // |-- jsonData: string (nullable = true) // |-- blobData: string (nullable = true) // |-- parsedJSON: struct (nullable = true) // | |-- k: string (nullable = true) // | |-- v: integer (nullable = false)