我有UDF
处理JSON并返回每行的动态数据结果.在我的情况下,我需要这个来验证数据并返回验证数据.
架构对于每一行都是灵活的.这意味着我无法case class
为每种情况创建(我的一些数据可以嵌套).
我试图从我的UDF函数返回元组,但我也没有运气(因为我需要从列表转换为元组),我没有找到一个优雅的解决方案.
我正在返回的数据类型String
,Integer
,Double
,DateTime
,在不同的顺序.
我曾试图map
在DataFrame 上使用,但我的架构有问题.
import spark.implicits._ def processData(row_type: String) = { /* completely random output here. Tuple/List/Array of elements with a type Integer, String, Double, DateType. */ // pseudo-code starts here if row_type == A (1, "second", 3) else (1, "second", 3, 4) } val processDataUDF = udf((row_type: String) => processData(row_type)) val df = Seq((0, 1), (1, 2)).toDF("a", "b") val df2 = df.select(processDataUDF($"a")) df2.show(5) df2.printSchema()
结果
+------------+ | UDF(a)| +------------+ |[1,second,3]| |[1,second,3]| +------------+
我该如何处理这个问题?我们有不同的处理结果row_type
.所有row_type
的都是动态设置的.我可以很好地Schema
为每个人row_type
,但我不能使用不同的模式返回相同的UDF返回结果.
使用map
是唯一的方法吗?
Spark Dataset
是一个柱状数据结构,这里没有灵活架构的地方.Schema必须是同类的(所有行必须具有相同的通用结构)并且已知为upfront(如果使用UDF,则必须返回定义良好的SQL类型).
您可以通过以下方式实现一些灵
定义表示所有可能字段的超集的模式,并将各列标记为nullable
.只有在没有类型冲突的情况下才有可能(如果Row
包含字段foo
,则始终使用相同的SQL类型表示).
使用集合类型(MapType
,ArrayType
)表示可变大小的字段.所有值和/或键必须属于同一类型.
将原始数据重塑为可通过固定架构实际表示的点.Spark包含作为其依赖项,json4s
它提供了一组用于合并,区分和查询 JSON数据的工具.如果需要,它可用于应用相对复杂的转换.
如果这不实用,我建议保持JSON字段"按原样"并仅按需解析它以提取特定值.您可以使用get_json_object
和显式类型转换.这允许测试不同的场景:
coalesce(Seq("$.bar", "$.foo.bar", "$.foobar.foo.bar") .map(get_json_object($"json_col", _)): _*).cast(DoubleType)
不假设单个文档结构.
你可以通过二进制Encoders
(Encoders.kryo
,Encoders.java
)或RDD
API 获得更多的灵活性,它可以用来存储联合类型(甚至是Any
),但如果你真的希望完全随机输出,它会提出一些严重的设计或数据建模问题.即使您可以存储已分析的数据,也很难使用它.