EventBus(v3.2.0)源码解析

EventBus解析(v3.2.0)

首先理清几个关系:

  1. 一个订阅类内可以订阅多个事件,所以订阅类和事件的关系是1->N
  2. 同一个事件,订阅类的继承结构上可能出现在父类和子类订阅同一个事件
  3. 一个事件在一个应用内可以被多个类所订阅,所以事件和订阅类的关系是1->N

1. 注册订阅

EventBus#register

  1. 查找订阅对象内的事件消费方法,包装成SubscriberMethod(包括方法对象Method,ThreadMode,eventType的Class,优先级,是否粘性)
  2. 将消费方法SubscriberMethod实体注册到总线上
public void register(Object subscriber) {
        Class<?> subscriberClass = subscriber.getClass();// 获取订阅类型
        List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);// 查找订阅实体内的消费方法
        synchronized (this) {
            for (SubscriberMethod subscriberMethod : subscriberMethods) {
                subscribe(subscriber, subscriberMethod);
            }
        }
    }

SubscriberMethod成员属性设计

public class SubscriberMethod {
    final Method method;
    final ThreadMode threadMode;
    final Class<?> eventType;
    final int priority;
    final boolean sticky;
    
    /** Used for efficient comparison */
    String methodString;
    
    /**
     * equals方法后续会使用到作为判等条件
     */
    @Override
    public boolean equals(Object other) {
        if (other == this) {
            return true;
        } else if (other instanceof SubscriberMethod) {
            checkMethodString();// 获取方法的字符串描述,类似MainActivity#OnNotify(NotifyMessage
            SubscriberMethod otherSubscriberMethod = (SubscriberMethod)other;
            otherSubscriberMethod.checkMethodString();
            // Don't use method.equals because of http://code.google.com/p/android/issues/detail?id=7811#c6
            return methodString.equals(otherSubscriberMethod.methodString);
        } else {
            return false;
        }
    }
}

1.1 查找消费方法

SubscriberMethodFinder#findSubscriberMethods负责查找SubscriberMethod,缓存是以订阅类型作为key,其内所有事件的消费方法SubscriberMethod列表为value的键值对,具体查找过程如下:

  1. 从缓存中查找订阅类型所映射的List<SubscriberMethod>,查找成功的话,直接返回
  2. 缓存查找无果,执行查找逻辑
  3. 查找结果存入缓存并返回查找结果List<SubscriberMethod>

主要代码如下:

// 缓存的结构
private static final Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();

List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
        // 1. 从METHOD_CACHE缓存中查找
        // 2. 反射订阅类查找
        if (ignoreGeneratedIndex) {
            // 反射订阅类,查找消费方法的相关订阅参数
            subscriberMethods = findUsingReflection(subscriberClass);
        } else {
            subscriberMethods = findUsingInfo(subscriberClass);
        }
        // 3. 放入缓存并返回
    }

第一种方式:findUsingReflection

反射订阅类的查找方式:

  1. 获取订阅类内所有声明的方法
  2. 挨个遍历方法列表
  3. 判断修饰符是否public,且非staticabstract
  4. 判断方法的参数个数是否仅一个
  5. 方法上是否声明@Subscribe注解
  6. 参数类型是否已经添加到收集的集合中
  7. 读取注解的参数,构建消费方法SubscriberMethod实例,并收集存储到findState.subscriberMethods
  8. 返回findState.subscriberMethods
private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
        FindState findState = prepareFindState();
        findState.initForSubscriber(subscriberClass);
        while (findState.clazz != null) {
            // 具体反射的逻辑在这个findUsingReflectionInSingleClass方法里
            findUsingReflectionInSingleClass(findState);
            // 查询超类,如果超类是java、javax或android、androidx系统的类,findState.clazz会被置空,中止循环,跳出循环块
            findState.moveToSuperclass();
        }
    
        // @8 返回订阅类的消费方法列表,释放查找过程中FindState使用的临时数据,以便复用FindState
        return getMethodsAndRelease(findState);
    }


