响应式编程RxJava(二)

PublishSubject 源码分析

/**
 * Copyright 2014 Netflix, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package rx.subjects;

import java.util.*;
import java.util.concurrent.atomic.*;

import rx.*;
import rx.Observer;
import rx.exceptions.*;
import rx.internal.operators.BackpressureUtils;

/**
 * Subject that, once an {@link Observer} has subscribed, emits all subsequently observed items to the
 * subscriber.
 * <p>
 * <img width="640" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/S.PublishSubject.png" alt="">
 * <p>
 * Example usage:
 * <p>
 * <pre> {@code

  PublishSubject<Object> subject = PublishSubject.create();
  // observer1 will receive all onNext and onCompleted events
  subject.subscribe(observer1);
  subject.onNext("one");
  subject.onNext("two");
  // observer2 will only receive "three" and onCompleted
  subject.subscribe(observer2);
  subject.onNext("three");
  subject.onCompleted();

  } </pre>
 *
 * @param <T>
 *          the type of items observed and emitted by the Subject
 */
public final class PublishSubject<T> extends Subject<T, T> {

    final PublishSubjectState<T> state;

    /**
     * Creates and returns a new {@code PublishSubject}.
     *
     * @param <T> the value type
     * @return the new {@code PublishSubject}
     */
    public static <T> PublishSubject<T> create() {
        return new PublishSubject<T>(new PublishSubjectState<T>());
    }

    protected PublishSubject(PublishSubjectState<T> state) {
        super(state);
        this.state = state;
    }

    @Override
    public void onNext(T v) {
        state.onNext(v);
    }

    @Override
    public void onError(Throwable e) {
        state.onError(e);
    }

    @Override
    public void onCompleted() {
        state.onCompleted();
    }


    @Override
    public boolean hasObservers() {
        return state.get().length != 0;
    }

    /**
     * Check if the Subject has terminated with an exception.
     * @return true if the subject has received a throwable through {@code onError}.
     * @since 1.2
     */
    public boolean hasThrowable() {
        return state.get() == PublishSubjectState.TERMINATED && state.error != null;
    }
    /**
     * Check if the Subject has terminated normally.
     * @return true if the subject completed normally via {@code onCompleted}
     * @since 1.2
     */
    public boolean hasCompleted() {
        return state.get() == PublishSubjectState.TERMINATED && state.error == null;
    }
    /**
     * Returns the Throwable that terminated the Subject.
     * @return the Throwable that terminated the Subject or {@code null} if the
     * subject hasn't terminated yet or it terminated normally.
     * @since 1.2
     */
    public Throwable getThrowable() {
        if (state.get() == PublishSubjectState.TERMINATED) {
            return state.error;
        }
        return null;
    }

    static final class PublishSubjectState<T>
    extends AtomicReference<PublishSubjectProducer<T>[]>
    implements OnSubscribe<T>, Observer<T> {

        /** */
        private static final long serialVersionUID = -7568940796666027140L;

        @SuppressWarnings("rawtypes")
        static final PublishSubjectProducer[] EMPTY = new PublishSubjectProducer[0];
        @SuppressWarnings("rawtypes")
        static final PublishSubjectProducer[] TERMINATED = new PublishSubjectProducer[0];

        Throwable error;

        @SuppressWarnings("unchecked")
        public PublishSubjectState() {
            lazySet(EMPTY);
        }

        @Override
        public void call(Subscriber<? super T> t) {
            PublishSubjectProducer<T> pp = new PublishSubjectProducer<T>(this, t);
            t.add(pp);
            t.setProducer(pp);

            if (add(pp)) {
                if (pp.isUnsubscribed()) {
                    remove(pp);
                }
            } else {
                Throwable ex = error;
                if (ex != null) {
                    t.onError(ex);
                } else {
                    t.onCompleted();
                }
            }
        }


        boolean add(PublishSubjectProducer<T> inner) {
            for (;;) {
                PublishSubjectProducer<T>[] curr = get();
                if (curr == TERMINATED) {
                    return false;
                }

                int n = curr.length;

                @SuppressWarnings("unchecked")
                PublishSubjectProducer<T>[] next = new PublishSubjectProducer[n + 1];
                System.arraycopy(curr, 0, next, 0, n);

                next[n] = inner;
                if (compareAndSet(curr, next)) {
                    return true;
                }
            }
        }

        @SuppressWarnings("unchecked")
        void remove(PublishSubjectProducer<T> inner) {
            for (;;) {
                PublishSubjectProducer<T>[] curr = get();
                if (curr == TERMINATED || curr == EMPTY) {
                    return;
                }

                int n = curr.length;
                int j = -1;
                for (int i = 0; i < n; i++) {
                    if (curr[i] == inner) {
                        j = i;
                        break;
                    }
                }

                if (j < 0) {
                    return;
                }

                PublishSubjectProducer<T>[] next;
                if (n == 1) {
                    next = EMPTY;
                } else {
                    next = new PublishSubjectProducer[n - 1];
                    System.arraycopy(curr, 0, next, 0, j);
                    System.arraycopy(curr, j + 1, next, j, n - j - 1);
                }

                if (compareAndSet(curr, next)) {
                    return;
                }
            }
        }

        @Override
        public void onNext(T t) {
            for (PublishSubjectProducer<T> pp : get()) {
                pp.onNext(t);
            }
        }

        @SuppressWarnings("unchecked")
        @Override
        public void onError(Throwable e) {
            error = e;
            List<Throwable> errors = null;
            for (PublishSubjectProducer<T> pp : getAndSet(TERMINATED)) {
                try {
                    pp.onError(e);
                } catch (Throwable ex) {
                    if (errors == null) {
                        errors = new ArrayList<Throwable>(1);
                    }
                    errors.add(ex);
                }
            }

            Exceptions.throwIfAny(errors);
        }

        @SuppressWarnings("unchecked")
        @Override
        public void onCompleted() {
            for (PublishSubjectProducer<T> pp : getAndSet(TERMINATED)) {
                pp.onCompleted();
            }
        }

    }

