有关EventBus的发送异步消息时的线程切换

概述: EventBus用于不同的Activity之间或者Activity与Service之间进行通信,非常的方便,即使是不同线程之间的数据发送,我们定义的数据接收方法也能收到。

简单使用

public class MainActivity extend Activity{

    @Override
    protected void onCreate(@Nullable Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        EventBus.getDefault().register(this)
    }


    @Override
    protected void onDestroy() {
        super.onDestroy();
        EventBus.getDefault().unregister(this)
    }

    @Subscribe(threadMode = ThreadMode.MAIN)
    public void onEvent(Event event){//自定义Event实体类
        Log.i("onEvent", "接收到消息");
    }

}

//在其他地方直接发送Event对象
EventBus.getDefault().post(new Event());

只要我们页面内注册了好了EventBus,并且定义好了接收方法,不管我们从哪个线程发送消息都可以接收到。
onEvent()方法添加了@Subscribe注解;

Subscribe注解

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface Subscribe {
    ThreadMode threadMode() default ThreadMode.POSTING;

    /**
     * If true, delivers the most recent sticky event (posted with
     * {@link EventBus#postSticky(Object)}) to this subscriber (if event available).
     */
    boolean sticky() default false;

    /** Subscriber priority to influence the order of event delivery.
     * Within the same delivery thread ({@link ThreadMode}), higher priority subscribers will receive events before
     * others with a lower priority. The default priority is 0. Note: the priority does *NOT* affect the order of
     * delivery among subscribers with different {@link ThreadMode}s! */
    int priority() default 0;
}
  • threadMode属性定义方法在哪个线程接收消息;
  • sticky属性定义方法是否接收粘性消息,用于接收页面创建之前发送的粘性消息;
  • priority属性定义方法的优先级,优先级高的先获取到消息回调;

ThreadMode

public enum ThreadMode {
    
    //哪个线程发送的消息,就在哪个线程回调接收方法;
    //接收方法不能过于耗时,会阻塞发送线程(可能为主线程)
    POSTING,

    //不管哪个线程发送,都在主线程回调接收方法;
    MAIN,

    //在主线程回调接收方法,消息被发送直接回放入队列,不一定会马上回调接收方法;
    MAIN_ORDERED,

    //在子线程回调接收方法,如果发送消息是在子线程,直接回调接收方法;
    BACKGROUND,

   //在一个独立的线程回调接收方法,不在主线程,也不在发送消息的线程;
    ASYNC
}

接下来查看post方法是如何执行的:

1、post(Object event)
public class EventBus {

    /** Posts the given event to the event bus. */
    public void post(Object event) {
        PostingThreadState postingState = currentPostingThreadState.get();//往下看分析1和分析2
        List<Object> eventQueue = postingState.eventQueue;//获取消息队列
        eventQueue.add(event);//把事件添加到队列中

        if (!postingState.isPosting) {
            postingState.isMainThread = isMainThread();//记录是否在主线程
            postingState.isPosting = true;//设置为正在发送中的状态
            if (postingState.canceled) {
                throw new EventBusException("Internal error. Abort state was not reset");
            }
            try {
                //循环发送消息队里中的消息
                while (!eventQueue.isEmpty()) {
                    postSingleEvent(eventQueue.remove(0), postingState);
                }
            } finally {
                postingState.isPosting = false;
                postingState.isMainThread = false;
            }
        }
    }

    //分析1
    //ThreadLocal 是一个线程内部的数据存储类,存储线程私有数据;
    private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
        @Override
        protected PostingThreadState initialValue() {
            return new PostingThreadState();
        }
    };


    //分析2
    //记录发送线程的信息,如发送的消息队里、是否主线程、是否正在发送,是否取消发送等
    final static class PostingThreadState {
        final List<Object> eventQueue = new ArrayList<>();
        boolean isPosting;
        boolean isMainThread;
        Subscription subscription;
        Object event;
        boolean canceled;
    }

}

分析:

  1. post方法中,先从线程中获取PostingThreadState数据,PostingThreadState用于保存消息队列,发送状态,是否为主线程;
  2. 把消息添加到线程所在的消息队列中;
  3. 如果现在还没开始发送消息,循环发送消息队列中的消息,设置发送状态为true;
  4. 调用postSingleEvent(eventQueue.remove(0), postingState)发送单个消息;
