XXL-JOB(V2.0.2)源码解读:一篇带你理解执行器源码

接着上篇文章,上篇文章详细分析了XXL-JOB调度中心的源码,这篇文章我们一起看一看执行器端是如何工作的
实际在执行器应用中,内嵌了一个jetty服务器, 服务在xxlJobExecutor 初始化的时候启动。当执行器端启动时会定时向注册中心进行自动注册,并且当调度中心有任务触发的时候也会发起RPC请求,请求执行器执行具体的任务
首先我们看一下执行器端集成XXL-JOB所做过的配置信息XxlJobConfig,同样是一个基于JavaConfig的配置信息

/**
 * xxl-job config
 *
 * @author xuxueli 2017-04-28
 */
@Configuration
public class XxlJobConfig {
    private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);

    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;

    @Value("${xxl.job.executor.appname}")
    private String appName;

    @Value("${xxl.job.executor.ip}")
    private String ip;

    @Value("${xxl.job.executor.port}")
    private int port;

    @Value("${xxl.job.accessToken}")
    private String accessToken;

    @Value("${xxl.job.executor.logpath}")
    private String logPath;

    @Value("${xxl.job.executor.logretentiondays}")
    private int logRetentionDays;


    @Bean(initMethod = "start", destroyMethod = "destroy")
    public XxlJobSpringExecutor xxlJobExecutor() {
        logger.info(">>>>>>>>>>> xxl-job config init.");
        //创建执行器对象
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        //设置一些属性,属性值是从配置文件中得到的
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppName(appName);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
        return xxlJobSpringExecutor;
  }

    /**
     * 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;
     *
     *      1、引入依赖:
     *          <dependency>
     *             <groupId>org.springframework.cloud</groupId>
     *             <artifactId>spring-cloud-commons</artifactId>
     *             <version>${version}</version>
     *         </dependency>
     *
     *      2、配置文件,或者容器启动变量
     *          spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
     *
     *      3、获取IP
     *          String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
     */
}

在容器初始化好xxlJobExecutor后会执行其initMethod,也就是start方法,我们进去看看这个方法做了些什么

public void start() throws Exception {
    this.initJobHandlerRepository(applicationContext);
    GlueFactory.refreshInstance(1);
    super.start();
}

这里首先会调用initJobHandlerRepository(applicationContext)方法,进去这个方法:

private void initJobHandlerRepository(ApplicationContext applicationContext) {
    if (applicationContext != null) {
        //从上下文中获取到所有的带有@JobHandler注解的Bean
        Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(JobHandler.class);
        if (serviceBeanMap != null && serviceBeanMap.size() > 0) {
            Iterator i$ = serviceBeanMap.values().iterator();

            while(i$.hasNext()) {
                Object serviceBean = i$.next();
                if (serviceBean instanceof IJobHandler) { //如果实现了IJobHandler接口
                    //取出@JobHandler的value值
                    String name = ((JobHandler)serviceBean.getClass().getAnnotation(JobHandler.class)).value();
                    IJobHandler handler = (IJobHandler)serviceBean;
                    if (loadJobHandler(name) != null) {
                        throw new RuntimeException("xxl-job jobhandler naming conflicts.");
                    }
                    //注册JobHandler
                    registJobHandler(name, handler);
                }
            }
        }
    }
}

跟进registJobHandler(name, handler):

public static IJobHandler registJobHandler(String name, IJobHandler jobHandler) {
    logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
    //将jobHandler放入jobHandlerRepository(本质是一个ConcurrentHashMap)
    return (IJobHandler)jobHandlerRepository.put(name, jobHandler);
}

接着我们再看下super.start()方法做了什么:

public void start() throws Exception {
    //设置日志目录
    XxlJobFileAppender.initLogPath(this.logPath);
    //⚠️⚠️⚠️这里很重要,这里initAdminBizList方法会初始化AdminBizList,并且会创建AdminBiz的动态代理XxlRpcReferenceBean,而最后会用到这个类进行自动注册(标记Tag,下文会用到这里)
    this.initAdminBizList(this.adminAddresses, this.accessToken);
    //开启日志清理线程
    JobLogFileCleanThread.getInstance().start((long)this.logRetentionDays);
    //初始化触发器回调线程(用RPC回调调度中心接口)
    TriggerCallbackThread.getInstance().start();
    this.port = this.port > 0 ? this.port : NetUtil.findAvailablePort(9999);
    this.ip = this.ip != null && this.ip.trim().length() > 0 ? this.ip : IpUtil.getIp();
    //初始化Rpc服务
    this.initRpcProvider(this.ip, this.port, this.appName, this.accessToken);
}