    static final class PublishSubjectProducer<T>
    extends AtomicLong
    implements Producer, Subscription, Observer<T> {
        /** */
        private static final long serialVersionUID = 6451806817170721536L;

        final PublishSubjectState<T> parent;

        final Subscriber<? super T> actual;

        long produced;

        public PublishSubjectProducer(PublishSubjectState<T> parent, Subscriber<? super T> actual) {
            this.parent = parent;
            this.actual = actual;
        }

        @Override
        public void request(long n) {
            if (BackpressureUtils.validate(n)) {
                for (;;) {
                    long r = get();
                    if (r == Long.MIN_VALUE) {
                        return;
                    }
                    long u = BackpressureUtils.addCap(r, n);
                    if (compareAndSet(r, u)) {
                        return;
                    }
                }
            }
        }

        @Override
        public boolean isUnsubscribed() {
            return get() == Long.MIN_VALUE;
        }

        @Override
        public void unsubscribe() {
            if (getAndSet(Long.MIN_VALUE) != Long.MIN_VALUE) {
                parent.remove(this);
            }
        }

        @Override
        public void onNext(T t) {
            long r = get();
            if (r != Long.MIN_VALUE) {
                long p = produced;
                if (r != p) {
                    produced = p + 1;
                    actual.onNext(t);
                } else {
                    unsubscribe();
                    actual.onError(new MissingBackpressureException("PublishSubject: could not emit value due to lack of requests"));
                }
            }
        }

        @Override
        public void onError(Throwable e) {
            if (get() != Long.MIN_VALUE) {
                actual.onError(e);
            }
        }

        @Override
        public void onCompleted() {
            if (get() != Long.MIN_VALUE) {
                actual.onCompleted();
            }
        }
    }
}

第一步:创建Observable:PublishSubject.create();

第二步:在PublishSubject类的Create()方法中,创建了观察者管理器PublishSubjectProducer

第三步:在PublishSubject的create方法中创建了new PublishSubject<T>(state,state)对象,并将管理器传入

第四步:在PublishSubject的.call方法中创建了new PublishSubject<T>(state,state)对象,并将管理器传入
第五步:在call()方法中创建了new PublishSubject<T>(state,state)对象,并将管理器传入

BehaviorSubject 使用

  private BehaviorSubject<String> behaviorSubject;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_simple2);
        //我默认发送一条消息,以后都是正常
        behaviorSubject = BehaviorSubject.create("发送默认消息");
    }

    public void click(View v) {
        behaviorSubject.subscribe(new Observer<String>() {
            @Override
            public void onCompleted() {
                Log.i("main", "完成......");
            }

            @Override
            public void onError(Throwable e) {
                Log.i("main", "异常......");
            }

            @Override
            public void onNext(String message) {
                Log.i("main", "消息:" + message);
            }
        });

    }

    public void send(View v) {
        behaviorSubject.onNext("你好,要坚持住.....");
    }

结果输出:
08-03 10:26:07.037 18544-18544/com.haocai.architect.rxjava I/main: 消息:发送默认消息

BehaviorSubject 源码

public final class BehaviorSubject<T> extends Subject<T, T> {
    /** An empty array to trigger getValues() to return a new array. */
    private static final Object[] EMPTY_ARRAY = new Object[0];
    private final SubjectSubscriptionManager<T> state;

