源码 livedata 2.0.0
Jetpack里LiveData的相关类不多,类图见下
[站外图片上传中...(image-ec369-1555299590976)]
Observer:作为interface,观察者,数据发生改变,通过onChanged()响应改变;
LiveData:抽象出来的统一被观察者对象,与Observer建立观察联系的方法是observe()方法。
ComputableLiveData:可计算的LiveData,内部持有LiveData,使原数据失效并出刷新数据。
MutableLiveData:这个方法只是将LiveData的方法权限修改,并未实现其他业务。
通常在使用的时候,LiveData有可能是由其他类或库创建出来的实例(如Room提供了toLiveData()方法),也可能是自己 new 出来的对象。订阅见下
mLiveData.observe(Activity.this,Observer(){
Log.d(TAG, it.toString());
})
以正常一次数据更新来看其内部实现,看数据改变的入口;
异步更新:mLiveData.postValue(data);
同步更新:mLiveData.setValue(data);
同步setValue()
同步更新方法相对简单,先看看它怎么实现的。setValue()方法里首先对线程进行判定,它必须在主线程里面运行,之后保存发更改的数据对象,并调用分发函数,dispatchingValue(null);
void dispatchingValue(@Nullable ObserverWrapper initiator) {
if (mDispatchingValue) {
mDispatchInvalidated = true;
return;
}
mDispatchingValue = true;
do {
mDispatchInvalidated = false;
if (initiator != null) {
considerNotify(initiator);
initiator = null;
} else {
for (Iterator<Map.Entry<Observer<? super T>, ObserverWrapper>> iterator =
mObservers.iteratorWithAdditions(); iterator.hasNext(); ) {
considerNotify(iterator.next().getValue());
if (mDispatchInvalidated) {
break;
}
}
}
} while (mDispatchInvalidated);
mDispatchingValue = false;
}
内部核心considerNotify(),由于这里传入的是null,因此从 mObservers里面获取数据,而LiveData.mObservers对象则是在observe的时候添加了数据。
private void considerNotify(ObserverWrapper observer) {
if (!observer.mActive) {
return;
}
// Check latest state b4 dispatch. Maybe it changed state but we didn't get the event yet.
//
// we still first check observer.active to keep it as the entrance for events. So even if
// the observer moved to an active state, if we've not received that event, we better not
// notify for a more predictable notification order.
if (!observer.shouldBeActive()) {
observer.activeStateChanged(false);
return;
}
if (observer.mLastVersion >= mVersion) {
return;
}
observer.mLastVersion = mVersion;
//noinspection unchecked
observer.mObserver.onChanged((T) mData);
}
最后调用了onChanged(),完成响应回调。其中有检测了观察者是否是正在观察等。同步的代码流程就完成了。
异步postValue()
异步方法核心是线程切换,悬挂数据处理。
因此,postValue()内部必须先对数据进行加锁,判定是不是可以发送的数据。代码也给了说明备注,若在主线程Runnable数据时再连续postValue()多个数据,最后一个数据才会被发送出去。见下,会直接reutrn .
protected void postValue(T value) {
boolean postTask;
synchronized (mDataLock) {
postTask = mPendingData == NOT_SET;
mPendingData = value;
}
if (!postTask) {
return;
}
ArchTaskExecutor.getInstance().postToMainThread(mPostValueRunnable);
}
在主线程上将悬挂数据发送出去。看看它是怎么确定在主线程的?
postToMainThread()是虚方法, ArchTaskExecutor.getInstance()得到的实例mDefaultTaskExecutor,即DefaultTaskExecutor类,它的postToMainThread()如下:
public void postToMainThread(Runnable runnable) {
if (mMainHandler == null) {
synchronized (mLock) {
if (mMainHandler == null) {
mMainHandler = new Handler(Looper.getMainLooper());
}
}
}
//noinspection ConstantConditions
mMainHandler.post(runnable);
}
获取到主线程的Looper,创建Handdler,并post,因此会在主线程发送。
再看mPostValueRunnable内部是怎么处理的?
private final Runnable mPostValueRunnable = new Runnable() {
@Override
public void run() {
Object newValue;
synchronized (mDataLock) {
newValue = mPendingData;
mPendingData = NOT_SET;
}
//noinspection unchecked
setValue((T) newValue);
}
};
调用setValue(),这里已经在主线程上了,因此它是一个同步发送事件了,之后的流程就是前面分析的流程。
至此,LiveData的核心代码就分析完成了。
再看一看具有计算功能的LiveData;
可计算LiveData (ComputableLiveData)
从功能上来说,使原数据失效并出刷新数据。
内部实现核心:mRefreshRunnable、mInvalidationRunnable
先看刷新mInvalidationRunnable
final Runnable mInvalidationRunnable = new Runnable() {
@MainThread
@Override
public void run() {
boolean isActive = mLiveData.hasActiveObservers();
if (mInvalid.compareAndSet(false, true)) {
if (isActive) {
mExecutor.execute(mRefreshRunnable);
}
}
}
};
显而易见的,它去执行的还是mRefreshRunnable,只是将mInvalid标志为true, 那看看绝对核心mRefreshRunnable,
@VisibleForTesting
final Runnable mRefreshRunnable = new Runnable() {
@WorkerThread
@Override
public void run() {
boolean computed;
do {
computed = false;
// compute can happen only in 1 thread but no reason to lock others.
if (mComputing.compareAndSet(false, true)) {
// as long as it is invalid, keep computing.
try {
T value = null;
while (mInvalid.compareAndSet(true, false)) {
computed = true;
value = compute();
}
if (computed) {
mLiveData.postValue(value);
}
} finally {
// release compute lock
mComputing.set(false);
}
}
// check invalid after releasing compute lock to avoid the following scenario.
// Thread A runs compute()
// Thread A checks invalid, it is false
// Main thread sets invalid to true
// Thread B runs, fails to acquire compute lock and skips
// Thread A releases compute lock
// We've left invalid in set state. The check below recovers.
} while (computed && mInvalid.get());
}
};
其逻辑线,如果使用失效,需要调用compute()去重新计算新的数据源回来,并且在这种情况下会调用LiveData的异步更新。
原数据失效对外的方法是invalidate()。它内部将mInvalidationRunnable对象post到主线程里面。