通过spark-csv README运行示例Java代码,如导入org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.*;
SQLContext sqlContext = new SQLContext(sc); StructType customSchema = new StructType( new StructField("year", IntegerType, true), new StructField("make", StringType, true), new StructField("model", StringType, true), new StructField("comment", StringType, true), new StructField("blank", StringType, true)); DataFrame df = sqlContext.read() .format("com.databricks.spark.csv") .option("inferSchema", "true") .option("header", "true") .load("cars.csv"); df.select("year", "model").write() .format("com.databricks.spark.csv") .option("header", "true") .save("newcars.csv");
它没有开箱即用的编译,因此通过一些争论,我将它编译为更改不正确的FooType
语法DataTypes.FooType
并将StructFields作为传递new StructField[]
; 编译器metadata
在构造函数中请求了第四个参数,StructField
但是我很难找到它意味着什么的文档(javadocs描述了它的用例,但实际上并不是如何决定在StructField构造期间传递什么).使用以下代码,它现在运行,直到任何副作用方法,如collect()
:
JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); // Read features. System.out.println("Reading features from " + args[0]); StructType featuresSchema = new StructType(new StructField[] { new StructField("case_id", DataTypes.StringType, false, null), new StructField("foo", DataTypes.DoubleType, false, null) }); DataFrame features = sqlContext.read() .format("com.databricks.spark.csv") .schema(featuresSchema) .load(args[0]); for (Row r : features.collect()) { System.out.println("Row: " + r); }
我得到以下例外:
Exception in thread "main" java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.AttributeReference.hashCode(namedExpressions.scala:202) at scala.runtime.ScalaRunTime$.hash(ScalaRunTime.scala:210) at scala.collection.immutable.HashSet.elemHashCode(HashSet.scala:65) at scala.collection.immutable.HashSet.computeHash(HashSet.scala:74) at scala.collection.immutable.HashSet.$plus(HashSet.scala:56) at scala.collection.immutable.HashSet.$plus(HashSet.scala:59) at scala.collection.immutable.Set$Set4.$plus(Set.scala:127) at scala.collection.immutable.Set$Set4.$plus(Set.scala:121) at scala.collection.mutable.SetBuilder.$plus$eq(SetBuilder.scala:24) at scala.collection.mutable.SetBuilder.$plus$eq(SetBuilder.scala:22) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153) at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306) at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47) at scala.collection.SetLike$class.map(SetLike.scala:93) at scala.collection.AbstractSet.map(Set.scala:47) at org.apache.spark.sql.catalyst.expressions.AttributeSet.foreach(AttributeSet.scala:114) at scala.collection.TraversableOnce$class.size(TraversableOnce.scala:105) at org.apache.spark.sql.catalyst.expressions.AttributeSet.size(AttributeSet.scala:56) at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:307) at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:282) at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:56) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:926) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:924) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:930) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:930) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:53) at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903) at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384) ...
知道什么是错的吗?
似乎README非常过时,需要对Java示例进行一些重要的编辑.我追踪了添加了元数据字段的实际JIRA,它指出Map.empty
了Scala案例的默认值的使用情况,编写文档的人必须将Scala直接翻译成Java,尽管输入缺少相同的默认值参数.
在SparkSQL代码的1.5分支中,我们可以看到它在metadata.hashCode()
没有检查的情况下引用,这就是导致它的原因NullPointerException
.Metadata.empty()方法的存在与在Scala中使用空映射作为默认值的讨论相结合似乎暗示正确的实现是继续并且Metadata.empty()
如果您不关心它则通过.修改后的例子应该是:
SQLContext sqlContext = new SQLContext(sc); StructType customSchema = new StructType(new StructField[] { new StructField("year", DataTypes.IntegerType, true, Metadata.empty()), new StructField("make", DataTypes.StringType, true, Metadata.empty()), new StructField("model", DataTypes.StringType, true, Metadata.empty()), new StructField("comment", DataTypes.StringType, true, Metadata.empty()), new StructField("blank", DataTypes.StringType, true, Metadata.empty()) }); DataFrame df = sqlContext.read() .format("com.databricks.spark.csv") .schema(customSchema) .option("header", "true") .load("cars.csv"); df.select("year", "model").write() .format("com.databricks.spark.csv") .option("header", "true") .save("newcars.csv");