2、postSingleEvent(eventQueue.remove(0), postingState)
    private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
        Class<?> eventClass = event.getClass();//获取消息实体类对应的Class对象
        boolean subscriptionFound = false;
        if (eventInheritance) {//默认为true, 判断是否要查找消息实体类的父类或实现的接口类
            List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);//分析3,获取消息实体类的父类或者接口类
            int countTypes = eventTypes.size();
            for (int h = 0; h < countTypes; h++) {//遍历所有的event类对象
                Class<?> clazz = eventTypes.get(h);
                subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
            }
        } else {
            subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
        }
        if (!subscriptionFound) {
            if (logNoSubscriberMessages) {
                logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
            }
            if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                    eventClass != SubscriberExceptionEvent.class) {
                post(new NoSubscriberEvent(this, event));
            }
        }
    }

    //分析3
    /** Looks up all Class objects including super classes and interfaces. Should also work for interfaces. */
    private static List<Class<?>> lookupAllEventTypes(Class<?> eventClass) {
        synchronized (eventTypesCache) {
            List<Class<?>> eventTypes = eventTypesCache.get(eventClass);//获取缓存
            if (eventTypes == null) {//如果缓存为空
                eventTypes = new ArrayList<>();
                Class<?> clazz = eventClass;
                while (clazz != null) {
                    eventTypes.add(clazz);
                    addInterfaces(eventTypes, clazz.getInterfaces());//添加class的接口类
                    clazz = clazz.getSuperclass();//获取class的父类
                }
                eventTypesCache.put(eventClass, eventTypes);//添加进集合中
            }
            return eventTypes;
        }
    }

分析:

  1. 获取消息实体类的类对象,判断是否需要获取这个类对象的父类和接口类的类对象;
  2. 获取这个类对象的所有父类和接口类的类对象, 然后遍历这些类对象,调用postSingleEventForEventType(event, postingState, clazz);
  3. 不需要获取消息类对象的父类和接口,直接调用postSingleEventForEventType(event, postingState, eventClass);
3、postSingleEventForEventType(event, postingState, clazz)

    //subscriptionsByEventType记录了消息实体类的类对象,与消息订阅类和消息订阅方法的映射关系
    //subscriptionsByEventType在EventBus初始化时被赋值为HashMap;
    private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;


    private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
        CopyOnWriteArrayList<Subscription> subscriptions;//Subscription记录了消息订阅类和订阅方法
        synchronized (this) {
            subscriptions = subscriptionsByEventType.get(eventClass);//获取消息的所有订阅者信息
        }
        if (subscriptions != null && !subscriptions.isEmpty()) {
            for (Subscription subscription : subscriptions) {//遍历所有的消息订阅者
                postingState.event = event;//配置发送线程配置的消息时间
                postingState.subscription = subscription;//配置发送线程配置的消息订阅者
                boolean aborted = false;
                try {
                    postToSubscription(subscription, event, postingState.isMainThread);
                    aborted = postingState.canceled;
                } finally {
                    postingState.event = null;
                    postingState.subscription = null;
                    postingState.canceled = false;
                }
                if (aborted) {
                    break;
                }
            }
            return true;
        }
        return false;
    }

//消息订阅者
final class Subscription {
    final Object subscriber;//消息订阅类
    final SubscriberMethod subscriberMethod;//消息订阅方法
}

分析:
1.首先获取消息实体类对应的消息订阅者集合;这个集合是在我们调用EventBus的注册方法时,遍历注册方法所在类的所有方法,获取添加了@Subscribe的方法所有方法,再获取这些方法的参数,然后以参数的类对象为key,Subscription的集合为value,保存消息参数和消息订阅者的映射关系,消息和订阅类以及订阅方法的映射关系为1:N:N;有兴趣的童鞋可以自己查看register()方法的相关代码;

  1. 获取到消息订阅者的集合后,遍历这个集合,调用postToSubscription(subscription, event, postingState.isMainThread);
3、postToSubscription(subscription, event, postingState.isMainThread)
    private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        switch (subscription.subscriberMethod.threadMode) {//判断消息订阅方法定义了要在回调线程模式
            case POSTING:
                invokeSubscriber(subscription, event);//直接在发送消息的线程回调
                break;
            case MAIN:
                if (isMainThread) {
                    invokeSubscriber(subscription, event);//发送消息是主线程,直接回调
                } else {
                    mainThreadPoster.enqueue(subscription, event);//保存到主线程handler的消息队列
                }
                break;
            case MAIN_ORDERED:
                if (mainThreadPoster != null) {
                    //保存到主线程handler的消息队列,然后发送handler消息
                    mainThreadPoster.enqueue(subscription, event);//分析4
                } else {
                    // temporary: technically not correct as poster not decoupled from subscriber
                    invokeSubscriber(subscription, event);
                }
                break;
            case BACKGROUND:
                if (isMainThread) {
                    //backgroundPoster是一个Runnable, 把消息保存到子线程的消息队列,然后调用线程池执行runnable
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    invokeSubscriber(subscription, event);
                }
                break;
            case ASYNC:
                 //asyncPoster是一个Runnable, 把消息保存到子线程的消息队列,然后调用线程池执行runnable
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
        }
    }

    //分析4, mainThreadPoster变量的初始化
    private final Poster mainThreadPoster;

     //消息发送接口,
    interface Poster {
        void enqueue(Subscription subscription, Object event);//消息入队方法
    }

    EventBus(EventBusBuilder builder) {
        mainThreadSupport = builder.getMainThreadSupport();
        mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
    }

   
//mainThreadPoster靠MainThreadSupport来初始化
public interface MainThreadSupport {