跟进 this.initRpcProvider(this.ip, this.port, this.appName, this.accessToken),代码片段如下:

private void initRpcProvider(String ip, int port, String appName, String accessToken) throws Exception {
    this.xxlRpcInvokerFactory = new XxlRpcInvokerFactory();
    String address = IpUtil.getIpPort(ip, port);
    Map<String, String> serviceRegistryParam = new HashMap();
    serviceRegistryParam.put("appName", appName);
    serviceRegistryParam.put("address", address);
    this.xxlRpcProviderFactory = new XxlRpcProviderFactory();
    this.xxlRpcProviderFactory.initConfig(NetEnum.JETTY, SerializeEnum.HESSIAN.getSerializer(), ip, port, accessToken, XxlJobExecutor.ExecutorServiceRegistry.class, serviceRegistryParam);
    this.xxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), (String)null, new ExecutorBizImpl());
    this.xxlRpcProviderFactory.start();
}

初始化了一个XxlRpcInvokerFactory和XxlRpcProviderFactory,上面代码中最重要的是

this.xxlRpcProviderFactory.initConfig(NetEnum.JETTY, SerializeEnum.HESSIAN.getSerializer(), ip, port, accessToken, XxlJobExecutor.ExecutorServiceRegistry.class, serviceRegistryParam);

this.xxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), (String)null, new ExecutorBizImpl());

以上两步操作:
1.指定了使用jetty服务,序列化工具,IP,端口等信息,并且指定serviceRegistryClass为XxlJobExecutor.ExecutorServiceRegistry.class,执行器的自动注册是在这实现的,稍后会分析
2.设置使用ExecutorBizImpl作为服务的处理类,供给调用中心调用
接下来我们看一下初始化完毕后,调用this.xxlRpcProviderFactory.start()方法做了些什么?

public void start() throws Exception {
    //设置内嵌的jetty服务器
    this.server = (Server)this.netType.serverClass.newInstance();
    //设置内嵌jetty服务器启动时的执行任务
    this.server.setStartedCallback(new BaseCallback() {
        public void run() throws Exception {
            //如果serviceRegistryClass不为null,在上面的initConfig中我们已经初始化了,所以程序正常可以进入
            if (XxlRpcProviderFactory.this.serviceRegistryClass != null) {
                //反射获取注册服务类
                XxlRpcProviderFactory.this.serviceRegistry = (ServiceRegistry)XxlRpcProviderFactory.this.serviceRegistryClass.newInstance();
                //调用start执行自动注册,因为我们在上面的配置中设置了serviceRegistryClass为XxlJobExecutor.ExecutorServiceRegistry.class,所以会执行ExecutorServiceRegistry中的start方法
                XxlRpcProviderFactory.this.serviceRegistry.start(XxlRpcProviderFactory.this.serviceRegistryParam);
                if (XxlRpcProviderFactory.this.serviceData.size() > 0) {
                    String ipPort = IpUtil.getIpPort(XxlRpcProviderFactory.this.ip, XxlRpcProviderFactory.this.port);
                    Iterator i$ = XxlRpcProviderFactory.this.serviceData.keySet().iterator();

                    while(i$.hasNext()) {
                        String serviceKey = (String)i$.next();
                        //因为我们在上面的配置中设置了serviceRegistryClass为XxlJobExecutor.ExecutorServiceRegistry.class,所以执行registry函数内部直接返回false,可以点进去看看
                        XxlRpcProviderFactory.this.serviceRegistry.registry(serviceKey, ipPort);
                    }
                }
            }

        }
    });
    //设置内嵌jetty服务器关闭时的执行任务
    this.server.setStopedCallback(new BaseCallback() {
        public void run() {
            if (XxlRpcProviderFactory.this.serviceRegistry != null) {
                if (XxlRpcProviderFactory.this.serviceData.size() > 0) {
                    String ipPort = IpUtil.getIpPort(XxlRpcProviderFactory.this.ip, XxlRpcProviderFactory.this.port);
                    Iterator i$ = XxlRpcProviderFactory.this.serviceData.keySet().iterator();

                    while(i$.hasNext()) {
                        String serviceKey = (String)i$.next();
                        XxlRpcProviderFactory.this.serviceRegistry.remove(serviceKey, ipPort);
                    }
                }

                XxlRpcProviderFactory.this.serviceRegistry.stop();
                XxlRpcProviderFactory.this.serviceRegistry = null;
            }

        }
    });
    //启动服务器(做个标识,下面会用到,引用标识:JETTY)
    this.server.start(this);
}

