Stream的生成
1、从零开始创建Stream
创建一个Stream可以通过异步生成器(async*)函数。当异步生成器函数被调用时会创建一个 Stream,而函数体则会在该 Stream 被监听时开始运行。当函数返回时,Stream 关闭。在函数返回前,你可以使用 yield 或 yield 语句向该 Stream 提交事件。
下面是一个周期性发送整数的函数例子:
void main() {
var duration = Duration(seconds: 3);
var stream = timedCounter(duration, 10);
stream.listen((event) {
print(event);
});
}
Stream<int> timedCounter(Duration interval, [int maxCount]) async* {
var i = 0;
while(true) {
await Future.delayed(interval);
yield i++;
if(i == maxCount) break;
}
}
2、转换现有的Stream
我们在创建 Stream 时常见的情形是根据现有 Stream 的事件创建一个新的 Stream。比如你已经有了一个可以提供字节事件的 Stream,然后你想将该 Stream 变为一个可以提供字符串的 Stream,并且该 Stream 中的字符串还经过 UTF-8 编码。对于这种情况,常用的办法是创建一个新的 Stream 去等待获取原 Stream 的事件,然后再将新 Stream 中的事件输出。例如:
/// 将连续的字符串 Stream 拆分为行。
///
/// 输入的字符串来自于"源" Stream 并以较小的 chunk 块提供。
Stream<String> lines(Stream<String> source) async* {
// 存储从上一个数据块中分离出的字符串行。
var partial = '';
// 等到新的数据块可用时开始处理。
await for (var chunk in source) {
var lines = chunk.split('\n');
lines[0] = partial + lines[0]; // 追加拼接行。
partial = lines.removeLast(); // 删除剩余不完整的行。
for (var line in lines) {
yield line; // 将分离的每个字符串行添加至输出 Stream。
}
}
// 最后如果最终的字符串行不为空则将其添加至输出流。
if (partial.isNotEmpty) yield partial;
}
3、使用 StreamController
如果你 Stream 的事件不仅来自于异步函数可以遍历的 Stream 和 Future,还来自于你程序的不同部分,这种情况使用上述两种方式生成 Stream 就显得比较困难。面对这种情况,我们可以使用一个 StreamController 来创建和填充 Stream。
StreamController 可以为你生成一个 Stream,并提供在任何时候、任何地方将事件添加到该 Stream 的方法。该 Stream 具有处理监听器和暂停所需的所有逻辑。控制器对象你可以自行处理而只需返回调用者所需的 Stream 即可。
void main() {
var duration = Duration(seconds: 3);
var stream = timedCounter(duration, 10);
stream.listen((event) {
print(event);
});
}
Stream<int> timedCounter(Duration interval, [int maxCount]) {
var controller = StreamController<int>();
var counter = 0;
Timer timer;
void tick(Timer timer) {
counter++;
controller.add(counter); // 请求 Stream 将计数器值作为事件发送。
if (maxCount != null && counter >= maxCount) {
timer.cancel();
controller.close(); // 请求 Stream 关闭并告知监听器。
}
}
void startTimer() {
timer = Timer.periodic(interval, tick);
}
void stopTimer() {
if(timer != null) {
timer.cancel();
timer = null;
}
}
controller = StreamController<int> (
onListen: startTimer,
onPause: stopTimer,
onResume: startTimer,
onCancel: stopTimer,
);
return controller.stream;
}
不通过async*函数创建Stream时,请务必牢记以下几点:
使用同步控制器时要小心。例如,使用
StreamController(sync: true)
构造方法创建控制器。当你发送一个事件到一个未暂停的同步控制器(例如:使用 EventSink 中定义的add()
、addError()
或close()
方法),事件立即发送给所有 Stream 的监听器。在添加监听器的代码返回之前,决不能调用Stream
监听器,而在错误的事件使用同步控制器会破坏该规则并导致其它正常代码执行失败。因此,你应该避免使用同步控制器。如果你使用 StreamController, onListen 回调会在 listen 方法调用返回 StreamSubscription 前返回。不要让 onListen 回调依赖于已经存在的订阅。例如,在下面的代码中,onListen 回调有可能会在 subscription 变量被初始化为一个有效值之前被触发(同时 处理器 被调用)
当 Stream 的监听器状态改变时,由 StreamController 定义的 onListen、onPause、onResume 和 onCancel 回调会被调用,该调用绝不会发生在事件生成时或在某个状态变化处理回调的调用期间。在这些情况出现时,状态变化的回调会被延迟,直到上一个回调执行完成。
不要尝试自己去实现 Stream 接口。否则很容易在事件、回调以及添加和移除监听器这些操作交互时出现一些难以察觉的错误。你应该总是使用一个现有的 Stream(比如由 StreamController 生成的)去实现新 Stream 中 listen 方法的调用。
尽管你可以通过扩展 Stream 类并实现 listen 方法来实现更多额外的功能,但一般不建议这么做,因为这样会引入一个调用者必须考虑的新类型。相反,你可以创建一个(或多个)具有 Stream 的类而不是一个(或多个)Stream。