什么是EventBus
EventBus是全局事件总线,底层通过Stream
来实现;它可以实现不同页面的跨层访问,通过Stream
的机制来实现不同widget之间的状态共享.
这里我们使用的是官方提供了一个Event_Bus
库.
在pubsec.yaml文件导入:
event_bus: ^1.1.0
在使用的文件内:
import 'package:event_bus/event_bus.dart';
使用方式
这里举个例子:有三个界面A,B,C, 从A进入B, B再进入C. 在C中点击按钮,然后A的背景色会变成红色.这里实际上就形成了跨层的访问C->A.
首先我们需要定义一个EventBus对象来订阅事件流.这里我们通过一个工厂方法来创建一个唯一的EventBus对象.
class GlobalEventBus{
EventBus event;
factory GlobalEventBus() => _getInstance();
static GlobalEventBus get instance => _getInstance();
static GlobalEventBus _instance;
GlobalEventBus._internal() {
// 创建对象
event = EventBus();
}
static GlobalEventBus _getInstance() {
if (_instance == null) {
_instance = GlobalEventBus._internal();
}
return _instance;
}
}
用一个全局对象的好处就是不需要将eventbus当做参数进行传递了,而且更符合全局总线的概念.
1.首先,我们需要先创建一个事件用来监听,我们将这个事件命名为BackgroundColorChangeEvent
,这个事件其实是一个类,用来当做共享数据的载体,我们可以在这个类中加入不同的属性.例如:
class BackgroundColorChangeEvent{
Color color;
BackgroundColorChangeEvent(this.color);
}
这里我们就可以把color
当做参数通过BackgroundColorChangeEvent
对象传递到A中,然后改变A的背景色.
2.然后在A订阅事件:
GlobalEventBus().event.on<BackgroundColorChangeEvent>().listen((event) {
Color color = event.color;
setState(){
_backgroundColor = color;
}
});
以上就完成了事件的创建和订阅.
3.接着我们就需要在C中发布这个事件,Event_Bus提供了一个fire
方法来分发事件.
FlatButton(){
onPress:(){
GlobalEventBus().event.fire(BackgroundColorChangeEvent(Colors.red)) ;
}
}
这里我们通过全局event
对象,分发了BackgroundColorChangeEvent
事件,并且携带了一个color
参数过去.此时A已经订阅了这个事件,于是背景色就改变了.
需要注意到的是在使用之后需要关闭event事件流,不然会造成内存泄漏,调用如下代码即可:
GlobalEventBus().event.destroy();
EventBus的底层实现
以上就是EventBus
的使用方式,非常的简单.但它底层是怎么实现的呢?我们一步步来分析.
1.创建EventBus
首先我们从EventBus
的初始化开始,进入event_bus.dart
源文件中:
EventBus({bool sync = false})
: _streamController = StreamController.broadcast(sync: sync);
我们可以看出,EventBus
对象初始化实际上初始化了一个_streamController
对象,而这个对象是通过StreamController
的broadcast(sync: sync)
方法初始化的,这里大致可以看出来,EventBus
的底层实际上就是通过Stream
来实现的.这里默认带了sync
参数,表示是否同步,默认为async
.
而进入broadcast(sync: sync)
方法中:
factory StreamController.broadcast(
{void onListen(), void onCancel(), bool sync: false}) {
return sync
? new _SyncBroadcastStreamController<T>(onListen, onCancel) // 同步广播
: new _AsyncBroadcastStreamController<T>(onListen, onCancel); // 异步广播
}
可以看到sync
参数作用就是返回一个是同步或异步广播流控制器.
这里同步和异步的区别是:
如果
sync
为true,则事件将通过fire
方法直接传递给流的监听者;如果为false,在创建EventBus
之后,才将事件延后传递给监听者。
那么事件是怎么被监听到的呢?
2. EventBus进行订阅监听
事件我们是通过event.on().listen()
方法来监听的,进入on()
的dart源码中,
Stream<T> on<T>() {
if (T == dynamic) {
return streamController.stream;
} else {
return streamController.stream.where((event) => event is T).cast<T>();
}
}
我们可以看到on()
返回了一个Stream
,并且用泛型约定了事件类型.如果不传泛型约定,那么就默认监听所有的事件; 如果添加了泛型T
,那么就只会监听T
这个事件.
然后我们在看listen()
方法:
StreamSubscription<T> listen(void onData(T event),
{Function onError, void onDone(), bool cancelOnError});
在方法参数中,我们可以发现onData(T event)
这个方法回调,正是这个方法参数回调了我们需要监听订阅的事件T
.至于怎么接收到这个onData()
回调的,我们后面再看,先了解下fire()
方法的实现.
3. EventBus发布事件
我们进入fire()
方法中,
void fire(event) {
streamController.add(event);
}
streamController
添加了这个事件,平淡无奇,继续看下去.
void add(T value) {
if (!_mayAddEvent) throw _badEventState();
_add(value);
}
_mayAddEvent
表示流关闭之后或者正在add stream期间,可能无法添加新事件.
在看看_add(value)
:
void _add(T value) {
if (hasListener) {
_sendData(value);
} else if (_isInitialState) {
_ensurePendingEvents().add(new _DelayedData<T>(value));
}
}
如果hasListener
已经有被订阅了,那么就发送这个事件,如果是_isInitialState
初始化状态,就挂起这个事件,再add()
一个_DelayedData
;
简单说下
_DelayedData
,这是一个继承自_DelayedEvent
的对象,它其实保存了我们这个事件value
,然后利用perform
方法在某个时机进行事件发放.它的实现如下,
class _DelayedData<T> extends _DelayedEvent<T> {
final T value;
// 保存这个事件
_DelayedData(this.value);
// perform方法用于某个时机发放这个事件
void perform(_EventDispatch<T> dispatch) {
dispatch._sendData(value);
}
}
再进入_sendData()
方法
void _sendData(T data) {
assert(!_isCanceled);
assert(!_isPaused);
assert(!_inCallback);
bool wasInputPaused = _isInputPaused;
_state |= _STATE_IN_CALLBACK;
_zone.runUnaryGuarded(_onData, data);
_state &= ~_STATE_IN_CALLBACK;
_checkState(wasInputPaused);
}
要看懂这段代码,我们首先要明白几个变量和方法.
(1). _zone
: 是一个Zone对象,通过源码注释我们可以大概了解一下它的作用:
A zone represents an environment that remains stable across asynchronous calls.
Code is always executed in the context of a zone, available as [Zone.current].
The initial main function runs in the context of the default zone ([Zone.root]).
Code can be run in a different zone using either [runZoned],
to create a new zone, or [Zone.run] to run code in the context of
an existing zone likely created using [Zone.fork].
Developers can create a new zone that overrides some of the functionality of an existing zone.
For example, custom zones can replace of modify the behavior of print, timers,
microtasks or how uncaught errors are handled.
zone
表示在异步过程保持稳定的一个环境,类似于iOS沙箱机制.Dart代码通常都是在这个环境的上下文中执行,在这个环境中你可以对里面的代码做很多操作,例如异常捕捉等.而此时这个zone
取值是Zone.current
,就是当前的环境.
(2). runUnaryGuarded(_onData, data)
:这个方法的作用就是在此环境中用参数执行给定的方法,并捕获同步错误.有点类似于iOS中performSelector:withObject:
方法.指定的方法就是_onData
,参数就是data
,也就是我们传递过来的事件.
(3). _onData
: 这个方法就是我们回调事件的方法,它是通过上述zone
的_zone.registerUnaryCallback<dynamic, T>(handleData)
方法拿到的.
void onData(void handleData(T event)) {
handleData ??= _nullDataHandler;
// TODO(floitsch): the return type should be 'void', and the type
// should be inferred.
_onData = _zone.registerUnaryCallback<dynamic, T>(handleData);
}
从上面的事件订阅监听我们知道是在listen()
方法中的onData()
回调中监听到事件,那很明显这个_onData
和listen
中的onData
肯定有某种关联,知道了这个关联,那整个监听流程就通了.
实际上底层有对listen()
中的onData()
进行一层转换,
// onData() ->>> onData(void handleData(T event))
/**
* Replaces the data event handler of this subscription.
*
* The [handleData] function is called for each element of the stream
* after this function is called.
* If [handleData] is `null`, further elements are ignored.
*
* This method replaces the current handler set by the invocation of
* [Stream.listen] or by a previous call to [onData].
*/
void onData(void handleData(T data));
从源码的注释可以看出这里的onData()
确实是被替换成了onData(void handleData(T event))
.
那么_zone.registerUnaryCallback<dynamic, T>(handleData)
拿到的_onData
就是就是我们在listen
中传入的onData()
回调了.
到这里,订阅和发布事件就形成了一个闭环.这也就是EventBus实现的原理. 其实Flutter很多工具底层都是基于Stream
,后面会专门分析一下Stream
的实现原理.