【RxJava】- 连接操作符源码分析

目录

【RxJava】- 创建操作符源码分析
【RxJava】- 变换操作符源码分析
【RxJava】- 过滤操作符源码分析
【RxJava】- 结合操作符源码分析

Connect

让一个可连接的Observable开始发射数据给订阅者,可连接的Observable (connectable Observable)与普通的Observable差不多,不过它并不会在被订阅时开始发射数据,而是直到使用了Connect操作符时才会开始。用这个方法,你可以等待所有的观察者都订阅了Observable之后再开始发射数据。

RxJava中connect是ConnectableObservable接口的一个方法,使用publish操作符可以将一个普通的Observable转换为一个ConnectableObservable。

Observable.create(null).replay().connect();

实现类

ConnectConsumer

自己查看,很简单,就几行代码。

调用 connect(cc)方法,返回disposable实例。

Replay

保证所有的观察者收到相同的数据序列,即使它们在Observable开始发射数据之后才订阅.

可连接的Observable (connectable Observable)与普通的Observable差不多,不过它并不会在被订阅时开始发射数据,而是直到使用了Connect操作符时才会开始。用这种方法,你可以在任何时候让一个Observable开始发射数据。

实现类

ObservableReplay

Replay还有其它实现类,请自己查看。这里讲解replay()没有传入参数的实现。看一下创建过程,最终创建ObservableReplay实例传入的参数如下:

  • source
    被观察者
  • bufferFactory
    UnBoundedFactory实例
  • curr
    final AtomicReference<ReplayObserver<T>> curr = new AtomicReference<>();
  • onSubscribe
    ObservableSource<T> onSubscribe = new ReplaySource<>(curr, bufferFactory);
    

首先调用connect(Consumer<? super Disposable> connection)方法

返回初始化容量为16的UnboundedReplayBuffer数组

ReplayBuffer<T> buf = bufferFactory.call();

保存新创建的ReplayObserver对象

if (!current.compareAndSet(ps, u)){continue;}

调用accept方法

connection.accept(ps);

这里调用的是上面Connect中的ConnectConsumer类中的accept方法。

public void accept(Disposable t) {
    this.disposable = t;
}

调用被观察者

if (doConnect) {
    source.subscribe(ps);
}

调用onNext(T t)

public void onNext(T t) {
    if (!done) {
        // 保存到数据
        buffer.next(t);
        replay();
    }
}

核心逻辑就在replay中,自己查看。

Publish

将普通的Observable转换为可连接的Observable

ConnectableObservable observable = Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
    ObservableEmitter<Integer> observableEmitter = emitter.serialize();
    try {
        // 序列化
        if (!emitter.isDisposed()) {
            for (int i = 1; i < 3; i++) {
                System.out.println("create operate--->emitter: "+i);
                if (1==i){
                    // ExceptionHelper.TERMINATED
                   //   observableEmitter.onError(new Throwable("error"));
                }else {
                    observableEmitter.onNext(i);
                }
            }
            observableEmitter.onComplete();
        }
    } catch (Exception e) {
        observableEmitter.onError(e);
    }
}).publish();
observable.subscribe(new Observer<Integer>() {...});
observable.connect();

observable.connect()不调用,是不会发射数据的。

实现类

ObservablePublish

看subscribeActual方法

protected void subscribeActual(Observer<? super T> observer) {
    ...
    InnerDisposable<T> inner = new InnerDisposable<>(observer, co
    observer.onSubscribe(inner);
    if (conn.add(inner)) {
        if (inner.isDisposed()) {
            conn.remove(inner);
        }
        return;
    }
    ...
}

conn.add(inner)为每一个订阅者创建InnerDisposable对象,然后保存在PublishConnection中的一个数组中。

看connect方法

public void connect(Consumer<? super Disposable> connection) {
    ...
    if (doConnect) {
        source.subscribe(conn);
    }
}

调用被观察者subscribe对象,然后可以在里面发射数据。

PublishConnection中的 onNext方法

public void onNext(T t) {
    for (InnerDisposable<T> inner : get()) {
        inner.downstream.onNext(t);
    }

调用每一个订阅者的onNext方法,将数据发射给订阅者。

RefCount

让一个可连接的Observable行为像普通的Observable。可连接的Observable (connectable Observable)与普通的Observable差不多,不过它并不会在被订阅时开始发射数据,而是直到使用了Connect操作符时才会开始。用这种方法,你可以在任何时候让一个Observable开始发射数据。

Observable observable = Observable.create((ObservableOnSubscrib
    ObservableEmitter<Integer> observableEmitter = emitter.seri
    try {
        // 序列化
        if (!emitter.isDisposed()) {
            for (int i = 0; i < 4; i++) {
                System.out.println("create operate--->emitter: 
                if (1==i){
                    // ExceptionHelper.TERMINATED
                      observableEmitter.onError(new Throwable("
                }else {
                    observableEmitter.onNext(i);
                }
            }
            observableEmitter.onComplete();
        }
    } catch (Exception e) {
        observableEmitter.onError(e);
    }
}).publish().refCount(2);
observable.subscribe(...);
observable.subscribe(...);

实现类

ObservableRefCount

看一下subscribeActual方法

protected void subscribeActual(Observer<? super T> observer) {
    RefConnection conn;
    boolean connect = false;
    synchronized (this) {
        conn = connection;
        if (conn == null) {
            conn = new RefConnection(this);
            connection = conn;
        }
        long c = conn.subscriberCount;
        if (c == 0L && conn.timer != null) {
            conn.timer.dispose();
        }
        conn.subscriberCount = c + 1;
        if (!conn.connected && c + 1 == n) {
            connect = true;
            conn.connected = true;
        }
    }
    source.subscribe(new RefCountObserver<>(observer, this, conn));
    if (connect) {
        source.connect(conn);
    }
}

if (!conn.connected && c + 1 == n)当订阅者的数量等于refCount方法传入的数量时,表示可以连接,调用ObservablePublish中的connect方法。就与上面将Publish操作符调用connect方法作用相同。

当然你可以随时调用connect方法来强制发射数据。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,324评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,356评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,328评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,147评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,160评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,115评论 1 296
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,025评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,867评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,307评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,528评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,688评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,409评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,001评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,657评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,811评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,685评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,573评论 2 353

推荐阅读更多精彩内容