我正在Rust中创建一个小的ncurses应用程序,需要与子进程通信.我已经有了一个用Common Lisp编写的原型; 这里的gif 将有希望展示我想做的事情.我正在尝试重写它,因为CL为这么小的工具使用了大量的内存.
我之前没有使用过Rust(或其他低级语言),而且我在弄清楚如何与子进程交互时遇到了一些麻烦.
我目前正在做的大致是这样的:
创建流程:
let mut program = match Command::new(command) .args(arguments) .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn() { Ok(child) => child, Err(_) => { println!("Cannot run program '{}'.", command); return; } };
将它传递给无限(直到用户退出)循环,该循环读取并处理输入并像这样监听输出(并将其写入屏幕):
fn listen_for_output(program: &mut Child, output_viewer: &TextViewer) { match program.stdout { Some(ref mut out) => { let mut buf_string = String::new(); match out.read_to_string(&mut buf_string) { Ok(_) => output_viewer.append_string(buf_string), Err(_) => return, }; } None => return, }; }
read_to_string
然而,调用阻止程序直到进程退出.从我所看到的read_to_end
,read
似乎也阻止.如果我尝试运行类似于ls
哪个退出的东西,它可以工作,但是有些东西不能退出,python
或者sbcl
只有在我手动杀死子进程后才会继续.
编辑:
基于这个答案,我改变了使用的代码BufReader
:
fn listen_for_output(program: &mut Child, output_viewer: &TextViewer) { match program.stdout.as_mut() { Some(out) => { let buf_reader = BufReader::new(out); for line in buf_reader.lines() { match line { Ok(l) => { output_viewer.append_string(l); } Err(_) => return, }; } } None => return, } }
但问题仍然存在.它将读取所有可用的行,然后阻止.由于该工具应该可以与任何程序一起使用,因此在尝试读取之前无法猜出输出何时结束.似乎没有办法为BufReader
两者设置超时.
默认情况下,流是阻止的.TCP/IP流,文件系统流,管道流,它们都是阻塞的.当你告诉一个流给你一大块字节时,它会停止并等到它有给定的字节数或者直到发生其他事情(中断,流结束,错误).
操作系统急于将数据返回到读取过程,因此如果你想要的只是等待下一行并在它进入时立即处理它,那么Shepmaster建议的方法无法管理到生成的子项过程不止一次起作用.(理论上它没有必要,因为允许操作系统BufReader
等待更多数据read
,但实际上操作系统更喜欢早期的"短读取"等待).
这个简单的BufReader
基于方法停止工作,当您需要处理多个数据流(如stdout
和stderr
的子进程)或多个进程.例如,BufReader
当子进程等待您排空其stderr
管道而您的进程被阻塞等待它为空时,基于-Based的方法可能会死锁stdout
.
同样,BufReader
当您不希望程序无限期地等待子进程时,您无法使用.也许您想要在孩子仍在工作时显示进度条或计时器并且没有输出.
BufReader
如果操作系统不急于将数据返回到进程(更喜欢"完全读取"到"短读取"),则不能使用基于方法,因为在这种情况下,子进程打印的最后几行可能最终处于灰色区域:操作系统获得了它们,但它们不够大,无法填充BufReader
缓冲区.
BufReader
仅限于Read
接口允许它对流做什么,它的阻塞程度不低于底层流.为了提高效率,它将以块的形式读取输入,告诉操作系统尽可能多地填充缓冲区.
您可能想知道为什么在块中读取数据如此重要,为什么不能BufReader
逐字节地读取数据.问题是要从流中读取数据,我们需要操作系统的帮助.另一方面,我们不是操作系统,我们与它隔离工作,以免在我们的流程出现问题时弄乱它.因此,为了调用操作系统,需要转换到"内核模式",这也可能导致"上下文切换".这就是调用操作系统读取每个字节的原因很昂贵.我们希望尽可能少的OS调用,因此我们批量获取流数据.
要在没有阻止的情况下等待流,您需要一个非阻塞流.MIO 承诺为管道提供所需的非阻塞流支持,很可能是使用PipeReader,但到目前为止我还没有检查过它.
流的非阻塞性质应该能够以块的形式读取数据,而不管操作系统是否更喜欢"短读取".因为非阻塞流永远不会阻塞.如果流中没有数据,它只会告诉您.
在没有阻塞流的情况下,您将不得不求助于生成线程,以便阻塞读取将在单独的线程中执行,因此不会阻止您的主线程.您可能还希望逐字节读取流,以便在操作系统不喜欢"短读取"时立即对行分隔符做出反应.这是一个有效的例子:https://gist.github.com/ArtemGr/db40ae04b431a95f2b78.
这是使用tokio和tokio-process的示例。
use std::{ io::BufReader, process::{Command, Stdio}, }; use tokio::{io, prelude::*, runtime::Runtime}; // 0.1.18 use tokio_process::CommandExt; // 0.2.3 fn main() { let mut cmd = Command::new("/tmp/slow.bash") .stdout(Stdio::piped()) .spawn_async() .expect("cannot spawn"); let stdout = cmd.stdout().take().expect("no stdout"); let mut runtime = Runtime::new().expect("Unable to start the runtime"); let result = runtime.block_on({ io::lines(BufReader::new(stdout)) .inspect(|s| println!("> {}", s)) .collect() }); println!("All the lines: {:?}", result); }东京线程池
这是使用tokio和tokio-threadpool的示例。我们使用blocking
函数在线程中启动进程。我们将其转换为流stream::poll_fn
use std::process::{Command, Stdio}; use tokio::{prelude::*, runtime::Runtime}; // 0.1.18 use tokio_threadpool; // 0.1.13 fn stream_command_output( mut command: Command, ) -> impl Stream- , Error = tokio_threadpool::BlockingError> { // Ensure that the output is available to read from and start the process let mut child = command .stdout(Stdio::piped()) .spawn() .expect("cannot spawn"); let mut stdout = child.stdout.take().expect("no stdout"); // Create a stream of data stream::poll_fn(move || { // Perform blocking IO tokio_threadpool::blocking(|| { // Allocate some space to store anything read let mut data = vec![0; 128]; // Read 1-128 bytes of data let n_bytes_read = stdout.read(&mut data).expect("cannot read"); if n_bytes_read == 0 { // Stdout is done None } else { // Only return as many bytes as we read data.truncate(n_bytes_read); Some(data) } }) }) } fn main() { let output_stream = stream_command_output(Command::new("/tmp/slow.bash")); let mut runtime = Runtime::new().expect("Unable to start the runtime"); let result = runtime.block_on({ output_stream .map(|d| String::from_utf8(d).expect("Not UTF-8")) .fold(Vec::new(), |mut v, s| { print!("> {}", s); v.push(s); Ok(v) }) }); println!("All the lines: {:?}", result); }
在这里可以进行很多可能的权衡。例如,总是分配128个字节并不理想,但是实现起来很简单。
支持供参考,这里是slow.bash:
use std::{ io::BufReader, process::{Command, Stdio}, }; use tokio::{io, prelude::*, runtime::Runtime}; // 0.1.18 use tokio_process::CommandExt; // 0.2.3 fn main() { let mut cmd = Command::new("/tmp/slow.bash") .stdout(Stdio::piped()) .spawn_async() .expect("cannot spawn"); let stdout = cmd.stdout().take().expect("no stdout"); let mut runtime = Runtime::new().expect("Unable to start the runtime"); let result = runtime.block_on({ io::lines(BufReader::new(stdout)) .inspect(|s| println!("> {}", s)) .collect() }); println!("All the lines: {:?}", result); }
也可以看看:
如何在稳定的Rust中同步返回在异步Future中计算的值?