当上端代码执行this.server.start(this)时,内嵌的jetty服务器就会启动,点进this.server.start(this)可以看到:



服务器启动时会触发onStart方法,进而会执行上面的starCallback逻辑,开始进行自动注册,让再回顾头来我们看一下XxlRpcProviderFactory.this.serviceRegistry.start(XxlRpcProviderFactory.this.serviceRegistryParam);内部逻辑:

public static class ExecutorServiceRegistry extends ServiceRegistry {
    public ExecutorServiceRegistry() {
    }

    public void start(Map<String, String> param) {
        //执行器注册线程开始自动注册
        ExecutorRegistryThread.getInstance().start((String)param.get("appName"), (String)param.get("address"));
    }

    public void stop() {
        ExecutorRegistryThread.getInstance().toStop();
    }

    public boolean registry(String key, String value) {
        return false;
    }

    public boolean remove(String key, String value) {
        return false;
    }

    public TreeSet<String> discovery(String key) {
        return null;
    }
}

跟进上面的ExecutorRegistryThread.getInstance().start((String)param.get("appName"), (String)param.get("address"))内部:

public void start(final String appName, final String address) {
    if (appName != null && appName.trim().length() != 0) {
        if (XxlJobExecutor.getAdminBizList() == null) {
            logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null.");
        } else {
            this.registryThread = new Thread(new Runnable() {
                public void run() {
                    RegistryParam registryParam;
                    Iterator i$;
                    AdminBiz adminBiz;
                    ReturnT registryResult;
                    while(!ExecutorRegistryThread.this.toStop) { // private volatile boolean toStop = false;会一直循环此段代码(停顿30s)直到设置toStop为true
                        try {
                            registryParam = new RegistryParam(RegistType.EXECUTOR.name(), appName, address);
                            //⚠️⚠️⚠️还记得上面那个标记Tag吗,在那里我们初始化了AdminBizList
                            i$ = XxlJobExecutor.getAdminBizList().iterator();

                            while(i$.hasNext()) {
                                adminBiz = (AdminBiz)i$.next();

                                try {
                                    //这里不会真正执行 adminBiz.registry(registryParam),只是调用一下方法,触发动态代理
                                    registryResult = adminBiz.registry(registryParam);
                                    if (registryResult != null && 200 == registryResult.getCode()) {
                                        registryResult = ReturnT.SUCCESS;
                                        ExecutorRegistryThread.logger.info(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                                        break;
                                    }

                                    ExecutorRegistryThread.logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                                } catch (Exception var6) {
                                    ExecutorRegistryThread.logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, var6);
                                }
                            }
                        } catch (Exception var7) {
                            ExecutorRegistryThread.logger.error(var7.getMessage(), var7);
                        }

                        try {
                            TimeUnit.SECONDS.sleep(30L);
                        } catch (InterruptedException var5) {
                            ExecutorRegistryThread.logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", var5.getMessage());
                        }
                    }

                    try {
                        registryParam = new RegistryParam(RegistType.EXECUTOR.name(), appName, address);
                        i$ = XxlJobExecutor.getAdminBizList().iterator();

                        while(i$.hasNext()) {
                            adminBiz = (AdminBiz)i$.next();

                            try {
                                registryResult = adminBiz.registryRemove(registryParam);
                                if (registryResult != null && 200 == registryResult.getCode()) {
                                    registryResult = ReturnT.SUCCESS;
                                    ExecutorRegistryThread.logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                                    break;
                                }

                                ExecutorRegistryThread.logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                            } catch (Exception var8) {
                                ExecutorRegistryThread.logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, var8);
                            }
                        }
                    } catch (Exception var9) {
                        ExecutorRegistryThread.logger.error(var9.getMessage(), var9);
                    }

                    ExecutorRegistryThread.logger.info(">>>>>>>>>>> xxl-job, executor registry thread destory.");
                }
            });
            this.registryThread.setDaemon(true);
            this.registryThread.start();
        }
    } else {
        logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, appName is null.");
    }
}

