当前位置:  开发笔记 > 数据库 > 正文

Parquet读取时内存不足

如何解决《Parquet读取时内存不足》经验,为你挑选了0个好方法。

我试图乘以以镶木地板格式存储的大矩阵,所以小心不要将RDD存储在内存中,但是从镶木地板阅读器中得到OOM错误:

15/12/06 05:23:36 WARN TaskSetManager: Lost task 950.0 in stage 4.0   
(TID 28398, 172.31.34.233): java.lang.OutOfMemoryError: Java heap space
at org.apache.parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:755)
at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:494)
at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:127)
at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:208)
at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
...

具体来说,矩阵是一个46752×54843120的32位浮点密集矩阵,以镶木地板格式存储(每行约为1.7GB未压缩).

下面的代码将此矩阵作为Spark IndexedRowMatrix加载并将其乘以随机向量(行与关联的字符串标签一起存储,并且浮点数必须转换为双精度,因为IndexedRows只能使用双精度数):

val rows = {
  sqlContext.read.parquet(datafname).rdd.map {
    case SQLRow(rowname: String, values: WrappedArray[Float]) =>
    // DenseVectors have to be doubles
      val vector = new DenseVector(values.toArray.map(v => v.toDouble)) 
      new IndexedRow(indexLUT(rowname), vector)
    }
}

val nrows : Long = 46752
val ncols = 54843120
val A = new IndexedRowMatrix(rows, nrows, ncols)
A.rows.unpersist() // doesn't help avoid OOM

val x = new DenseMatrix(ncols, 1, BDV.rand(ncols).data)
A.multiply(x).rows.collect

我在运行时使用以下选项

--driver-memory 220G
--num-executors 203
--executor-cores 4
--executor-memory 25G
--conf spark.storage.memoryFraction=0

镶木地板文件有25573个分区,因此每个分区的未压缩Float值应小于4Gb; 我希望这应该意味着当前执行程序内存远远不够(我不能提高执行程序内存设置).

任何想法为什么会遇到OOM错误以及如何解决它?据我所知,镶木地板读卡器没有理由存放任何东西.

推荐阅读
女女的家_747
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有