我有以下情况:我在socketio服务器上收到请求.我回答它(socket.emit(..))然后在另一个线程中启动一些计算负载很重的东西.
如果繁重的计算是由subprocess.Popen
(使用subprocess.PIPE
)引起的,它会完全阻止每个传入的请求,只要它正在被执行,尽管它发生在一个单独的线程中.
没问题 - 在这个线程中,建议异步读取子进程的结果,缓冲区大小为1,以便在这些读取之间,其他线程有机会做某事.不幸的是,这对我没有帮助.
我也已经monkeypatched eventlet和工作正常-只要我不使用subprocess.Popen
与subprocess.PIPE
在线程.
在此代码示例中,您可以看到它只在使用subprocess.Popen
时发生subprocess.PIPE
.当取消注释#functionWithSimulatedHeavyLoad()
并反而评论functionWithHeavyLoad()
一切都像魅力一样.
from flask import Flask from flask.ext.socketio import SocketIO, emit import eventlet eventlet.monkey_patch() app = Flask(__name__) socketio = SocketIO(app) import time from threading import Thread @socketio.on('client command') def response(data, type = None, nonce = None): socketio.emit('client response', ['foo']) thread = Thread(target = testThreadFunction) thread.daemon = True thread.start() def testThreadFunction(): #functionWithSimulatedHeavyLoad() functionWithHeavyLoad() def functionWithSimulatedHeavyLoad(): time.sleep(5) def functionWithHeavyLoad(): from datetime import datetime import subprocess import sys from queue import Queue, Empty ON_POSIX = 'posix' in sys.builtin_module_names def enqueueOutput(out, queue): for line in iter(out.readline, b''): if line == '': break queue.put(line) out.close() # just anything that takes long to be computed shellCommand = 'find / test' p = subprocess.Popen(shellCommand, universal_newlines=True, shell=True, stdout=subprocess.PIPE, bufsize=1, close_fds=ON_POSIX) q = Queue() t = Thread(target = enqueueOutput, args = (p.stdout, q)) t.daemon = True t.start() t.join() text = '' while True: try: line = q.get_nowait() text += line print(line) except Empty: break socketio.emit('client response', {'text': text}) socketio.run(app)
在完成functionWithHeavyLoad()函数中的阻塞工作后,客户端收到消息'foo'.但它应该早些收到消息.
可以将此示例复制并粘贴到.py文件中,并且可以立即重现该行为.
我使用的是Python 3.4.3,Flask 0.10.1,flask-socketio1.2,eventlet 0.17.4
更新
如果我将它放入functionWithHeavyLoad函数中它实际上是有效的,一切都很好:
import shlex shellCommand = shlex.split('find / test') popen = subprocess.Popen(shellCommand, stdout=subprocess.PIPE) lines_iterator = iter(popen.stdout.readline, b"") for line in lines_iterator: print(line) eventlet.sleep()
问题是:我用于find
重负荷,以使您的样品更容易重现.但是,在我的代码中,我实际上使用tesseract "{0}" stdout -l deu
了sell命令.这(不像find
)仍然阻止一切.这是一个tesseract
比eventlet 更重要的问题吗?但是仍然:如果它发生在一个单独的线程中,它会如何阻止它在find
不阻塞的情况下逐行读取上下文切换?
感谢这个问题,我今天学到了一些新知识.Eventlet提供了一个greenlet友好版本的子进程及其功能,但由于一些奇怪的原因,它不会在标准库中修补此模块.
链接到子进程的eventlet实现:https://github.com/eventlet/eventlet/blob/master/eventlet/green/subprocess.py
查看eventlet 修补程序,修补的模块是os,select,socket,thread,time,MySQLdb,builtins和psycopg2.在修补程序中绝对没有对子进程的引用.
好消息是Popen()
,在我更换后,我能够在与您非常相似的应用程序中工作:
import subprocess
有:
from eventlet.green import subprocess
但请注意,当前发布的eventlet版本(0.17.4)不支持该universal_newlines
选项Popen
,如果使用它将会出错.支持此选项的是master(这是添加选项的提交).您将不得不从调用中删除该选项,或者直接从github安装eventlet的主分支.