我正在使用子进程模块启动子进程并连接到它的输出流(stdout).我希望能够在其标准输出上执行非阻塞读取.有没有办法让.readline非阻塞或在我调用之前检查流上是否有数据.readline
?我希望这是可移植的,或至少在Windows和Linux下工作.
这是我现在如何做到的(.readline
如果没有数据可用,则阻止它):
p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE) output_str = p.stdout.readline()
jfs.. 393
fcntl
,select
,asyncproc
不会在这种情况下帮助.
无论操作系统如何,无阻塞地读取流的可靠方法是使用Queue.get_nowait()
:
import sys from subprocess import PIPE, Popen from threading import Thread try: from queue import Queue, Empty except ImportError: from Queue import Queue, Empty # python 2.x ON_POSIX = 'posix' in sys.builtin_module_names def enqueue_output(out, queue): for line in iter(out.readline, b''): queue.put(line) out.close() p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX) q = Queue() t = Thread(target=enqueue_output, args=(p.stdout, q)) t.daemon = True # thread dies with the program t.start() # ... do other things here # read line without blocking try: line = q.get_nowait() # or q.get(timeout=.1) except Empty: print('no output yet') else: # got line # ... do something with line
是的,这对我有用,我删除了很多.它包括良好实践,但并非总是必要的.Python 3.x 2.X compat和close_fds可能会被省略,它仍然有效.但只要知道一切都做了什么,不要盲目地复制它,即使它只是有效!(实际上最简单的解决方案是使用一个线程并像Seb那样做一个readline,Qeues只是获取数据的简单方法,还有其他的,线程就是答案!) (6认同)
如果我无法关闭子进程,例如.由于例外?stdout-reader线程不会死,python会挂起,即使主线程退出,不是吗?怎么可以解决这个问题呢?python 2.x不支持查杀线程,更糟糕的是,不支持中断它们.:((显然应该处理异常以确保子进程被关闭,但万一它不会,你能做什么?) (4认同)
@Justin:'out.readline'不会阻止它在另一个线程中执行的主线程. (3认同)
我已经在`shelljob`http://psypi.python.org/pypi/shelljob包中创建了一些友好的包装器 (3认同)
在线程内部,对`out.readline`的调用会阻塞线程和主线程,我必须等到readline返回,然后其他所有内容才会继续.有什么简单的方法吗?(我正在从我的进程中读取多行,这也是另一个正在执行数据库和事物的.py文件) (2认同)
close_fds绝对不是你想盲目复制到你的应用程序中的东西...... (2认同)
@naxa:注意`daemon = True`:如果退出主线程,python进程将不会挂起. (2认同)
Jesse.. 76
我经常遇到类似的问题; 我经常编写的Python程序需要能够执行一些主要功能,同时从命令行(stdin)接受用户输入.简单地将用户输入处理功能放在另一个线程中并不能解决问题,因为readline()
阻塞并且没有超时.如果主要功能已完成并且不再需要等待进一步的用户输入,我通常希望我的程序退出,但它不能因为readline()
在另一个等待一行的线程中仍然阻塞.我发现这个问题的解决方案是使用fcntl模块使stdin成为非阻塞文件:
import fcntl import os import sys # make stdin a non-blocking file fd = sys.stdin.fileno() fl = fcntl.fcntl(fd, fcntl.F_GETFL) fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) # user input handling thread while mainThreadIsRunning: try: input = sys.stdin.readline() except: continue handleInput(input)
在我看来,这比使用选择或信号模块解决这个问题要清晰一点,但是它再次只适用于UNIX ...
fcntl
,select
,asyncproc
不会在这种情况下帮助.
无论操作系统如何,无阻塞地读取流的可靠方法是使用Queue.get_nowait()
:
import sys from subprocess import PIPE, Popen from threading import Thread try: from queue import Queue, Empty except ImportError: from Queue import Queue, Empty # python 2.x ON_POSIX = 'posix' in sys.builtin_module_names def enqueue_output(out, queue): for line in iter(out.readline, b''): queue.put(line) out.close() p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX) q = Queue() t = Thread(target=enqueue_output, args=(p.stdout, q)) t.daemon = True # thread dies with the program t.start() # ... do other things here # read line without blocking try: line = q.get_nowait() # or q.get(timeout=.1) except Empty: print('no output yet') else: # got line # ... do something with line
我经常遇到类似的问题; 我经常编写的Python程序需要能够执行一些主要功能,同时从命令行(stdin)接受用户输入.简单地将用户输入处理功能放在另一个线程中并不能解决问题,因为readline()
阻塞并且没有超时.如果主要功能已完成并且不再需要等待进一步的用户输入,我通常希望我的程序退出,但它不能因为readline()
在另一个等待一行的线程中仍然阻塞.我发现这个问题的解决方案是使用fcntl模块使stdin成为非阻塞文件:
import fcntl import os import sys # make stdin a non-blocking file fd = sys.stdin.fileno() fl = fcntl.fcntl(fd, fcntl.F_GETFL) fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) # user input handling thread while mainThreadIsRunning: try: input = sys.stdin.readline() except: continue handleInput(input)
在我看来,这比使用选择或信号模块解决这个问题要清晰一点,但是它再次只适用于UNIX ...
Python 3.4 为异步IO 模块引入了新的临时API.asyncio
这种方法类似于twisted
@Bryan Ward的基于答案的答案 - 定义一个协议,一旦数据准备就调用它的方法:
#!/usr/bin/env python3 import asyncio import os class SubprocessProtocol(asyncio.SubprocessProtocol): def pipe_data_received(self, fd, data): if fd == 1: # got stdout data (bytes) print(data) def connection_lost(self, exc): loop.stop() # end loop.run_forever() if os.name == 'nt': loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows asyncio.set_event_loop(loop) else: loop = asyncio.get_event_loop() try: loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol, "myprogram.exe", "arg1", "arg2")) loop.run_forever() finally: loop.close()
请参阅文档中的"子流程".
有一个高级接口asyncio.create_subprocess_exec()
返回允许使用coroutine异步读取行的Process
对象
(使用/ Python 3.5+语法):StreamReader.readline()
async
await
#!/usr/bin/env python3.5 import asyncio import locale import sys from asyncio.subprocess import PIPE from contextlib import closing async def readline_and_kill(*args): # start child process process = await asyncio.create_subprocess_exec(*args, stdout=PIPE) # read line (sequence of bytes ending with b'\n') asynchronously async for line in process.stdout: print("got line:", line.decode(locale.getpreferredencoding(False))) break process.kill() return await process.wait() # wait for the child process to exit if sys.platform == "win32": loop = asyncio.ProactorEventLoop() asyncio.set_event_loop(loop) else: loop = asyncio.get_event_loop() with closing(loop): sys.exit(loop.run_until_complete(readline_and_kill( "myprogram.exe", "arg1", "arg2")))
readline_and_kill()
执行以下任务:
启动子进程,将其stdout重定向到管道
从子进程'stdout异步读取一行
杀死子进程
等它退出
如有必要,每个步骤都可以通过超时秒限制.
尝试使用asyncproc模块.例如:
import os from asyncproc import Process myProc = Process("myprogram.app") while True: # check to see if process has ended poll = myProc.wait(os.WNOHANG) if poll != None: break # print any new output out = myProc.read() if out != "": print out
该模块负责S.Lott建议的所有线程.
您可以在Twisted中轻松完成此操作.根据您现有的代码库,这可能不是那么容易使用,但如果您正在构建一个扭曲的应用程序,那么这样的事情几乎变得微不足道.您创建一个ProcessProtocol
类,并覆盖该outReceived()
方法.Twisted(取决于所使用的反应器)通常只是一个select()
安装了回调的大循环来处理来自不同文件描述符(通常是网络套接字)的数据.因此,该outReceived()
方法只是安装一个回调来处理来自的数据STDOUT
.演示此行为的简单示例如下:
from twisted.internet import protocol, reactor class MyProcessProtocol(protocol.ProcessProtocol): def outReceived(self, data): print data proc = MyProcessProtocol() reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3']) reactor.run()
在扭曲的文档对此有一些有用的信息.
如果你围绕Twisted构建整个应用程序,它会与本地或远程的其他进程进行异步通信,就像这样非常优雅.另一方面,如果你的程序不是建立在Twisted之上,那么这实际上并没有那么有用.希望这对其他读者有帮助,即使它不适用于您的特定应用程序.
使用select&read(1).
import subprocess #no new requirements def readAllSoFar(proc, retVal=''): while (select.select([proc.stdout],[],[],0)[0]!=[]): retVal+=proc.stdout.read(1) return retVal p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE) while not p.poll(): print (readAllSoFar(p))
对于readline() - 如:
lines = [''] while not p.poll(): lines = readAllSoFar(p, lines[-1]).split('\n') for a in range(len(lines)-1): print a lines = readAllSoFar(p, lines[-1]).split('\n') for a in range(len(lines)-1): print a
一种解决方案是使另一个进程执行您对进程的读取,或者使进程的线程超时.
这是超时函数的线程版本:
http://code.activestate.com/recipes/473878/
但是,你需要阅读stdout,因为它正在进入?另一种解决方案可能是将输出转储到文件并等待进程使用p.wait()完成.
f = open('myprogram_output.txt','w') p = subprocess.Popen('myprogram.exe', stdout=f) p.wait() f.close() str = open('myprogram_output.txt','r').read()
免责声明:这仅适用于龙卷风
您可以通过将fd设置为非阻塞来执行此操作,然后使用ioloop注册回调.我把它打包成一个名为tornado_subprocess的蛋,你可以通过PyPI安装它:
easy_install tornado_subprocess
现在你可以这样做:
import tornado_subprocess import tornado.ioloop def print_res( status, stdout, stderr ) : print status, stdout, stderr if status == 0: print "OK:" print stdout else: print "ERROR:" print stderr t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] ) t.start() tornado.ioloop.IOLoop.instance().start()
您也可以将它与RequestHandler一起使用
class MyHandler(tornado.web.RequestHandler): def on_done(self, status, stdout, stderr): self.write( stdout ) self.finish() @tornado.web.asynchronous def get(self): t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] ) t.start()
现有的解决方案对我不起作用(详情如下).最终工作的是使用read(1)实现readline(基于这个答案).后者不会阻止:
from subprocess import Popen, PIPE from threading import Thread def process_output(myprocess): #output-consuming thread nextline = None buf = '' while True: #--- extract line using read(1) out = myprocess.stdout.read(1) if out == '' and myprocess.poll() != None: break if out != '': buf += out if out == '\n': nextline = buf buf = '' if not nextline: continue line = nextline nextline = None #--- do whatever you want with line here print 'Line is:', line myprocess.stdout.close() myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process p1 = Thread(target=process_output, args=(dcmpid,)) #output-consuming thread p1.daemon = True p1.start() #--- do whatever here and then kill process and thread if needed if myprocess.poll() == None: #kill process; will automatically stop thread myprocess.kill() myprocess.wait() if p1 and p1.is_alive(): #wait for thread to finish p1.join()
为什么现有解决方案不起作用:
需要readline的解决方案(包括基于Queue的解决方案)始终会阻止.杀死执行readline的线程很困难(不可能?)它只会在创建它的进程完成时被杀死,但不会在生成输出的进程被终止时被杀死.
正如aonnn所指出的,将低级别fcntl与高级别readline调用混合可能无法正常工作.
使用select.poll()很整洁,但根据python docs在Windows上不起作用.
使用第三方库似乎对此任务有些过分,并添加了其他依赖项.
我添加此问题以读取一些subprocess.Popen stdout。这是我的非阻塞读取解决方案:
import fcntl def non_block_read(output): fd = output.fileno() fl = fcntl.fcntl(fd, fcntl.F_GETFL) fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) try: return output.read() except: return "" # Use example from subprocess import * sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE) sb.kill() # sb.stdout.read() # <-- This will block non_block_read(sb.stdout) 'test\n'
这是我的代码,用于捕获子流程ASAP的每个输出,包括部分行。它同时抽水,并且以几乎正确的顺序抽出stdout和stderr。
经过测试并在Python 2.7 linux&Windows上正确工作。
#!/usr/bin/python # # Runner with stdout/stderr catcher # from sys import argv from subprocess import Popen, PIPE import os, io from threading import Thread import Queue def __main__(): if (len(argv) > 1) and (argv[-1] == "-sub-"): import time, sys print "Application runned!" time.sleep(2) print "Slept 2 second" time.sleep(1) print "Slept 1 additional second", time.sleep(2) sys.stderr.write("Stderr output after 5 seconds") print "Eol on stdin" sys.stderr.write("Eol on stderr\n") time.sleep(1) print "Wow, we have end of work!", else: os.environ["PYTHONUNBUFFERED"]="1" try: p = Popen( argv + ["-sub-"], bufsize=0, # line-buffered stdin=PIPE, stdout=PIPE, stderr=PIPE ) except WindowsError, W: if W.winerror==193: p = Popen( argv + ["-sub-"], shell=True, # Try to run via shell bufsize=0, # line-buffered stdin=PIPE, stdout=PIPE, stderr=PIPE ) else: raise inp = Queue.Queue() sout = io.open(p.stdout.fileno(), 'rb', closefd=False) serr = io.open(p.stderr.fileno(), 'rb', closefd=False) def Pump(stream, category): queue = Queue.Queue() def rdr(): while True: buf = stream.read1(8192) if len(buf)>0: queue.put( buf ) else: queue.put( None ) return def clct(): active = True while active: r = queue.get() try: while True: r1 = queue.get(timeout=0.005) if r1 is None: active = False break else: r += r1 except Queue.Empty: pass inp.put( (category, r) ) for tgt in [rdr, clct]: th = Thread(target=tgt) th.setDaemon(True) th.start() Pump(sout, 'stdout') Pump(serr, 'stderr') while p.poll() is None: # App still working try: chan,line = inp.get(timeout = 1.0) if chan=='stdout': print "STDOUT>>", line, "<" elif chan=='stderr': print " ERROR==", line, "=?=" except Queue.Empty: pass print "Finish" if __name__ == '__main__': __main__()
这无阻塞读的版本并不需要特殊的模块,将工作外的开箱上大多数Linux发行版的.
import os import sys import time import fcntl import subprocess def async_read(fd): # set non-blocking flag while preserving old flags fl = fcntl.fcntl(fd, fcntl.F_GETFL) fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) # read char until EOF hit while True: try: ch = os.read(fd.fileno(), 1) # EOF if not ch: break sys.stdout.write(ch) except OSError: # waiting for data be available on fd pass def shell(args, async=True): # merge stderr and stdout proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) if async: async_read(proc.stdout) sout, serr = proc.communicate() return (sout, serr) if __name__ == '__main__': cmd = 'ping 8.8.8.8' sout, serr = shell(cmd.split())