我试图乘以以镶木地板格式存储的大矩阵,所以小心不要将RDD存储在内存中,但是从镶木地板阅读器中得到OOM错误:
15/12/06 05:23:36 WARN TaskSetManager: Lost task 950.0 in stage 4.0 (TID 28398, 172.31.34.233): java.lang.OutOfMemoryError: Java heap space at org.apache.parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:755) at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:494) at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:127) at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:208) at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201) ...
具体来说,矩阵是一个46752×54843120的32位浮点密集矩阵,以镶木地板格式存储(每行约为1.7GB未压缩).
下面的代码将此矩阵作为Spark IndexedRowMatrix加载并将其乘以随机向量(行与关联的字符串标签一起存储,并且浮点数必须转换为双精度,因为IndexedRows只能使用双精度数):
val rows = { sqlContext.read.parquet(datafname).rdd.map { case SQLRow(rowname: String, values: WrappedArray[Float]) => // DenseVectors have to be doubles val vector = new DenseVector(values.toArray.map(v => v.toDouble)) new IndexedRow(indexLUT(rowname), vector) } } val nrows : Long = 46752 val ncols = 54843120 val A = new IndexedRowMatrix(rows, nrows, ncols) A.rows.unpersist() // doesn't help avoid OOM val x = new DenseMatrix(ncols, 1, BDV.rand(ncols).data) A.multiply(x).rows.collect
我在运行时使用以下选项
--driver-memory 220G --num-executors 203 --executor-cores 4 --executor-memory 25G --conf spark.storage.memoryFraction=0
镶木地板文件有25573个分区,因此每个分区的未压缩Float值应小于4Gb; 我希望这应该意味着当前执行程序内存远远不够(我不能提高执行程序内存设置).
任何想法为什么会遇到OOM错误以及如何解决它?据我所知,镶木地板读卡器没有理由存放任何东西.