我试图在Apache spark sql上运行查询.第一个查询工作正常,但第二个查询也删除空值.
代码:
def main(args: Array[String]) { val sc = new SparkContext("local[*]", "Spark") val sqlContext = new SQLContext(sc) val pageViewsDF = getDataframe(sc, sqlContext) println("RUNNING SQL QUERIES ") sqlContext.sql("select name , count(*) from pageviews_by_second group by name").show(10) sqlContext.sql("select name , count(*) from pageviews_by_second where name not in (\"Rose\") group by name").show(10) } def getDataframe(sc: SparkContext, sqlContext: SQLContext): DataFrame = { Logger.getLogger("org").setLevel(Level.OFF); Logger.getLogger("akka").setLevel(Level.OFF); val dataArray = List(List("David", null), List("David", null), List("Charlie", "23"), List("Rose", null), List("Ben", null), List("Harry", "43"), List(null, "25"), List(null, "21"), List("David", "15"), List("Rose", null), List("Alan", "26")) val separator = "," // Create an RDD val dataRDD = sc.parallelize(dataArray) // The schema is encoded in a string val header = "name,age" // Import Spark SQL data types and Row. import org.apache.spark.sql._ // Generate the schema based on the string of schema val schema = StructType( header.split(separator).map { fieldName => StructField(fieldName, StringType, true) }) val rowRDD = dataRDD .map(p => Row(p(0), p(1))) // Apply the schema to the RDD. var df = sqlContext.createDataFrame(rowRDD, schema) df.registerTempTable("pageviews_by_second") df }
第一个查询的结果是:
+-------+---+ | name|_c1| +-------+---+ | Alan| 1| | Ben| 1| | David| 3| |Charlie| 1| | Rose| 2| | Harry| 1| | null| 2| +-------+---+
第二个查询的输出:
+-------+---+ | name|_c1| +-------+---+ | Alan| 1| | Ben| 1| | David| 3| |Charlie| 1| | Harry| 1| +-------+---+
在第二个查询中,我仅排除"Rose",但"null"也被排除在外.
如果我的查询错误,请帮助我正确的查询.
它发生是因为NULL
在SQL中相当于"未知".这意味着NULL
除了IS NULL
/ 之外的任何比较IS NOT NULL
都是未定义的并返回NULL
.
case class Record(id: Integer, value: String)
val df = sc.parallelize(Seq(Record(1, "foo"), Record(2, null))).toDF
df.registerTempTable("df")
sqlContext.sql("""SELECT value = "foo" FROM df""").show
// +----+
// | _c0|
// +----+
// |true|
// |null|
// +----+
sqlContext.sql("""SELECT value != "foo" FROM df""").show
// +-----+
// | _c0|
// +-----+
// |false|
// | null|
// +-----+
因为这个IN
/ NOT IN
未定义:
sqlContext.sql("""SELECT value IN ("foo", "bar") FROM df""").show
// +----+
// | _c0|
// +----+
// |true|
// |null|
// +----+
这是一个标准的SQL行为,正确实现SQL标准的系统应该以相同的方式运行.如果你过滤并保持NULLs
你必须明确地:
sqlContext.sql(
"""SELECT value IN ("foo", "bar") OR value IS NULL FROM df""").show
// +----+
// | _c0|
// +----+
// |true|
// |true|
// +----+