private void findUsingReflectionInSingleClass(FindState findState) {
        Method[] methods;
        try {
            // This is faster than getMethods, especially when subscribers are fat classes like Activities
            // 仅订阅类声明的方法列表,不包括父类上的 @1
            methods = findState.clazz.getDeclaredMethods();
        } catch (Throwable th) {
            // Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149  这个bug描述详见改链接地址
            try {
                methods = findState.clazz.getMethods();
            } catch (LinkageError error) { // super class of NoClassDefFoundError to be a bit more broad...
                ...
                throw new EventBusException(msg, error);
            }
            
            // 跳过超类内的方法查找
            findState.skipSuperClasses = true;
        }
    
        // 遍历声明的方法,@2
        for (Method method : methods) {
            int modifiers = method.getModifiers();
            // 方法修饰符校验,只允许pubilc的方法,抽象,静态等不行,@3
            if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
                Class<?>[] parameterTypes = method.getParameterTypes();
                // 方法只允许一个参数, @4
                if (parameterTypes.length == 1) {
                    Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                    // 方法上有Subscribe注解,说明消费方法查找成功 @5
                    if (subscribeAnnotation != null) {
                        Class<?> eventType = parameterTypes[0]; 
                        // checkAdd控制是否存储以事件类型作为key, Method作为key的键值对 @6
                        if (findState.checkAdd(method, eventType)) {
                            ThreadMode threadMode = subscribeAnnotation.threadMode();
                            // 将所有注解信息等组组合到SubscriberMethod,并存储到列表中 @7
                            findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                                    subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                        }
                    }
                } 
                // 参数个数校验
                ...
            }
            // 修饰符校验
            ...
        }
    }

FindState校验逻辑

    // 以事件类型作为key,方法对象Method作为value存储
    final Map<Class, Object> anyMethodByEventType = new HashMap<>();

    boolean checkAdd(Method method, Class<?> eventType) {
            // 2 level check: 1st level with event type only (fast), 2nd level with complete signature when required.
            // Usually a subscriber doesn't have methods listening to the same event type.
            Object existing = anyMethodByEventType.put(eventType, method);
            // 判断订阅类中同一个事件类型是否存在多个消费方法,仅一个的话返回添加成功 
            if (existing == null) {
                return true;
            } else {
                if (existing instanceof Method) {
                    if (!checkAddWithMethodSignature((Method) existing, eventType)) {
                        // Paranoia check
                        throw new IllegalStateException();
                    }
                    // Put any non-Method object to "consume" the existing Method
                    anyMethodByEventType.put(eventType, this);
                }
                return checkAddWithMethodSignature(method, eventType);
            }
        }

    // 生成的methodkey作为key,存储声明这个method的类
    final Map<String, Class> subscriberClassByMethodKey = new HashMap<>();

    private boolean checkAddWithMethodSignature(Method method, Class<?> eventType) {
            methodKeyBuilder.setLength(0);
            methodKeyBuilder.append(method.getName());
            // 方法的key形如:onMessageEvent>MessageTest
            methodKeyBuilder.append('>').append(eventType.getName());
            

            String methodKey = methodKeyBuilder.toString();
            // MainActivity.class 
            Class<?> methodClass = method.getDeclaringClass();
            // onMessageEvent>MessageTest = MainActivity.class
            Class<?> methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass);
            // 该方法生成的key已经有对应类了,唯一一种情况就是子类覆盖了父类的订阅方法
            if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) {
                // Only add if not already found in a sub class
                return true;
            } else {
                // Revert the put, old class is further down the class hierarchy
                subscriberClassByMethodKey.put(methodKey, methodClassOld);
                return false;
            }
        }

第二种方式:findUsingInfo

