当前位置:  开发笔记 > 编程语言 > 正文

返回动态数据类型的Apache Spark UDF

如何解决《返回动态数据类型的ApacheSparkUDF》经验,为你挑选了1个好方法。

我有UDF处理JSON并返回每行的动态数据结果.在我的情况下,我需要这个来验证数据并返回验证数据.

架构对于每一行都是灵活的.这意味着我无法case class为每种情况创建(我的一些数据可以嵌套).

我试图从我的UDF函数返回元组,但我也没有运气(因为我需要从列表转换为元组),我没有找到一个优雅的解决方案.

我正在返回的数据类型String,Integer,Double,DateTime,在不同的顺序.

我曾试图map在DataFrame 上使用,但我的架构有问题.

import spark.implicits._

def processData(row_type: String) = {
  /*
  completely random output here. Tuple/List/Array of 
  elements with a type Integer, String, Double, DateType.
  */

  // pseudo-code starts here

  if row_type == A
     (1, "second", 3)
  else
     (1, "second", 3, 4)
}

val processDataUDF = udf((row_type: String) => processData(row_type))

val df = Seq((0, 1), (1, 2)).toDF("a", "b")
val df2 = df.select(processDataUDF($"a"))
df2.show(5)
df2.printSchema()

结果

+------------+
|      UDF(a)|
+------------+
|[1,second,3]|
|[1,second,3]|
+------------+

我该如何处理这个问题?我们有不同的处理结果row_type.所有row_type的都是动态设置的.我可以很好地Schema为每个人row_type,但我不能使用不同的模式返回相同的UDF返回结果.

使用map是唯一的方法吗?



1> user6910411..:

Spark Dataset是一个柱状数据结构,这里没有灵活架构的地方.Schema必须是同类的(所有行必须具有相同的通用结构)并且已知为upfront(如果使用UDF,则必须返回定义良好的SQL类型).

您可以通过以下方式实现一些灵

定义表示所有可能字段的超集的模式,并将各列标记为nullable.只有在没有类型冲突的情况下才有可能(如果Row包含字段foo,则始终使用相同的SQL类型表示).

使用集合类型(MapType,ArrayType)表示可变大小的字段.所有值和/或键必须属于同一类型.

将原始数据重塑为可通过固定架构实际表示的点.Spark包含作为其依赖项,json4s它提供了一组用于合并,区分和查询 JSON数据的工具.如果需要,它可用于应用相对复杂的转换.

如果这不实用,我建议保持JSON字段"按原样"并仅按需解析它以提取特定值.您可以使用get_json_object和显式类型转换.这允许测试不同的场景:

coalesce(Seq("$.bar", "$.foo.bar", "$.foobar.foo.bar")
  .map(get_json_object($"json_col", _)): _*).cast(DoubleType)

不假设单个文档结构.

你可以通过二进制Encoders(Encoders.kryo,Encoders.java)或RDDAPI 获得更多的灵活性,它可以用来存储联合类型(甚至是Any),但如果你真的希望完全随机输出,它会提出一些严重的设计或数据建模问题.即使您可以存储已分析的数据,也很难使用它.

推荐阅读
乐韵答题
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有