在上面的代码中 registryResult = adminBiz.registry(registryParam)这个其实不会真正去执行registry方法,只是为了触发动态代理,请看官们返回我在上文中标记的Tag部分(ctrl+f),我们找到当时标记Tag的代码:this.initAdminBizList(this.adminAddresses, this.accessToken),跟进这个函数:

private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
    if (adminAddresses != null && adminAddresses.trim().length() > 0) {
        String[] arr$ = adminAddresses.trim().split(",");
        int len$ = arr$.length;

        for(int i$ = 0; i$ < len$; ++i$) {
            String address = arr$[i$];
            if (address != null && address.trim().length() > 0) {
                String addressUrl = address.concat("/api");
                //这里的getObject会创建动态代理
                AdminBiz adminBiz = (AdminBiz)(new XxlRpcReferenceBean(NetEnum.JETTY, SerializeEnum.HESSIAN.getSerializer(), CallType.SYNC, AdminBiz.class, (String)null, 10000L, addressUrl, accessToken, (XxlRpcInvokeCallback)null)).getObject();
                if (adminBizList == null) {
                    adminBizList = new ArrayList();
                }

                adminBizList.add(adminBiz);
            }
        }
    }
}

跟进getObject:

public Object getObject() {
    return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{this.iface}, new InvocationHandler() {
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            String className = method.getDeclaringClass().getName();
            if (Object.class.getName().equals(className)) {
                XxlRpcReferenceBean.logger.info(">>>>>>>>>>> xxl-rpc proxy class-method not support [{}.{}]", className, method.getName());
                throw new XxlRpcException("xxl-rpc proxy class-method not support");
            } else {
                String address = XxlRpcReferenceBean.this.routeAddress();
                if (address != null && address.trim().length() != 0) {
                    //构建XxlRpcRequest
                    XxlRpcRequest xxlRpcRequest = new XxlRpcRequest();
                    xxlRpcRequest.setRequestId(UUID.randomUUID().toString());
                    xxlRpcRequest.setCreateMillisTime(System.currentTimeMillis());
                    xxlRpcRequest.setAccessToken(XxlRpcReferenceBean.this.accessToken);
                    xxlRpcRequest.setClassName(className);
                    xxlRpcRequest.setMethodName(method.getName());
                    xxlRpcRequest.setParameterTypes(method.getParameterTypes());
                    xxlRpcRequest.setParameters(args);
                    XxlRpcFutureResponse invokeFuture;
                    if (CallType.SYNC != XxlRpcReferenceBean.this.callType) {
                        if (CallType.FUTURE == XxlRpcReferenceBean.this.callType) {
                            invokeFuture = null;

                            try {
                                XxlRpcInvokeFuture invokeFuturex = new XxlRpcInvokeFuture(new XxlRpcFutureResponse(xxlRpcRequest, (XxlRpcInvokeCallback)null));
                                XxlRpcInvokeFuture.setFuture(invokeFuturex);
                                XxlRpcReferenceBean.this.client.asyncSend(address, xxlRpcRequest);
                                return null;
                            } catch (Exception var16) {
                                XxlRpcReferenceBean.logger.info(">>>>>>>>>>> xxl-job, invoke error, address:{}, XxlRpcRequest{}", address, xxlRpcRequest);
                                invokeFuture.stop();
                                throw (Throwable)(var16 instanceof XxlRpcException ? var16 : new XxlRpcException(var16));
                            }
                        } else if (CallType.CALLBACK == XxlRpcReferenceBean.this.callType) {
                            XxlRpcInvokeCallback finalInvokeCallback = XxlRpcReferenceBean.this.invokeCallback;
                            XxlRpcInvokeCallback threadInvokeCallback = XxlRpcInvokeCallback.getCallback();
                            if (threadInvokeCallback != null) {
                                finalInvokeCallback = threadInvokeCallback;
                            }

                            if (finalInvokeCallback == null) {
                                throw new XxlRpcException("xxl-rpc XxlRpcInvokeCallback(CallType=" + CallType.CALLBACK.name() + ") cannot be null.");
                            } else {
                                try {
                                    new XxlRpcFutureResponse(xxlRpcRequest, finalInvokeCallback);
                                    XxlRpcReferenceBean.this.client.asyncSend(address, xxlRpcRequest);
                                    return null;
                                } catch (Exception var15) {
                                    XxlRpcReferenceBean.logger.info(">>>>>>>>>>> xxl-job, invoke error, address:{}, XxlRpcRequest{}", address, xxlRpcRequest);
                                    XxlRpcFutureResponseFactory.removeInvokerFuture(xxlRpcRequest.getRequestId());
                                    throw (Throwable)(var15 instanceof XxlRpcException ? var15 : new XxlRpcException(var15));
                                }
                            }
                        } else if (CallType.ONEWAY == XxlRpcReferenceBean.this.callType) {
                            XxlRpcReferenceBean.this.client.asyncSend(address, xxlRpcRequest);
                            return null;
                        } else {
                            throw new XxlRpcException("xxl-rpc callType[" + XxlRpcReferenceBean.this.callType + "] invalid");
                        }
                    } else {
                        //因为我们指定了CallType为CallType.SYNC,进入这里
                        Object var9;
                        try {
                            invokeFuture = new XxlRpcFutureResponse(xxlRpcRequest, (XxlRpcInvokeCallback)null);
                            //发送RPC请求,进行自动注册
                            XxlRpcReferenceBean.this.client.asyncSend(address, xxlRpcRequest);
                            XxlRpcResponse xxlRpcResponse = invokeFuture.get(XxlRpcReferenceBean.this.timeout, TimeUnit.MILLISECONDS);
                            if (xxlRpcResponse.getErrorMsg() != null) {
                                throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
                            }

                            var9 = xxlRpcResponse.getResult();
                        } catch (Exception var17) {
                            XxlRpcReferenceBean.logger.info(">>>>>>>>>>> xxl-job, invoke error, address:{}, XxlRpcRequest{}", address, xxlRpcRequest);
                            throw (Throwable)(var17 instanceof XxlRpcException ? var17 : new XxlRpcException(var17));
                        } finally {
                            XxlRpcFutureResponseFactory.removeInvokerFuture(xxlRpcRequest.getRequestId());
                        }

                        return var9;
                    }
                } else {
                    throw new XxlRpcException("xxl-rpc reference bean[" + className + "] address empty");
                }
            }
        }
    });
}