这种需要编译时注解处理器的配合才能完成,速度更快,编译处理器处理完@Subcribe注解,将注解参数用SubscriberMethodInfo类型包装,再和订阅类型一起包装成SimpleSubscriberInfo,以订阅类型作为key,将SimpleSubscriberInfo存入到生成的MyEventBusIndexSUBSCRIBER_INDEX中。

private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
    FindState findState = prepareFindState();
    findState.initForSubscriber(subscriberClass);
    while (findState.clazz != null) {
        // @1 获取SubscriberInfo
        findState.subscriberInfo = getSubscriberInfo(findState);
        if (findState.subscriberInfo != null) {
            // @2 从SubscriberInfo中获取消费方法数组
            SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
            for (SubscriberMethod subscriberMethod : array) {
                if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
                    findState.subscriberMethods.add(subscriberMethod);
                }
            }
        } else {
            findUsingReflectionInSingleClass(findState);
        }
        findState.moveToSuperclass();
    }
    return getMethodsAndRelease(findState);
}

SimpleSubscriberInfo设计

public class SimpleSubscriberInfo extends AbstractSubscriberInfo {
    // 注意到这个消费方法数组在生成的索引类会塞入数据
    private final SubscriberMethodInfo[] methodInfos;

    public SimpleSubscriberInfo(Class subscriberClass, boolean shouldCheckSuperclass, SubscriberMethodInfo[] methodInfos) {
        super(subscriberClass, null, shouldCheckSuperclass);
        this.methodInfos = methodInfos;
    }

    /**
     * 将SubscriberMethodInfo转换成SubscriberMethod
     */
    @Override
    public synchronized SubscriberMethod[] getSubscriberMethods() {
        int length = methodInfos.length;
        SubscriberMethod[] methods = new SubscriberMethod[length];
        for (int i = 0; i < length; i++) {
            SubscriberMethodInfo info = methodInfos[i];
            methods[i] = createSubscriberMethod(info.methodName, info.eventType, info.threadMode,
                    info.priority, info.sticky);
        }
        return methods;
    }
}
第一步

增加注解处理器,配置生成类的名称

defaultConfig {
      javaCompileOptions {
            annotationProcessorOptions {
                arguments = [eventBusIndex: 'com.liminghuang.demo.MyEventBusIndex']
            }
        }
}

    def eventbus_version = '3.2.0'
    implementation "org.greenrobot:eventbus:$eventbus_version"
    // 主要是添加注解处理器
    annotationProcessor "org.greenrobot:eventbus-annotation-processor:$eventbus_version"
第二步

注解处理器生成的类的内容

public class MyEventBusIndex implements SubscriberInfoIndex {
    private static final Map<Class<?>, SubscriberInfo> SUBSCRIBER_INDEX;

    static {
        SUBSCRIBER_INDEX = new HashMap<Class<?>, SubscriberInfo>();

        putIndex(new SimpleSubscriberInfo(MainActivity.class, true, new SubscriberMethodInfo[] {
            new SubscriberMethodInfo("onMessage", TestMessage.class, ThreadMode.POSTING, 0, true),
            new SubscriberMethodInfo("onNotify", Object.class),
        }));

    }

    private static void putIndex(SubscriberInfo info) {
        SUBSCRIBER_INDEX.put(info.getSubscriberClass(), info);
    }

    @Override
    public SubscriberInfo getSubscriberInfo(Class<?> subscriberClass) {
        SubscriberInfo info = SUBSCRIBER_INDEX.get(subscriberClass);
        if (info != null) {
            return info;
        } else {
            return null;
        }
    }
}
第三步

添加索引加速,替换默认EventBus实例

EventBus.builder().addIndex(new MyEventBusIndex()).installDefaultEventBus();

SubscriberInfoIndex接口

public interface SubscriberInfoIndex {
    SubscriberInfo getSubscriberInfo(Class<?> subscriberClass);
}

SubscriberInfo接口

public interface SubscriberInfo {
    Class<?> getSubscriberClass();

    SubscriberMethod[] getSubscriberMethods();

