var a = 0.obs
我们看下会发生什么?
a就被RxInt包裹,
extension IntExtension on int {
/// Returns a `RxInt` with [this] `int` as initial value.
RxInt get obs => RxInt(this);
}
Rx<T>继承_RxImpl
class Rx<T> extends _RxImpl<T> {
}
接着看
abstract class _RxImpl<T> extends RxNotifier<T> with RxObjectMixin<T> {
//这里触发赋值操作
_RxImpl(T initial) {
_value = initial;
}
void update(void Function(T? val) fn) {
fn(_value);
subject.add(_value);
}
void trigger(T v) {
var firstRebuild = this.firstRebuild;
value = v;
if (!firstRebuild && !sentToStream) {
subject.add(v);
}
}
}
mixin RxObjectMixin<T> on NotifyManager<T> {
late T _value;
bool firstRebuild = true;
bool sentToStream = false;
String get string => value.toString();
set value(T val) {
if (subject.isClosed) return;
sentToStream = false;
if (_value == val && !firstRebuild) return;
firstRebuild = false;
_value = val;
sentToStream = true;
subject.add(_value);
}
T get value {
//这里的subject是GetStream; RxInterface.proxy == observer(build里面传入的observer)
RxInterface.proxy?.addListener(subject);
return _value;
}
}
class GetStream<T> {
List<LightSubscription<T>>? _onData = <LightSubscription<T>>[];
FutureOr<void> addSubscription(LightSubscription<T> subs) async {
if (!_isBusy!) {
return _onData!.add(subs);
} else {
await Future.delayed(Duration.zero);
return _onData!.add(subs);
}
}
void _notifyData(T data) {
_isBusy = true;
for (final item in _onData!) {
if (!item.isPaused) {
item._data?.call(data);
}
}
_isBusy = false;
}
void add(T event) {
assert(!isClosed, 'You cannot add event to closed Stream');
_value = event;
_notifyData(event);
}
LightSubscription<T> listen(void Function(T event) onData,
{Function? onError, void Function()? onDone, bool? cancelOnError}) {
final subs = LightSubscription<T>(
removeSubscription,
onPause: onPause,
onResume: onResume,
onCancel: onCancel,
)
..onData(onData)
..onError(onError)
..onDone(onDone)
..cancelOnError = cancelOnError;
addSubscription(subs);
onListen?.call();
return subs;
}
}
总结:
当我们给观察者对象添加.obs后,被封装成了Rx<T>,RX继承_RxImpl,_RxImpl 继承RxNotifier with RxObjectMixin,在Obx中使用(a.value)会调用RxObjectMixin的get方法,get方法内部 观察者监听stream的变化(RxInterface.proxy?.addListen(subject)),当赋值的时候(a.value=xx)会调用set方法,如果该被订阅对象已经失效或者被订阅对象的值没有发生变化并且非首次构建则不会刷新,否则会赋值新值,并且调用subject(getStream)的add方法,然后调用GetStream类的_notifyData方法,遍历onData(onData是我们在Obx的时候订阅的集合,Obx继承ObxWidget继承StatefulWidget,initState方法里面绑定观察者订阅关系(final _observer = RxNotifier();)订阅者(late StreamSubscription subs;),subs = _observer.listen(_updateTree,false);RXNotifier混入了NotifyManager,NotifyManager里的(GetStream<T> subject = GetStream<T>();)里面有listen方法,然后再调用subject.listen(),会调用addSubscription(subs),然后 _onData!.add(subs);添加到_onData集合),调用item._data.call()方法。当RxNotifier接受到通知后会通过listen回调调用_updateTree,这里面调用的是setState({})更新UI;
注释:Obx原理
,obx继承自ObxWidget,ObxWidget在initstate方法中绑定了观察者订阅关系。build方法中调用RxInterface.notifyChildren 把_observer作为RxInterface.proxy 的临时属性,调用builder的后恢复原有的属性, 注意builder(controller)函数里一定要包含obs.value,否则在if (!observer.canUpdate) 检测时,由于没有观察对象,会抛出提示异常。
class Obx extends ObxWidget {
final WidgetCallback builder;
const Obx(this.builder, {Key? key}) : super(key: key);
@override
Widget build() => builder();
}
abstract class ObxWidget extends StatefulWidget {
const ObxWidget({Key? key}) : super(key: key);
@override
ObxState createState() => ObxState();
@protected
Widget build();
}
class ObxState extends State<ObxWidget> {
final _observer = RxNotifier();
late StreamSubscription subs;
@override
void initState() {
super.initState();
subs = _observer.listen(_updateTree, cancelOnError: false);
}
void _updateTree(_) {
if (mounted) {
setState(() {});
}
}
@override
Widget build(BuildContext context) =>
RxInterface.notifyChildren(_observer, widget.build);
}
class RxNotifier<T> = RxInterface<T> with NotifyManager<T>;
mixin NotifyManager<T> {
GetStream<T> subject = GetStream<T>();
final _subscriptions = <GetStream, List<StreamSubscription>>{};
bool get canUpdate => _subscriptions.isNotEmpty;
/// This is an internal method.
/// Subscribe to changes on the inner stream.
void addListener(GetStream<T> rxGetx) {
if (!_subscriptions.containsKey(rxGetx)) {
final subs = rxGetx.listen((data) {
if (!subject.isClosed) subject.add(data);
});
final listSubscriptions =
_subscriptions[rxGetx] ??= <StreamSubscription>[];
listSubscriptions.add(subs);
}
}
StreamSubscription<T> listen(
void Function(T) onData, {
Function? onError,
void Function()? onDone,
bool? cancelOnError,
}) =>
subject.listen(
onData,
onError: onError,
onDone: onDone,
cancelOnError: cancelOnError ?? false,
);
}
}
abstract class RxInterface<T> {
static RxInterface? proxy;
bool get canUpdate;
static T notifyChildren<T>(RxNotifier observer, ValueGetter<T> builder) {
final oldObserver = RxInterface.proxy;
RxInterface.proxy = observer;
final result = builder();
if (!observer.canUpdate) {
RxInterface.proxy = oldObserver;
}
RxInterface.proxy = oldObserver;
return result;
}
}
为什么要有这一步 RxInterface.proxy = observer;
T get value {
RxInterface.proxy?.addListener(subject);
return _value;
}
Rx 最终是混入了 RxObjectMixin 类,即在 Rx 的获取数据中调用了 RxInterface.proxy?.addListener ,那什么时候获取 Rx 的数据呢?
Obx(() {
return Text("${a.value}");
});
就是在 Obx 的 builder 方法里,这就清楚了为什么在 RxInterface.notifyChildren 方法里是先将传入的 observer 赋值给 proxy 然后再调用 builder 方法了,因为这样在调用 builder 方法时调用了 Rx.value ,而在 get value 中调用了 RxInterface.proxy?.addListener ,且 addListener 传入的 subject 是 Rx 的 GetStream, 而 proxy 是 _ObxState 里创建的 RxNotifier。
文字理解
obx 刷新原理
var a = 1.obs
将变量a包装成RxInt 继承自Rx<T> 继承自抽象类_RxImpl 在这里初始化赋值, 继承RxNotifier并且混入了RxObjectMiXin
obx 继承自obxWidget->statefulWidget
obxState中
final _observer = RxNotifier()
var streamSubScription _subs
initState 中绑定观察者和订阅关系
_subs = _oberver.listen(_updateTree,cancleError:false)
接着会调用
getStream中的listen
listen(){
创建subs绑定传入的updateTree函数,然后调用addSubscription(subs),添加到_onData集中(这是个订阅者集合)
}
接下来看builder
builder中调用了
RxInterface.notifyChildren(_observer,builder)
实现关键代码: 临时变量保存 const oldObserver = Rxinterface.proxy , Rxinterface.proxy = _observer,调用bulder方法,将原来的oldObserve赋值给Rxinterface.proxy = oldObserver
Obx的bulder方法中使用了a.value
a.value 就调用了 rxObjectMixin中的get方法
RxInterface.proxy.addListener(subject) //subject是getStream对象 RxInterface.proxy = build中传入的observer(RxNotifier)
rxNotifier实现了addListener() 是因为Rxnotifier混入了NotifyManager
addListener(subject){
判断subject对象是否有订阅对象 如果没有就调用getStream的listen方法将订阅者加入到订阅者集合
}
当我们给a.value= 2赋值的时候
会调用RxObjectMixin 的set方法
set(val){
首先判断订阅是否关闭
subject.close return
_value == val && !firsrBuld return
_value = val
subject.add(_value) //调用GetStream中的add方法
add(event){
_value = event
_notifyData(event)
}
_notifyData(){
for(item in _onData){
item._data.call() //这里就会调用updateTree 接着调用setState({})进行刷新
}
}
}