当前位置:  开发笔记 > 编程语言 > 正文

SparkError:XXXX任务的序列化结果总大小(2.0 GB)大于spark.driver.maxResultSize(2.0 GB)

如何解决《SparkError:XXXX任务的序列化结果总大小(2.0GB)大于spark.driver.maxResultSize(2.0GB)》经验,为你挑选了0个好方法。

错误:

ERROR TaskSetManager: Total size of serialized results of XXXX tasks (2.0 GB) is bigger than spark.driver.maxResultSize (2.0 GB)

目标:获取使用该模型的所有用户的建议,并与每个用户测试数据重叠并生成重叠率.

我使用spark mllib构建了一个推荐模型.我评估每个用户的测试数据和每个用户的推荐项目的重叠比率,并生成平均重叠率.

  def overlapRatio(model: MatrixFactorizationModel, test_data: org.apache.spark.rdd.RDD[Rating]): Double = {

    val testData: RDD[(Int, Iterable[Int])] = test_data.map(r => (r.user, r.product)).groupByKey
    val n = testData.count

    val recommendations: RDD[(Int, Array[Int])] = model.recommendProductsForUsers(20)
      .mapValues(_.map(r => r.product))

    val overlaps = testData.join(recommendations).map(x => {
      val moviesPerUserInRecs = x._2._2.toSet
      val moviesPerUserInTest = x._2._1.toSet
      val localHitRatio = moviesPerUserInRecs.intersect(moviesPerUserInTest)
      if(localHitRatio.size > 0)
        1
      else
        0
    }).filter(x => x != 0).count

    var r = 0.0
    if (overlaps != 0)
      r = overlaps / n

    return r

  }

但这里的问题是它最终会抛出maxResultSize错误.在我的火花配置中,我做了以下增加maxResultSize.

val conf = new SparkConf()
conf.set("spark.driver.maxResultSize", "6g")

但这并没有解决问题,我几乎接近我分配驱动程序内存的数量,但问题没有得到解决.虽然代码正在执行,但我仍然关注我的火花工作,我看到的有点令人费解.

[Stage 281:==>   (47807 + 100) / 1000000]15/12/01 12:27:03 ERROR TaskSetManager: Total size of serialized results of 47809 tasks (6.0 GB) is bigger than spark.driver.maxResultSize (6.0 GB)

在高于阶段代码执行MatrixFactorization代码在火花mllib recommendForAll周围line 277(未完全确定的行号).

  private def recommendForAll(
      rank: Int,
      srcFeatures: RDD[(Int, Array[Double])],
      dstFeatures: RDD[(Int, Array[Double])],
      num: Int): RDD[(Int, Array[(Int, Double)])] = {
    val srcBlocks = blockify(rank, srcFeatures)
    val dstBlocks = blockify(rank, dstFeatures)
    val ratings = srcBlocks.cartesian(dstBlocks).flatMap {
      case ((srcIds, srcFactors), (dstIds, dstFactors)) =>
        val m = srcIds.length
        val n = dstIds.length
        val ratings = srcFactors.transpose.multiply(dstFactors)
        val output = new Array[(Int, (Int, Double))](m * n)
        var k = 0
        ratings.foreachActive { (i, j, r) =>
          output(k) = (srcIds(i), (dstIds(j), r))
          k += 1
        }
        output.toSeq
    }
    ratings.topByKey(num)(Ordering.by(_._2))
  }

recommendForAll方法从recommendProductsForUsers方法调用.

但看起来这种方法正在剥离1M任务.获得的数据来自2000个部分文件,所以我很困惑它开始吐出1M任务,我认为这可能是问题所在.

我的问题是如何才能真正解决这个问题.没有使用这种方法,它很难计算overlap ratiorecall@K.这是火花1.5(cloudera 5.5)

推荐阅读
小白也坚强_177
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有