当前位置:  开发笔记 > 编程语言 > 正文

如何使用spark sc.textFile获取文件名?

如何解决《如何使用sparksc.textFile获取文件名?》经验,为你挑选了1个好方法。

我正在使用以下代码读取文件目录:

val data = sc.textFile("/mySource/dir1/*")

现在我的 datardd包含目录中所有文件的所有行(对吧?)

我现在想要为每行添加一个包含源文件名的列,我该怎么做?

我尝试的其他选项是使用wholeTextFile但我不断出现内存异常.5台服务器24核24 GB(执行器 - 核心5执行器 - 内存5G)任何想法?



1> Udy..:

您可以使用此代码.我用Spark 1.4和1.5进行了测试.

它从获取文件名inputSplit,并使用其添加到每一行iterator使用mapPartitionsWithInputSplitNewHadoopRDD

import org.apache.hadoop.mapreduce.lib.input.{FileSplit, TextInputFormat}
import org.apache.spark.rdd.{NewHadoopRDD}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text

val sc = new SparkContext(new SparkConf().setMaster("local"))

val fc = classOf[TextInputFormat]
val kc = classOf[LongWritable]
val vc = classOf[Text]

val path :String = "file:///home/user/test"
val text = sc.newAPIHadoopFile(path, fc ,kc, vc, sc.hadoopConfiguration)

val linesWithFileNames = text.asInstanceOf[NewHadoopRDD[LongWritable, Text]]
           .mapPartitionsWithInputSplit((inputSplit, iterator) => {
  val file = inputSplit.asInstanceOf[FileSplit]
  iterator.map(tup => (file.getPath, tup._2))
  }
)

linesWithFileNames.foreach(println)

推荐阅读
mobiledu2402851373
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有