通常,您可以使用检查点来打破长谱系.一些或多或少类似的应该工作:
import org.apache.spark.rdd.RDD import scala.reflect.ClassTag val checkpointInterval: Int = ??? def loadAndFilter(path: String) = sc.textFile(path) .filter(_.startsWith("#####")) .map((path, _)) def mergeWithLocalCheckpoint[T: ClassTag](interval: Int) (acc: RDD[T], xi: (RDD[T], Int)) = { if(xi._2 % interval == 0 & xi._2 > 0) xi._1.union(acc).localCheckpoint else xi._1.union(acc) } val zero: RDD[(String, String)] = sc.emptyRDD[(String, String)] fileList.map(loadAndFilter).zipWithIndex .foldLeft(zero)(mergeWithLocalCheckpoint(checkpointInterval))
在这种特殊情况下,一个更简单的解决方案应该是使用SparkContext.union
方法:
val masterRDD = sc.union(
fileList.map(path => sc.textFile(path)
.filter(_.startsWith("#####"))
.map((path, _)))
)
当您查看loop /生成的DAG时,这些方法之间的区别应该是显而易见的reduce
:
和一个union
:
当然,如果文件很小,你可以结合wholeTextFiles
使用flatMap
和读取所有文件一次:
sc.wholeTextFiles(fileList.mkString(",")) .flatMap{case (path, text) => text.split("\n").filter(_.startsWith("#####")).map((path, _))}