注意:您的csv文件在每行中没有相同数量的字段 - 这不能解析为DataFrame.(SchemaRDD已重命名为DataFrame.)如果您的csv文件格式正确,您可以执行以下操作:
使用--packages com.databricks:spark-csv_2.10:1.3.0启动spark-shell或spark-submit,以便轻松解析csv文件(参见此处).在Scala中,您的代码将是,假设您的csv文件有一个标题 - 如果是,则更容易引用列:
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").option("delimiter", '|').load("/path/to/file.csv") // assume 1st column has name col1 val df1 = df.filter( df("col1") === 1) // 1st DataFrame val df2 = df.filter( df("col1") === 2) // 2nd DataFrame etc...
由于您的文件格式不正确,您必须以不同方式解析每个不同的行,例如,执行以下操作:
val lines = sc.textFile("/path/to/file.csv") case class RowRecord1( col1:Int, col2:Double, col3:String, col4:Int) def parseRowRecord1( arr:Array[String]) = RowRecord1( arr(0).toInt, arr(1).toDouble, arr(2), arr(3).toInt) case class RowRecord2( col1:Int, col2:String, col3:Int, col4:Int, col5:Int, col6:Double, col7:Int) def parseRowRecord2( arr:Array[String]) = RowRecord2( arr(0).toInt, arr(1), arr(2).toInt, arr(3).toInt, arr(4).toInt, arr(5).toDouble, arr(8).toInt) val df1 = lines.filter(_.startsWith("1")).map( _.split('|')).map( arr => parseRowRecord1( arr )).toDF val df2 = lines.filter(_.startsWith("2")).map( _.split('|')).map( arr => parseRowRecord2( arr )).toDF