当前位置:  开发笔记 > 编程语言 > 正文

是否可以从具有超时的InputStream读取?

如何解决《是否可以从具有超时的InputStream读取?》经验,为你挑选了6个好方法。

具体来说,问题是写一个这样的方法:

int maybeRead(InputStream in, long timeout)

如果数据在'timeout'毫秒内可用,则返回值与in.read()相同,否则为-2.在方法返回之前,任何生成的线程都必须退出.

为了避免参数,这里的主题是java.io.InputStream,由Sun(任何Java版本)记录.请注意,这并不像看起来那么简单.以下是Sun的文档直接支持的一些事实.

    in.read()方法可能是不可中断的.

    将InputStream包装在Reader或InterruptibleChannel中没有帮助,因为所有这些类都可以调用InputStream的方法.如果可以使用这些类,则可以编写一个直接在InputStream上执行相同逻辑的解决方案.

    in.available()返回0总是可以接受的.

    in.close()方法可能会阻塞或不执行任何操作.

    没有通用的方法来杀死另一个线程.

Glen Best.. 80

使用inputStream.available()

System.in.available()始终可以返回0.

我发现了相反的情况 - 它总是返回可用字节数的最佳值.Javadoc InputStream.available():

Returns an estimate of the number of bytes that can be read (or skipped over) 
from this input stream without blocking by the next invocation of a method for 
this input stream.

由于时间/陈旧性,估计是不可避免的.这个数字可能是一次性的低估,因为新的数据不断到来.然而,它总是在接下来的呼叫中"赶上" - 它应该考虑所有到达的数据,即在新呼叫时刻到达的数据栏.当有数据时,永久返回0会导致上述条件失败.

First Caveat:InputStream的具体子类负责available()

InputStream是一个抽象类.它没有数据源.拥有可用数据对它毫无意义.因此,javadoc available()也表示:

The available method for class InputStream always returns 0.

This method should be overridden by subclasses.

实际上,具体的输入流类会覆盖available(),提供有意义的值,而不是常量0.

第二个注意事项:确保在Windows中键入输入时使用回车符.

如果使用System.in,程序只会在命令shell交给它时接收输入.如果您正在使用文件重定向/管道(例如somefile> java myJavaApp或somecommand | java myJavaApp),则输入数据通常会立即移交.但是,如果您手动键入输入,则可能会延迟数据切换.例如,使用windows cmd.exe shell,数据将缓存在cmd.exe shell中.数据仅在回车(control-m或)之后传递给正在执行的java程序.这是执行环境的限制.当然,只要shell缓冲数据,InputStream.available()就会返回0 - 这是正确的行为; 那时没有可用的数据.一旦数据从shell获得,该方法返回的值> 0.注意:Cygwin也使用cmd.exe.

最简单的解决方案(无阻塞,因此无需超时)

只要用这个:

    byte[] inputData = new byte[1024];
    int result = is.read(inputData, 0, is.available());  
    // result will indicate number of bytes read; -1 for EOF with no data read.

或者等价地,

    BufferedReader br = new BufferedReader(new InputStreamReader(System.in, Charset.forName("ISO-8859-1")),1024);
    // ...
         // inside some iteration / processing logic:
         if (br.ready()) {
             int readCount = br.read(inputData, bufferOffset, inputData.length-bufferOffset);
         }

更丰富的解决方案(在超时期限内最大限度地填充缓冲区)

声明:

public static int readInputStreamWithTimeout(InputStream is, byte[] b, int timeoutMillis)
     throws IOException  {
     int bufferOffset = 0;
     long maxTimeMillis = System.currentTimeMillis() + timeoutMillis;
     while (System.currentTimeMillis() < maxTimeMillis && bufferOffset < b.length) {
         int readLength = java.lang.Math.min(is.available(),b.length-bufferOffset);
         // can alternatively use bufferedReader, guarded by isReady():
         int readResult = is.read(b, bufferOffset, readLength);
         if (readResult == -1) break;
         bufferOffset += readResult;
     }
     return bufferOffset;
 }

然后用这个:

    byte[] inputData = new byte[1024];
    int readCount = readInputStreamWithTimeout(System.in, inputData, 6000);  // 6 second timeout
    // readCount will indicate number of bytes read; -1 for EOF with no data read.


Ian Jones.. 65

假设您的流没有套接字支持(因此您无法使用Socket.setSoTimeout()),我认为解决此类问题的标准方法是使用Future.

