当前位置:  开发笔记 > 编程语言 > 正文

由于长RDD沿袭导致的Stackoverflow

如何解决《由于长RDD沿袭导致的Stackoverflow》经验,为你挑选了1个好方法。



1> zero323..:

通常,您可以使用检查点来打破长谱系.一些或多或少类似的应该工作:

import org.apache.spark.rdd.RDD
import scala.reflect.ClassTag

val checkpointInterval: Int = ???

def loadAndFilter(path: String) = sc.textFile(path)
  .filter(_.startsWith("#####"))
  .map((path, _))

def mergeWithLocalCheckpoint[T: ClassTag](interval: Int)
  (acc: RDD[T], xi: (RDD[T], Int)) = {
    if(xi._2 % interval == 0 & xi._2 > 0) xi._1.union(acc).localCheckpoint
    else xi._1.union(acc)
  }

val zero: RDD[(String, String)] = sc.emptyRDD[(String, String)]
fileList.map(loadAndFilter).zipWithIndex
  .foldLeft(zero)(mergeWithLocalCheckpoint(checkpointInterval))

在这种特殊情况下,一个更简单的解决方案应该是使用SparkContext.union方法:

val masterRDD = sc.union(
  fileList.map(path => sc.textFile(path)
    .filter(_.startsWith("#####"))
    .map((path, _))) 
)

当您查看loop /生成的DAG时,这些方法之间的区别应该是显而易见的reduce:

在此输入图像描述

和一个union:

在此输入图像描述

当然,如果文件很小,你可以结合wholeTextFiles使用flatMap和读取所有文件一次:

sc.wholeTextFiles(fileList.mkString(","))
  .flatMap{case (path, text) =>  
    text.split("\n").filter(_.startsWith("#####")).map((path, _))}

推荐阅读
吻过彩虹的脸_378
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有