    SubscriberInfo getSuperSubscriberInfo();

    boolean shouldCheckSuperclass();
}

1.2 订阅Subscribe

将事件消费方法注册到总线上

  1. subscriptionsByEventType使用eventType(事件类型)作为key,来管理该事件所有订阅关系,将订阅者subscriber及其消费方法SubscriberMethod包装成该事件的一条订阅关系Subscription
  2. 将某一事件在某个订阅类中的一条订阅关系注册到总线上,通过判断是否已注册来防止二次注册
  3. 根据消费方法上标注的优先级,插入到数组的相应位置上
  4. typesBySubscriber订阅类实例作为key,存储其内部订阅的所有事件的类型
  5. 针对粘性事件,在订阅时需要立即分发一次
// 通过使用eventType作为key,来管理所有订阅关系,即事件总线自身
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;

// 以订阅类subscriber为key,管理其内部所有的订阅事件类型
private final Map<Object, List<Class<?>>> typesBySubscriber;

// 粘性事件存储于此,以事件类型作为key,事件实例作为value存储
private final Map<Class<?>, Object> stickyEvents;

    // Must be called in synchronized block
    private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
        // 订阅的事件类型
        Class<?> eventType = subscriberMethod.eventType;
        // 将订阅类实例和消费方法组装一起,构建一条订阅关系
        Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
        
        // subscriptionsByEventType以事件类型作为key,存储所有订阅类实例对该事件的订阅关系
        CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
        
        // 该事件尚无任何订阅者,先创建存储订阅关系的容器
        if (subscriptions == null) {
            subscriptions = new CopyOnWriteArrayList<>();
            subscriptionsByEventType.put(eventType, subscriptions);
        } else {
            // 该事件已创建订阅关系存储容器,判断订阅关系是否已注册
            if (subscriptions.contains(newSubscription)) {
                throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                        + eventType);
            }
        }

        int size = subscriptions.size();
        for (int i = 0; i <= size; i++) {
            // 根据优先级在数组中寻找合适位置插入,或者一直寻找到数组末端,只能插入在最末端
            if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
                subscriptions.add(i, newSubscription);
                break;
            }
        }

        // 以订阅类实例为key,存储其内部订阅的所有事件的类型
        List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
        if (subscribedEvents == null) {
            subscribedEvents = new ArrayList<>();
            typesBySubscriber.put(subscriber, subscribedEvents);
        }
        subscribedEvents.add(eventType);

        // 粘性事件处理
        if (subscriberMethod.sticky) {
            if (eventInheritance) {// 默认为true
                // Existing sticky events of all subclasses of eventType have to be considered.
                // Note: Iterating over all events may be inefficient with lots of sticky events,
                // thus data structure should be changed to allow a more efficient lookup
                // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
                Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
                for (Map.Entry<Class<?>, Object> entry : entries) {
                    Class<?> candidateEventType = entry.getKey();
                    if (eventType.isAssignableFrom(candidateEventType)) {
                        Object stickyEvent = entry.getValue();
                        checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                    }
                }
            } else {
                // 根据事件类型获取粘性事件实例,分发给订阅关系,执行回调
                Object stickyEvent = stickyEvents.get(eventType);
                checkPostStickyEventToSubscription(newSubscription, stickyEvent);
            }
        }
    }


private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {
        if (stickyEvent != null) {
            // If the subscriber is trying to abort the event, it will fail (event is not tracked in posting state)
            // --> Strange corner case, which we don't take care of here.
            postToSubscription(newSubscription, stickyEvent, isMainThread());
        }
    }

Subscription订阅关系

  1. active表示订阅关系的注册状态,unregister之后失效,成为false,active会在事件发布后被消费时被检查,若是false,此时unregister的功能就实现了。
  2. 订阅关系equals比较通过SubscriberMethod#equals来实现