    /**
     * Creates a {@link BehaviorSubject} without a default item.
     *
     * @param <T>
     *            the type of item the Subject will emit
     * @return the constructed {@link BehaviorSubject}
     */
    public static <T> BehaviorSubject<T> create() {
        return create(null, false);
    }
    /**
     * Creates a {@link BehaviorSubject} that emits the last item it observed and all subsequent items to each
     * {@link Observer} that subscribes to it.
     *
     * @param <T>
     *            the type of item the Subject will emit
     * @param defaultValue
     *            the item that will be emitted first to any {@link Observer} as long as the
     *            {@link BehaviorSubject} has not yet observed any items from its source {@code Observable}
     * @return the constructed {@link BehaviorSubject}
     */
    public static <T> BehaviorSubject<T> create(T defaultValue) {
        return create(defaultValue, true);
    }
    private static <T> BehaviorSubject<T> create(T defaultValue, boolean hasDefault) {
        final SubjectSubscriptionManager<T> state = new SubjectSubscriptionManager<T>();
        if (hasDefault) {
            state.setLatest(NotificationLite.next(defaultValue));
        }
        state.onAdded = new Action1<SubjectObserver<T>>() {

            @Override
            public void call(SubjectObserver<T> o) {
                o.emitFirst(state.getLatest());
            }

        };
        state.onTerminated = state.onAdded;
        return new BehaviorSubject<T>(state, state);
    }

    protected BehaviorSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T> state) {
        super(onSubscribe);
        this.state = state;
    }

    @Override
    public void onCompleted() {
        Object last = state.getLatest();
        if (last == null || state.active) {
            Object n = NotificationLite.completed();
            for (SubjectObserver<T> bo : state.terminate(n)) {
                bo.emitNext(n);
            }
        }
    }

    @Override
    public void onError(Throwable e) {
        Object last = state.getLatest();
        if (last == null || state.active) {
            Object n = NotificationLite.error(e);
            List<Throwable> errors = null;
            for (SubjectObserver<T> bo : state.terminate(n)) {
                try {
                    bo.emitNext(n);
                } catch (Throwable e2) {
                    if (errors == null) {
                        errors = new ArrayList<Throwable>();
                    }
                    errors.add(e2);
                }
            }

            Exceptions.throwIfAny(errors);
        }
    }

    @Override
    public void onNext(T v) {
        Object last = state.getLatest();
        if (last == null || state.active) {
            Object n = NotificationLite.next(v);
            for (SubjectObserver<T> bo : state.next(n)) {
                bo.emitNext(n);
            }
        }
    }

    /* test support */ int subscriberCount() {
        return state.observers().length;
    }

    @Override
    public boolean hasObservers() {
        return state.observers().length > 0;
    }
    /**
     * Check if the Subject has a value.
     * <p>Use the {@link #getValue()} method to retrieve such a value.
     * <p>Note that unless {@link #hasCompleted()} or {@link #hasThrowable()} returns true, the value
     * retrieved by {@code getValue()} may get outdated.
     * @return true if and only if the subject has some value and hasn't terminated yet.
     * @since 1.2
     */
    public boolean hasValue() {
        Object o = state.getLatest();
        return NotificationLite.isNext(o);
    }
    /**
     * Check if the Subject has terminated with an exception.
     * @return true if the subject has received a throwable through {@code onError}.
     * @since 1.2
     */
    public boolean hasThrowable() {
        Object o = state.getLatest();
        return NotificationLite.isError(o);
    }
    /**
     * Check if the Subject has terminated normally.
     * @return true if the subject completed normally via {@code onCompleted()}
     * @since 1.2
     */
    public boolean hasCompleted() {
        Object o = state.getLatest();
        return NotificationLite.isCompleted(o);
    }
    /**
     * Returns the current value of the Subject if there is such a value and
     * the subject hasn't terminated yet.
     * <p>The method can return {@code null} for various reasons. Use {@link #hasValue()}, {@link #hasThrowable()}
     * and {@link #hasCompleted()} to determine if such {@code null} is a valid value, there was an
     * exception or the Subject terminated (with or without receiving any value).
     * @return the current value or {@code null} if the Subject doesn't have a value,
     * has terminated or has an actual {@code null} as a valid value.
     * @since 1.2
     */
    public T getValue() {
        Object o = state.getLatest();
        if (NotificationLite.isNext(o)) {
            return NotificationLite.getValue(o);
        }
        return null;
    }
    /**
     * Returns the Throwable that terminated the Subject.
     * @return the Throwable that terminated the Subject or {@code null} if the
     * subject hasn't terminated yet or it terminated normally.
     * @since 1.2
     */
    public Throwable getThrowable() {
        Object o = state.getLatest();
        if (NotificationLite.isError(o)) {
            return NotificationLite.getError(o);
        }
        return null;
    }
    /**
     * Returns a snapshot of the currently buffered non-terminal events into
     * the provided {@code a} array or creates a new array if it has not enough capacity.
     * @param a the array to fill in
     * @return the array {@code a} if it had enough capacity or a new array containing the available values
     * @since 1.2
     */
    @SuppressWarnings("unchecked")
    public T[] getValues(T[] a) {
        Object o = state.getLatest();
        if (NotificationLite.isNext(o)) {
            if (a.length == 0) {
                a = (T[])Array.newInstance(a.getClass().getComponentType(), 1);
            }
            a[0] = NotificationLite.getValue(o);
            if (a.length > 1) {
                a[1] = null;
            }
        } else
        if (a.length > 0) {
            a[0] = null;
        }
        return a;
    }

    /**
     * Returns a snapshot of the currently buffered non-terminal events.
     * <p>The operation is thread-safe.
     *
     * @return a snapshot of the currently buffered non-terminal events.
     * @since (If this graduates from being an Experimental class method, replace this parenthetical with the release number)
     * @since 1.2
     */
    @SuppressWarnings("unchecked")
    public Object[] getValues() {
        T[] r = getValues((T[])EMPTY_ARRAY);
        if (r == EMPTY_ARRAY) {
            return new Object[0]; // don't leak the default empty array.
        }
        return r;
    }
}

