Guava源码解析之EventBus

最近看Elastic-Job源码,看到它里面实现的任务运行轨迹的持久化,使用的是Guava的AsyncEventBus,一个内存级别的异步事件总线服务,实现了简单的生产-消费者模式,从而在不影响任务执行效率的基础上,将任务执行和任务轨迹记录解耦,大大提高了EJ的性能。

EventBus在Elastic-Job中的使用

EventBus的使用方法不难,具体可以参考EJ里面几个相关的类:JobEventListener、JobEventBus和LiteJobFacade。主要的流程如下:

  • JobEventListener主要是消费者。定义需要监听的方法,目前主要定义了两个listen方法,注意想监听到的话,需要在方法前加上注解:@Subscribe和@AllowConcurrentEvents。看字面意思就是订阅和允许并发事件。如果不加上后面那个注解,则会导致效率问题,这个咱们后续分析。目前这个接口只有一个实现类JobEventRdbListener,实现了日志写入DB的操作。
  • JobEventBus参考的EventBus源码,提供了register和post方法,去掉了unregister方法。主要的功能就是注册监听器和生产消息。他的构造方法中,默认使用的是Guava的AsyncEventBus,初始化中同时包含了注册动作。
  • LiteJobFacade主要是JobEventBus的使用者。主要调用的是JobEventBus的post方法。
    @Override
    public void postJobExecutionEvent(final JobExecutionEvent jobExecutionEvent) {
        jobEventBus.post(jobExecutionEvent);
    }
    
    @Override
    public void postJobStatusTraceEvent(final String taskId, final State state, final String message) {
        TaskContext taskContext = TaskContext.from(taskId);
        jobEventBus.post(new JobStatusTraceEvent(taskContext.getMetaInfo().getJobName(), taskContext.getId(),
                taskContext.getSlaveId(), Source.LITE_EXECUTOR, taskContext.getType(), taskContext.getMetaInfo().getShardingItems().toString(), state, message));
        if (!Strings.isNullOrEmpty(message)) {
            log.trace(message);
        }
    }

EventBus源码分析

言归正传,我们来看看EventBus到底是如何实现观察者模式的。他的主要实现类都在com.google.common.eventbus这个包下面。

主要类概念分析

我们首先来看一下里面比较重要的几个类,同时理解一些概念。

  • EventBus:这个类的作用有两个,一个是作为一个总线通道,另一个作用是消息的广播。
  • AsyncEventBus:异步的EventBus,功能与EventBus类似,只不过实现方式有所差异。
  • Subscriber:可以按照字面理解是订阅者,也可以说是监听器。
  • SubscriberRegistry:订阅注册表。主要存储的是Subcriber和Event之间的关系,用于消息分发时可以迅速根据Event的类型找到Subscriber。
  • Dispatcher:事件分发器,定义了一些分发的策略,里面包含三种分发器。
  • 两个重要的注解@Subscribe和@AllowConcurrentEvents。第一个是标识监听器的方法,第二个与第一个配合使用,标识允许多线程执行。
  • DeadEvent:死信对象,标识没有订阅者关注的事件。
  • SubscribeExceptionHandler:订阅者抛出异常的处理器。SubscribeExceptionContext:订阅者抛出异常的上下文对象。

EventBus

这个类有几个属性:

  private final String identifier;//唯一标识,默认为default
  private final Executor executor;//多线程处理器,默认MoreExecutors.directExecutor()
  private final SubscriberExceptionHandler exceptionHandler;//异常处理器

  private final SubscriberRegistry subscribers = new SubscriberRegistry(this);//订阅注册表
  private final Dispatcher dispatcher;//消息分发器,默认为Dispatcher.perThreadDispatchQueue(),单线程消息分发队列

其中,identifier表示,同一个应用中,可以根据identifier来区分不同的事件总线,只不过默认为default而已。

EventBus主要定义了几个方法:

注册

public void register(Object object) {
    subscribers.register(object);
}

注册的是自己定义的监听器,也就是listener。

取消注册

public void unregister(Object object) {
    subscribers.unregister(object);
}

类似于注册。

消息广播

public void post(Object event) {
    Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
    if (eventSubscribers.hasNext()) {
      dispatcher.dispatch(event, eventSubscribers);
    } else if (!(event instanceof DeadEvent)) {
      // the event had no subscribers and was not itself a DeadEvent
      post(new DeadEvent(this, event));
    }
}

