elastic-Job 源码解析之事件追踪EventBus

​ 在elastic-job中,有一块很重要的功能,与作业的执行密切相关,但又不影响作业的执行,那就是作业的执行状态和运行轨迹记录,脑子里很容易想到这几个词,观察者模式,发布订阅模式。

​ 在elastic-Job中,是使用guava的EventBus事件总线工具,简单的使用观察者模式来实现。
​ 先看一个简单的demo:

​ 新建一个消息总线的发送者


public class EventBusPoster {

  private EventBus eventBus = new EventBus();

  public void post(String message) {
    eventBus.post(message);
  }

  public void addListener(EventBusListener eventBusListener) {
    eventBus.register(eventBusListener);
  }

新建一个监听消息总线的listener


public class EventBusListener {

  @Subscribe
  public void listener(String message) {
    System.out.println("receive eventbus message:" + message);
  }
}
public class Main {
  public static void main(String[] args) {
    EventBusPoster eventBusPoster = new EventBusPoster();
    EventBusListener eventBusListener = new EventBusListener();
    eventBusPoster.addListener(eventBusListener);
    eventBusPoster.post("hello world!");
    eventBusPoster.post("你好,世界!");
  }
}
receive eventbus message:hello world!
receive eventbus message:你好,世界!

一个很简单的观察者模式就这样实现了。那么,elastic-Job是怎样实现的?先看一下类图:


event.png

​ 在elastic-Job启动的过程中,初始化JobScheduler时,就已经将JobEventBus初始化进去了,看代码new JobEventBus():

public JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final ElasticJobListener... elasticJobListeners) {
  this(regCenter, liteJobConfig, new JobEventBus(), elasticJobListeners);
}

public JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventConfiguration jobEventConfig, 
                    final ElasticJobListener... elasticJobListeners) {
  this(regCenter, liteJobConfig, new JobEventBus(jobEventConfig), elasticJobListeners);
}

​ 而在作业执行过程中,多次调用jobFacade.postJobStatusTraceEvent(..)和postJobExecutionEvent去推送Event,看代码;

//AbstractElasticJobExecutor 抽象执行器
public final void execute() {
  try {
    jobFacade.checkJobExecutionEnvironment();
  } catch (final JobExecutionEnvironmentException cause) {
    jobExceptionHandler.handleException(jobName, cause);
  }
  ShardingContexts shardingContexts = jobFacade.getShardingContexts();
  if (shardingContexts.isAllowSendJobEvent()) {
    //这里推送Event
    jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobName));
  }
  
  jobFacade.failoverIfNecessary();
  //此处省略很多代码
}

private void process(final ShardingContexts shardingContexts, final int item, final JobExecutionEvent startEvent) {
  if (shardingContexts.isAllowSendJobEvent()) {
     //推送执行情况Event
    jobFacade.postJobExecutionEvent(startEvent);
  }
  log.trace("Job '{}' executing, item is: '{}'.", jobName, item);
  JobExecutionEvent completeEvent;
  try {
    process(new ShardingContext(shardingContexts, item));
    completeEvent = startEvent.executionSuccess();
    log.trace("Job '{}' executed, item is: '{}'.", jobName, item);
    if (shardingContexts.isAllowSendJobEvent()) {
       //推送执行情况Event
      jobFacade.postJobExecutionEvent(completeEvent);
    }
    // CHECKSTYLE:OFF
  } catch (final Throwable cause) {
    // CHECKSTYLE:ON
    completeEvent = startEvent.executionFailure(cause);
    //推送执行情况Event
    jobFacade.postJobExecutionEvent(completeEvent);
    itemErrorMessages.put(item, ExceptionUtil.transform(cause));
    jobExceptionHandler.handleException(jobName, cause);
  }
}

而postJobExecutionEvent和postJobStatusTraceEvent主要是调用jobEventBus去post数据,看代码:

@Override
public void postJobExecutionEvent(final JobExecutionEvent jobExecutionEvent) {
  jobEventBus.post(jobExecutionEvent);
}

@Override
public void postJobStatusTraceEvent(final String taskId, final State state, final String message) {
  TaskContext taskContext = TaskContext.from(taskId);
  jobEventBus.post(new JobStatusTraceEvent(taskContext.getMetaInfo().getJobName(), taskContext.getId(),  taskContext.getSlaveId(), Source.LITE_EXECUTOR, taskContext.getType(), taskContext.getMetaInfo().getShardingItems().toString(), state, message));
  if (!Strings.isNullOrEmpty(message)) {
    log.trace(message);
  }
}