    boolean isMainThread();

    Poster createPoster(EventBus eventBus);

    class AndroidHandlerMainThreadSupport implements MainThreadSupport {

        private final Looper looper;

        public AndroidHandlerMainThreadSupport(Looper looper) {
            this.looper = looper;
        }

        @Override
        public boolean isMainThread() {
            return looper == Looper.myLooper();
        }

        @Override
        public Poster createPoster(EventBus eventBus) {
            //返回HandlerPoster,赋值给mainThreadPoster, looper为主线程的looper
            return new HandlerPoster(eventBus, looper, 10);
        }
    }

}

//mainThreadPoster实际上就是一个HandlerPoster,是一个Handler
public class HandlerPoster extends Handler implements Poster {

    private final PendingPostQueue queue;
    private final int maxMillisInsideHandleMessage;
    private final EventBus eventBus;
    private boolean handlerActive;

    protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
        super(looper);
        this.eventBus = eventBus;
        this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            queue.enqueue(pendingPost);//消息入队
            if (!handlerActive) {
                handlerActive = true;
                if (!sendMessage(obtainMessage())) {//然后发送空消息通知handler处理队列中的消息
                    throw new EventBusException("Could not send handler message");
                }
            }
        }
    }


    //主线程处理消息
    @Override
    public void handleMessage(Message msg) {
        boolean rescheduled = false;
        try {
            long started = SystemClock.uptimeMillis();
            while (true) {
                PendingPost pendingPost = queue.poll();
                if (pendingPost == null) {
                    synchronized (this) {
                        // Check again, this time in synchronized
                        pendingPost = queue.poll();
                        if (pendingPost == null) {
                            handlerActive = false;
                            return;
                        }
                    }
                }
                eventBus.invokeSubscriber(pendingPost);//分析5
                long timeInMethod = SystemClock.uptimeMillis() - started;
                if (timeInMethod >= maxMillisInsideHandleMessage) {
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                    rescheduled = true;
                    return;
                }
            }
        } finally {
            handlerActive = rescheduled;
        }
    }
}

   //分析5
  void invokeSubscriber(PendingPost pendingPost) {
        Object event = pendingPost.event;
        Subscription subscription = pendingPost.subscription;
        PendingPost.releasePendingPost(pendingPost);
        if (subscription.active) {
            invokeSubscriber(subscription, event);通过反射,调用订阅方法
        }
    }

分析:

  • 这里主要分析一下发送消息所在线程不是主线程时,调用mainThreadPoster.enqueue(subscription, event)是如何切换到主线程回调订阅方法的;
  • 从上面的代码可以知道mainThreadPoster是一个handler,而looper是获取了主线程的looper后传递进来的;
  • 调用完enqueue方法,让消息进入队列,然后调用sendMessage方法,让handleMessage来处理消息,也就切换到主线程了,然后直接通过反射,在主线程调用订阅方法;
  • 如果是通过backgroundPoster.enqueue(subscription, event)或者asyncPoster.enqueue(subscription, event),会通过线程池来执行runnable,也就切换不同的线程来调用订阅方法。

总结:
1、调用EventBus发送消息后,会判断发送消息所在的线程是否为主线程;
2、获取消息的类对象,然后查找这个消息对应的所有订阅者;
3、调用消息的订阅方法时,会判断订阅方法设置的线程模式;如果需要在主线程回调,发送消息时不在主线程,那么通过主线程的handler来回调订阅方法;如果需要在子线程回调订阅方法,发送消息时是在主线程,那么通过继承了Runnable的类来调用订阅方法,通过线程池启动这个Runnable;这样就能实现子线程发送,切换成主线程回调订阅方法,以及主线程发送,子线程回调订阅方法。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 195,898评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,401评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,058评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,539评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,382评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,319评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,706评论 3 386
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,370评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,664评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,715评论 2 312
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,476评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,326评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,730评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,003评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,275评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,683评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,877评论 2 335

推荐阅读更多精彩内容

  • 项目到了一定阶段会出现一种甜蜜的负担:业务的不断发展与人员的流动性越来越大,代码维护与测试回归流程越来越繁琐。这个...
    fdacc6a1e764阅读 3,147评论 0 6
  • Swift1> Swift和OC的区别1.1> Swift没有地址/指针的概念1.2> 泛型1.3> 类型严谨 对...
    cosWriter阅读 11,068评论 1 32
  • EventBus基本使用 EventBus基于观察者模式的Android事件分发总线。 从这个图可以看出,Even...
    顾氏名清明阅读 613评论 0 1
  • 前言 在上一篇文章:EventBus 3.0初探: 入门使用及其使用 完全解析中,笔者为大家介绍了EventBus...
    丶蓝天白云梦阅读 15,786评论 21 128
  • 先吐槽一下博客园的MarkDown编辑器,推出的时候还很高兴博客园支持MarkDown了,试用了下发现支持不完善就...
    Ten_Minutes阅读 555评论 0 2