当前位置:  开发笔记 > 编程语言 > 正文

交错并行文件读取比顺序读取慢?

如何解决《交错并行文件读取比顺序读取慢?》经验,为你挑选了0个好方法。

我已经实现了一个小的IO类,它可以读取不同磁盘上的多个相同文件(例如,两个包含相同文件的硬盘).在连续的情况下,两个磁盘平均读取文件的速度为60MB/s,但是当我进行交错(例如4k磁盘1,4k磁盘2然后组合)时,有效读取速度降低到40MB/s而不是增加?

上下文:Win 7 + JDK 7b70,2GB RAM,2.2GB测试文件.基本上,我尝试以穷人的方式模仿Win7的ReadyBoost和RAID x.

在心脏中,当向类发出read()时,它会创建两个runnables,其中包含从特定位置和长度读取预打开的RandomAccessFile的指令.使用执行程序服务和Future.get()调用,当两者都完成时,数据读取将被复制到公共缓冲区并返回给调用者.

我的方法中是否存在概念错误?(例如,OS缓存机制将始终抵消?)

protected  List waitForAll(List> futures) 
throws MultiIOException {
    MultiIOException mex = null;
    int i = 0;
    List result = new ArrayList(futures.size());
    for (Future f : futures) {
        try {
            result.add(f.get());
        } catch (InterruptedException ex) {
            if (mex == null) {
                mex = new MultiIOException();
            }
            mex.exceptions.add(new ExceptionPair(metrics[i].file, ex));
        } catch (ExecutionException ex) {
            if (mex == null) {
                mex = new MultiIOException();
            }
            mex.exceptions.add(new ExceptionPair(metrics[i].file, ex));
        }
        i++;
    }
    if (mex != null) {
        throw mex;
    }
    return result;
}

public int read(long position, byte[] output, int start, int length) 
throws IOException {
    if (start < 0 || start + length > output.length) {
        throw new IndexOutOfBoundsException(
        String.format("start=%d, length=%d, output=%d", 
        start, length, output.length));
    }
    // compute the fragment sizes and positions
    int result = 0;
    final long[] positions = new long[metrics.length];
    final int[] lengths = new int[metrics.length];
    double speedSum = 0.0;
    double maxValue = 0.0;
    int maxIndex = 0;
    for (int i = 0; i < metrics.length; i++) {
        speedSum += metrics[i].readSpeed;
        if (metrics[i].readSpeed > maxValue) {
            maxValue = metrics[i].readSpeed;
            maxIndex = i;
        }
    }
    // adjust read lengths
    int lengthSum = length;
    for (int i = 0; i < metrics.length; i++) {
        int len = (int)Math.ceil(length * metrics[i].readSpeed / speedSum);
        lengths[i] = (len > lengthSum) ? lengthSum : len;
        lengthSum -= lengths[i];
    }
    if (lengthSum > 0) {
        lengths[maxIndex] += lengthSum;
    }
    // adjust read positions
    long positionDelta = position;
    for (int i = 0; i < metrics.length; i++) {
        positions[i] = positionDelta;
        positionDelta += (long)lengths[i]; 
    }        
    List> futures = new LinkedList>();
    // read in parallel
    for (int i = 0; i < metrics.length; i++) {
        final int j = i;
        futures.add(exec.submit(new Callable() {
            @Override
            public byte[] call() throws Exception {
                byte[] buffer = new byte[lengths[j]];
                long t = System.nanoTime();
                long t0 = t;

                long currPos = metrics[j].handle.getFilePointer();
                metrics[j].handle.seek(positions[j]);
                t = System.nanoTime() - t;
                metrics[j].seekTime = t * 1024.0 * 1024.0 / 
                    Math.abs(currPos - positions[j]) / 1E9 ;

                int c = metrics[j].handle.read(buffer);
                t0 = System.nanoTime() - t0;
                // adjust the read speed if we read something
                if (c > 0) {
                    metrics[j].readSpeed = (alpha * c * 1E9 / t0 / 1024 / 1024
                    + (1 - alpha) * metrics[j].readSpeed) ;
                }
                if (c < 0) {
                    return null;
                } else
                if (c == 0) {
                    return EMPTY_BYTE_ARRAY;
                } else
                if (c < buffer.length) {
                    return Arrays.copyOf(buffer, c);
                }
                return buffer;
            }
        }));
    }
    List data = waitForAll(futures);
    boolean eof = true;
    for (byte[] b : data) {
        if (b != null && b.length > 0) {
            System.arraycopy(b, 0, output, start + result, b.length);
            result += b.length;
            eof = false;
        } else {
            break; // the rest probably reached EOF
        }
    }
    // if there was no data at all, we reached the end of file
    if (eof) {
        return -1;
    }
    sequentialPosition = position + (long)result;

    // evaluate the fastest file to read
    double maxSpeed = 0;
    maxIndex = 0;
    for (int i = 0; i < metrics.length; i++) {
        if (metrics[i].readSpeed > maxSpeed) {
            maxSpeed = metrics[i].readSpeed;
            maxIndex = i;
        }
    }
    fastest = metrics[maxIndex];
    return result;
}

(度量数组中的FileMetrics包含读取速度的度量,以自适应地确定各种输入通道的缓冲区大小 - 在我的测试中,alpha = 0和readSpeed = 1结果相等分布)

编辑 我运行了一个非纠缠测试(例如,在不同的线程中独立读取两个文件.)并且我的组合有效速度为110MB/s.

Edit2 我想我知道为什么会这样.

当我并行并按顺序读取时,它不是磁盘的顺序读取,而是由于交错而导致的读取 - 跳过 - 读取 - 跳过模式(并且可能充满了分配表查找).这基本上将每个磁盘的有效读取速度降低了一半或更差.

推荐阅读
Life一切安好
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有