final class Subscription {
    final Object subscriber;// 订阅类
    final SubscriberMethod subscriberMethod;// 消费方法
    /**
     * Becomes false as soon as {@link EventBus#unregister(Object)} is called, which is checked by queued event delivery
     * {@link EventBus#invokeSubscriber(PendingPost)} to prevent race conditions.
     */
    volatile boolean active;

    Subscription(Object subscriber, SubscriberMethod subscriberMethod) {
        this.subscriber = subscriber;
        this.subscriberMethod = subscriberMethod;
        active = true;
    }

    @Override
    public boolean equals(Object other) {
        if (other instanceof Subscription) {
            Subscription otherSubscription = (Subscription) other;
            // 同一个订阅实例,同样的消费方法
            return subscriber == otherSubscription.subscriber
                    && subscriberMethod.equals(otherSubscription.subscriberMethod);
        } else {
            return false;
        }
    }

    @Override
    public int hashCode() {
        return subscriber.hashCode() + subscriberMethod.methodString.hashCode();
    }
}

2. 事件发布

2.1 postSticky示例

先熟悉一个后续即将会被使用到的类的设计,PostingThreadState设计被用来记录管理一个事件一次发布的流程

final static class PostingThreadState {
        final List<Object> eventQueue = new ArrayList<>();
        boolean isPosting;
        boolean isMainThread; 
        Subscription subscription; 
        Object event;
        boolean canceled;
    }

PostingThreadState成员属性列表:

  • eventQueue:事件实例缓存队列
  • isPosting:标识是否正在发布中(一个事件发布到总线,到各个订阅部分执行完成需要一定的时间,做状态标识)
  • isMainThread:标识发布是否在主线程触发
  • subscription:标识当前正在消费的一条订阅关系是哪一个
  • event:标识正在发布的事件
  • canceled:表示发布是否取消,取消的话整个发布过程会中止,尚未开始被消费的订阅关系失效

2.1.1 postSticky粘性事件发布流程:

  1. 存储粘性事件实例,以事件类型作为key,事件实例作为value存储
  2. 发布事件
    public void postSticky(Object event) {
        synchronized (stickyEvents) {
            stickyEvents.put(event.getClass(), event);
        }
        // Should be posted after it is putted, in case the subscriber wants to remove immediately
        post(event);
    }
3. 事件**先提交到事件队列**,触发发布动作后,从队头中依序取出进行发布
    // 采用ThreadLocal为每个线程中保存对应的发布状态
    private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
        @Override
        protected PostingThreadState initialValue() {
            return new PostingThreadState();
        }
    };


    public void post(Object event) {
        // (线程隔离的)发布流程记录
        PostingThreadState postingState = currentPostingThreadState.get();
        List<Object> eventQueue = postingState.eventQueue;
        eventQueue.add(event);// 事件入队先缓存着,因为不一定能触发发布动作

        // 整个发布需要一个过程,发布过程还未结束时不可触发新的发布过程
        if (!postingState.isPosting) {
            postingState.isMainThread = isMainThread();
            postingState.isPosting = true;
            
            if (postingState.canceled) {
                throw new EventBusException("Internal error. Abort state was not reset");
            }
            
            // 从队头按序取出发布事件开始发布
            try {
                while (!eventQueue.isEmpty()) {
                    postSingleEvent(eventQueue.remove(0), postingState);
                }
            } finally {
                postingState.isPosting = false;
                postingState.isMainThread = false;
            }
        }
    }

​ 4. 根据事件类型,从总线中取出所有订阅关系,按个遍历调用。