在JobEventBus初始化过程中,主要是通过构造线程池初始化一个AsynEventBus,通过register注册监听类,若没有配置类,则不注册监听Listener,且不postEvent。

public final class JobEventBus {

  private final JobEventConfiguration jobEventConfig;

  private final ExecutorServiceObject executorServiceObject;

  private final EventBus eventBus;

  private boolean isRegistered;

  public JobEventBus() {
    jobEventConfig = null;
    executorServiceObject = null;
    eventBus = null;
  }

  public JobEventBus(final JobEventConfiguration jobEventConfig) {
    this.jobEventConfig = jobEventConfig;
    //线程池线程数量为cpu核数的两倍
    executorServiceObject = new ExecutorServiceObject("job-event", Runtime.getRuntime().availableProcessors() * 2);
    //异步总线
    eventBus = new AsyncEventBus(executorServiceObject.createExecutorService());
    register();
  }

  private void register() {
    try {
      eventBus.register(jobEventConfig.createJobEventListener());
      isRegistered = true;
    } catch (final JobEventListenerConfigurationException ex) {
      log.error("Elastic job: create JobEventListener failure, error is: ", ex);
    }
  }

  /**
     * 发布事件.
     * 若没有注册则不发布
     * @param event 作业事件
     */
  public void post(final JobEvent event) {
    if (isRegistered && !executorServiceObject.isShutdown()) {
      eventBus.post(event);
    }
  }
}

而在注册的过程中,主要是注册了一个监听类JobEventRdbListener,看代码:

@Override
public JobEventListener createJobEventListener() throws JobEventListenerConfigurationException {
  try {
    return new JobEventRdbListener(dataSource);
  } catch (final SQLException ex) {
    throw new JobEventListenerConfigurationException(ex);
  }
}

而JobEventRdbListener类,主要是订阅Event消息。


public final class JobEventRdbListener extends JobEventRdbIdentity implements JobEventListener {
    
    private final JobEventRdbStorage repository;
    
    public JobEventRdbListener(final DataSource dataSource) throws SQLException {
        repository = new JobEventRdbStorage(dataSource);
    }
    
    @Override
    public void listen(final JobExecutionEvent executionEvent) {
        repository.addJobExecutionEvent(executionEvent);
    }
    
    @Override
    public void listen(final JobStatusTraceEvent jobStatusTraceEvent) {
        repository.addJobStatusTraceEvent(jobStatusTraceEvent);
    }
}

那post的Event事件,Listener是如何感知到的,看接口定义,有两个标签,@Subscribe,代表订阅消息,只要是该方法参数类型的Event,就能够被监听,如果没有合适的类型的Event,则会是DeadEvent,而@AllowConcurrentEvents字面意思就是允许并发,实际原因在于如果是线程安全的,使用该标签会减少同步开销,具体原因可以看AllowConcurrentEvents分析

public interface JobEventListener extends JobEventIdentity {
    
    /**
     * 作业执行事件监听执行.
     *
     * @param jobExecutionEvent 作业执行事件
     */
    @Subscribe
    @AllowConcurrentEvents
    void listen(JobExecutionEvent jobExecutionEvent);
    
    /**
     * 作业状态痕迹事件监听执行.
     *
     * @param jobStatusTraceEvent 作业状态痕迹事件
     */
    @Subscribe
    @AllowConcurrentEvents
    void listen(JobStatusTraceEvent jobStatusTraceEvent);
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,968评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,601评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,220评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,416评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,425评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,144评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,432评论 3 401
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,088评论 0 261
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,586评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,028评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,137评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,783评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,343评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,333评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,559评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,595评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,901评论 2 345

推荐阅读更多精彩内容

  • 最近看Elastic-Job源码,看到它里面实现的任务运行轨迹的持久化,使用的是Guava的AsyncEventB...
    端木轩阅读 2,046评论 2 6
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,601评论 18 139
  • 版权声明:本文为原创文章,未经允许不得转载。 Spark程序程序job的运行是通过actions算子触发的,每一个...
    lehi阅读 1,094评论 0 0
  • Spring Boot 参考指南 介绍 转载自:https://www.gitbook.com/book/qbgb...
    毛宇鹏阅读 46,748评论 6 342
  • EventBus源码分析(一) EventBus官方介绍为一个为Android系统优化的事件订阅总线,它不仅可以很...
    蕉下孤客阅读 3,972评论 4 42