这不是源码分析篇只是想聊一聊这些哪些地方可以用到
1just
<pre>
Observable.just("Cricket", "Football")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getObserver())
private Observer<String> getObserver() {
return new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}
@Override
public void onNext(String value) {
textView.append(" onNext : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onNext : value : " + value);
}
@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}
@Override
public void onComplete() {
textView.append(" onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onComplete");
}
};
}
</pre>
大家想想这个能用到哪种情况? just(参数最多10个)
我说一种情况(控件中获取的值,然后我们会对这个值去判定,比如请假两个时间是比对,登录判断是不是为null)
2map
<pre>
.map(new Function<List<你有的数据类型r>, List<你希望的数据类型>>() {
@Override
public List<你希望的数据类型> apply(List<你有的数据类型> apiUsers) throws Exception {
return (转化的类型);
}
})
</pre>
想一想什么情况下会用? 当你想转的时候(举个例子bitmap换流)
好吧 我放弃 这篇连源码一起分析了 要不不知道怎么去开始
第一篇我会尽量详细点
<pre>
public static <T1, T2, R> Observable<R> zip(
ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2,
BiFunction<? super T1, ? super T2, ? extends R> zipper) {
return zipArray(Functions.toFunction(zipper), false, bufferSize(), source1, source2);
}
</pre>
这是Observable的静态方法,
ObservableSource和Observable的关系:Observable实现了ObservableSource接口的抽象类
<pre>
public static <T1, T2, R> Function<Object[], R> toFunction(final BiFunction<? super T1, ? super T2, ? extends R> f) {
ObjectHelper.requireNonNull(f, "f is null");
return new Function<Object[], R>() {
@Override
public R apply(Object[] a) throws Exception {
if (a.length != 2) {
throw new IllegalArgumentException("Array of size 2 expected but got " + a.length);
}
return ((BiFunction<Object, Object, R>)f).apply(a[0], a[1]);
}
};
}
</pre>
发现了吗
public interface BiFunction<T1, T2, R> {
R apply(@NonNull T1 t1, @NonNull T2 t2) throws Exception;
}
返回值都是R
这样BiFunction就和Function联系起来了
<pre>
public static <T, R> Observable<R> zipArray(Function<? super Object[], ? extends R> zipper,
boolean delayError, int bufferSize, ObservableSource<? extends T>... sources) {
if (sources.length == 0) {
return empty();
}
ObjectHelper.requireNonNull(zipper, "zipper is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableZip<T, R>(sources, null, zipper, bufferSize, delayError));
}
</pre>
最重要就是最后一句话
new ObservableZip<T, R>(sources, null, zipper, bufferSize, delayError)
去构造了一个ObservableZip 可以说道这里就结束了。你会说着怎么可能?
好吧 让我们看一个方法,下面的这个方法是ObservableZip的方法
<pre>
public void subscribeActual(Observer<? super R> s) {
ObservableSource<? extends T>[] sources = this.sources;
int count = 0;
if (sources == null) {
sources = new Observable[8];
for (ObservableSource<? extends T> p : sourcesIterable) {
if (count == sources.length) {
ObservableSource<? extends T>[] b = new ObservableSource[count + (count >> 2)];
System.arraycopy(sources, 0, b, 0, count);
sources = b;
}
sources[count++] = p;
}
} else {
count = sources.length;
}
if (count == 0) {
EmptyDisposable.complete(s);
return;
}
ZipCoordinator<T, R> zc = new ZipCoordinator<T, R>(s, zipper, count, delayError);
zc.subscribe(sources, bufferSize);
}
</pre>
注意到这个方法了吗这是在你订阅的时候开始在Observable.subscribe
<pre>
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
</pre>
这时候会调用subscribeActual方法。
突然打到这里被卡住了。
<pre>
@Override
@SuppressWarnings("unchecked")
public void subscribeActual(Observer<? super R> s) {
ObservableSource<? extends T>[] sources = this.sources;
int count = 0;
if (sources == null) {
sources = new Observable[8];
for (ObservableSource<? extends T> p : sourcesIterable) {
if (count == sources.length) {
ObservableSource<? extends T>[] b = new ObservableSource[count + (count >> 2)];
System.arraycopy(sources, 0, b, 0, count);
sources = b;
}
sources[count++] = p;
}
} else {
count = sources.length;
}
if (count == 0) {
EmptyDisposable.complete(s);
return;
}
ZipCoordinator<T, R> zc = new ZipCoordinator<T, R>(s, zipper, count, delayError);
zc.subscribe(sources, bufferSize);
}
</pre>
zc.subscribe(sources, bufferSize);卡再这里 分析不下去了。 让我们静下心来
先让大家看一下我疑惑的地方
sources[i].subscribe(s[i]);这里 实际上也是很让人迷惑的地方 这是什么?
这里是把你分别获取的Observable 发射出去。 你们会有疑问 怎么发射的吗?
首先 它会先走Observable的subscribe的方法。 下面的也是最关键的一步。 也是让我迷惑的一部。也就是它去执行谁的subscribeActual方法。 我要揭露谜底了、 它走的是 你那两个Observable参数 创建的时候的subscribeActual 方法。
我举个例子
<pre>
return Observable.create(new ObservableOnSubscribe<List<User>>() {
@Override
public void subscribe(ObservableEmitter<List<User>> e) throws Exception {
if (!e.isDisposed()) {
e.onNext(Utils.getUserListWhoLovesCricket());
e.onComplete();
}
}
});
</pre>
通过这个创建 ,那你你执行的就是ObservableCreate的 subscribeActual方法。 懂了吧。 快下班了 晚上继续。 一会我们探讨Function的作用。喜欢的或者有不同意见的欢迎留言。