我有一个项目清单通过期货::派Sink
:
let mut list = VecDeque::new(); /* add a bunch of Packet items to list */ let (sink, stream) = tcp_stream.framed(PacketCodec).split();
我可以使用发送一个数据包
if let Some(first) = list.pop_front() { sink.send(first); }
如何发送整个列表?
当使用新的软件时,我觉得在深入潜水之前阅读文档非常有用.一般来说,Rust社区提供了非常好的资源.
例如,除了一系列工作示例之外,Tokio项目还有一个关于s和s的整个页面Sink
send
.该用于生成API文档也是非常宝贵的.Send
我建议所有新程序员学习的另一件事是创建问题的MCVE.这使他们能够专注于问题的核心,同时也消除了问题背后的瑕疵和磨练.这是一个案例:
use futures; // 0.1.26 fn thing(sink: S) where S: futures::Sink, { let mut all_the_things = vec![1, 2, 3, 4, 5]; while let Some(v) = all_the_things.pop() { sink.send(v); } }
我们收到错误消息
error[E0382]: use of moved value: `sink`
--> src/lib.rs:10:9
|
3 | fn thing(sink: S)
| - ---- move occurs because `sink` has type `S`, which does not implement the `Copy` trait
| |
| consider adding a `Copy` constraint to this type argument
...
10 | sink.send(v);
| ^^^^ value moved here, in previous iteration of loop
让我们回到API文档......
fn send(self, item: Self::SinkItem) -> Sendwhere Self: Sized,
从这里,我们可以看到Send
消耗接收器并返回一个Future
对象.那很奇怪,不是吗!实际上并没有那么奇怪,一旦你看到Future
实现Future
- 将某些内容推送到流中可能会阻塞流是否已满; 这是背压的概念.结果是a Send
是如何知道项目何时实际添加到流中.实施Item = S
了Sink
有wait
,具体类型的Sink::send_all
.因此,一种解决方案是将未来推向完成以获得回收,其中一种方法是调用Stream
:
use futures::Future; // 0.1.26 use std::fmt; fn thing(mut sink: S) where S: futures::Sink, S::SinkError: fmt::Debug, { let mut all_the_things = vec![1, 2, 3, 4, 5]; while let Some(v) = all_the_things.pop() { let f = sink.send(v); sink = f.wait().expect("Something bad happened while sending"); } }
这编译,但不是非常漂亮.回到文档,我们也可以看到Stream
,这需要一个stream::iter_ok
.我们可以使用以下命令wait
从迭代器创建wait
:
use futures::Future; // 0.1.26 use std::fmt; fn thing(sink: S) where S: futures::Sink, S::SinkError: fmt::Debug, { let all_the_things = vec![1, 2, 3, 4, 5]; sink.send_all(futures::stream::iter_ok(all_the_things)) .wait() .expect("Something bad happened while sending"); }
我们再次用它Future
来推动未来的完成.如果我们忘记这是一个很好的错误:"除非进行调查,否则期货什么都不做".但是,Sink
这不是处理这个问题的正确方法.通常,您希望将其send
返回给调用者,因为他们想知道何时将所有内容都塞进了接收器.