我正在使用以下代码读取文件目录:
val data = sc.textFile("/mySource/dir1/*")
现在我的 data
rdd包含目录中所有文件的所有行(对吧?)
我现在想要为每行添加一个包含源文件名的列,我该怎么做?
我尝试的其他选项是使用wholeTextFile但我不断出现内存异常.5台服务器24核24 GB(执行器 - 核心5执行器 - 内存5G)任何想法?
您可以使用此代码.我用Spark 1.4和1.5进行了测试.
它从获取文件名inputSplit
,并使用其添加到每一行iterator
使用mapPartitionsWithInputSplit
的NewHadoopRDD
import org.apache.hadoop.mapreduce.lib.input.{FileSplit, TextInputFormat} import org.apache.spark.rdd.{NewHadoopRDD} import org.apache.spark.{SparkConf, SparkContext} import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text val sc = new SparkContext(new SparkConf().setMaster("local")) val fc = classOf[TextInputFormat] val kc = classOf[LongWritable] val vc = classOf[Text] val path :String = "file:///home/user/test" val text = sc.newAPIHadoopFile(path, fc ,kc, vc, sc.hadoopConfiguration) val linesWithFileNames = text.asInstanceOf[NewHadoopRDD[LongWritable, Text]] .mapPartitionsWithInputSplit((inputSplit, iterator) => { val file = inputSplit.asInstanceOf[FileSplit] iterator.map(tup => (file.getPath, tup._2)) } ) linesWithFileNames.foreach(println)