假设我有以下执行器和流:

    ExecutorService executor = Executors.newFixedThreadPool(2);
    final PipedOutputStream outputStream = new PipedOutputStream();
    final PipedInputStream inputStream = new PipedInputStream(outputStream);

我有编写器写入一些数据,然后等待5秒钟,然后写入最后一段数据并关闭流:

    Runnable writeTask = new Runnable() {
        @Override
        public void run() {
            try {
                outputStream.write(1);
                outputStream.write(2);
                Thread.sleep(5000);
                outputStream.write(3);
                outputStream.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    };
    executor.submit(writeTask);

阅读的正常方法如下.读取将无限期地阻塞数据,因此在5s内完成:

    long start = currentTimeMillis();
    int readByte = 1;
    // Read data without timeout
    while (readByte >= 0) {
        readByte = inputStream.read();
        if (readByte >= 0)
            System.out.println("Read: " + readByte);
    }
    System.out.println("Complete in " + (currentTimeMillis() - start) + "ms");

哪个输出:

Read: 1
Read: 2
Read: 3
Complete in 5001ms

如果有一个更基本的问题,比如作者没有回应,读者就会永远阻止.如果我将来包装读取,我可以控制超时,如下所示:

    int readByte = 1;
    // Read data with timeout
    Callable readTask = new Callable() {
        @Override
        public Integer call() throws Exception {
            return inputStream.read();
        }
    };
    while (readByte >= 0) {
        Future future = executor.submit(readTask);
        readByte = future.get(1000, TimeUnit.MILLISECONDS);
        if (readByte >= 0)
            System.out.println("Read: " + readByte);
    }

哪个输出:

Read: 1
Read: 2
Exception in thread "main" java.util.concurrent.TimeoutException
    at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:228)
    at java.util.concurrent.FutureTask.get(FutureTask.java:91)
    at test.InputStreamWithTimeoutTest.main(InputStreamWithTimeoutTest.java:74)

我可以捕获TimeoutException并做任何我想要的清理工作.



1> Glen Best..:

使用inputStream.available()

System.in.available()始终可以返回0.

我发现了相反的情况 - 它总是返回可用字节数的最佳值.Javadoc InputStream.available():

Returns an estimate of the number of bytes that can be read (or skipped over) 
from this input stream without blocking by the next invocation of a method for 
this input stream.

由于时间/陈旧性,估计是不可避免的.这个数字可能是一次性的低估,因为新的数据不断到来.然而,它总是在接下来的呼叫中"赶上" - 它应该考虑所有到达的数据,即在新呼叫时刻到达的数据栏.当有数据时,永久返回0会导致上述条件失败.

First Caveat:InputStream的具体子类负责available()

InputStream是一个抽象类.它没有数据源.拥有可用数据对它毫无意义.因此,javadoc available()也表示:

The available method for class InputStream always returns 0.

This method should be overridden by subclasses.

实际上,具体的输入流类会覆盖available(),提供有意义的值,而不是常量0.

第二个注意事项:确保在Windows中键入输入时使用回车符.

如果使用System.in,程序只会在命令shell交给它时接收输入.如果您正在使用文件重定向/管道(例如somefile> java myJavaApp或somecommand | java myJavaApp),则输入数据通常会立即移交.但是,如果您手动键入输入,则可能会延迟数据切换.例如,使用windows cmd.exe shell,数据将缓存在cmd.exe shell中.数据仅在回车(control-m或)之后传递给正在执行的java程序.这是执行环境的限制.当然,只要shell缓冲数据,InputStream.available()就会返回0 - 这是正确的行为; 那时没有可用的数据.一旦数据从shell获得,该方法返回的值> 0.注意:Cygwin也使用cmd.exe.

最简单的解决方案(无阻塞,因此无需超时)

只要用这个:

    byte[] inputData = new byte[1024];
    int result = is.read(inputData, 0, is.available());  
    // result will indicate number of bytes read; -1 for EOF with no data read.

或者等价地,

    BufferedReader br = new BufferedReader(new InputStreamReader(System.in, Charset.forName("ISO-8859-1")),1024);
    // ...
         // inside some iteration / processing logic:
         if (br.ready()) {
             int readCount = br.read(inputData, bufferOffset, inputData.length-bufferOffset);
         }

更丰富的解决方案(在超时期限内最大限度地填充缓冲区)

声明:

public static int readInputStreamWithTimeout(InputStream is, byte[] b, int timeoutMillis)
     throws IOException  {
     int bufferOffset = 0;
     long maxTimeMillis = System.currentTimeMillis() + timeoutMillis;
     while (System.currentTimeMillis() < maxTimeMillis && bufferOffset < b.length) {
         int readLength = java.lang.Math.min(is.available(),b.length-bufferOffset);
         // can alternatively use bufferedReader, guarded by isReady():
         int readResult = is.read(b, bufferOffset, readLength);
         if (readResult == -1) break;
         bufferOffset += readResult;
     }
     return bufferOffset;
 }

然后用这个:

    byte[] inputData = new byte[1024];
    int readCount = readInputStreamWithTimeout(System.in, inputData, 6000);  // 6 second timeout
    // readCount will indicate number of bytes read; -1 for EOF with no data read.



2> Ian Jones..:

假设您的流没有套接字支持(因此您无法使用Socket.setSoTimeout()),我认为解决此类问题的标准方法是使用Future.

假设我有以下执行器和流:

    ExecutorService executor = Executors.newFixedThreadPool(2);
    final PipedOutputStream outputStream = new PipedOutputStream();
    final PipedInputStream inputStream = new PipedInputStream(outputStream);

我有编写器写入一些数据,然后等待5秒钟,然后写入最后一段数据并关闭流:

    Runnable writeTask = new Runnable() {
        @Override
        public void run() {
            try {
                outputStream.write(1);
                outputStream.write(2);
                Thread.sleep(5000);
                outputStream.write(3);
                outputStream.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    };
    executor.submit(writeTask);

阅读的正常方法如下.读取将无限期地阻塞数据,因此在5s内完成:

    long start = currentTimeMillis();
    int readByte = 1;
    // Read data without timeout
    while (readByte >= 0) {
        readByte = inputStream.read();
        if (readByte >= 0)
            System.out.println("Read: " + readByte);
    }
    System.out.println("Complete in " + (currentTimeMillis() - start) + "ms");

哪个输出:

Read: 1
Read: 2
Read: 3
Complete in 5001ms

如果有一个更基本的问题,比如作者没有回应,读者就会永远阻止.如果我将来包装读取,我可以控制超时,如下所示:

    int readByte = 1;
    // Read data with timeout
    Callable readTask = new Callable() {
        @Override
        public Integer call() throws Exception {
            return inputStream.read();
        }
    };
    while (readByte >= 0) {
        Future future = executor.submit(readTask);
        readByte = future.get(1000, TimeUnit.MILLISECONDS);
        if (readByte >= 0)
            System.out.println("Read: " + readByte);
    }

哪个输出:

Read: 1
Read: 2
Exception in thread "main" java.util.concurrent.TimeoutException
    at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:228)
    at java.util.concurrent.FutureTask.get(FutureTask.java:91)
    at test.InputStreamWithTimeoutTest.main(InputStreamWithTimeoutTest.java:74)

我可以捕获TimeoutException并做任何我想要的清理工作.


但是阻塞线程怎么样?!它会留在内存中,直到应用程序终止?如果我是正确的,这可能会产生无休止的线程,应用程序负载很重,甚至更多,阻止进一步的线程使用你的池占用和阻塞它的线程.如果我错了,请纠正我.谢谢.
@ortang这就是我所说的"抓住TimeoutException并做任何清理......"例如我可能想杀死读取线程:... catch(TimeoutException e){executor.shutdownNow(); }
`executer.shutdownNow`不会杀死线程.它会试图中断它,没有任何效果.没有可能的清理,这是一个严重的问题.
-1 as the threads reading stay blocked until the application terminates.
Muhammad Gelbana,你是对的:阻塞的read()线程保持运行,这是不行的.我找到了一种防止这种情况的方法:当超时命中时,从调用线程关闭输入流(在我的情况下,我关闭输入流来自的android蓝牙套接字).当你这样做时,read()调用将立即返回..在我的情况下,我使用int read(byte [])重载,并立即返回.也许int read()重载会抛出IOException,因为我不知道它会返回什么...在我看来这是正确的解决方案.

3> Templar..:

如果您的InputStream由Socket支持,您可以使用setSoTimeout设置Socket超时(以毫秒为单位).如果read()调用在指定的超时内没有解除阻塞,它将抛出SocketTimeoutException.

只需确保在进行read()调用之前在Socket上调用setSoTimeout.



4> user207421..:

我会质疑问题陈述而不是盲目地接受它.您只需要从控制台或通过网络超时.如果你拥有后者Socket.setSoTimeout(),HttpURLConnection.setReadTimeout()并且两者都完全符合要求,只要你在构造/获取它们时正确设置它们.当你拥有所有东西时,将它留在应用程序的后面的任意点是糟糕的设计导致非常尴尬的实现.


在其他情况下,读取可能会在很长时间内阻塞; 例如,从磁带驱动器读取时,从远程安装的网络驱动器读取,或从后端使用磁带机器人的HFS读取.(但你答案的主旨是正确的.)
InputStream在流上工作并且会阻塞,但它不提供超时机制。因此,InputStream抽象不是经过适当设计的抽象。因此,要求一种在流上超时的方法并不需要太多。因此,问题是要解决一个非常实际的问题。大多数基础实现将被阻止。那就是流的本质。如果流的另一端尚未准备好新数据,则套接字,文件,管道将阻塞。
@EJP。我不知道你是怎么得到的。我不同意你的看法。问题语句“如何在InputStream上超时”有效。由于框架没有提供超时的方法,因此提出这样的问题是适当的。

5> jt...:

我没有使用Java NIO包中的类,但似乎它们可能在这里有所帮助.具体来说,java.nio.channels.Channels和java.nio.channels.InterruptibleChannel.


+1:我不相信有一种可靠的方法可以单独使用InputStream执行OP要求的操作.然而,nio是为此目的而创建的.
OP已经基本上排除了这一点.InputStream本身就是阻塞的,可能是不可中断的.

6> 小智..:

这是一种从System.in获取NIO FileChannel并使用超时检查数据可用性的方法,这是问题中描述的问题的特例.在控制台运行它,不要输入任何输入,然后等待结果.它在Windows和Linux上的Java 6下成功测试.

import java.io.FileInputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;

public class Main {

    static final ByteBuffer buf = ByteBuffer.allocate(4096);

    public static void main(String[] args) {

        long timeout = 1000 * 5;

        try {
            InputStream in = extract(System.in);
            if (! (in instanceof FileInputStream))
                throw new RuntimeException(
                        "Could not extract a FileInputStream from STDIN.");

            try {
                int ret = maybeAvailable((FileInputStream)in, timeout);
                System.out.println(
                        Integer.toString(ret) + " bytes were read.");

            } finally {
                in.close();
            }

        } catch (Exception e) {
            throw new RuntimeException(e);
        }

    }

    /* unravels all layers of FilterInputStream wrappers to get to the
     * core InputStream
     */
    public static InputStream extract(InputStream in)
            throws NoSuchFieldException, IllegalAccessException {

        Field f = FilterInputStream.class.getDeclaredField("in");
        f.setAccessible(true);

        while( in instanceof FilterInputStream )
            in = (InputStream)f.get((FilterInputStream)in);

        return in;
    }

    /* Returns the number of bytes which could be read from the stream,
     * timing out after the specified number of milliseconds.
     * Returns 0 on timeout (because no bytes could be read)
     * and -1 for end of stream.
     */
    public static int maybeAvailable(final FileInputStream in, long timeout)
            throws IOException, InterruptedException {

        final int[] dataReady = {0};
        final IOException[] maybeException = {null};
        final Thread reader = new Thread() {
            public void run() {                
                try {
                    dataReady[0] = in.getChannel().read(buf);
                } catch (ClosedByInterruptException e) {
                    System.err.println("Reader interrupted.");
                } catch (IOException e) {
                    maybeException[0] = e;
                }
            }
        };

        Thread interruptor = new Thread() {
            public void run() {
                reader.interrupt();
            }
        };

        reader.start();
        for(;;) {

            reader.join(timeout);
            if (!reader.isAlive())
                break;

            interruptor.start();
            interruptor.join(1000);
            reader.join(1000);
            if (!reader.isAlive())
                break;

            System.err.println("We're hung");
            System.exit(1);
        }

        if ( maybeException[0] != null )
            throw maybeException[0];

        return dataReady[0];
    }
}

有趣的是,当在NetBeans 6.5中而不是在控制台中运行程序时,超时根本不起作用,并且实际上需要调用System.exit()来杀死僵尸线程.会发生什么事情是中断线程在调用reader.interrupt()时阻塞(!).另一个测试程序(此处未显示)另外尝试关闭通道,但这也不起作用.

推荐阅读
依然-狠幸福
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有