在前面我们简单的介绍了一些 Futures 的基本知识的例子中,我们出现了 combinator 的概念,也就是 and_then
这些。Combinator 能将多个 Future 串联组合起来,依次执行调动,最终得到结果。
在 futures 库里面,已经提供了一些基本的 Future,后面,我们会结合这些 concrete 的 Future,来具体说明一下 combinator。
Leaf Future
在 futures 里面,有一种 leaf future ,也就是 FutureResult,它只要调用 poll
就能够及时的返回值,不需要额外的等待。后续很多的例子我们都会通过这种 leaf future 来完成。
使用 leaf future 也是非常简单的,我们可以通过 ok
, err
以及 result
来完成:
let f = ok::<u32, u32>(1);
assert_eq!(f.wait().unwrap(), 1);
let f = err::<u32, u32>(2);
assert_eq!(f.wait().unwrap_err(), 2);
let f = result::<u32, u32>(Ok(3));
assert_eq!(f.wait().unwrap(), 3);
let f = result::<u32, u32>(Err(4));
assert_eq!(f.wait().unwrap_err(), 4);
Empty Future
不同于上面的 leaf future,我们还可以创建一种 empty future,它在任何 poll
的时候,都会返回 NotReady
。
let mut f1 = empty::<u32, u32>();
assert_eq!(f1.poll(), Ok(Async::NotReady));
Map / MapErr Future
我们可以使用 map
或者 map_err
将一个 Future 转为另一个 Future,譬如:
let f = ok::<u32, u32>(1);
let f2 = f.map(|x| x + 1);
assert_eq!(f2.wait().unwrap(), 2);
let f = err::<u32, u32>(2);
let f2 = f.map_err(|x| x + 2);
assert_eq!(f2.wait().unwrap_err(), 4);
Then / AndThen / OrElse Future
我们也可以使用 then
/ and_then
/ or_else
来将当前 Future 完成的值传递给下一个 Future 处理:
let f = ok::<u32, u32>(1);
let f2 = f.then(|x| x.map(|y| y + 1));
assert_eq!(f2.wait().unwrap(), 2);
let f = err::<u32, u32>(2);
let f2 = f.then(|x| x.map_err(|y| y + 2));
assert_eq!(f2.wait().unwrap_err(), 4);
let f = ok::<u32, u32>(1);
let f2 = f.and_then(|x| Ok(x + 1));
assert_eq!(f2.wait().unwrap(), 2);
let f = err::<u32, u32>(2);
let f2 = f.or_else(|x| Err(x + 2));
assert_eq!(f2.wait().unwrap_err(), 4);
Select / Join Future
我们可以使用 select
/ join
来等待一个或者所有 Future 完成:
let f1 = ok::<u32, u32>(1);
let f2 = ok::<u32, u32>(2);
let _ = f1.select(f2).map(|(x, y)| {
assert_eq!(x, 1);
assert_eq!(y.wait().unwrap(), 2);
});
let f1 = ok::<u32, u32>(1);
let f2 = ok::<u32, u32>(2);
let _ = f1.join(f2).map(|(x, y)| {
assert_eq!(x, 1);
assert_eq!(y, 2);
});
Flatten Future
也可以使用 flatten
来得到最后一个 Future 的值:
let f1 = ok::<u32, u32>(1);
let f2 = ok::<_, u32>(f1);
assert_eq!(f2.flatten().wait().unwrap(), 1);
Fuse Future
通常一个 Future 完成之后,再次去 poll
会 panic 出错,为了防止这样的情况出现,我们可以使用 fuse
,这样当 Future 已经完成之后,后面任何 poll
都会返回 NotReady
:
let mut f1 = ok::<u32, u32>(1);
assert_eq!(f1.poll(), Ok(Async::Ready(1)));
let mut f1 = ok::<u32, u32>(1).fuse();
assert_eq!(f1.poll(), Ok(Async::Ready(1)));
assert_eq!(f1.poll(), Ok(Async::NotReady));
Channel
futures 现在提供两种 channel,one-shot 以及 MPSC。one-shot 主要用于两个线程之间交互的情况,而 MPSC 则是可以多个线程发送数据,一个线程接受处理。
一个 one-shot 的简单例子:
let (tx, rx) = oneshot::channel::<u32>();
thread::spawn(move || {
tx.complete(1);
});
rx.map(|x| {
assert_eq!(x, 1);
})
.wait()
.unwrap();
一个 MPSC 的简单例子:
let (tx, rx) = mpsc::channel::<u32>(1);
let tx1 = tx.clone();
thread::spawn(move || {
tx1.send(1).wait().unwrap();
});
let tx2 = tx.clone();
thread::spawn(move || {
tx2.send(2).wait().unwrap();
});
drop(tx);
let mut w = rx.wait();
assert_eq!(w.next().unwrap(), Ok(1));
assert_eq!(w.next().unwrap(), Ok(2));
assert!(w.next().is_none());
小结
这里,我们简单介绍了 futures 库提供的一些基本 future,combinator,以及他们的使用方法。 可以看到,使用都是非常简单的。这里,我们并没有详细的说明 Stream,但也没有特别大的差别。另外,futures 库还有一个关键的 task 概念,这个我们后续在说明。