private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
        // 从总线上获取该事件所有的订阅关系
        CopyOnWriteArrayList<Subscription> subscriptions;
        synchronized (this) {
            subscriptions = subscriptionsByEventType.get(eventClass);
        }
    
        if (subscriptions != null && !subscriptions.isEmpty()) {
            // 遍历订阅关系,以便对他们按个进行回调 
            for (Subscription subscription : subscriptions) {
                // 标记当前处理的事件和订阅关系
                postingState.event = event;
                postingState.subscription = subscription;
                
                boolean aborted;
                try {
                    postToSubscription(subscription, event, postingState.isMainThread);
                    // 发布过程中标记了canceled取消状态,则中止发布过程
                    aborted = postingState.canceled;
                } finally {
                    // 状态清理复原
                    postingState.event = null;
                    postingState.subscription = null;
                    postingState.canceled = false;
                }
                
                // 事件中止分发,停止遍历
                if (aborted) {
                    break;
                }
            }
            return true;
        }
        return false;
    }
  1. 根据发布所在的线程环境,切换到对应的线程中执行
  • POSTING: 在发布的线程中直接执行
  • MAIN:在主线程执行,若发布线程是主线程的话则直接执行;否则通过HandlerPoster发送到主线程处理
  • BACKGROUND:在工作线程中执行,若发布线程已经是子线程的话则直接执行;否则通过BackgroundPoster发送到子线程执行
  • ASYNC:无论如何创建一个子线程异步执行
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        // 判断消费方法需要执行的线程环境
        switch (subscription.subscriberMethod.threadMode) {
            // 在发布线程中执行
            case POSTING:
                invokeSubscriber(subscription, event);
                break;
            // 发送线程是主线程,则直接执行;否则通过HandlerPoster发送到主线程处理
            case MAIN:
                if (isMainThread) {
                    invokeSubscriber(subscription, event);
                } else {
                    mainThreadPoster.enqueue(subscription, event);
                }
                break;
            case MAIN_ORDERED:
                if (mainThreadPoster != null) {
                    mainThreadPoster.enqueue(subscription, event);
                } else {
                    // temporary: technically not correct as poster not decoupled from subscriber
                    invokeSubscriber(subscription, event);
                }
                break;
            // 在工作线程中执行,本身已经是子线程则直接执行;否则通过`BackgroundPoster`发送到子线程执行
            case BACKGROUND:
                if (isMainThread) {
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    invokeSubscriber(subscription, event);
                }
                break;
            // 无论如何创建一个子线程异步执行
            case ASYNC:
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
        }
    }
  1. 反射调用消费体
// 反射调用SubscriberMethod.method
void invokeSubscriber(Subscription subscription, Object event) {
        try {
            subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
        } catch (InvocationTargetException e) {
            handleSubscriberException(subscription, event, e.getCause());
        } catch (IllegalAccessException e) {
            throw new IllegalStateException("Unexpected exception", e);
        }
    }

2.1.2 注意

backgroundPoster将事件入队后,有机会触发到开启执行开关executorRunning(如果还没开启的话),会将此刻事件队列中所有的background事件都开始在线程池调度线程去执行回调。如果回调时刻,订阅类已经unregister了,则事件不会真正执行回调逻辑。其余AsyncPosterHandlerPoster也是类似会判断active状态。

void invokeSubscriber(PendingPost pendingPost) {
        Object event = pendingPost.event;
        Subscription subscription = pendingPost.subscription;
        PendingPost.releasePendingPost(pendingPost);
        if (subscription.active) {// 订阅类取消注册时会改为false
            invokeSubscriber(subscription, event);
        }
    }

2.2 线程池

使用线程池的两个分发器AsyncPosterBackgroundPoster,是EventBus的两个final的成员常量属性,EventBus是单例,即可认为这两个也是单例存在。主线程的发布器可能是一个HandlerPoster,也可能没有

1.1 AsyncPoster

  1. 内部使用的消息队列是PendingPostQueue,每一个类型的分发器包含一个独立的队列
  2. Poster接口定义了一个总线事件入队的方式enqueue
class AsyncPoster implements Runnable, Poster {

    private final PendingPostQueue queue;
    private final EventBus eventBus;

    AsyncPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();// 独立的消息队列
    }

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        queue.enqueue(pendingPost);
        eventBus.getExecutorService().execute(this);
    }

    @Override
    public void run() {
        PendingPost pendingPost = queue.poll();
        if(pendingPost == null) {
            throw new IllegalStateException("No pending post available");
        }
        eventBus.invokeSubscriber(pendingPost);
    }

}

