转载请以链接形式标明出处:
本文出自:103style的博客
转换相关的操作符 以及 官方介绍
RxJava
之 转换操作符 官方介绍 :Transforming Observables
buffer
cast
concatMap
concatMapXXX
flatMap
flatMapXXX
flattenAsFlowable
flattenAsObservable
groupBy
map
scan
switchMap
window
以下介绍我们就直接具体实现,中间流程请参考 RxJava之create操作符源码解析。
buffer
- 官方示例:
输出:Observable.range(0, 10) .buffer(4) .subscribe((List<Integer> buffer) -> System.out.println(buffer));
[0, 1, 2, 3] [4, 5, 6, 7] [8, 9]
- 返回对象的
ObservableBuffer
的subscribeActual
方法:
单参数buffer
的skip
和count
是相等的。protected void subscribeActual(Observer<? super U> t) { if (skip == count) { BufferExactObserver<T, U> bes = new BufferExactObserver<T, U>(t, count, bufferSupplier); if (bes.createBuffer()) {//1.0 source.subscribe(bes); } } else { source.subscribe(new BufferSkipObserver<T, U>(t, count, skip, bufferSupplier)); } }
-
(1.0)
createBuffer
即新创建了一个ArrayList
对象buffer
。
-
-
onNext(T t)
和onComplete()
方法:public void onNext(T t) { U b = buffer; if (b != null) { b.add(t); if (++size >= count) {//1.0 downstream.onNext(b); size = 0; createBuffer(); } } } public void onComplete() { U b = buffer; if (b != null) {//2.0 buffer = null; if (!b.isEmpty()) { downstream.onNext(b); } downstream.onComplete(); } }
-
(1.0)
每次调用onNext
就检查缓存的事件数是否 不小于buffer
操作符设置的 值,成立则将缓存的buffer
数组 传给观察者的onNext
。 -
(2.0)
onComplete
是检查缓存的事件数是否不为空,成立则将缓存的buffer
数组 传给观察者的onNext
,再调用观察者的onComplete
。
-
cast
- 官方示例:
输出:Observable<Number> numbers = Observable.just(1, 4.0, 3f, 7, 12, 4.6, 5); numbers.filter((Number x) -> x instanceof Integer) .cast(Integer.class) .subscribe((Integer x) -> System.out.println(x));
1 7 12 5
-
cast
是通过map
操作符来实现的,我们直接看map
。public final <U> Observable<U> cast(final Class<U> clazz) { ObjectHelper.requireNonNull(clazz, "clazz is null"); return map(Functions.castFunction(clazz)); }
-
apply
方法:public U apply(T t) throws Exception { return clazz.cast(t); }
-
map
- 官方示例:
输出:Observable.just(1, 2, 3) .map(x -> x * x) .subscribe(System.out::println);
1 4 9
- 返回对象的
ObservableMap
的subscribeActual
方法:public void subscribeActual(Observer<? super U> t) { source.subscribe(new MapObserver<T, U>(t, function)); }
- 继续看
MapObserver
的onNext(T t)
:public void onNext(T t) { if (done) { return; } if (sourceMode != NONE) { downstream.onNext(null); return; } U v; try { v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value."); } catch (Throwable ex) { fail(ex); return; } downstream.onNext(v); }
- 即将
map
操作符 传入Function
对象的 返回值 传递给 链式调用上一步的返回对象的onNext(T t)
。
- 即将
concatMap
flatMap
flattenAsFlowable
- 官方示例:
输出:Single<Double> source = Single.just(2.0); Flowable<Double> flowable = source.flattenAsFlowable(x -> { return Arrays.asList(x, Math.pow(x, 2), Math.pow(x, 3)); }); flowable.subscribe(x -> System.out.println("onNext: " + x));
onNext: 2.0 onNext: 4.0 onNext: 8.0
- 我们先看
Single.just(2.0)
:public static <T> Single<T> just(final T item) { ObjectHelper.requireNonNull(item, "value is null"); return RxJavaPlugins.onAssembly(new SingleJust<T>(item)); }
-
SingleJust
的subscribeActual
:protected void subscribeActual(SingleObserver<? super T> observer) { observer.onSubscribe(Disposables.disposed()); observer.onSuccess(value); }
-
flattenAsFlowable
返回对象的SingleFlatMapIterableFlowable
的subscribeActual
方法:protected void subscribeActual(Subscriber<? super R> s) { source.subscribe(new FlatMapIterableObserver<T, R>(s, mapper)); }
- 继续看
FlatMapIterableObserver
的onSubscribe(Disposable d)
和onSuccess(T value)
:public void onSubscribe(Disposable d) { if (DisposableHelper.validate(this.upstream, d)) { this.upstream = d; downstream.onSubscribe(this);//1.0 } } public void onSuccess(T value) { Iterator<? extends R> iterator; boolean has; try { iterator = mapper.apply(value).iterator();//2.0 调用apply返回的Iterable对象的 iterator()方法。 has = iterator.hasNext();//3.0 } catch (Throwable ex) { Exceptions.throwIfFatal(ex); downstream.onError(ex); return; } if (!has) { downstream.onComplete();//3.1 return; } this.it = iterator;//3.2 drain(); }
-
(1.0)
通过Flowable subscribe流程介绍 我们知道downstream.onSubscribe(this)
即调用FlowableInternalHelper.RequestMax.INSTANCE
的accept
方法:
即:public enum RequestMax implements Consumer<Subscription> { INSTANCE; @Override public void accept(Subscription t) throws Exception { t.request(Long.MAX_VALUE); } }
FlatMapIterableObserver.request(Long.MAX_VALUE)
: 即为设置变量requested
的value
为Long.MAX_VALUE
。drain()
因为it
变量还是null
,所以没做什么操作。public void request(long n) { if (SubscriptionHelper.validate(n)) { BackpressureHelper.add(requested, n); drain(); } }
-
(2.0)
调用flattenAsFlowable
传入的Function
的apply
返回的Iterable
对象的iterator()
方法。 -
(3.0)
检查Iterable
时候为空,(3.1)
为空直接onComplete()
,(3.2)
不为空则将iterator()
返回值赋值给当前的it
变量,继续执行drain()
。
-
-
drain()
:void drain() { ... Subscriber<? super R> a = downstream; Iterator<? extends R> iterator = this.it; ... int missed = 1; for (; ; ) { if (iterator != null) { long r = requested.get(); long e = 0L; if (r == Long.MAX_VALUE) {//1.0 slowPath(a, iterator); return; } ... } ... } }
- 因为上一步
downstream.onSubscribe(this)
调用了request(Long.MAX_VALUE)
, 所以(1.0)
这里条件成立,执行slowPath(downstream iterator)
。
- 因为上一步
-
slowPath(downstream iterator)
:void slowPath(Subscriber<? super R> a, Iterator<? extends R> iterator) { for (;;) { if (cancelled) { return; } R v; try { v = iterator.next();//1.0 } catch (Throwable ex) { Exceptions.throwIfFatal(ex); a.onError(ex); return; } a.onNext(v);//1.1 if (cancelled) { return; } boolean b; try { b = iterator.hasNext();//1.2 } catch (Throwable ex) { Exceptions.throwIfFatal(ex); a.onError(ex); return; } if (!b) { a.onComplete();//1.3 return; } } }
- 将
(1.0)
获取到的元素,(1.1)
传递给downstream
的onNext
,(1.2)
然后判断是否还有其他元素,如果有则循环继续,没有的话即调用downstream
的onComplete
结束。
- 将
flattenAsObservable
- 官方示例:
输出:Single<Double> source = Single.just(2.0); Observable<Double> observable = source.flattenAsObservable(x -> { return Arrays.asList(x, Math.pow(x, 2), Math.pow(x, 3)); }); observable.subscribe(x -> System.out.println("onNext: " + x));
onNext: 2.0 onNext: 4.0 onNext: 8.0
subscribeActual
实现的逻辑和 flattenAsFlowable
类似,只是返回的对象为 SingleFlatMapIterableObservable
,就不再赘述了。
groupBy
-
官方示例:
Observable<String> animals = Observable.just( "Tiger", "Elephant", "Cat", "Chameleon", "Frog", "Fish", "Turtle", "Flamingo"); animals.groupBy(animal -> animal.charAt(0), String::toUpperCase) .concatMapSingle(Observable::toList) .subscribe(System.out::println);
输出:
[TIGER, TURTLE] [ELEPHANT] [CAT, CHAMELEON] [FROG, FISH, FLAMINGO]
-
我们来看看返回对象的
ObservableGroupBy
:public GroupByObserver(Observer<? super GroupedObservable<K, V>> actual, Function<? super T, ? extends K> keySelector, Function<? super T, ? extends V> valueSelector, int bufferSize, boolean delayError) { this.downstream = actual; this.keySelector = keySelector; this.valueSelector = valueSelector; this.bufferSize = bufferSize; this.delayError = delayError; this.groups = new ConcurrentHashMap<Object, GroupedUnicast<K, V>>(); this.lazySet(1); } public void subscribeActual(Observer<? super GroupedObservable<K, V>> t) { source.subscribe(new GroupByObserver<T, K, V>(t, keySelector, valueSelector, bufferSize, delayError)); }
-
继续看
GroupByObserver
的onNext(T t)
:public void onNext(T t) { K key; try { key = keySelector.apply(t);//1.0 } catch (Throwable e) { ... return; } Object mapKey = key != null ? key : NULL_KEY; GroupedUnicast<K, V> group = groups.get(mapKey); if (group == null) { if (cancelled.get()) { return; } group = GroupedUnicast.createWith(key, bufferSize, this, delayError); groups.put(mapKey, group);//2.0 getAndIncrement(); downstream.onNext(group);//3.0 } V v; try { v = ObjectHelper.requireNonNull(valueSelector.apply(t), "The value supplied is null");//4.0 } catch (Throwable e) { ... return; } group.onNext(v);//4.1 }
-
(1.0)
通过keySelector.apply(t)
即官方示例中的animal.charAt(0)
获取分组的key
。 -
(2.0)
如果GroupedUnicast
不存再这个key
,则保存进去。 -
(3.0)
然后继续调用上一步操作符的onNext
方法,即官方示例中的just
。 -
(4.0)
通过valueSelector.apply(t)
即官方示例中的String::toUpperCase)
获取值,(4.1)
添加到ToListObserver
的collection
中。
-
-
最后通过
onComplete()
输出:public void onComplete() { List<ObservableGroupBy.GroupedUnicast<K, V>> list = new ArrayList<ObservableGroupBy.GroupedUnicast<K, V>>(groups.values()); groups.clear(); for (ObservableGroupBy.GroupedUnicast<K, V> e : list) { e.onComplete(); } downstream.onComplete(); }
ToListObserver
的onComplete()
:public void onComplete() { U c = collection; collection = null; downstream.onNext(c); downstream.onComplete(); }
scan
- 官方示例:
输出:Observable.just(5, 3, 8, 1, 7) .scan(0, (partialSum, x) -> partialSum + x) .subscribe(System.out::println);
0 5 8 16 17 24
- 我们来看看返回对象的
ObservableScanSeed
:public ObservableScanSeed(ObservableSource<T> source, Callable<R> seedSupplier, BiFunction<R, ? super T, R> accumulator) { super(source); this.accumulator = accumulator; this.seedSupplier = seedSupplier; } @Override public void subscribeActual(Observer<? super R> t) { R r; try { r = ObjectHelper.requireNonNull(seedSupplier.call(), "The seed supplied is null");//1.0 } catch (Throwable e) { Exceptions.throwIfFatal(e); EmptyDisposable.error(e, t); return; } source.subscribe(new ScanSeedObserver<T, R>(t, accumulator, r)); }
-
(1.0)
seedSupplier.call()
即官方示例中的0
,即设置r
的值为0
.
-
- 继续看
ScanSeedObserver
的onNext(T t)
:public void onNext(T t) { if (done) { return; } R v = value;//1.0 R u; try { u = ObjectHelper.requireNonNull(accumulator.apply(v, t), "The accumulator returned a null value");//2.0 } catch (Throwable e) { Exceptions.throwIfFatal(e); upstream.dispose(); onError(e); return; } value = u;//3.0 downstream.onNext(u);//4.0 }
-
(1.0)
value
即为上一步设置的r
值0
. -
(2.0)
accumulator.apply(v, t)
即为官方示例中的partialSum + x
-
(3.0)
更新value
的值 -
(4.0)
将accumulator.apply(v, t)
传递给观察者的onNext
-
switchMap
- 官方示例:
输出:Observable.interval(0, 1, TimeUnit.SECONDS) .switchMap(x -> { return Observable.interval(0, 750, TimeUnit.MILLISECONDS) .map(y -> x); }) .takeWhile(x -> x < 3) .blockingSubscribe(System.out::print);
001122
- 我们来看看返回对象的
ObservableSwitchMap
:public ObservableSwitchMap(ObservableSource<T> source, Function<? super T, ? extends ObservableSource<? extends R>> mapper, int bufferSize, boolean delayErrors) { super(source); this.mapper = mapper; this.bufferSize = bufferSize; this.delayErrors = delayErrors; } @Override public void subscribeActual(Observer<? super R> t) { if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) { return; } source.subscribe(new SwitchMapObserver<T, R>(t, mapper, bufferSize, delayErrors)); }
- 继续看
SwitchMapObserver
的onNext
:public void onNext(T t) { long c = unique + 1; unique = c; SwitchMapInnerObserver<T, R> inner = active.get(); if (inner != null) { inner.cancel(); } ObservableSource<? extends R> p; try { p = ObjectHelper.requireNonNull(mapper.apply(t), "The ObservableSource returned is null");//1.0 } catch (Throwable e) { ... return; } SwitchMapInnerObserver<T, R> nextInner = new SwitchMapInnerObserver<T, R>(this, c, bufferSize);//2.0 for (;;) { inner = active.get(); if (inner == CANCELLED) { break; } if (active.compareAndSet(inner, nextInner)) { p.subscribe(nextInner);//3.0 break; } } }
-
(1.0)
通过mapper.apply(t)
即官方示例中的Observable.interval(0, 750, TimeUnit.MILLISECONDS).map(y -> x)
返回的ObservableMap
对象。 -
(2.0)
构建SwitchMapInnerObserver
对象 -
(3.0)
用返回的ObservableMap
订阅SwitchMapInnerObserver
对象
-
window
- 官方示例:
输出:Observable.range(1, 10) // Create windows containing at most 2 items, and skip 3 items before starting a new window. .window(2, 3) .flatMapSingle(window -> { return window.map(String::valueOf) .reduce(new StringJoiner(", ", "[", "]"), StringJoiner::add); }) .subscribe(System.out::println);
[1, 2] [4, 5] [7, 8] [10]
- 我们来看看返回对象的
ObservableWindow
:public ObservableWindow(ObservableSource<T> source, long count, long skip, int capacityHint) { super(source); this.count = count; this.skip = skip; this.capacityHint = capacityHint; } @Override public void subscribeActual(Observer<? super Observable<T>> t) { if (count == skip) { source.subscribe(new WindowExactObserver<T>(t, count, capacityHint)); } else { source.subscribe(new WindowSkipObserver<T>(t, count, skip, capacityHint)); } }
- 继续看
WindowSkipObserver
的onNext
:public void onNext(T t) { final ArrayDeque<UnicastSubject<T>> ws = windows; long i = index; long s = skip; if (i % s == 0 && !cancelled) {//3.0 wip.getAndIncrement(); UnicastSubject<T> w = UnicastSubject.create(capacityHint, this); ws.offer(w); downstream.onNext(w); } long c = firstEmission + 1; for (UnicastSubject<T> w : ws) { w.onNext(t);//1.0 } if (c >= count) { ws.poll().onComplete();//2.0 if (ws.isEmpty() && cancelled) { this.upstream.dispose(); return; } firstEmission = c - s; } else { firstEmission = c; } index = i + 1; }
-
(1.0)
将元素存入queue
-
(2.0)
当元素个数到达count
时,就之前的元素全部输出 -
(3.0)
当元素个数到达skip
时,就重新创建一个UnicastSubject
来存储元素
-
以上