最近看Elastic-Job源码,看到它里面实现的任务运行轨迹的持久化,使用的是Guava的AsyncEventBus,一个内存级别的异步事件总线服务,实现了简单的生产-消费者模式,从而在不影响任务执行效率的基础上,将任务执行和任务轨迹记录解耦,大大提高了EJ的性能。
EventBus在Elastic-Job中的使用
EventBus的使用方法不难,具体可以参考EJ里面几个相关的类:JobEventListener、JobEventBus和LiteJobFacade。主要的流程如下:
- JobEventListener主要是消费者。定义需要监听的方法,目前主要定义了两个listen方法,注意想监听到的话,需要在方法前加上注解:@Subscribe和@AllowConcurrentEvents。看字面意思就是订阅和允许并发事件。如果不加上后面那个注解,则会导致效率问题,这个咱们后续分析。目前这个接口只有一个实现类JobEventRdbListener,实现了日志写入DB的操作。
- JobEventBus参考的EventBus源码,提供了register和post方法,去掉了unregister方法。主要的功能就是注册监听器和生产消息。他的构造方法中,默认使用的是Guava的AsyncEventBus,初始化中同时包含了注册动作。
- LiteJobFacade主要是JobEventBus的使用者。主要调用的是JobEventBus的post方法。
@Override
public void postJobExecutionEvent(final JobExecutionEvent jobExecutionEvent) {
jobEventBus.post(jobExecutionEvent);
}
@Override
public void postJobStatusTraceEvent(final String taskId, final State state, final String message) {
TaskContext taskContext = TaskContext.from(taskId);
jobEventBus.post(new JobStatusTraceEvent(taskContext.getMetaInfo().getJobName(), taskContext.getId(),
taskContext.getSlaveId(), Source.LITE_EXECUTOR, taskContext.getType(), taskContext.getMetaInfo().getShardingItems().toString(), state, message));
if (!Strings.isNullOrEmpty(message)) {
log.trace(message);
}
}
EventBus源码分析
言归正传,我们来看看EventBus到底是如何实现观察者模式的。他的主要实现类都在com.google.common.eventbus这个包下面。
主要类概念分析
我们首先来看一下里面比较重要的几个类,同时理解一些概念。
- EventBus:这个类的作用有两个,一个是作为一个总线通道,另一个作用是消息的广播。
- AsyncEventBus:异步的EventBus,功能与EventBus类似,只不过实现方式有所差异。
- Subscriber:可以按照字面理解是订阅者,也可以说是监听器。
- SubscriberRegistry:订阅注册表。主要存储的是Subcriber和Event之间的关系,用于消息分发时可以迅速根据Event的类型找到Subscriber。
- Dispatcher:事件分发器,定义了一些分发的策略,里面包含三种分发器。
- 两个重要的注解@Subscribe和@AllowConcurrentEvents。第一个是标识监听器的方法,第二个与第一个配合使用,标识允许多线程执行。
- DeadEvent:死信对象,标识没有订阅者关注的事件。
- SubscribeExceptionHandler:订阅者抛出异常的处理器。SubscribeExceptionContext:订阅者抛出异常的上下文对象。
EventBus
这个类有几个属性:
private final String identifier;//唯一标识,默认为default
private final Executor executor;//多线程处理器,默认MoreExecutors.directExecutor()
private final SubscriberExceptionHandler exceptionHandler;//异常处理器
private final SubscriberRegistry subscribers = new SubscriberRegistry(this);//订阅注册表
private final Dispatcher dispatcher;//消息分发器,默认为Dispatcher.perThreadDispatchQueue(),单线程消息分发队列
其中,identifier表示,同一个应用中,可以根据identifier来区分不同的事件总线,只不过默认为default而已。
EventBus主要定义了几个方法:
注册
public void register(Object object) {
subscribers.register(object);
}
注册的是自己定义的监听器,也就是listener。
取消注册
public void unregister(Object object) {
subscribers.unregister(object);
}
类似于注册。
消息广播
public void post(Object event) {
Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
if (eventSubscribers.hasNext()) {
dispatcher.dispatch(event, eventSubscribers);
} else if (!(event instanceof DeadEvent)) {
// the event had no subscribers and was not itself a DeadEvent
post(new DeadEvent(this, event));
}
}
这块主要是根据event事件类型,来获取事件的订阅者,然后进行事件消息的分发。当然,如果没有订阅者,也就是event的类型是DeadEvent,也会进行对应的处理。
AsyncEventBus
继承自EventBus,主要区别在于分发器,使用的是Dispatcher.legacyAsync()。这个后续咱们再分析。
Subscriber
乍看这个类,就是订阅者,其实我们看源码就能理解,当一个订阅类的多个方法用@Subscribe注解时,每个被注解的方法对应的是一个订阅者。
构造
这个类只是package内可见,没有定义为public,可以通过静态方法create来创建它。
static Subscriber create(EventBus bus, Object listener, Method method) {
return isDeclaredThreadSafe(method)
? new Subscriber(bus, listener, method)
: new SynchronizedSubscriber(bus, listener, method);
}
这里传入的method就是使用了@Subscribe注解的方法,这块会先判断这个方法是否线程安全,即是否使用@AllowConcurrentEvent来进行注解,来创建不同的Subscriber。唯一的差别是SynchronizedSubscriber中一个方法使用了synchronized来修饰。
dispatchEvent
final void dispatchEvent(final Object event) {
executor.execute(
new Runnable() {
@Override
public void run() {
try {
invokeSubscriberMethod(event);
} catch (InvocationTargetException e) {
bus.handleSubscriberException(e.getCause(), context(event));
}
}
});
}
调用多线程来处理event。
invokeSubscriberMethod
@VisibleForTesting
void invokeSubscriberMethod(Object event) throws InvocationTargetException {
try {
method.invoke(target, checkNotNull(event));
} catch (IllegalArgumentException e) {
throw new Error("Method rejected target/argument: " + event, e);
} catch (IllegalAccessException e) {
throw new Error("Method became inaccessible: " + event, e);
} catch (InvocationTargetException e) {
if (e.getCause() instanceof Error) {
throw (Error) e.getCause();
}
throw e;
}
}
调用订阅者的方法。
SubscriberRegistry
我们之前在讲到EventBus时,里面有两个方法register和unregister,调用的就是这个类的方法。这个类的作用也讲到,是存储event和对应的订阅者的关系的。我们来看一下这个类的设计。
属性
private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers =
Maps.newConcurrentMap();
@Weak private final EventBus bus;
这个类有两个属性。
- 第一个是ConcurrentMap,他的键是Class类,也就是Event的类型,值是CopyOnWriteArraySet<Subscriber>,也就是订阅者。这个ConcurrentMap是Guava定义的并发Map,这个后续咱们有机会再分析。
- 第二个属性就是EventBus。
register
注册监听器。
void register(Object listener) {
Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
Class<?> eventType = entry.getKey();
Collection<Subscriber> eventMethodsInListener = entry.getValue();
CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
if (eventSubscribers == null) {
CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>();
eventSubscribers =
MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
}
eventSubscribers.addAll(eventMethodsInListener);
}
}
主要的逻辑是:
- 获取这个类中所有用@Subscribe注解的方法,存储到Multimap中。
- 遍历Multimap,键为eventType,然后根据这个键,从缓存中获取这个事件对应的订阅者集合。
- 获取到之后,判断集合是否为空,如果为空,新建一个集合来存储。
unregister
实现与register类似,先根据listener找到subscriber,找到需要监听的方法,然后根据事件类型去移除subscriber。
findAllSubscribers
获取监听器中所有的监听方法。
private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
Class<?> clazz = listener.getClass();
for (Method method : getAnnotatedMethods(clazz)) {
Class<?>[] parameterTypes = method.getParameterTypes();
Class<?> eventType = parameterTypes[0];
methodsInListener.put(eventType, Subscriber.create(bus, listener, method));
}
return methodsInListener;
}
findAllSubscribers用于查找事件类型以及事件处理器的对应关系。查找注解需要涉及到反射,通过反射来获取标注在方法上的注解。因为Guava针对EventBus的注册采取的是“隐式契约”而非接口这种“显式契约”。而类与接口是存在继承关系的,所有很有可能某个订阅者其父类(或者父类实现的某个接口)也订阅了某个事件。因此这里的查找需要顺着继承链向上查找父类的方法是否也被注解标注。
getSubscribes
获取event的订阅者。
Iterator<Subscriber> getSubscribers(Object event) {
ImmutableSet<Class<?>> eventTypes = flattenHierarchy(event.getClass());
List<Iterator<Subscriber>> subscriberIterators =
Lists.newArrayListWithCapacity(eventTypes.size());
for (Class<?> eventType : eventTypes) {
CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
if (eventSubscribers != null) {
// eager no-copy snapshot
subscriberIterators.add(eventSubscribers.iterator());
}
}
return Iterators.concat(subscriberIterators.iterator());
}
Dispatcher
分发器,用于将event分发给subscriber。它内部实现了三种不同类型的分发器,用于不同的情况下事件的顺序性。它的核心方法是:
abstract void dispatch(Object event, Iterator<Subscriber> subscribers);
它的三种实现:
PerThreadQueuedDispatcher
EventBus默认使用的分发器。它的实现是通过ThreadLocal来实现一个事件队列,每个线程包含一个这样的内部队列。
它的分发代码如下:
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
checkNotNull(subscribers);
Queue<Event> queueForThread = queue.get();
queueForThread.offer(new Event(event, subscribers));
if (!dispatching.get()) {
dispatching.set(true);
try {
Event nextEvent;
while ((nextEvent = queueForThread.poll()) != null) {
while (nextEvent.subscribers.hasNext()) {
nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
}
}
} finally {
dispatching.remove();
queue.remove();
}
}
}
嵌套两层循环,第一层事件不为空,第二层该事件下的订阅者不为空,则分发事件下去。
LegacyAsyncDispatcher
AsyncEventBus使用的分发器。它在内部通过一个ConcurrentLinkedQueue<EventWithSubscriber>的全局队列来存储事件。他和PerThreadQueuedDispatcher的主要区别在于分发循环这块。
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
while (subscribers.hasNext()) {
queue.add(new EventWithSubscriber(event, subscribers.next()));
}
EventWithSubscriber e;
while ((e = queue.poll()) != null) {
e.subscriber.dispatchEvent(e.event);
}
}
是一前一后两个循环。前面一个是遍历事件订阅处理器,并构建一个事件实体对象存入队列。后一个循环是遍历该事件实体对象队列,取出事件实体对象中的事件进行分发。
ImmediateDispatcher
同步分发器。
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
while (subscribers.hasNext()) {
subscribers.next().dispatchEvent(event);
}
}
总结
Elastic-Job使用的EventBus,可以说很好的对任务的运行和轨迹记录进行了解耦,借鉴了Guava的思想,将代码优雅发挥到了新的境界。当然,Guava对EventBus的设计思想是我们需要进行学习和使用的。