ReplaySubject 基本使用

    private ReplaySubject<String> replaySubject;
    private EditText editText;
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_simple2);
        //缓存所有的消息,一旦有新的观察者订阅了,会立马将缓存的所有信息,发送给对应的观察者
        replaySubject = ReplaySubject.create();
    }

    public void click(View v) {
        replaySubject.subscribe(new Observer<String>() {
            @Override
            public void onCompleted() {
                Log.i("main", "完成......");
            }

            @Override
            public void onError(Throwable e) {
                Log.i("main", "异常......");
            }

            @Override
            public void onNext(String message) {
                Log.i("main", "消息:" + message);
            }
        });

    }

    public void send(View v) {
        if(editText==null){
            editText = (EditText) findViewById(R.id.et_text);
        }

        replaySubject.onNext(editText.getText().toString());
    }

AsyncSubject 基本使用

 private AsyncSubject<String> asyncSubject;
    private EditText editText;
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_simple2);
        //总结
        //特点一:我们发送消息的时候,调用了onNext方法,必须还要调用onComplete()方法提交
        //消息才能发送出去,要不然观察者收不到消息
        //特点二:采用了AsyncSubject创建并且添加注册观察者,那么所有的观察者,只会有且只能够收到最近的一条消息

        asyncSubject = AsyncSubject.create();
    }

    public void click(View v) {
        asyncSubject.subscribe(new Observer<String>() {
            @Override
            public void onCompleted() {
                Log.i("main", "完成......");
            }

            @Override
            public void onError(Throwable e) {
                Log.i("main", "异常......");
            }

            @Override
            public void onNext(String message) {
                Log.i("main", "消息:" + message);
            }
        });

    }

    public void send(View v) {
//        if(editText==null){
//            editText = (EditText) findViewById(R.id.et_text);
//        }
        asyncSubject.onNext("hello world");
        asyncSubject.onNext("qwert");
        asyncSubject.onNext("asdfg");
        asyncSubject.onNext("zxcvb");
        asyncSubject.onCompleted();
    }

结果输出:

08-03 14:33:56.457 15968-15968/com.haocai.architect.rxjava I/main: 消息:zxcvb
08-03 14:33:56.457 15968-15968/com.haocai.architect.rxjava I/main: 完成......

AsyncSubject 源码分析

public final class AsyncSubject<T> extends Subject<T, T> {
    final SubjectSubscriptionManager<T> state;
    volatile Object lastValue;

    /**
     * Creates and returns a new {@code AsyncSubject}.
     * @param <T> the result value type
     * @return the new {@code AsyncSubject}
     */
    public static <T> AsyncSubject<T> create() {
        final SubjectSubscriptionManager<T> state = new SubjectSubscriptionManager<T>();
        state.onTerminated = new Action1<SubjectObserver<T>>() {
            @Override
            public void call(SubjectObserver<T> o) {
                Object v = state.getLatest();
                if (v == null || NotificationLite.isCompleted(v)) {
                    o.onCompleted();
                } else
                if (NotificationLite.isError(v)) {
                    o.onError(NotificationLite.getError(v));
                } else {
                    o.actual.setProducer(new SingleProducer<T>(o.actual, NotificationLite.<T>getValue(v)));
                }
            }
        };
        return new AsyncSubject<T>(state, state);
    }