以上就是执行器端自动注册的流程
那么,执行器又是如何接收调度中心的调度请求并触发jobHandler执行的呢?
这里我们需要看一下内置jetty启动做了什么,我们找到上文埋下的的引用标识JETTY那部分描述:


启动内置jetty

跟进这个start方法:

public void start(final XxlRpcProviderFactory xxlRpcProviderFactory) throws Exception {
    this.thread = new Thread(new Runnable() {
        public void run() {
            JettyServer.this.server = new org.eclipse.jetty.server.Server(new QueuedThreadPool());
            ServerConnector connector = new ServerConnector(JettyServer.this.server);
            connector.setPort(xxlRpcProviderFactory.getPort());
            JettyServer.this.server.setConnectors(new Connector[]{connector});
            HandlerCollection handlerc = new HandlerCollection();
            //设置一个处理器JettyServerHandler(重点)
            handlerc.setHandlers(new Handler[]{new JettyServerHandler(xxlRpcProviderFactory)});
            JettyServer.this.server.setHandler(handlerc);

            try {
                JettyServer.this.server.start();
                JettyServer.logger.info(">>>>>>>>>>> xxl-rpc remoting server start success, nettype = {}, port = {}", JettyServer.class.getName(), xxlRpcProviderFactory.getPort());
                JettyServer.this.onStarted();
                JettyServer.this.server.join();
            } catch (Exception var12) {
                JettyServer.logger.error(">>>>>>>>>>> xxl-rpc remoting server start error.", var12);
            } finally {
                try {
                    JettyServer.this.stop();
                } catch (Exception var11) {
                    JettyServer.logger.error(var11.getMessage(), var11);
                }

            }

        }
    });
    //设置为守护线程
    this.thread.setDaemon(true);
    this.thread.start();
}

通过handlerc.setHandlers(new Handler[]{new JettyServerHandler(xxlRpcProviderFactory)})我们可以直到jetty服务器接收到请求后会使用JettyServerHandler的hadle方法处理:

public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
    StringBuffer stringBuffer;
    if (!"/services".equals(target)) {
        stringBuffer = null;

        XxlRpcRequest xxlRpcRequest;
        try {
            // 获取远程请求信息xxlRpcRequest
            xxlRpcRequest = this.parseRequest(request);
        } catch (Exception var8) {
            this.writeResponse(baseRequest, response, ThrowableUtil.toString(var8).getBytes());
            return;
        }
        //处理远程请求信息xxlRpcRequest
        XxlRpcResponse xxlRpcResponse = this.xxlRpcProviderFactory.invokeService(xxlRpcRequest);
        byte[] responseBytes = this.xxlRpcProviderFactory.getSerializer().serialize(xxlRpcResponse);
        this.writeResponse(baseRequest, response, responseBytes);
    } else {
        stringBuffer = new StringBuffer("<ui>");
        Iterator i$ = this.xxlRpcProviderFactory.getServiceData().keySet().iterator();

        while(i$.hasNext()) {
            String serviceKey = (String)i$.next();
            stringBuffer.append("<li>").append(serviceKey).append(": ").append(this.xxlRpcProviderFactory.getServiceData().get(serviceKey)).append("</li>");
        }

        stringBuffer.append("</ui>");
        this.writeResponse(baseRequest, response, stringBuffer.toString().getBytes());
    }
}

跟进 this.xxlRpcProviderFactory.invokeService(xxlRpcRequest):

public XxlRpcResponse invokeService(XxlRpcRequest xxlRpcRequest) {
    XxlRpcResponse xxlRpcResponse = new XxlRpcResponse();
    xxlRpcResponse.setRequestId(xxlRpcRequest.getRequestId());
    String serviceKey = makeServiceKey(xxlRpcRequest.getClassName(), xxlRpcRequest.getVersion());
    //获取serviceBean
    Object serviceBean = this.serviceData.get(serviceKey);
    if (serviceBean == null) {
        xxlRpcResponse.setErrorMsg("The serviceKey[" + serviceKey + "] not found.");
        return xxlRpcResponse;
    } else if (System.currentTimeMillis() - xxlRpcRequest.getCreateMillisTime() > 180000L) {
        xxlRpcResponse.setErrorMsg("The timestamp difference between admin and executor exceeds the limit.");
        return xxlRpcResponse;
    } else if (this.accessToken != null && this.accessToken.trim().length() > 0 && !this.accessToken.trim().equals(xxlRpcRequest.getAccessToken())) {
        xxlRpcResponse.setErrorMsg("The access token[" + xxlRpcRequest.getAccessToken() + "] is wrong.");
        return xxlRpcResponse;
    } else {
        try {
            Class<?> serviceClass = serviceBean.getClass();
            String methodName = xxlRpcRequest.getMethodName();
            Class<?>[] parameterTypes = xxlRpcRequest.getParameterTypes();
            Object[] parameters = xxlRpcRequest.getParameters();
            Method method = serviceClass.getMethod(methodName, parameterTypes);
            method.setAccessible(true);
            Object result = method.invoke(serviceBean, parameters);
            xxlRpcResponse.setResult(result);
        } catch (Throwable var11) {
            logger.error("xxl-rpc provider invokeService error.", var11);
            xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(var11));
        }

        return xxlRpcResponse;
    }
}

上面代码中Object serviceBean = this.serviceData.get(serviceKey);实际获取的bean就是前文我们看到的initRpcProvider方法中
this.xxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), (String)null, new ExecutorBizImpl());中设置的值,其实也就是ExecutorBizImpl这个类,回顾一下,看下下面的2个图片,还有印象吗


初始化RpcProvider

初始化ServiceData

通过请求参数中的methodName:run,我们会通过反射执行ExecutorBizImpl中的run方法:

public ReturnT<String> run(TriggerParam triggerParam) {
    JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
    IJobHandler jobHandler = jobThread != null ? jobThread.getHandler() : null;
    String removeOldReason = null;
    GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
    IJobHandler originJobHandler;
    //取出任务对应的本地线程池中的线程信息
    if (GlueTypeEnum.BEAN == glueTypeEnum) { // Bean模式
        //取出要执行的JobHandler
        originJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
        if (jobThread != null && jobHandler != originJobHandler) {
            removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";
            jobThread = null;
            jobHandler = null;
        }

        if (jobHandler == null) {
            jobHandler = originJobHandler;
            if (originJobHandler == null) {
                return new ReturnT(500, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
            }
        }
    } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {
        if (jobThread != null && (!(jobThread.getHandler() instanceof GlueJobHandler) || ((GlueJobHandler)jobThread.getHandler()).getGlueUpdatetime() != triggerParam.getGlueUpdatetime())) {
            removeOldReason = "change job source or glue type, and terminate the old job thread.";
            jobThread = null;
            jobHandler = null;
        }

        if (jobHandler == null) {
            try {
                originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
                jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
            } catch (Exception var7) {
                logger.error(var7.getMessage(), var7);
                return new ReturnT(500, var7.getMessage());
            }
        }
    } else {
        if (glueTypeEnum == null || !glueTypeEnum.isScript()) {
            return new ReturnT(500, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
        }

        if (jobThread != null && (!(jobThread.getHandler() instanceof ScriptJobHandler) || ((ScriptJobHandler)jobThread.getHandler()).getGlueUpdatetime() != triggerParam.getGlueUpdatetime())) {
            removeOldReason = "change job source or glue type, and terminate the old job thread.";
            jobThread = null;
            jobHandler = null;
        }

        if (jobHandler == null) {
            jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));
        }
    }

    if (jobThread != null) {
        //取出任务配置的阻塞策略
        ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), (ExecutorBlockStrategyEnum)null);
        if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) { // 丢弃后续
            if (jobThread.isRunningOrHasQueue()) {
                return new ReturnT(500, "block strategy effect:" + ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
            }
        } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy && jobThread.isRunningOrHasQueue()) { //覆盖之前
            removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
            jobThread = null;
        }
    }

    if (jobThread == null) {
        //说明走的是覆盖之前的阻塞策略
        jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), (IJobHandler)jobHandler, removeOldReason);
    }
    //放进触发队列,触发任务执行
    ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
    return pushResult;
}