这块主要是根据event事件类型,来获取事件的订阅者,然后进行事件消息的分发。当然,如果没有订阅者,也就是event的类型是DeadEvent,也会进行对应的处理。

AsyncEventBus

继承自EventBus,主要区别在于分发器,使用的是Dispatcher.legacyAsync()。这个后续咱们再分析。

Subscriber

乍看这个类,就是订阅者,其实我们看源码就能理解,当一个订阅类的多个方法用@Subscribe注解时,每个被注解的方法对应的是一个订阅者。

构造

这个类只是package内可见,没有定义为public,可以通过静态方法create来创建它。

static Subscriber create(EventBus bus, Object listener, Method method) {
    return isDeclaredThreadSafe(method)
        ? new Subscriber(bus, listener, method)
        : new SynchronizedSubscriber(bus, listener, method);
}

这里传入的method就是使用了@Subscribe注解的方法,这块会先判断这个方法是否线程安全,即是否使用@AllowConcurrentEvent来进行注解,来创建不同的Subscriber。唯一的差别是SynchronizedSubscriber中一个方法使用了synchronized来修饰。

dispatchEvent

  final void dispatchEvent(final Object event) {
    executor.execute(
        new Runnable() {
          @Override
          public void run() {
            try {
              invokeSubscriberMethod(event);
            } catch (InvocationTargetException e) {
              bus.handleSubscriberException(e.getCause(), context(event));
            }
          }
        });
  }

调用多线程来处理event。

invokeSubscriberMethod

@VisibleForTesting
void invokeSubscriberMethod(Object event) throws InvocationTargetException {
    try {
      method.invoke(target, checkNotNull(event));
    } catch (IllegalArgumentException e) {
      throw new Error("Method rejected target/argument: " + event, e);
    } catch (IllegalAccessException e) {
      throw new Error("Method became inaccessible: " + event, e);
    } catch (InvocationTargetException e) {
      if (e.getCause() instanceof Error) {
        throw (Error) e.getCause();
      }
      throw e;
    }
}

调用订阅者的方法。

SubscriberRegistry

我们之前在讲到EventBus时,里面有两个方法register和unregister,调用的就是这个类的方法。这个类的作用也讲到,是存储event和对应的订阅者的关系的。我们来看一下这个类的设计。

属性

private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers =
      Maps.newConcurrentMap();

@Weak private final EventBus bus;

这个类有两个属性。

  • 第一个是ConcurrentMap,他的键是Class类,也就是Event的类型,值是CopyOnWriteArraySet<Subscriber>,也就是订阅者。这个ConcurrentMap是Guava定义的并发Map,这个后续咱们有机会再分析。
  • 第二个属性就是EventBus。

register

注册监听器。

void register(Object listener) {
    Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);

    for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
      Class<?> eventType = entry.getKey();
      Collection<Subscriber> eventMethodsInListener = entry.getValue();

      CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);

      if (eventSubscribers == null) {
        CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>();
        eventSubscribers =
            MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
      }

      eventSubscribers.addAll(eventMethodsInListener);
    }
}

主要的逻辑是:

  • 获取这个类中所有用@Subscribe注解的方法,存储到Multimap中。
  • 遍历Multimap,键为eventType,然后根据这个键,从缓存中获取这个事件对应的订阅者集合。
  • 获取到之后,判断集合是否为空,如果为空,新建一个集合来存储。

unregister

实现与register类似,先根据listener找到subscriber,找到需要监听的方法,然后根据事件类型去移除subscriber。

findAllSubscribers

获取监听器中所有的监听方法。

private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
    Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
    Class<?> clazz = listener.getClass();
    for (Method method : getAnnotatedMethods(clazz)) {
      Class<?>[] parameterTypes = method.getParameterTypes();
      Class<?> eventType = parameterTypes[0];
      methodsInListener.put(eventType, Subscriber.create(bus, listener, method));
    }
    return methodsInListener;
}

findAllSubscribers用于查找事件类型以及事件处理器的对应关系。查找注解需要涉及到反射,通过反射来获取标注在方法上的注解。因为Guava针对EventBus的注册采取的是“隐式契约”而非接口这种“显式契约”。而类与接口是存在继承关系的,所有很有可能某个订阅者其父类(或者父类实现的某个接口)也订阅了某个事件。因此这里的查找需要顺着继承链向上查找父类的方法是否也被注解标注。