1.2 PendingPostQueue

  1. 自定义的一个线程安全的消息队列,通过链表实现,包含headtail两个指针
  2. poll提供一种超时获取的机制
final class PendingPostQueue {
    private PendingPost head;
    private PendingPost tail;

    synchronized void enqueue(PendingPost pendingPost) {
        if (pendingPost == null) {
            throw new NullPointerException("null cannot be enqueued");
        }
        if (tail != null) {
            tail.next = pendingPost; // 入队
            tail = pendingPost;
        } else if (head == null) {// 队列当前为空,第一个新元素入列
            head = tail = pendingPost;
        } else {
            throw new IllegalStateException("Head present, but no tail");
        }
        notifyAll();// 唤醒获取的线程
    }

    synchronized PendingPost poll() {
        PendingPost pendingPost = head;
        if (head != null) {
            head = head.next;
            if (head == null) {// 空消息队列,head和tail指向null
                tail = null;
            }
        }
        return pendingPost;
    }

    synchronized PendingPost poll(int maxMillisToWait) throws InterruptedException {
        if (head == null) {// 空队列等待
            wait(maxMillisToWait);
        }
        return poll();
    }

}

1.3 PendingPost

队列中的元素定义,至少包含next指针,指向下一个元素,形成单项链表。可以说这个结构模仿了Message的设计,同时也包含了消息池复用

final class PendingPost {
    private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();

    Object event;
    Subscription subscription;
    PendingPost next;

    private PendingPost(Object event, Subscription subscription) {
        this.event = event;
        this.subscription = subscription;
    }

    static PendingPost obtainPendingPost(Subscription subscription, Object event) {
        synchronized (pendingPostPool) {
            int size = pendingPostPool.size();
            if (size > 0) {
                PendingPost pendingPost = pendingPostPool.remove(size - 1);
                pendingPost.event = event;
                pendingPost.subscription = subscription;
                pendingPost.next = null;
                return pendingPost;
            }
        }
        return new PendingPost(event, subscription);
    }

    static void releasePendingPost(PendingPost pendingPost) {
        pendingPost.event = null;
        pendingPost.subscription = null;
        pendingPost.next = null;
        synchronized (pendingPostPool) {
            // Don't let the pool grow indefinitely
            if (pendingPostPool.size() < 10000) {
                pendingPostPool.add(pendingPost);
            }
        }
    }

}

3. 取消注册

  1. 从总线上根据事件类型,从subscriptionsByEventType移除以该订阅类形成的订阅关系
  2. 一个订阅类内包含多个订阅事件类型,所以从typesBySubscriber移除订阅类订阅的所有事件类型
public synchronized void unregister(Object subscriber) {
    List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
    if (subscribedTypes != null) {
        for (Class<?> eventType : subscribedTypes) {
            // 1. 从总线上移除该订阅类和该事件的订阅关系
            unsubscribeByEventType(subscriber, eventType);
        }
        // 2. 移除订阅类和其订阅事件的联系
        typesBySubscriber.remove(subscriber);
    } else {
        logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
    }
}
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
    List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
    if (subscriptions != null) {
        int size = subscriptions.size();
        for (int i = 0; i < size; i++) {
            Subscription subscription = subscriptions.get(i);
            if (subscription.subscriber == subscriber) {// 匹配订阅者
                subscription.active = false;
                subscriptions.remove(i);
                i--;
                size--;
            }
        }
    }
}

4. 总结

事件的注册和发布关系归结如下

event -> subscription(subscriber + subscribermethod) ->subscribermethod ->method

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,542评论 6 504
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,822评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,912评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,449评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,500评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,370评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,193评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,074评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,505评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,722评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,841评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,569评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,168评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,783评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,918评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,962评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,781评论 2 354

推荐阅读更多精彩内容