    protected AsyncSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T> state) {
        super(onSubscribe);
        this.state = state;
    }

    @Override
    public void onCompleted() {
        if (state.active) {
            Object last = lastValue;
            if (last == null) {
                last = NotificationLite.completed();
            }
            for (SubjectObserver<T> bo : state.terminate(last)) {
                if (last == NotificationLite.completed()) {
                    bo.onCompleted();
                } else {
                    bo.actual.setProducer(new SingleProducer<T>(bo.actual, NotificationLite.<T>getValue(last)));
                }
            }
        }
    }

    @Override
    public void onError(final Throwable e) {
        if (state.active) {
            Object n = NotificationLite.error(e);
            List<Throwable> errors = null;
            for (SubjectObserver<T> bo : state.terminate(n)) {
                try {
                    bo.onError(e);
                } catch (Throwable e2) {
                    if (errors == null) {
                        errors = new ArrayList<Throwable>();
                    }
                    errors.add(e2);
                }
            }

            Exceptions.throwIfAny(errors);
        }
    }

    @Override
    public void onNext(T v) {
        lastValue = NotificationLite.next(v);
    }

    @Override
    public boolean hasObservers() {
        return state.observers().length > 0;
    }
    /**
     * Check if the Subject has a value.
     * <p>Use the {@link #getValue()} method to retrieve such a value.
     * <p>Note that unless {@link #hasCompleted()} or {@link #hasThrowable()} returns true, the value
     * retrieved by {@code getValue()} may get outdated.
     * @return true if and only if the subject has some value but not an error
     * @since 1.2
     */
    public boolean hasValue() {
        Object v = lastValue;
        Object o = state.getLatest();
        return !NotificationLite.isError(o) && NotificationLite.isNext(v);
    }
    /**
     * Check if the Subject has terminated with an exception.
     * @return true if the subject has received a throwable through {@code onError}.
     * @since 1.2
     */
    public boolean hasThrowable() {
        Object o = state.getLatest();
        return NotificationLite.isError(o);
    }
    /**
     * Check if the Subject has terminated normally.
     * @return true if the subject completed normally via {@code onCompleted()}
     * @since 1.2
     */
    public boolean hasCompleted() {
        Object o = state.getLatest();
        return o != null && !NotificationLite.isError(o);
    }
    /**
     * Returns the current value of the Subject if there is such a value and
     * the subject hasn't terminated with an exception.
     * <p>The method can return {@code null} for various reasons. Use {@link #hasValue()}, {@link #hasThrowable()}
     * and {@link #hasCompleted()} to determine if such {@code null} is a valid value, there was an
     * exception or the Subject terminated without receiving any value.
     * @return the current value or {@code null} if the Subject doesn't have a value,
     * has terminated with an exception or has an actual {@code null} as a value.
     * @since 1.2
     */
    public T getValue() {
        Object v = lastValue;
        Object o = state.getLatest();
        if (!NotificationLite.isError(o) && NotificationLite.isNext(v)) {
            return NotificationLite.getValue(v);
        }
        return null;
    }
    /**
     * Returns the Throwable that terminated the Subject.
     * @return the Throwable that terminated the Subject or {@code null} if the
     * subject hasn't terminated yet or it terminated normally.
     * @since 1.2
     */
    public Throwable getThrowable() {
        Object o = state.getLatest();
        if (NotificationLite.isError(o)) {
            return NotificationLite.getError(o);
        }
        return null;
    }
}

Observable中其他的一些api
第一个:just()方法

    AppInfo appInfo1 = new AppInfo("Michael同学", 0);
    AppInfo appInfo2 = new AppInfo("Michael同学", 0);
    AppInfo appInfo3 = new AppInfo("Michael同学", 0);
    Observable.just(appInfo1, appInfo2, appInfo3);

    //源码
    public static <T> Observable<T> just(T t1, T t2, T t3) {
        return from((T[])new Object[] { t1, t2, t3 });
    }
    单个对象转数组

第二个:repeat()方法

public final class OnSubscribeRedo<T> implements OnSubscribe<T> {
    final Observable<T> source;
    private final Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> controlHandlerFunction;
    final boolean stopOnComplete;
    final boolean stopOnError;
    private final Scheduler scheduler;

    static final Func1<Observable<? extends Notification<?>>, Observable<?>> REDO_INFINITE = new Func1<Observable<? extends Notification<?>>, Observable<?>>() {
        @Override
        public Observable<?> call(Observable<? extends Notification<?>> ts) {
            return ts.map(new Func1<Notification<?>, Notification<?>>() {
                @Override
                public Notification<?> call(Notification<?> terminal) {
                    return Notification.createOnNext(null);
                }
            });
        }
    };

    public static final class RedoFinite implements Func1<Observable<? extends Notification<?>>, Observable<?>> {
        final long count;

        public RedoFinite(long count) {
            this.count = count;
        }

        @Override
        public Observable<?> call(Observable<? extends Notification<?>> ts) {
            return ts.map(new Func1<Notification<?>, Notification<?>>() {

                int num;

                @Override
                public Notification<?> call(Notification<?> terminalNotification) {
                    if (count == 0) {
                        return terminalNotification;
                    }

                    num++;
                    if (num <= count) {
                        return Notification.createOnNext(num);
                    } else {
                        return terminalNotification;
                    }
                }

            }).dematerialize();
        }
    }

