当前位置:  开发笔记 > 编程语言 > 正文

python中subprocess.PIPE上的非阻塞读取

如何解决《python中subprocess.PIPE上的非阻塞读取》经验,为你挑选了12个好方法。

我正在使用子进程模块启动子进程并连接到它的输出流(stdout).我希望能够在其标准输出上执行非阻塞读取.有没有办法让.readline非阻塞或在我调用之前检查流上是否有数据.readline?我希望这是可移植的,或至少在Windows和Linux下工作.

这是我现在如何做到的(.readline如果没有数据可用,则阻止它):

p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()

jfs.. 393

fcntl,select,asyncproc不会在这种情况下帮助.

无论操作系统如何,无阻塞地读取流的可靠方法是使用Queue.get_nowait():

import sys
from subprocess import PIPE, Popen
from threading  import Thread

try:
    from queue import Queue, Empty
except ImportError:
    from Queue import Queue, Empty  # python 2.x

ON_POSIX = 'posix' in sys.builtin_module_names

def enqueue_output(out, queue):
    for line in iter(out.readline, b''):
        queue.put(line)
    out.close()

p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()

# ... do other things here

# read line without blocking
try:  line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
    print('no output yet')
else: # got line
    # ... do something with line

是的,这对我有用,我删除了很多.它包括良好实践,但并非总是必要的.Python 3.x 2.X compat和close_fds可能会被省略,它仍然有效.但只要知道一切都做了什么,不要盲目地复制它,即使它只是有效!(实际上最简单的解决方案是使用一个线程并像Seb那样做一个readline,Qeues只是获取数据的简单方法,还有其他的,线程就是答案!) (6认同)