通过上面我们可以发现, 执行executorBiz的run 方法的时候, 首先会通过JOBID,从本地线程库里面获取该任务对应的线程,同时,如果任务的JobHandler有更新的话,那么会自动使用最新的jobHandler , 同时根据任务的阻塞策略。 执行不同的操作。 最终,如果是第一次执行任务的时候,系统会分配给改任务一个线程,同时启动该线程。
单独说一下阻塞策略:覆盖之前的实现方式

public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason) {
    //给当前任务创建一个新的任务线程
    JobThread newJobThread = new JobThread(jobId, handler);
    //启动新的任务线程
    newJobThread.start();
    logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});
    //新任务覆盖老任务
    JobThread oldJobThread = (JobThread)jobThreadRepository.put(jobId, newJobThread);
    if (oldJobThread != null) {
        //中断老任务线程
        oldJobThread.toStop(removeOldReason);
        oldJobThread.interrupt();
    }
    return newJobThread;
}

接下来,可以在具体看一下JobThread 的run方法,看下最终的任务是如何执行的

public void run() {
    try {
        // 执行IJobHandler 中的init方法,以后如果有一些,在执行handler之前的初始化的工作,可以覆写这个方法
        this.handler.init();
    } catch (Throwable var26) {
        logger.error(var26.getMessage(), var26);
    }

    final TriggerParam triggerParam;
    ReturnT executeResult;
    while(!this.toStop) {
        this.running = false;
        //执行次数
        ++this.idleTimes;
        triggerParam = null;
        executeResult = null;
        boolean var16 = false;

        ReturnT stopResult;
        label302: {
            try {
                var16 = true;
                从linkBlockingQueue中获取数据,如果3秒获取不到,则返回null
                triggerParam = (TriggerParam)this.triggerQueue.poll(3L, TimeUnit.SECONDS);
                if (triggerParam != null) {
                    this.running = true;
                    // 将运行次数清空,保证运行90秒空闲之后会被移除
                    this.idleTimes = 0;
                    // 去掉这条数据
                    this.triggerLogIdSet.remove(triggerParam.getLogId());
                    // 记录日志
                    String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTim()), triggerParam.getLogId());
                    XxlJobFileAppender.contextHolder.set(logFileName);
                    
                // 写入分片信息, 将当前机器的分片标记和分片总数写入到ShardingUtil中,到时候,可以在handler中通过这个工具类获取
                    ShardingUtil.setShardingVo(new ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal()));
                    XxlJobLogger.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + triggerParam.getExecutorParams(), new Object[0]);
                    // 超时时间>0
                    if (triggerParam.getExecutorTimeout() > 0) {
                        Thread futureThread = null;

                        try {
                            //开启新线程执行
                            FutureTask<ReturnT<String>> futureTask = new FutureTask(new Callable<ReturnT<String>>() {
                                public ReturnT<String> call() throws Exception {
                                    return JobThread.this.handler.execute(triggerParam.getExecutorParams());
                                }
                            });
                            futureThread = new Thread(futureTask);
                            futureThread.start();
                            executeResult = (ReturnT)futureTask.get((long)triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
                        } catch (TimeoutException var24) {
                            XxlJobLogger.log("<br>----------- xxl-job job execute timeout", new Object[0]);
                            XxlJobLogger.log(var24);
                            executeResult = new ReturnT(IJobHandler.FAIL_TIMEOUT.getCode(), "job execute timeout ");
                        } finally {
                            futureThread.interrupt();
                        }
                    } else {
                        // 否则直接执行
                        executeResult = this.handler.execute(triggerParam.getExecutorParams());
                    }

                    if (executeResult == null) {
                        executeResult = IJobHandler.FAIL;
                    }

                    XxlJobLogger.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + executeResult, new Object[0]);
                    var16 = false;
                } else if (this.idleTimes > 30) {
                    // 每3秒获取一次数据,获取30次都没有获取到数据之后,则现场被清除
                    XxlJobExecutor.removeJobThread(this.jobId, "excutor idel times over limit.");
                    var16 = false;
                } else {
                    var16 = false;
                }
                break label302;
            } catch (Throwable var27) {
                if (this.toStop) {
                    XxlJobLogger.log("<br>----------- JobThread toStop, stopReason:" + this.stopReason, new Object[0]);
                }

                StringWriter stringWriter = new StringWriter();
                var27.printStackTrace(new PrintWriter(stringWriter));
                String errorMsg = stringWriter.toString();
                executeResult = new ReturnT(500, errorMsg);
                XxlJobLogger.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------", new Object[0]);
                var16 = false;
            } finally {
                if (var16) {
                    if (triggerParam != null) {
                        if (!this.toStop) {
                            TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTim(), executeResult));
                        } else {
                            ReturnT<String> stopResult = new ReturnT(500, this.stopReason + " [job running,killed]");
                            TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTim(), stopResult));
                        }
                    }

                }
            }

            if (triggerParam != null) {
                if (!this.toStop) {
                    TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTim(), executeResult));
                } else {
                    stopResult = new ReturnT(500, this.stopReason + " [job running,killed]");
                    TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTim(), stopResult));
                }
            }
            continue;
        }

        if (triggerParam != null) {
            if (!this.toStop) {
                TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTim(), executeResult));
            } else {
                stopResult = new ReturnT(500, this.stopReason + " [job running,killed]");
                TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTim(), stopResult));
            }
        }
    }
    // 当现场被终止之后,队列里面剩余的未执行的任务,将被终止的这些任务放入队列,供日志监控线程来处理,回调给调度中心
    while(this.triggerQueue != null && this.triggerQueue.size() > 0) {
        triggerParam = (TriggerParam)this.triggerQueue.poll();
        if (triggerParam != null) {
            executeResult = new ReturnT(500, this.stopReason + " [job not executed, in the job queue, killed.]");
            TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTim(), executeResult));
        }
    }

    try {
        this.handler.destroy();
    } catch (Throwable var23) {
        logger.error(var23.getMessage(), var23);
    }

    logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
}

