stream和Rxjava,Kotlin Flow一样,产生数据流并处理
首先来看看如何使用
基本api使用
StreamController<int> _controller = StreamController();
Stream<int> stream = _controller.stream;
//数据订阅
StreamSubscription<int> subscription = stream.listen((event) {
print(event);
});
//数据发送
_controller.add(1);
_controller.add(2);
_controller.add(3);
StreamController 是个泛型,泛型表示对应的数据类型,声明什么类型,后续发送也是什么类型数据
拿到StreamController的stream对象,调用listen就可以监听数据。listen返回一个StreamSubscription对象。可以调用其cancel方法取消订阅
StreamSubscription的cancel可以取消监听数据,后续即使有数据,也不会回调,整个和EventBus类似,订阅数据,处理数据,取消订阅
输出
1
2
3
Process finished with exit code 0
把上面例子改改
Future.delayed(Duration(seconds: 1),(){
subscription.cancel();
_controller.add(3);
});
由于取消了,后续也只打印了1和2
StreamController发送数据
Stream订阅,处理数据
StreamSubscription取消订阅
再看StreamController构造方法,里面还有些回调参数,表示订阅了,暂停,恢复,取消的回调,还有个sync表示是否同步还是异步,默认为false,
如果为false的话,表示add数据后,稍后再调用处理数据的回调,
如果为true的话,表示立马回调数据。
StreamController(
{void onListen()?,
void onPause()?,
void onResume()?,
FutureOr<void> onCancel()?,
bool sync = false})
分别添加打印
StreamController(onListen: () {
print('onListen');
}, onCancel: () {
print('onCancel');
},
onResume: (){
print('onResume');
},
onPause: (){
print('onPause');
}
)
_controller.add(1);
_controller.add(2);
Future.delayed(Duration(seconds: 1), () {
_controller.add(3);
subscription.pause(Future.value());
// subscription.cancel();
_controller.add(4);
});
Future.delayed(Duration(seconds: 2),(){
_controller.add(5);
subscription.resume();
_controller.add(6);
});
输出如下,StreamSubscription的操作都能监听到回调,虽然add(3)在pause前调用,但是,在后面输出
onListen
1
2
onPause
3
4
onResume
5
6
其他不变,参数sync为true
输出如下,可以看出3在onPause前调用,也就是说调用add方法就立马触发了数据回调
onListen
1
2
3
onPause
4
onResume
5
6
除了StreamController产生数据流,
还要其他构造方法也能产生流
Stream.value,从单个数据产生流
Stream.value(1).listen(print);
fromIterable 从集合的产生数据流
Stream<String> stream =
Stream<String>.fromIterable(['1', '2', '3', '4']);
stream.listen((event) {
print(event);
});
还可以结合await这么写
Stream<String> stream = Stream<String>.fromIterable(list);
await for (String s in stream) {
print(s);
}
yield 返回多个数据流。方法要用async*修饰
Stream<int> createStream({int count = 5}) async* {
for (int i = 0; i < count; i++) {
await Future.delayed(const Duration(seconds: 1));
yield i;
}
}
createStream().listen((event) {
print(event);
})
输出
1
2
3
4
periodic每隔一段时间产生一个数据流
Stream.periodic(Duration(seconds: 1),(counter){
return "s"*counter;
}).take(5).listen((event) {
print(event);
})
counter每次回调都会增1,take是操作符,表示只取前面5个
s
ss
sss
ssss
fromFuture,从一个Future中创建一个流,Stream.fromFutures(Iterable<Future<T>> futures)从多个Future创建多个流
Stream.fromFuture(futureTask()).listen((event) {
print(event);
});
Future<String> futureTask() async {
await Future.delayed(const Duration(seconds: 1));
return 'Future complete';
}
输出
Future complete
Process finished with exit code 0
多个监听者,StreamController.broadcast
正常我们的流只能有一个监听者,我想多个监听,就会抛一个异常。
代码如下:
StreamController<int> _streamController =
StreamController();
_streamController.stream.listen((event) {
print('listen1:' + event.toString());
});
_streamController.stream.listen((event) {
print('listen2:' + event.toString());
});
_streamController.add(1);
输出
Unhandled exception:
Bad state: Stream has already been listened to.
如果想多个监听,就用StreamController.broadcast构造方法生成StreamController
StreamController<int> _streamController =
StreamController.broadcast(sync: false);
_streamController.stream.listen((event) {
print('listen1:' + event.toString());
});
_streamController.stream.listen((event) {
print('listen2:' + event.toString());
});
_streamController.add(1);
输出如下,两个监听器都监听到数据
listen1:1
listen2:1
haha
Process finished with exit code 0
操作符
和Rxjava,Kotlin Flow一样,对于数据流也有各种各样的操作符方便我们处理数据,名字也差不多,就不一一列举了