    public static final class RetryWithPredicate implements Func1<Observable<? extends Notification<?>>, Observable<? extends Notification<?>>> {
        final Func2<Integer, Throwable, Boolean> predicate;

        public RetryWithPredicate(Func2<Integer, Throwable, Boolean> predicate) {
            this.predicate = predicate;
        }

        @Override
        public Observable<? extends Notification<?>> call(Observable<? extends Notification<?>> ts) {
            return ts.scan(Notification.createOnNext(0), new Func2<Notification<Integer>, Notification<?>, Notification<Integer>>() {
                @SuppressWarnings("unchecked")
                @Override
                public Notification<Integer> call(Notification<Integer> n, Notification<?> term) {
                    final int value = n.getValue();
                    if (predicate.call(value, term.getThrowable())) {
                        return Notification.createOnNext(value + 1);
                    } else {
                        return (Notification<Integer>) term;
                    }
                }
            });
        }
    }

    public static <T> Observable<T> retry(Observable<T> source) {
       return retry(source, REDO_INFINITE);
    }

    public static <T> Observable<T> retry(Observable<T> source, final long count) {
        if (count < 0) {
            throw new IllegalArgumentException("count >= 0 expected");
        }
        if (count == 0) {
            return source;
        }
        return retry(source, new RedoFinite(count));
    }

    public static <T> Observable<T> retry(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> notificationHandler) {
        return unsafeCreate(new OnSubscribeRedo<T>(source, notificationHandler, true, false, Schedulers.trampoline()));
    }

    public static <T> Observable<T> retry(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> notificationHandler, Scheduler scheduler) {
        return unsafeCreate(new OnSubscribeRedo<T>(source, notificationHandler, true, false, scheduler));
    }

    public static <T> Observable<T> repeat(Observable<T> source) {
        return repeat(source, Schedulers.trampoline());
    }

    public static <T> Observable<T> repeat(Observable<T> source, Scheduler scheduler) {
        return repeat(source, REDO_INFINITE, scheduler);
    }

    public static <T> Observable<T> repeat(Observable<T> source, final long count) {
        return repeat(source, count, Schedulers.trampoline());
    }

    public static <T> Observable<T> repeat(Observable<T> source, final long count, Scheduler scheduler) {
        if (count == 0) {
            return Observable.empty();
        }
        if (count < 0) {
            throw new IllegalArgumentException("count >= 0 expected");
        }
        return repeat(source, new RedoFinite(count - 1), scheduler);
    }

    public static <T> Observable<T> repeat(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> notificationHandler) {
        return unsafeCreate(new OnSubscribeRedo<T>(source, notificationHandler, false, true, Schedulers.trampoline()));
    }

    public static <T> Observable<T> repeat(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> notificationHandler, Scheduler scheduler) {
        return unsafeCreate(new OnSubscribeRedo<T>(source, notificationHandler, false, true, scheduler));
    }

    public static <T> Observable<T> redo(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> notificationHandler, Scheduler scheduler) {
        return unsafeCreate(new OnSubscribeRedo<T>(source, notificationHandler, false, false, scheduler));
    }

    private OnSubscribeRedo(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> f, boolean stopOnComplete, boolean stopOnError,
            Scheduler scheduler) {
        this.source = source;
        this.controlHandlerFunction = f;
        this.stopOnComplete = stopOnComplete;
        this.stopOnError = stopOnError;
        this.scheduler = scheduler;
    }