至此全部结束 。。。。。
后续更新任务调度回调源码解析

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 225,208评论 6 524
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 96,502评论 3 405
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 172,496评论 0 370
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 61,176评论 1 302
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 70,185评论 6 401
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 53,630评论 1 316
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 41,992评论 3 431
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 40,973评论 0 280
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 47,510评论 1 325
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 39,546评论 3 347
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 41,659评论 1 355
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 37,250评论 5 351
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 42,990评论 3 340
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 33,421评论 0 25
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 34,569评论 1 277
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 50,238评论 3 382
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 46,732评论 2 366

推荐阅读更多精彩内容

  • Swift1> Swift和OC的区别1.1> Swift没有地址/指针的概念1.2> 泛型1.3> 类型严谨 对...
    cosWriter阅读 11,120评论 1 32
  • v2.0.2 Release Notes 1、底层通讯方案优化:升级较新版本xxl-rpc,由"JETTY"方案调...
    许雪里阅读 1,311评论 3 4
  •   一个任务通常就是一个程序,每个运行中的程序就是一个进程。当一个程序运行时,内部可能包含了多个顺序执行流,每个顺...
    OmaiMoon阅读 1,686评论 0 12
  • ORA-00001: 违反唯一约束条件 (.) 错误说明:当在唯一索引所对应的列上键入重复值时,会触发此异常。 O...
    我想起个好名字阅读 5,356评论 0 9
  • 一位得了帕金森症和抑郁症的老大爷说:“害死我得了”,他的老伴说:“能害死就好了”。
    There一Is一A一Door阅读 295评论 0 1