如果我无法关闭子进程,例如.由于例外?stdout-reader线程不会死,python会挂起,即使主线程退出,不是吗?怎么可以解决这个问题呢?python 2.x不支持查杀线程,更糟糕的是,不支持中断它们.:((显然应该处理异常以确保子进程被关闭,但万一它不会,你能做什么?) (4认同)

@Justin:'out.readline'不会阻止它在另一个线程中执行的主线程. (3认同)

我已经在`shelljob`http://psypi.python.org/pypi/shelljob包中创建了一些友好的包装器 (3认同)

在线程内部,对`out.readline`的调用会阻塞线程和主线程,我必须等到readline返回,然后其他所有内容才会继续.有什么简单的方法吗?(我正在从我的进程中读取多行,这也是另一个正在执行数据库和事物的.py文件) (2认同)

close_fds绝对不是你想盲目复制到你的应用程序中的东西...... (2认同)

@naxa:注意`daemon = True`:如果退出主线程,python进程将不会挂起. (2认同)


Jesse.. 76

我经常遇到类似的问题; 我经常编写的Python程序需要能够执行一些主要功能,同时从命令行(stdin)接受用户输入.简单地将用户输入处理功能放在另一个线程中并不能解决问题,因为readline()阻塞并且没有超时.如果主要功能已完成并且不再需要等待进一步的用户输入,我通常希望我的程序退出,但它不能因为readline()在另一个等待一行的线程中仍然阻塞.我发现这个问题的解决方案是使用fcntl模块使stdin成为非阻塞文件:

import fcntl
import os
import sys

# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

# user input handling thread
while mainThreadIsRunning:
      try: input = sys.stdin.readline()
      except: continue
      handleInput(input)

在我看来,这比使用选择或信号模块解决这个问题要清晰一点,但是它再次只适用于UNIX ...



1> jfs..:

fcntl,select,asyncproc不会在这种情况下帮助.

无论操作系统如何,无阻塞地读取流的可靠方法是使用Queue.get_nowait():

import sys
from subprocess import PIPE, Popen
from threading  import Thread

try:
    from queue import Queue, Empty
except ImportError:
    from Queue import Queue, Empty  # python 2.x

ON_POSIX = 'posix' in sys.builtin_module_names

def enqueue_output(out, queue):
    for line in iter(out.readline, b''):
        queue.put(line)
    out.close()

p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()

# ... do other things here

# read line without blocking
try:  line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
    print('no output yet')
else: # got line
    # ... do something with line


是的,这对我有用,我删除了很多.它包括良好实践,但并非总是必要的.Python 3.x 2.X compat和close_fds可能会被省略,它仍然有效.但只要知道一切都做了什么,不要盲目地复制它,即使它只是有效!(实际上最简单的解决方案是使用一个线程并像Seb那样做一个readline,Qeues只是获取数据的简单方法,还有其他的,线程就是答案!)
如果我无法关闭子进程,例如.由于例外?stdout-reader线程不会死,python会挂起,即使主线程退出,不是吗?怎么可以解决这个问题呢?python 2.x不支持查杀线程,更糟糕的是,不支持中断它们.:((显然应该处理异常以确保子进程被关闭,但万一它不会,你能做什么?)
@Justin:'out.readline'不会阻止它在另一个线程中执行的主线程.
我已经在`shelljob`http://psypi.python.org/pypi/shelljob包中创建了一些友好的包装器
在线程内部,对`out.readline`的调用会阻塞线程和主线程,我必须等到readline返回,然后其他所有内容才会继续.有什么简单的方法吗?(我正在从我的进程中读取多行,这也是另一个正在执行数据库和事物的.py文件)
close_fds绝对不是你想盲目复制到你的应用程序中的东西......
@naxa:注意`daemon = True`:如果退出主线程,python进程将不会挂起.

2> Jesse..:

我经常遇到类似的问题; 我经常编写的Python程序需要能够执行一些主要功能,同时从命令行(stdin)接受用户输入.简单地将用户输入处理功能放在另一个线程中并不能解决问题,因为readline()阻塞并且没有超时.如果主要功能已完成并且不再需要等待进一步的用户输入,我通常希望我的程序退出,但它不能因为readline()在另一个等待一行的线程中仍然阻塞.我发现这个问题的解决方案是使用fcntl模块使stdin成为非阻塞文件:

import fcntl
import os
import sys

# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

# user input handling thread
while mainThreadIsRunning:
      try: input = sys.stdin.readline()
      except: continue
      handleInput(input)

在我看来,这比使用选择或信号模块解决这个问题要清晰一点,但是它再次只适用于UNIX ...


请不要使用繁忙的循环.使用poll()和超时来等待数据.
[Jesse的回答](http://stackoverflow.com/questions/375427/non-blocking-read-on-a-stream-in-python/1810703#1810703)不正确.根据Guido的说法,readline在非阻塞模式下无法正常工作,并且在Python 3000之前不会正常工作.http://bugs.python.org/issue1175#msg56041如果要使用fcntl将文件设置为非阻塞模式,你必须使用较低级别的os.read()并自己分开行.将fcntl与执行行缓冲的高级调用混合会产生麻烦.
在Python 2中使用readline似乎不正确.请参阅anonnn的答案http://stackoverflow.com/questions/375427/non-blocking-read-on-a-stream-in-python/4025909#4025909

3> jfs..:

Python 3.4 为异步IO 模块引入了新的临时API.asyncio

这种方法类似于twisted@Bryan Ward的基于答案的答案 - 定义一个协议,一旦数据准备就调用它的方法:

#!/usr/bin/env python3
import asyncio
import os

class SubprocessProtocol(asyncio.SubprocessProtocol):
    def pipe_data_received(self, fd, data):
        if fd == 1: # got stdout data (bytes)
            print(data)

    def connection_lost(self, exc):
        loop.stop() # end loop.run_forever()

if os.name == 'nt':
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol, 
        "myprogram.exe", "arg1", "arg2"))
    loop.run_forever()
finally:
    loop.close()

请参阅文档中的"子流程".

有一个高级接口asyncio.create_subprocess_exec()返回允许使用coroutine异步读取行的Process对象 (使用/ Python 3.5+语法):StreamReader.readline()asyncawait

#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing

async def readline_and_kill(*args):
    # start child process
    process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)

    # read line (sequence of bytes ending with b'\n') asynchronously
    async for line in process.stdout:
        print("got line:", line.decode(locale.getpreferredencoding(False)))
        break
    process.kill()
    return await process.wait() # wait for the child process to exit


if sys.platform == "win32":
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

with closing(loop):
    sys.exit(loop.run_until_complete(readline_and_kill(
        "myprogram.exe", "arg1", "arg2")))

readline_and_kill() 执行以下任务:

启动子进程,将其stdout重定向到管道

从子进程'stdout异步读取一行

杀死子进程

等它退出

如有必要,每个步骤都可以通过超时秒限制.



4> Noah..:

尝试使用asyncproc模块.例如:

import os
from asyncproc import Process
myProc = Process("myprogram.app")

while True:
    # check to see if process has ended
    poll = myProc.wait(os.WNOHANG)
    if poll != None:
        break
    # print any new output
    out = myProc.read()
    if out != "":
        print out

该模块负责S.Lott建议的所有线程.


asyncproc是GPL,进一步限制了它的使用:-(
asyncproc在Windows上不起作用,而windows不支持os.WNOHANG :-(

5> Bryan Ward..:

您可以在Twisted中轻松完成此操作.根据您现有的代码库,这可能不是那么容易使用,但如果您正在构建一个扭曲的应用程序,那么这样的事情几乎变得微不足道.您创建一个ProcessProtocol类,并覆盖该outReceived()方法.Twisted(取决于所使用的反应器)通常只是一个select()安装了回调的大循环来处理来自不同文件描述符(通常是网络套接字)的数据.因此,该outReceived()方法只是安装一个回调来处理来自的数据STDOUT.演示此行为的简单示例如下:

from twisted.internet import protocol, reactor

class MyProcessProtocol(protocol.ProcessProtocol):

    def outReceived(self, data):
        print data

proc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()

在扭曲的文档对此有一些有用的信息.

如果你围绕Twisted构建整个应用程序,它会与本地或远程的其他进程进行异步通信,就像这样非常优雅.另一方面,如果你的程序不是建立在Twisted之上,那么这实际上并没有那么有用.希望这对其他读者有帮助,即使它不适用于您的特定应用程序.


@naxa我不认为他所指的`select()`与你的相同.我假设这是因为`Twisted`适用于Windows ...

6> 小智..:

使用select&read(1).

import subprocess     #no new requirements
def readAllSoFar(proc, retVal=''): 
  while (select.select([proc.stdout],[],[],0)[0]!=[]):   
    retVal+=proc.stdout.read(1)
  return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
  print (readAllSoFar(p))

对于readline() - 如:

lines = ['']
while not p.poll():
  lines = readAllSoFar(p, lines[-1]).split('\n')
  for a in range(len(lines)-1):
    print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
  print a


不好.根据[docs](http://docs.python.org/2/library/select.html),`select`不适用于带文件描述符的窗口

7> monkut..:

一种解决方案是使另一个进程执行您对进程的读取,或者使进程的线程超时.

这是超时函数的线程版本:

http://code.activestate.com/recipes/473878/

但是,你需要阅读stdout,因为它正在进入?另一种解决方案可能是将输出转储到文件并等待进程使用p.wait()完成.

f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()


str = open('myprogram_output.txt','r').read()



8> Vukasin Toro..:

免责声明:这仅适用于龙卷风

您可以通过将fd设置为非阻塞来执行此操作,然后使用ioloop注册回调.我把它打包成一个名为tornado_subprocess的蛋,你可以通过PyPI安装它:

easy_install tornado_subprocess

现在你可以这样做:

import tornado_subprocess
import tornado.ioloop

    def print_res( status, stdout, stderr ) :
    print status, stdout, stderr
    if status == 0:
        print "OK:"
        print stdout
    else:
        print "ERROR:"
        print stderr

t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()

您也可以将它与RequestHandler一起使用

class MyHandler(tornado.web.RequestHandler):
    def on_done(self, status, stdout, stderr):
        self.write( stdout )
        self.finish()

    @tornado.web.asynchronous
    def get(self):
        t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
        t.start()



9> 小智..:

现有的解决方案对我不起作用(详情如下).最终工作的是使用read(1)实现readline(基于这个答案).后者不会阻止:

from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
    nextline = None
    buf = ''
    while True:
        #--- extract line using read(1)
        out = myprocess.stdout.read(1)
        if out == '' and myprocess.poll() != None: break
        if out != '':
            buf += out
            if out == '\n':
                nextline = buf
                buf = ''
        if not nextline: continue
        line = nextline
        nextline = None

        #--- do whatever you want with line here
        print 'Line is:', line
    myprocess.stdout.close()

myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(dcmpid,)) #output-consuming thread
p1.daemon = True
p1.start()

#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
    myprocess.kill()
    myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
    p1.join()

为什么现有解决方案不起作用:

    需要readline的解决方案(包括基于Queue的解决方案)始终会阻止.杀死执行readline的线程很困难(不可能?)它只会在创建它的进程完成时被杀死,但不会在生成输出的进程被终止时被杀死.

    正如aonnn所指出的,将低级别fcntl与高级别readline调用混合可能无法正常工作.

    使用select.poll()很整洁,但根据python docs在Windows上不起作用.

    使用第三方库似乎对此任务有些过分,并添加了其他依赖项.



10> 小智..:

我添加此问题以读取一些subprocess.Popen stdout。这是我的非阻塞读取解决方案:

import fcntl

def non_block_read(output):
    fd = output.fileno()
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    try:
        return output.read()
    except:
        return ""

# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE)
sb.kill()

# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
'test\n'


根据[docs](http://docs.python.org/2/library/fcntl.html),fcntl在Windows上不起作用。

11> datacompboy..:

这是我的代码,用于捕获子流程ASAP的每个输出,包括部分行。它同时抽水,并且以几乎正确的顺序抽出stdout和stderr。

经过测试并在Python 2.7 linux&Windows上正确工作。

#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():
    if (len(argv) > 1) and (argv[-1] == "-sub-"):
        import time, sys
        print "Application runned!"
        time.sleep(2)
        print "Slept 2 second"
        time.sleep(1)
        print "Slept 1 additional second",
        time.sleep(2)
        sys.stderr.write("Stderr output after 5 seconds")
        print "Eol on stdin"
        sys.stderr.write("Eol on stderr\n")
        time.sleep(1)
        print "Wow, we have end of work!",
    else:
        os.environ["PYTHONUNBUFFERED"]="1"
        try:
            p = Popen( argv + ["-sub-"],
                       bufsize=0, # line-buffered
                       stdin=PIPE, stdout=PIPE, stderr=PIPE )
        except WindowsError, W:
            if W.winerror==193:
                p = Popen( argv + ["-sub-"],
                           shell=True, # Try to run via shell
                           bufsize=0, # line-buffered
                           stdin=PIPE, stdout=PIPE, stderr=PIPE )
            else:
                raise
        inp = Queue.Queue()
        sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
        serr = io.open(p.stderr.fileno(), 'rb', closefd=False)
        def Pump(stream, category):
            queue = Queue.Queue()
            def rdr():
                while True:
                    buf = stream.read1(8192)
                    if len(buf)>0:
                        queue.put( buf )
                    else:
                        queue.put( None )
                        return
            def clct():
                active = True
                while active:
                    r = queue.get()
                    try:
                        while True:
                            r1 = queue.get(timeout=0.005)
                            if r1 is None:
                                active = False
                                break
                            else:
                                r += r1
                    except Queue.Empty:
                        pass
                    inp.put( (category, r) )
            for tgt in [rdr, clct]:
                th = Thread(target=tgt)
                th.setDaemon(True)
                th.start()
        Pump(sout, 'stdout')
        Pump(serr, 'stderr')

        while p.poll() is None:
            # App still working
            try:
                chan,line = inp.get(timeout = 1.0)
                if chan=='stdout':
                    print "STDOUT>>", line, "



12> Tom Lime..:

这无阻塞读的版本并不需要特殊的模块,将工作外的开箱上大多数Linux发行版的.

import os
import sys
import time
import fcntl
import subprocess

def async_read(fd):
    # set non-blocking flag while preserving old flags
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    # read char until EOF hit
    while True:
        try:
            ch = os.read(fd.fileno(), 1)
            # EOF
            if not ch: break                                                                                                                                                              
            sys.stdout.write(ch)
        except OSError:
            # waiting for data be available on fd
            pass

def shell(args, async=True):
    # merge stderr and stdout
    proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    if async: async_read(proc.stdout)
    sout, serr = proc.communicate()
    return (sout, serr)

if __name__ == '__main__':
    cmd = 'ping 8.8.8.8'
    sout, serr = shell(cmd.split())

推荐阅读
U友50081205_653
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有