前言
本文会通过阅读 EventBus 源码的方式分析订阅者注册、反注册、事件发送、粘性事件发送的过程。
依赖
implementation 'org.greenrobot:eventbus:3.1.1'
基本使用
1. 定义事件
public class MessageEvent {
private String msg;
public MessageEvent(String msg) {
this.msg = msg;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
}
2. 订阅者注册和反注册
public class EventBusActivity extends AppCompatActivity {
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_event_bus);
EventBus.getDefault().register(this);
...
}
@Override
protected void onDestroy() {
super.onDestroy();
EventBus.getDefault().unregister(this);
}
...
}
3. 订阅者定义订阅方法
public class EventBusActivity extends AppCompatActivity {
...
// 接收普通事件
@Subscribe(threadMode = ThreadMode.MAIN)
public void getMsg(MessageEvent event) {
}
// 接收普通事件以及粘性事件
@Subscribe(threadMode = ThreadMode.MAIN, sticky = true)
public void getStickyMsg(MessageEvent event) {
}
}
4. 发送事件
EventBus.getDefault().post(new MessageEvent("你好我是Msg"));
EventBus.getDefault().postSticky(new MessageEvent("你好我是粘性Msg"));
粘性事件指的是在事件发送之后注册的订阅者依然能够接收到的事件类型。
首先了解 EventBus.getDefault():
// EventBus,java
static volatile EventBus defaultInstance;
public static EventBus getDefault() {
if (defaultInstance == null) {
synchronized (EventBus.class) {
if (defaultInstance == null) {
defaultInstance = new EventBus();
}
}
}
return defaultInstance;
}
里面使用了 DCL 双检查锁单例模式,返回 EventBus 对象。
接着看构造方法
// EventBus,java
private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder();
public EventBus() {
this(DEFAULT_BUILDER);
}
EventBus(EventBusBuilder builder) {
logger = builder.getLogger();
// Key 为事件类型 Value 为与事件类型关联的订阅者和订阅方法集合的 HashMap
subscriptionsByEventType = new HashMap<>();
// Key 为订阅者 Value 为与订阅者关联的事件类型集合的 HashMap
typesBySubscriber = new HashMap<>();
// 存放所有粘性事件的 HashMap
stickyEvents = new ConcurrentHashMap<>();
mainThreadSupport = builder.getMainThreadSupport();
// 支持主线程的事件发射器
mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
// 支持后台发送的事件发射器
backgroundPoster = new BackgroundPoster(this);
// 支持异步发送的事件发射器
asyncPoster = new AsyncPoster(this);
indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
builder.strictMethodVerification, builder.ignoreGeneratedIndex);
logSubscriberExceptions = builder.logSubscriberExceptions;
logNoSubscriberMessages = builder.logNoSubscriberMessages;
sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
throwSubscriberException = builder.throwSubscriberException;
eventInheritance = builder.eventInheritance;
executorService = builder.executorService;
}
EventBus 的构造使用了 Builder 建造者模式,由 EventBusBuilder 组建,然后把 Builder 传到 EventBus 构造方法中初始化 EventBus 。
1.注册
register()
// EventBus,java
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
// 查找订阅者的订阅方法集合
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
// 遍历订阅方法集合注册到 EventBus 中
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
注册流程分为两步:
根据订阅者的 Class 获取订阅者所有带有 @Subscribe 注解的方法的集合 subscriberMethods 。
遍历订阅方法集合调用 subscribe() 执行真正的订阅流程。
1.1 查找订阅者订阅方法集合
findSubscriberMethods()
// SubscriberMethodFinder.java
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}
if (ignoreGeneratedIndex) {
subscriberMethods = findUsingReflection(subscriberClass);
} else {
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
先尝试从缓存中获取方法集合,若缓存中不存在,就判断标志位 ignoreGeneratedIndex ,ignoreGeneratedIndex 与 EventBus 3.0 新特性的 Subscriber Index 有关,本文暂时不做介绍,可能会在后续文章中介绍。ignoreGeneratedIndex 默认为 false ,所以会调用 findUsingInfo() 获取方法集合,然后存到缓存中并返回。
findUsingInfo()
// SubscriberMethodFinder.java
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
// 默认情况下 getSubscriberInfo(findState) 返回 null
// 需要设置 EventBusIndex 才能获取到订阅者信息 subscriberInfo
findState.subscriberInfo = getSubscriberInfo(findState);
if (findState.subscriberInfo != null) {
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
findUsingReflectionInSingleClass(findState);
}
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}
while 循环是为了遍历订阅者及其父类获取订阅方法集合,这里说的父类并不包括 Java 包以及 Java 扩展包和 Android 包下的类,具体可看 moveToSuperclass() 代码:
// SubscriberMethodFinder.java void moveToSuperclass() { if (skipSuperClasses) { clazz = null; } else { clazz = clazz.getSuperclass(); String clazzName = clazz.getName(); /** Skip system classes, this just degrades performance. */ if (clazzName.startsWith("java.") || clazzName.startsWith("javax.") || clazzName.startsWith("android.")) { clazz = null; } } }
默认情况下 findUsingInfo() 最终会调用 findUsingReflectionInSingleClass()
findUsingReflectionInSingleClass()
// SubscriberMethodFinder.java
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
// 获取订阅者所有方法
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
methods = findState.clazz.getMethods();
findState.skipSuperClasses = true;
}
// 遍历方法集合
for (Method method : methods) {
int modifiers = method.getModifiers();
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
Class<?>[] parameterTypes = method.getParameterTypes();
// 由于 EventBus 规定订阅方法只有一个参数,所以这里只看 parameterTypes.length == 1
if (parameterTypes.length == 1) {
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
// 获取事件的 Class
Class<?> eventType = parameterTypes[0];
if (findState.checkAdd(method, eventType)) {
// 获取 ThreadMode
ThreadMode threadMode = subscribeAnnotation.threadMode();
// 添加到 findState.subscriberMethods 中保存
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}
}
}
首先获取订阅者所有方法,然后遍历这些方法并找到有 @Subscribe 注解的方法,并获取该方法接收的事件类型 eventType 和 ThreadMode ,然后保存在 findState.subscriberMethod 中。
现在回到 findUsingInfo 看最后一行代码 return getMethodsAndRelease(findState);
getMethodsAndRelease()
// SubscriberMethodFinder.java
private List<SubscriberMethod> getMethodsAndRelease(FindState findState) {
List<SubscriberMethod> subscriberMethods = new ArrayList<>(findState.subscriberMethods);
findState.recycle();
synchronized (FIND_STATE_POOL) {
for (int i = 0; i < POOL_SIZE; i++) {
if (FIND_STATE_POOL[i] == null) {
FIND_STATE_POOL[i] = findState;
break;
}
}
}
return subscriberMethods;
}
getMethodsAndRelease() 就是从 FindState 中把订阅方法集合变成一个 ArrayList ,然后回收这个 findState 放回 findState 缓存池中,并返回这个 ArrayList 。
getMethodsAndRelease() 是 findSubscriberMethods() 查找订阅者订阅方法集合的方法调用链最后一个方法,所以它的作用是返回订阅者所有订阅方法的集合,以及回收查找方法时用到的 findState 对象。
1.2 订阅者订阅过程
subscribe()
// EventBus.java
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
// 获取事件类型
Class<?> eventType = subscriberMethod.eventType;
// 根据订阅者和订阅方法创建新的 Subscription
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
// 根据事件类型获取 subscriptions 的集合
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
// 若 subscriptions 不存在就创建并保存到 subscriptionsByEventType
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
// 如果新的 Subscription 已经存在即重复注册,抛出异常
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}
int size = subscriptions.size();
// 遍历 subscriptions 集合根据优先级把新订阅的新的 Subscription 添加到 subscriptions
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
// 根据订阅者获取与之关联的事件类型 subscribedEvents 集合
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
// 若 subscribedEvents 不存在就创建并保存到 typesBySubscriber
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
// 把事件类型添加到 subscribedEvents
subscribedEvents.add(eventType);
// 当订阅方法被设置为可以处理粘性事件时
if (subscriberMethod.sticky) {
if (eventInheritance) {
// 遍历保存所有粘性事件的 Set
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
// 判断粘性事件的 Class 与订阅方法的事件类型 Class 是否相同
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
根据上面的注解我们可以知道 subscribe() 主要做三件事情:
- 首先把订阅者 subscriber 和订阅方法 subscriberMethod 封装为 Subscription ,然后把 Subscription 根据优先级排序添加到具有相同 EventType 的 Subscription 集合 subscriptions 中,并把 subscriptions 保存到 subscriptionsByEventType<Class<事件类型>,Subscription集合> 中。
- 把新的事件类型添加到保存以订阅者为 Key ,与订阅者关联的事件类型集合为 Value 的 Map typesBySubscriber 中。
- 如果该订阅方法支持处理粘性事件,就会从粘性事件集合中获取他能处理的粘性事件调用 checkPostStickyEventToSubscription() 发送给该订阅者的该订阅方法。
subscribe() 是真正执行注册的方法。
checkPostStickyEventToSubscription()
private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {
if (stickyEvent != null) {
postToSubscription(newSubscription, stickyEvent, isMainThread());
}
}
checkPostStickyEventToSubscription() 内部会调用 postToSubscription() 且 postToSubscription() 会在下文事件发送时分析。
2. 事件发送
post()
// EventBus.java
public void post(Object event) {
// 从 ThreadLocal 中获取 PostingThreadState
// PostingThreadState 持有事件队列和线程状态
PostingThreadState postingState = currentPostingThreadState.get();
// 获取事件队列
List<Object> eventQueue = postingState.eventQueue;
// 把事件添加到事件队列中
eventQueue.add(event);
// 判断当先线程是都正在发送事件
// 确保并发调用 post 时只有一个在遍历事件队列并发送事件
if (!postingState.isPosting) {
// 设置标志位
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
// 遍历事件队列
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
// 恢复标志位
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
首先获取当前线程的事件队列,然后把需要发送的事件添加到队列中。当当前线程没有在发送事件的时候会遍历事件队列并调用 postSingleEvent() 发送事件直到事件队列中没有事件为止。
值得一提的是 isPosting 标志位的目的是为了在 post() 并发调用时,确保只有一个调用在遍历事件队列并发送所有事件,其余调用只能把事件添加到队列中就返回,这样可以保证事件不会被多次发送。
postSingleEvent()
// EventBus.java
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) {
// 根据事件类型获取该事件类型及其父类的类型集合
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
...
}
postSingleEvent() 首先判断标志位 eventInheritance 确定是否需要兼容事件类型的父类,若是就把事件类型以及其父类都放在集合中然后遍历 调用 postSingleEventForEventType(),若否就直接调用 postSingleEventForEventType()。
postSingleEventForEventType()
// EventBus.java
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
// 根据事件类型获取 Subscription 集合
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
首先利用 subscriptionsByEventType 根据事件类型获取 Subscription 集合,即获取所有接收该事件类型的订阅方法和对应的订阅者,关于 Subscription 集合保存的过程在上面介绍 subscribe() 中,获取到 Subscription 集合后就遍历集合并调用 postToSubscription()。
postToSubscription()
// EventBus.java
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED: //
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
postToSubscription 根据订阅方法的 threadMode 进行区分处理:
- POSTING : 直接在当前线程调用 invokeSubscriber()
- MAIN :
- 如果当前线程是主线程就直接调用 invokeSubscriber()
- 如果当前线程不是主线程就调用 mainThreadPoster.enqueue(subscription, event),内部调用主线程的 Handler 在主线程中调用 invokeSubscriber()
- MAIN_ORDERED :
- 如果 mainThreadPoster 不为空调用 mainThreadPoster.enqueue(subscription, event)
- 如果 mainThreadPoster 为空直接调用 invokeSubscriber()
- BACKGROUND :
- 如果当前线程是主线程,就调用 backgroundPoster.enqueue(subscription, event),内部使用了线程池,在子线程中调用 invokeSubscriber()
- 如果当前线程不是主线程,就直接调用 invokeSubscriber()
- ASYNC : 调用 asyncPoster.enqueue(subscription, event),内部使用了线程池在子线程中调用 invokeSubscriber()
总体来说就是针对不同的 threadMode 在主线程或者子线程中调用 invokeSubscriber()。
invokeSubscriber()
// EventBus.java
void invokeSubscriber(Subscription subscription, Object event) {
try {
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
其实就是调用 Method.invoke() 回调了订阅者的订阅方法。
3. 发送粘性事件
// EventBus.java
public void postSticky(Object event) {
synchronized (stickyEvents) {
stickyEvents.put(event.getClass(), event);
}
post(event);
}
粘性事件很简单,就是把事件添加到 stickyEvents 中,然后像常规事件发送一样调用 post(),后面注册的订阅者接收该粘性事件的过程在上文分析 subscribe() 中。
4. 反注册订阅者
unregister()
// EventBus.java
public synchronized void unregister(Object subscriber) {
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
for (Class<?> eventType : subscribedTypes) {
unsubscribeByEventType(subscriber, eventType);
}
typesBySubscriber.remove(subscriber);
} else {
logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
int size = subscriptions.size();
for (int i = 0; i < size; i++) {
Subscription subscription = subscriptions.get(i);
if (subscription.subscriber == subscriber) {
subscription.active = false;
subscriptions.remove(i);
i--;
size--;
}
}
}
}
还记得注册过程核心流程在 subscribe() 中,且主要针对两个 Map 做了保存操作,即:
typesBySubscriber :订阅者为 Key,事件类型集合为 Value
subscriptionsByEventType :事件类型为 Key,订阅者集合为 Value
反注册也很简单,只需要在这两个 Map 中删除与订阅者相关的元素即可。
总结
EventBus 注册流程: 先找到订阅者所有有 @Subscribe 注解的方法,然后把订阅者和订阅方法保存在 Map 中。
EventBus 使用了 Handler 在主线程中调用订阅方法。
EventBus 使用了线程池在子线程中调用订阅方法。
EventBus 调用订阅方法原理是 Method.invoke()。
EventBus 粘性事件原理:就是用一个 Map 保存粘性事件,每当新的订阅者注册时候会检查其所有订阅方法能否处理粘性事件并从集合中找到对应的粘性事件发送给新的订阅者。