    @Override
    public void call(final Subscriber<? super T> child) {

        // when true is a marker to say we are ready to resubscribe to source
        final AtomicBoolean resumeBoundary = new AtomicBoolean(true);

        // incremented when requests are made, decremented when requests are fulfilled
        final AtomicLong consumerCapacity = new AtomicLong();

        final Scheduler.Worker worker = scheduler.createWorker();
        child.add(worker);

        final SerialSubscription sourceSubscriptions = new SerialSubscription();
        child.add(sourceSubscriptions);

        // use a subject to receive terminals (onCompleted and onError signals) from
        // the source observable. We use a BehaviorSubject because subscribeToSource
        // may emit a terminal before the restarts observable (transformed terminals)
        // is subscribed
        final Subject<Notification<?>, Notification<?>> terminals = BehaviorSubject.<Notification<?>>create().toSerialized();
        final Subscriber<Notification<?>> dummySubscriber = Subscribers.empty();
        // subscribe immediately so the last emission will be replayed to the next
        // subscriber (which is the one we care about)
        terminals.subscribe(dummySubscriber);

        final ProducerArbiter arbiter = new ProducerArbiter();

        final Action0 subscribeToSource = new Action0() {
            @Override
            public void call() {
                if (child.isUnsubscribed()) {
                    return;
                }

                Subscriber<T> terminalDelegatingSubscriber = new Subscriber<T>() {
                    boolean done;

                    @Override
                    public void onCompleted() {
                        if (!done) {
                            done = true;
                            unsubscribe();
                            terminals.onNext(Notification.createOnCompleted());
                        }
                    }

                    @Override
                    public void onError(Throwable e) {
                        if (!done) {
                            done = true;
                            unsubscribe();
                            terminals.onNext(Notification.createOnError(e));
                        }
                    }

                    @Override
                    public void onNext(T v) {
                        if (!done) {
                            child.onNext(v);
                            decrementConsumerCapacity();
                            arbiter.produced(1);
                        }
                    }

                    private void decrementConsumerCapacity() {
                        // use a CAS loop because we don't want to decrement the
                        // value if it is Long.MAX_VALUE
                        while (true) {
                            long cc = consumerCapacity.get();
                            if (cc != Long.MAX_VALUE) {
                                if (consumerCapacity.compareAndSet(cc, cc - 1)) {
                                    break;
                                }
                            } else {
                                break;
                            }
                        }
                    }

                    @Override
                    public void setProducer(Producer producer) {
                        arbiter.setProducer(producer);
                    }
                };
                // new subscription each time so if it unsubscribes itself it does not prevent retries
                // by unsubscribing the child subscription
                sourceSubscriptions.set(terminalDelegatingSubscriber);
                source.unsafeSubscribe(terminalDelegatingSubscriber);
            }
        };

        // the observable received by the control handler function will receive notifications of onCompleted in the case of 'repeat'
        // type operators or notifications of onError for 'retry' this is done by lifting in a custom operator to selectively divert
        // the retry/repeat relevant values to the control handler
        final Observable<?> restarts = controlHandlerFunction.call(
                terminals.lift(new Operator<Notification<?>, Notification<?>>() {
                    @Override
                    public Subscriber<? super Notification<?>> call(final Subscriber<? super Notification<?>> filteredTerminals) {
                        return new Subscriber<Notification<?>>(filteredTerminals) {
                            @Override
                            public void onCompleted() {
                                filteredTerminals.onCompleted();
                            }

                            @Override
                            public void onError(Throwable e) {
                                filteredTerminals.onError(e);
                            }

                            @Override
                            public void onNext(Notification<?> t) {
                                if (t.isOnCompleted() && stopOnComplete) {
                                    filteredTerminals.onCompleted();
                                } else if (t.isOnError() && stopOnError) {
                                    filteredTerminals.onError(t.getThrowable());
                                } else {
                                    filteredTerminals.onNext(t);
                                }
                            }

                            @Override
                            public void setProducer(Producer producer) {
                                producer.request(Long.MAX_VALUE);
                            }
                        };
                    }
                }));

        // subscribe to the restarts observable to know when to schedule the next redo.
        worker.schedule(new Action0() {
            @Override
            public void call() {
                restarts.unsafeSubscribe(new Subscriber<Object>(child) {
                    @Override
                    public void onCompleted() {
                        child.onCompleted();
                    }

                    @Override
                    public void onError(Throwable e) {
                        child.onError(e);
                    }

                    @Override
                    public void onNext(Object t) {
                        if (!child.isUnsubscribed()) {
                            // perform a best endeavours check on consumerCapacity
                            // with the intent of only resubscribing immediately
                            // if there is outstanding capacity
                            if (consumerCapacity.get() > 0) {
                                worker.schedule(subscribeToSource);
                            } else {
                                // set this to true so that on next request
                                // subscribeToSource will be scheduled
                                resumeBoundary.compareAndSet(false, true);
                            }
                        }
                    }

                    @Override
                    public void setProducer(Producer producer) {
                        producer.request(Long.MAX_VALUE);
                    }
                });
            }
        });

        child.setProducer(new Producer() {

            @Override
            public void request(final long n) {
                if (n > 0) {
                    BackpressureUtils.getAndAddRequest(consumerCapacity, n);
                    arbiter.request(n);
                    if (resumeBoundary.compareAndSet(true, false)) {
                        worker.schedule(subscribeToSource);
                    }
                }
            }
        });

    }
}

数据重复操作

