EventBus解析(v3.2.0)
首先理清几个关系:
- 一个订阅类内可以订阅多个事件,所以订阅类和事件的关系是1->N
- 同一个事件,订阅类的继承结构上可能出现在父类和子类订阅同一个事件
- 一个事件在一个应用内可以被多个类所订阅,所以事件和订阅类的关系是1->N
1. 注册订阅
EventBus#register
- 查找订阅对象内的事件消费方法,包装成SubscriberMethod(包括方法对象Method,ThreadMode,eventType的Class,优先级,是否粘性)
- 将消费方法
SubscriberMethod
实体注册到总线上
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();// 获取订阅类型
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);// 查找订阅实体内的消费方法
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
SubscriberMethod成员属性设计
public class SubscriberMethod {
final Method method;
final ThreadMode threadMode;
final Class<?> eventType;
final int priority;
final boolean sticky;
/** Used for efficient comparison */
String methodString;
/**
* equals方法后续会使用到作为判等条件
*/
@Override
public boolean equals(Object other) {
if (other == this) {
return true;
} else if (other instanceof SubscriberMethod) {
checkMethodString();// 获取方法的字符串描述,类似MainActivity#OnNotify(NotifyMessage
SubscriberMethod otherSubscriberMethod = (SubscriberMethod)other;
otherSubscriberMethod.checkMethodString();
// Don't use method.equals because of http://code.google.com/p/android/issues/detail?id=7811#c6
return methodString.equals(otherSubscriberMethod.methodString);
} else {
return false;
}
}
}
1.1 查找消费方法
SubscriberMethodFinder#findSubscriberMethods负责查找SubscriberMethod,缓存是以订阅类型作为key,其内所有事件的消费方法SubscriberMethod列表为value的键值对,具体查找过程如下:
- 从缓存中查找订阅类型所映射的
List<SubscriberMethod>
,查找成功的话,直接返回 - 缓存查找无果,执行查找逻辑
- 查找结果存入缓存并返回查找结果
List<SubscriberMethod>
主要代码如下:
// 缓存的结构
private static final Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
// 1. 从METHOD_CACHE缓存中查找
// 2. 反射订阅类查找
if (ignoreGeneratedIndex) {
// 反射订阅类,查找消费方法的相关订阅参数
subscriberMethods = findUsingReflection(subscriberClass);
} else {
subscriberMethods = findUsingInfo(subscriberClass);
}
// 3. 放入缓存并返回
}
第一种方式:findUsingReflection
反射订阅类的查找方式:
- 获取订阅类内所有声明的方法
- 挨个遍历方法列表
- 判断修饰符是否
public
,且非static
,abstract
- 判断方法的参数个数是否仅一个
- 方法上是否声明
@Subscribe
注解 - 参数类型是否已经添加到收集的集合中
- 读取注解的参数,构建消费方法
SubscriberMethod
实例,并收集存储到findState.subscriberMethods
中 - 返回
findState.subscriberMethods
private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
// 具体反射的逻辑在这个findUsingReflectionInSingleClass方法里
findUsingReflectionInSingleClass(findState);
// 查询超类,如果超类是java、javax或android、androidx系统的类,findState.clazz会被置空,中止循环,跳出循环块
findState.moveToSuperclass();
}
// @8 返回订阅类的消费方法列表,释放查找过程中FindState使用的临时数据,以便复用FindState
return getMethodsAndRelease(findState);
}
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
// This is faster than getMethods, especially when subscribers are fat classes like Activities
// 仅订阅类声明的方法列表,不包括父类上的 @1
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
// Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149 这个bug描述详见改链接地址
try {
methods = findState.clazz.getMethods();
} catch (LinkageError error) { // super class of NoClassDefFoundError to be a bit more broad...
...
throw new EventBusException(msg, error);
}
// 跳过超类内的方法查找
findState.skipSuperClasses = true;
}
// 遍历声明的方法,@2
for (Method method : methods) {
int modifiers = method.getModifiers();
// 方法修饰符校验,只允许pubilc的方法,抽象,静态等不行,@3
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
Class<?>[] parameterTypes = method.getParameterTypes();
// 方法只允许一个参数, @4
if (parameterTypes.length == 1) {
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
// 方法上有Subscribe注解,说明消费方法查找成功 @5
if (subscribeAnnotation != null) {
Class<?> eventType = parameterTypes[0];
// checkAdd控制是否存储以事件类型作为key, Method作为key的键值对 @6
if (findState.checkAdd(method, eventType)) {
ThreadMode threadMode = subscribeAnnotation.threadMode();
// 将所有注解信息等组组合到SubscriberMethod,并存储到列表中 @7
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
}
// 参数个数校验
...
}
// 修饰符校验
...
}
}
FindState校验逻辑
// 以事件类型作为key,方法对象Method作为value存储
final Map<Class, Object> anyMethodByEventType = new HashMap<>();
boolean checkAdd(Method method, Class<?> eventType) {
// 2 level check: 1st level with event type only (fast), 2nd level with complete signature when required.
// Usually a subscriber doesn't have methods listening to the same event type.
Object existing = anyMethodByEventType.put(eventType, method);
// 判断订阅类中同一个事件类型是否存在多个消费方法,仅一个的话返回添加成功
if (existing == null) {
return true;
} else {
if (existing instanceof Method) {
if (!checkAddWithMethodSignature((Method) existing, eventType)) {
// Paranoia check
throw new IllegalStateException();
}
// Put any non-Method object to "consume" the existing Method
anyMethodByEventType.put(eventType, this);
}
return checkAddWithMethodSignature(method, eventType);
}
}
// 生成的methodkey作为key,存储声明这个method的类
final Map<String, Class> subscriberClassByMethodKey = new HashMap<>();
private boolean checkAddWithMethodSignature(Method method, Class<?> eventType) {
methodKeyBuilder.setLength(0);
methodKeyBuilder.append(method.getName());
// 方法的key形如:onMessageEvent>MessageTest
methodKeyBuilder.append('>').append(eventType.getName());
String methodKey = methodKeyBuilder.toString();
// MainActivity.class
Class<?> methodClass = method.getDeclaringClass();
// onMessageEvent>MessageTest = MainActivity.class
Class<?> methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass);
// 该方法生成的key已经有对应类了,唯一一种情况就是子类覆盖了父类的订阅方法
if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) {
// Only add if not already found in a sub class
return true;
} else {
// Revert the put, old class is further down the class hierarchy
subscriberClassByMethodKey.put(methodKey, methodClassOld);
return false;
}
}
第二种方式:findUsingInfo
这种需要编译时注解处理器的配合才能完成,速度更快,编译处理器处理完@Subcribe
注解,将注解参数用SubscriberMethodInfo
类型包装,再和订阅类型一起包装成SimpleSubscriberInfo
,以订阅类型作为key,将SimpleSubscriberInfo
存入到生成的MyEventBusIndex
的SUBSCRIBER_INDEX
中。
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
// @1 获取SubscriberInfo
findState.subscriberInfo = getSubscriberInfo(findState);
if (findState.subscriberInfo != null) {
// @2 从SubscriberInfo中获取消费方法数组
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
findUsingReflectionInSingleClass(findState);
}
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}
SimpleSubscriberInfo设计
public class SimpleSubscriberInfo extends AbstractSubscriberInfo {
// 注意到这个消费方法数组在生成的索引类会塞入数据
private final SubscriberMethodInfo[] methodInfos;
public SimpleSubscriberInfo(Class subscriberClass, boolean shouldCheckSuperclass, SubscriberMethodInfo[] methodInfos) {
super(subscriberClass, null, shouldCheckSuperclass);
this.methodInfos = methodInfos;
}
/**
* 将SubscriberMethodInfo转换成SubscriberMethod
*/
@Override
public synchronized SubscriberMethod[] getSubscriberMethods() {
int length = methodInfos.length;
SubscriberMethod[] methods = new SubscriberMethod[length];
for (int i = 0; i < length; i++) {
SubscriberMethodInfo info = methodInfos[i];
methods[i] = createSubscriberMethod(info.methodName, info.eventType, info.threadMode,
info.priority, info.sticky);
}
return methods;
}
}
第一步
增加注解处理器,配置生成类的名称
defaultConfig {
javaCompileOptions {
annotationProcessorOptions {
arguments = [eventBusIndex: 'com.liminghuang.demo.MyEventBusIndex']
}
}
}
def eventbus_version = '3.2.0'
implementation "org.greenrobot:eventbus:$eventbus_version"
// 主要是添加注解处理器
annotationProcessor "org.greenrobot:eventbus-annotation-processor:$eventbus_version"
第二步
注解处理器生成的类的内容
public class MyEventBusIndex implements SubscriberInfoIndex {
private static final Map<Class<?>, SubscriberInfo> SUBSCRIBER_INDEX;
static {
SUBSCRIBER_INDEX = new HashMap<Class<?>, SubscriberInfo>();
putIndex(new SimpleSubscriberInfo(MainActivity.class, true, new SubscriberMethodInfo[] {
new SubscriberMethodInfo("onMessage", TestMessage.class, ThreadMode.POSTING, 0, true),
new SubscriberMethodInfo("onNotify", Object.class),
}));
}
private static void putIndex(SubscriberInfo info) {
SUBSCRIBER_INDEX.put(info.getSubscriberClass(), info);
}
@Override
public SubscriberInfo getSubscriberInfo(Class<?> subscriberClass) {
SubscriberInfo info = SUBSCRIBER_INDEX.get(subscriberClass);
if (info != null) {
return info;
} else {
return null;
}
}
}
第三步
添加索引加速,替换默认EventBus实例
EventBus.builder().addIndex(new MyEventBusIndex()).installDefaultEventBus();
SubscriberInfoIndex接口
public interface SubscriberInfoIndex {
SubscriberInfo getSubscriberInfo(Class<?> subscriberClass);
}
SubscriberInfo接口
public interface SubscriberInfo {
Class<?> getSubscriberClass();
SubscriberMethod[] getSubscriberMethods();
SubscriberInfo getSuperSubscriberInfo();
boolean shouldCheckSuperclass();
}
1.2 订阅Subscribe
将事件消费方法注册到总线上
-
subscriptionsByEventType
使用eventType
(事件类型)作为key,来管理该事件所有订阅关系,将订阅者subscriber
及其消费方法SubscriberMethod
包装成该事件的一条订阅关系Subscription
- 将某一事件在某个订阅类中的一条订阅关系注册到总线上,通过判断是否已注册来防止二次注册
- 根据消费方法上标注的优先级,插入到数组的相应位置上
-
typesBySubscriber
以订阅类实例作为key,存储其内部订阅的所有事件的类型 - 针对粘性事件,在订阅时需要立即分发一次
// 通过使用eventType作为key,来管理所有订阅关系,即事件总线自身
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
// 以订阅类subscriber为key,管理其内部所有的订阅事件类型
private final Map<Object, List<Class<?>>> typesBySubscriber;
// 粘性事件存储于此,以事件类型作为key,事件实例作为value存储
private final Map<Class<?>, Object> stickyEvents;
// Must be called in synchronized block
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
// 订阅的事件类型
Class<?> eventType = subscriberMethod.eventType;
// 将订阅类实例和消费方法组装一起,构建一条订阅关系
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
// subscriptionsByEventType以事件类型作为key,存储所有订阅类实例对该事件的订阅关系
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
// 该事件尚无任何订阅者,先创建存储订阅关系的容器
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
// 该事件已创建订阅关系存储容器,判断订阅关系是否已注册
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
// 根据优先级在数组中寻找合适位置插入,或者一直寻找到数组末端,只能插入在最末端
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
// 以订阅类实例为key,存储其内部订阅的所有事件的类型
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
// 粘性事件处理
if (subscriberMethod.sticky) {
if (eventInheritance) {// 默认为true
// Existing sticky events of all subclasses of eventType have to be considered.
// Note: Iterating over all events may be inefficient with lots of sticky events,
// thus data structure should be changed to allow a more efficient lookup
// (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
// 根据事件类型获取粘性事件实例,分发给订阅关系,执行回调
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {
if (stickyEvent != null) {
// If the subscriber is trying to abort the event, it will fail (event is not tracked in posting state)
// --> Strange corner case, which we don't take care of here.
postToSubscription(newSubscription, stickyEvent, isMainThread());
}
}
Subscription订阅关系
-
active
表示订阅关系的注册状态,unregister
之后失效,成为false
,active会在事件发布后被消费时被检查,若是false,此时unregister的功能就实现了。 - 订阅关系
equals
比较通过SubscriberMethod#equals
来实现
final class Subscription {
final Object subscriber;// 订阅类
final SubscriberMethod subscriberMethod;// 消费方法
/**
* Becomes false as soon as {@link EventBus#unregister(Object)} is called, which is checked by queued event delivery
* {@link EventBus#invokeSubscriber(PendingPost)} to prevent race conditions.
*/
volatile boolean active;
Subscription(Object subscriber, SubscriberMethod subscriberMethod) {
this.subscriber = subscriber;
this.subscriberMethod = subscriberMethod;
active = true;
}
@Override
public boolean equals(Object other) {
if (other instanceof Subscription) {
Subscription otherSubscription = (Subscription) other;
// 同一个订阅实例,同样的消费方法
return subscriber == otherSubscription.subscriber
&& subscriberMethod.equals(otherSubscription.subscriberMethod);
} else {
return false;
}
}
@Override
public int hashCode() {
return subscriber.hashCode() + subscriberMethod.methodString.hashCode();
}
}
2. 事件发布
2.1 postSticky示例
先熟悉一个后续即将会被使用到的类的设计,PostingThreadState
设计被用来记录管理一个事件一次发布的流程。
final static class PostingThreadState {
final List<Object> eventQueue = new ArrayList<>();
boolean isPosting;
boolean isMainThread;
Subscription subscription;
Object event;
boolean canceled;
}
PostingThreadState成员属性列表:
- eventQueue:事件实例缓存队列
- isPosting:标识是否正在发布中(一个事件发布到总线,到各个订阅部分执行完成需要一定的时间,做状态标识)
- isMainThread:标识发布是否在主线程触发
- subscription:标识当前正在消费的一条订阅关系是哪一个
- event:标识正在发布的事件
- canceled:表示发布是否取消,取消的话整个发布过程会中止,尚未开始被消费的订阅关系失效
2.1.1 postSticky粘性事件发布流程:
- 存储粘性事件实例,以事件类型作为key,事件实例作为value存储
- 发布事件
public void postSticky(Object event) {
synchronized (stickyEvents) {
stickyEvents.put(event.getClass(), event);
}
// Should be posted after it is putted, in case the subscriber wants to remove immediately
post(event);
}
3. 事件**先提交到事件队列**,触发发布动作后,从队头中依序取出进行发布
// 采用ThreadLocal为每个线程中保存对应的发布状态
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
@Override
protected PostingThreadState initialValue() {
return new PostingThreadState();
}
};
public void post(Object event) {
// (线程隔离的)发布流程记录
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);// 事件入队先缓存着,因为不一定能触发发布动作
// 整个发布需要一个过程,发布过程还未结束时不可触发新的发布过程
if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
// 从队头按序取出发布事件开始发布
try {
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
4. 根据事件类型,从总线中取出所有订阅关系,按个遍历调用。
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
// 从总线上获取该事件所有的订阅关系
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
// 遍历订阅关系,以便对他们按个进行回调
for (Subscription subscription : subscriptions) {
// 标记当前处理的事件和订阅关系
postingState.event = event;
postingState.subscription = subscription;
boolean aborted;
try {
postToSubscription(subscription, event, postingState.isMainThread);
// 发布过程中标记了canceled取消状态,则中止发布过程
aborted = postingState.canceled;
} finally {
// 状态清理复原
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
// 事件中止分发,停止遍历
if (aborted) {
break;
}
}
return true;
}
return false;
}
- 根据发布所在的线程环境,切换到对应的线程中执行
- POSTING: 在发布的线程中直接执行
- MAIN:在主线程执行,若发布线程是主线程的话则直接执行;否则通过
HandlerPoster
发送到主线程处理 - BACKGROUND:在工作线程中执行,若发布线程已经是子线程的话则直接执行;否则通过
BackgroundPoster
发送到子线程执行 - ASYNC:无论如何创建一个子线程异步执行
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
// 判断消费方法需要执行的线程环境
switch (subscription.subscriberMethod.threadMode) {
// 在发布线程中执行
case POSTING:
invokeSubscriber(subscription, event);
break;
// 发送线程是主线程,则直接执行;否则通过HandlerPoster发送到主线程处理
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
// 在工作线程中执行,本身已经是子线程则直接执行;否则通过`BackgroundPoster`发送到子线程执行
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
// 无论如何创建一个子线程异步执行
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
- 反射调用消费体
// 反射调用SubscriberMethod.method
void invokeSubscriber(Subscription subscription, Object event) {
try {
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
2.1.2 注意
backgroundPoster
将事件入队后,有机会触发到开启执行开关executorRunning
(如果还没开启的话),会将此刻事件队列中所有的background事件都开始在线程池调度线程去执行回调。如果回调时刻,订阅类已经unregister
了,则事件不会真正执行回调逻辑。其余AsyncPoster
和HandlerPoster
也是类似会判断active
状态。
void invokeSubscriber(PendingPost pendingPost) {
Object event = pendingPost.event;
Subscription subscription = pendingPost.subscription;
PendingPost.releasePendingPost(pendingPost);
if (subscription.active) {// 订阅类取消注册时会改为false
invokeSubscriber(subscription, event);
}
}
2.2 线程池
使用线程池的两个分发器AsyncPoster
和BackgroundPoster
,是EventBus
的两个final
的成员常量属性,EventBus是单例,即可认为这两个也是单例存在。主线程的发布器可能是一个HandlerPoster
,也可能没有
1.1 AsyncPoster
- 内部使用的消息队列是
PendingPostQueue
,每一个类型的分发器包含一个独立的队列 -
Poster
接口定义了一个总线事件入队的方式enqueue
class AsyncPoster implements Runnable, Poster {
private final PendingPostQueue queue;
private final EventBus eventBus;
AsyncPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();// 独立的消息队列
}
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
queue.enqueue(pendingPost);
eventBus.getExecutorService().execute(this);
}
@Override
public void run() {
PendingPost pendingPost = queue.poll();
if(pendingPost == null) {
throw new IllegalStateException("No pending post available");
}
eventBus.invokeSubscriber(pendingPost);
}
}
1.2 PendingPostQueue
- 自定义的一个线程安全的消息队列,通过链表实现,包含
head
和tail
两个指针 -
poll
提供一种超时获取的机制
final class PendingPostQueue {
private PendingPost head;
private PendingPost tail;
synchronized void enqueue(PendingPost pendingPost) {
if (pendingPost == null) {
throw new NullPointerException("null cannot be enqueued");
}
if (tail != null) {
tail.next = pendingPost; // 入队
tail = pendingPost;
} else if (head == null) {// 队列当前为空,第一个新元素入列
head = tail = pendingPost;
} else {
throw new IllegalStateException("Head present, but no tail");
}
notifyAll();// 唤醒获取的线程
}
synchronized PendingPost poll() {
PendingPost pendingPost = head;
if (head != null) {
head = head.next;
if (head == null) {// 空消息队列,head和tail指向null
tail = null;
}
}
return pendingPost;
}
synchronized PendingPost poll(int maxMillisToWait) throws InterruptedException {
if (head == null) {// 空队列等待
wait(maxMillisToWait);
}
return poll();
}
}
1.3 PendingPost
队列中的元素定义,至少包含next
指针,指向下一个元素,形成单项链表。可以说这个结构模仿了Message
的设计,同时也包含了消息池复用
final class PendingPost {
private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();
Object event;
Subscription subscription;
PendingPost next;
private PendingPost(Object event, Subscription subscription) {
this.event = event;
this.subscription = subscription;
}
static PendingPost obtainPendingPost(Subscription subscription, Object event) {
synchronized (pendingPostPool) {
int size = pendingPostPool.size();
if (size > 0) {
PendingPost pendingPost = pendingPostPool.remove(size - 1);
pendingPost.event = event;
pendingPost.subscription = subscription;
pendingPost.next = null;
return pendingPost;
}
}
return new PendingPost(event, subscription);
}
static void releasePendingPost(PendingPost pendingPost) {
pendingPost.event = null;
pendingPost.subscription = null;
pendingPost.next = null;
synchronized (pendingPostPool) {
// Don't let the pool grow indefinitely
if (pendingPostPool.size() < 10000) {
pendingPostPool.add(pendingPost);
}
}
}
}
3. 取消注册
- 从总线上根据事件类型,从
subscriptionsByEventType
移除以该订阅类形成的订阅关系 - 一个订阅类内包含多个订阅事件类型,所以从
typesBySubscriber
移除订阅类订阅的所有事件类型
public synchronized void unregister(Object subscriber) {
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
for (Class<?> eventType : subscribedTypes) {
// 1. 从总线上移除该订阅类和该事件的订阅关系
unsubscribeByEventType(subscriber, eventType);
}
// 2. 移除订阅类和其订阅事件的联系
typesBySubscriber.remove(subscriber);
} else {
logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
int size = subscriptions.size();
for (int i = 0; i < size; i++) {
Subscription subscription = subscriptions.get(i);
if (subscription.subscriber == subscriber) {// 匹配订阅者
subscription.active = false;
subscriptions.remove(i);
i--;
size--;
}
}
}
}
4. 总结
事件的注册和发布关系归结如下
event -> subscription(subscriber + subscribermethod) ->subscribermethod ->method