getSubscribes

获取event的订阅者。

Iterator<Subscriber> getSubscribers(Object event) {
    ImmutableSet<Class<?>> eventTypes = flattenHierarchy(event.getClass());

    List<Iterator<Subscriber>> subscriberIterators =
        Lists.newArrayListWithCapacity(eventTypes.size());

    for (Class<?> eventType : eventTypes) {
      CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
      if (eventSubscribers != null) {
        // eager no-copy snapshot
        subscriberIterators.add(eventSubscribers.iterator());
      }
    }

    return Iterators.concat(subscriberIterators.iterator());
  }

Dispatcher

分发器,用于将event分发给subscriber。它内部实现了三种不同类型的分发器,用于不同的情况下事件的顺序性。它的核心方法是:

abstract void dispatch(Object event, Iterator<Subscriber> subscribers);

它的三种实现:

PerThreadQueuedDispatcher

EventBus默认使用的分发器。它的实现是通过ThreadLocal来实现一个事件队列,每个线程包含一个这样的内部队列。

它的分发代码如下:

void dispatch(Object event, Iterator<Subscriber> subscribers) {
  checkNotNull(event);
  checkNotNull(subscribers);
  Queue<Event> queueForThread = queue.get();
  queueForThread.offer(new Event(event, subscribers));

  if (!dispatching.get()) {
    dispatching.set(true);
    try {
      Event nextEvent;
      while ((nextEvent = queueForThread.poll()) != null) {
        while (nextEvent.subscribers.hasNext()) {
          nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
        }
      }
    } finally {
      dispatching.remove();
      queue.remove();
    }
 }
}

嵌套两层循环,第一层事件不为空,第二层该事件下的订阅者不为空,则分发事件下去。

LegacyAsyncDispatcher

AsyncEventBus使用的分发器。它在内部通过一个ConcurrentLinkedQueue<EventWithSubscriber>的全局队列来存储事件。他和PerThreadQueuedDispatcher的主要区别在于分发循环这块。

void dispatch(Object event, Iterator<Subscriber> subscribers) {
  checkNotNull(event);
  while (subscribers.hasNext()) {
    queue.add(new EventWithSubscriber(event, subscribers.next()));
  }

  EventWithSubscriber e;
  while ((e = queue.poll()) != null) {
    e.subscriber.dispatchEvent(e.event);
  }
}

是一前一后两个循环。前面一个是遍历事件订阅处理器,并构建一个事件实体对象存入队列。后一个循环是遍历该事件实体对象队列,取出事件实体对象中的事件进行分发。

ImmediateDispatcher

同步分发器。

void dispatch(Object event, Iterator<Subscriber> subscribers) {
  checkNotNull(event);
  while (subscribers.hasNext()) {
    subscribers.next().dispatchEvent(event);
  }
}

总结

Elastic-Job使用的EventBus,可以说很好的对任务的运行和轨迹记录进行了解耦,借鉴了Guava的思想,将代码优雅发挥到了新的境界。当然,Guava对EventBus的设计思想是我们需要进行学习和使用的。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,839评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,543评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,116评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,371评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,384评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,111评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,416评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,053评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,558评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,007评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,117评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,756评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,324评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,315评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,539评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,578评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,877评论 2 345

推荐阅读更多精彩内容

  • EventBus源码分析(一) EventBus官方介绍为一个为Android系统优化的事件订阅总线,它不仅可以很...
    蕉下孤客阅读 3,972评论 4 42
  • 对于Android开发老司机来说肯定不会陌生,它是一个基于观察者模式的事件发布/订阅框架,开发者可以通过极少的代码...
    飞扬小米阅读 1,468评论 0 50
  • 简单的使用 EventBus是greenrobot在Android平台发布的一款以订阅——发布模式为核心的开源库。...
    最最最最醉人阅读 756评论 0 13
  • EventBus源码分析(二) 在之前的一篇文章EventBus源码分析(一)分析了EventBus关于注册注销以...
    蕉下孤客阅读 1,649评论 0 10
  • 原文链接:http://blog.csdn.net/u012810020/article/details/7005...
    tinyjoy阅读 541评论 1 5