第三个:defer()方法
只有当订阅者订阅才创建Observable,为每个订阅创建一个新的Observable。内部通过OnSubscribeDefer在订阅时调用Func0创建Observable。(懒加载)

    private Observable<AppInfo> observable;
    private AppInfoAdapter appInfoAdapter;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_simple9);
        observable = getApps();
        initView();
        // registerObserver();
    }

    private void initView() {
        ListView listView = (ListView) findViewById(R.id.lv_app_name);
        appInfoAdapter = new AppInfoAdapter(this);
        listView.setAdapter(appInfoAdapter);
    }

    /**
     * 创建Observable
     * 
     * @return
     */
    private Observable<AppInfo> getApps() {
        // 一旦观察者订阅,那么会立马回调Func0中的call方法,创建Observable(懒加载)
        return Observable.defer(new Func0<Observable<AppInfo>>() {

            // @Override
            // public Observable<AppInfo> call() {
            // return Observable.create(new Observable.OnSubscribe<AppInfo>() {
            //
            // @Override
            // public void call(Subscriber<? super AppInfo> t) {
            // t.onNext(new AppInfo("梦想同学", 0));
            // t.onCompleted();
            // }
            // });
            // }

            @Override
            public Observable<AppInfo> call() {
                Log.i("main", "创建了Observable......");
                AppInfo appInfo1 = new AppInfo("Michael同学", 0);
                AppInfo appInfo2 = new AppInfo("Michael同学", 0);
                AppInfo appInfo3 = new AppInfo("Michael同学", 0);
                return Observable.just(appInfo1, appInfo2, appInfo3).repeat(3);
            }

        });
    }

    public void click(View v) {
        observable.subscribe(new Observer<AppInfo>() {

            @Override
            public void onCompleted() {
                // 完成之后刷新UI
                appInfoAdapter.notifyDataSetChanged();
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(AppInfo t) {
                // 添加数据
                appInfoAdapter.addAppInfo(t);
            }
        });
    }
点击onClick 加载9条数据

第四个:ranger()方法

    private Observable<Integer> observable;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_simple9);
        observable = getApps();
    }

    /**
     * 创建Observable
     * 
     * @return
     */
    private Observable<Integer> getApps() {
    
        // 从指定位置开始发送,你需向后发送多少个
        // 第一个参数:起始值
        // 第二个参数:数量
        // 注意:第一个参数根据数量进行累加,之后发送
        // range()函数用两个数字作为参数:第一个是起始点,第二个是我们想发射数字的个数。
        return Observable.range(5, 5);
    }

    public void click(View v) {
        observable.subscribe(new Observer<Integer>() {

            @Override
            public void onCompleted() {
                // 完成之后刷新UI
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Integer t) {
                // 添加数据
                Log.i("main", "接收到的数据: " + t);
            }
        });
    }

结果输出:
08-04 05:49:20.818 13051-13051/com.haocai.architect.rxjava I/main: 接收到的数据: 5
08-04 05:49:20.819 13051-13051/com.haocai.architect.rxjava I/main: 接收到的数据: 6
08-04 05:49:20.819 13051-13051/com.haocai.architect.rxjava I/main: 接收到的数据: 7
08-04 05:49:20.819 13051-13051/com.haocai.architect.rxjava I/main: 接收到的数据: 8
08-04 05:49:20.819 13051-13051/com.haocai.architect.rxjava I/main: 接收到的数据: 9

第五个:interval()方法
interval():创建一个按固定时间间隔发射整数序列的Observable
这个比较好理解,interval()也是用来创建Observable的,并且也可以延迟发送。但interval()是按周期执行的,所以可以这么认为:interval()是一个可以指定线程的TimerTask(威力加强版……)

第六个:timer()方法

    private Observable<Long> observable;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_simple9);
        observable = getApps();
    }

    /**
     * 创建Observable
     * 
     * @return
     */
    private Observable<Long> getApps() {
        // timer方法解释
        // 第一个参数:发送消息之间的间隔时间
        // 第二个参数:时间单位(毫秒、秒、分钟、小时等等......)
        return Observable.timer(3, TimeUnit.SECONDS);
    }

    public void click(View v) {
        // 轮训通知观察者
        observable.subscribe(new Observer<Long>() {

            @Override
            public void onCompleted() {
                // 完成之后刷新UI
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Long t) {
                // 添加数据
                Log.i("main", "接收到的数据: " + t);
            }
        });
    }

输出结果
08-04 08:25:14.754 24259-24357/com.haocai.architect.rxjava I/main: 接收到的数据: 0

总结:
使用Observable步骤:
准备观察者、准备Observable,注册观察者(observable.subscribe(new Observer()))、取消订阅(subscribe.unsubscribe()) 以及中间一些逻辑处理

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,589评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,615评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,933评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,976评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,999评论 6 393
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,775评论 1 307
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,474评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,359评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,854评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,007评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,146评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,826评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,484评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,029评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,153评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,420评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,107评论 2 356

推荐阅读更多精彩内容

  • 注:只包含标准包中的操作符,用于个人学习及备忘参考博客:http://blog.csdn.net/maplejaw...
    小白要超神阅读 2,195评论 2 8
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,672评论 18 139
  • 响应式编程简介 响应式编程是一种基于异步数据流概念的编程模式。数据流就像一条河:它可以被观测,被过滤,被操作,或者...
    说码解字阅读 3,070评论 0 5
  • 我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的...
    Jason_andy阅读 5,473评论 7 62
  • 前言 欢迎继续收看《我所理解的RxJava--上手其实很简单(二)》,上周出了第一篇,各位程序猿大大的反应还不错,...
    Weavey阅读 29,644评论 35 133