在spark Dataset.filter中获取此null错误
输入CSV:
name,age,stat abc,22,m xyz,,s
工作代码:
case class Person(name: String, age: Long, stat: String) val peopleDS = spark.read.option("inferSchema","true") .option("header", "true").option("delimiter", ",") .csv("./people.csv").as[Person] peopleDS.show() peopleDS.createOrReplaceTempView("people") spark.sql("select * from people where age > 30").show()
失败的代码(添加以下行返回错误):
val filteredDS = peopleDS.filter(_.age > 30) filteredDS.show()
返回null错误
java.lang.RuntimeException: Null value appeared in non-nullable field: - field (class: "scala.Long", name: "age") - root class: "com.gcp.model.Person" If the schema is inferred from a Scala tuple/case class, or a Java bean, please try to use scala.Option[_] or other nullable types (e.g. java.lang.Integer instead of int/scala.Int).
user6910411.. 19
你得到的例外应该解释一切,但让我们一步一步走:
使用csv
数据源加载数据时,所有字段都标记为nullable
:
val path: String = ???
val peopleDF = spark.read
.option("inferSchema","true")
.option("header", "true")
.option("delimiter", ",")
.csv(path)
peopleDF.printSchema
root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- stat: string (nullable = true)
缺少的字段表示为SQL NULL
peopleDF.where($"age".isNull).show
+----+----+----+
|name| age|stat|
+----+----+----+
| xyz|null| s|
+----+----+----+
接下来转换Dataset[Row]
为Dataset[Person]
使用Long
编码age
字段.Long
在Scala中不可能null
.因为输入模式是nullable
,输出模式保持不变nullable
:
val peopleDS = peopleDF.as[Person]
peopleDS.printSchema
root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- stat: string (nullable = true)
请注意,它as[T]
根本不会影响架构.
当您Dataset
使用SQL 查询(在已注册的表上)或DataFrame
API时,Spark不会反序列化该对象.由于架构仍然nullable
可以执行:
peopleDS.where($"age" > 30).show
+----+---+----+
|name|age|stat|
+----+---+----+
+----+---+----+
没有任何问题.这只是一个简单的SQL逻辑,NULL
是一个有效的值.
当我们使用静态类型的Dataset
API时:
peopleDS.filter(_.age > 30)
Spark必须反序列化对象.因为Long
不能null
(SQL NULL
)它会失败,你会看到异常.
如果不是因为你得到了NPE.
更正数据的静态类型表示应使用以下Optional
类型:
case class Person(name: String, age: Option[Long], stat: String)
调整过滤功能:
peopleDS.filter(_.age.map(_ > 30).getOrElse(false))
+----+---+----+
|name|age|stat|
+----+---+----+
+----+---+----+
如果您愿意,可以使用模式匹配:
peopleDS.filter { case Some(age) => age > 30 case _ => false // or case None => false }
请注意,您不必(但无论如何都会建议)为name
和使用可选类型stat
.因为Scala String
只是一个Java,String
它可以null
.当然,如果你采用这种方法,你必须明确检查访问的值是否是null
.
相关的Spark 2.0 Dataset与DataFrame相关
你得到的例外应该解释一切,但让我们一步一步走:
使用csv
数据源加载数据时,所有字段都标记为nullable
:
val path: String = ???
val peopleDF = spark.read
.option("inferSchema","true")
.option("header", "true")
.option("delimiter", ",")
.csv(path)
peopleDF.printSchema
root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- stat: string (nullable = true)
缺少的字段表示为SQL NULL
peopleDF.where($"age".isNull).show
+----+----+----+
|name| age|stat|
+----+----+----+
| xyz|null| s|
+----+----+----+
接下来转换Dataset[Row]
为Dataset[Person]
使用Long
编码age
字段.Long
在Scala中不可能null
.因为输入模式是nullable
,输出模式保持不变nullable
:
val peopleDS = peopleDF.as[Person]
peopleDS.printSchema
root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- stat: string (nullable = true)
请注意,它as[T]
根本不会影响架构.
当您Dataset
使用SQL 查询(在已注册的表上)或DataFrame
API时,Spark不会反序列化该对象.由于架构仍然nullable
可以执行:
peopleDS.where($"age" > 30).show
+----+---+----+
|name|age|stat|
+----+---+----+
+----+---+----+
没有任何问题.这只是一个简单的SQL逻辑,NULL
是一个有效的值.
当我们使用静态类型的Dataset
API时:
peopleDS.filter(_.age > 30)
Spark必须反序列化对象.因为Long
不能null
(SQL NULL
)它会失败,你会看到异常.
如果不是因为你得到了NPE.
更正数据的静态类型表示应使用以下Optional
类型:
case class Person(name: String, age: Option[Long], stat: String)
调整过滤功能:
peopleDS.filter(_.age.map(_ > 30).getOrElse(false))
+----+---+----+
|name|age|stat|
+----+---+----+
+----+---+----+
如果您愿意,可以使用模式匹配:
peopleDS.filter { case Some(age) => age > 30 case _ => false // or case None => false }
请注意,您不必(但无论如何都会建议)为name
和使用可选类型stat
.因为Scala String
只是一个Java,String
它可以null
.当然,如果你采用这种方法,你必须明确检查访问的值是否是null
.
相关的Spark 2.0 Dataset与DataFrame相关