Oozie-CoordinatorXCommand

CoordinatorXCommand是对Coordinator 组件 操作命令的封装类,对coord的命令我们整理为:


diagram3.png

CoordJobXCommand

/* (non-Javadoc)
 * @see org.apache.oozie.command.XCommand#execute()
 */
@Override
protected CoordinatorJobBean execute() throws CommandException {
    try {
        JPAService jpaService = Services.get().get(JPAService.class);
        CoordinatorJobBean coordJob = null;
        if (jpaService != null) {
            coordJob = jpaService.execute(new CoordJobGetJPAExecutor(id));
            if (getActionInfo) {
                int numAction = jpaService.execute(new CoordActionsCountForJobIdJPAExecutor(id));
                List<CoordinatorActionBean> coordActions = null;
                if (len == 0) {
                    coordActions = new ArrayList<CoordinatorActionBean>();
                }
                else {
                    coordActions = jpaService.execute(new CoordJobGetActionsSubsetJPAExecutor(id, filterMap, offset,
                            len, desc));
                }
                coordJob.setActions(coordActions);
                coordJob.setNumActions(numAction);
            }
        }
        else {
            LOG.error(ErrorCode.E0610);
        }
        return coordJob;
    }
    catch (XException ex) {
        throw new CommandException(ex);
    }
}

获取 coordinator任务信息;

CoordJobsXCommand

/* (non-Javadoc)
 * @see org.apache.oozie.command.XCommand#execute() */
@Override
protected CoordinatorJobInfo execute() throws CommandException {
    try {
        JPAService jpaService = Services.get().get(JPAService.class);
        CoordinatorJobInfo coordInfo = null;
        if (jpaService != null) {
            coordInfo = jpaService.execute(new CoordJobInfoGetJPAExecutor(filter, start, len));
        }
        else {
            LOG.error(ErrorCode.E0610);
        }
        return coordInfo;
    }
    catch (XException ex) {
        throw new CommandException(ex);
    }
}

获取 coordinator 任务列表 信息;

CoordChangeXCommand

@Override
protected Void execute() throws CommandException {
    LOG.info("STARTED CoordChangeXCommand for jobId=" + jobId);
    try {
        if (newEndTime != null) {
            // during coord materialization, nextMaterializedTime is set to
            // startTime + n(actions materialized) * frequency and this can be AFTER endTime,
            // while doneMaterialization is true. Hence the following checks
            // for newEndTime being in the middle of endTime and nextMatdTime.
            // Since job is already done materialization so no need to change            
boolean dontChange = coordJob.getEndTime().before(newEndTime)
                    && coordJob.getNextMaterializedTime() != null
                    && coordJob.getNextMaterializedTime().after(newEndTime);
            if (!dontChange) {
                coordJob.setEndTime(newEndTime);
                // OOZIE-1703, we should SUCCEEDED the coord, if it's in PREP and new endtime is before start time
                if (coordJob.getStartTime().compareTo(newEndTime) >= 0) {
                    if (coordJob.getStatus() != CoordinatorJob.Status.PREP) {
                        processLookaheadActions(coordJob, newEndTime);
                    }
                    if (coordJob.getStatus() == CoordinatorJob.Status.PREP
                            || coordJob.getStatus() == CoordinatorJob.Status.RUNNING) {
                        LOG.info("Changing coord status to SUCCEEDED, because it's in " + coordJob.getStatus()
                                + " and new end time is before start time. Startime is " + coordJob.getStartTime()
                                + " and new end time is " + newEndTime);
                        coordJob.setStatus(CoordinatorJob.Status.SUCCEEDED);                        coordJob.resetPending();
                    }
                    coordJob.setDoneMaterialization();
                }
                else {
                    // move it to running iff new end time is after starttime.
                    if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED) {
                        coordJob.setStatus(CoordinatorJob.Status.RUNNING);                    }
                    if (coordJob.getStatus() == CoordinatorJob.Status.DONEWITHERROR
                            || coordJob.getStatus() == CoordinatorJob.Status.FAILED) {
                        // Check for backward compatibility for Oozie versions (3.2 and before)
                        // when RUNNINGWITHERROR, SUSPENDEDWITHERROR and
                        // PAUSEDWITHERROR is not supported
                        coordJob.setStatus(StatusUtils
                                .getStatusIfBackwardSupportTrue(CoordinatorJob.Status.RUNNINGWITHERROR));
                    }
                    coordJob.setPending();
                    coordJob.resetDoneMaterialization();                    processLookaheadActions(coordJob, newEndTime);
                }
            }
            else {
                LOG.info("Didn't change endtime. Endtime is in between coord end time and next materialization time."
                        + "Coord endTime = " + DateUtils.formatDateOozieTZ(newEndTime)
                        + " next materialization time ="
                        + DateUtils.formatDateOozieTZ(coordJob.getNextMaterializedTime()));
            }
        }
        if (newConcurrency != null) {            this.coordJob.setConcurrency(newConcurrency);
        }
        if (newPauseTime != null || resetPauseTime == true) {
            this.coordJob.setPauseTime(newPauseTime);
            if (oldPauseTime != null && newPauseTime != null) {
                if (oldPauseTime.before(newPauseTime)) {
                    if (this.coordJob.getStatus() == Job.Status.PAUSED) {
                        this.coordJob.setStatus(Job.Status.RUNNING);                    }
                    else if (this.coordJob.getStatus() == Job.Status.PAUSEDWITHERROR) {
                        this.coordJob.setStatus(Job.Status.RUNNINGWITHERROR);                    }
                }
            }
            else if (oldPauseTime != null && newPauseTime == null) {
                if (this.coordJob.getStatus() == Job.Status.PAUSED) {
                    this.coordJob.setStatus(Job.Status.RUNNING);
                }
                else if (this.coordJob.getStatus() == Job.Status.PAUSEDWITHERROR) {
                    this.coordJob.setStatus(Job.Status.RUNNINGWITHERROR);                }
            }
            if (!resetPauseTime) {
                processLookaheadActions(coordJob, newPauseTime);
            }
        }
        if (jobStatus != null) {
            coordJob.setStatus(jobStatus);
            LOG.info("Coord status is changed to " + jobStatus + " from " + prevStatus);
            if (jobStatus.equals(CoordinatorJob.Status.RUNNING)) {
                coordJob.setPending();
                if (coordJob.getNextMaterializedTime() != null
                        && coordJob.getEndTime().after(coordJob.getNextMaterializedTime())) {
                    coordJob.resetDoneMaterialization();
                }
            } else if (jobStatus.equals(CoordinatorJob.Status.IGNORED)) {
                coordJob.resetPending();
                coordJob.setDoneMaterialization();
            }
        }
        if (coordJob.getNextMaterializedTime() != null && coordJob.getEndTime().compareTo(coordJob.getNextMaterializedTime()) <= 0) {
            LOG.info("[" + coordJob.getId() + "]: all actions have been materialized, job status = " + coordJob.getStatus()
                    + ", set pending to true");
           // set doneMaterialization to true when materialization is done            coordJob.setDoneMaterialization();
        }
        coordJob.setLastModifiedTime(new Date());
        updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_CHANGE, coordJob));
        BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, deleteList);
        return null;
    }
    catch (XException ex) {
        throw new CommandException(ex);
    }
    finally {
        LOG.info("ENDED CoordChangeXCommand for jobId=" + jobId);
        // update bundle action
        if (coordJob.getBundleId() != null) {
            //ignore pending as it'sync command
            BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus, true);
            bundleStatusUpdate.call();
        }
    }
}

更新 coordinator 任务命令;

CoordPushDependencyCheckXCommand

@Override
protected Void execute() throws CommandException {
    String pushMissingDeps = coordAction.getPushMissingDependencies();
    if (pushMissingDeps == null || pushMissingDeps.length() == 0) {
        LOG.info("Nothing to check. Empty push missing dependency");
    }
    else {
        String[] missingDepsArray = DependencyChecker.dependenciesAsArray(pushMissingDeps);
        LOG.info("First Push missing dependency is [{0}] ", missingDepsArray[0]);
        LOG.trace("Push missing dependencies are [{0}] ", pushMissingDeps);
        if (registerForNotification) {
            LOG.debug("Register for notifications is true");
        }
        try {
            Configuration actionConf = null;
            try {
                actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
            }
            catch (IOException e) {
                throw new CommandException(ErrorCode.E1307, e.getMessage(), e);
            }
            // Check all dependencies during materialization to avoid registering in the cache.
            // But check only first missing one afterwards similar to
            // CoordActionInputCheckXCommand for efficiency. listPartitions is costly.
            ActionDependency actionDep = DependencyChecker.checkForAvailability(missingDepsArray, actionConf,
                    !registerForNotification);
            boolean isChangeInDependency = true;
            boolean timeout = false;
            if (actionDep.getMissingDependencies().size() == 0) {
                // All push-based dependencies are available
                onAllPushDependenciesAvailable();
            }
            else {
                if (actionDep.getMissingDependencies().size() == missingDepsArray.length) {
                    isChangeInDependency = false;
                }
                else {
                    String stillMissingDeps = DependencyChecker.dependenciesAsString(actionDep
                            .getMissingDependencies());
                    coordAction.setPushMissingDependencies(stillMissingDeps);                
}
                // Checking for timeout
                timeout = isTimeout();
                if (timeout) {
                    queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
                }
                else {
                    queue(new CoordPushDependencyCheckXCommand(coordAction.getId()),
                            getCoordPushCheckRequeueInterval());
                }
            }
            updateCoordAction(coordAction, isChangeInDependency);
            if (registerForNotification) {
                registerForNotification(actionDep.getMissingDependencies(), actionConf);
            }
            if (removeAvailDependencies) {
                unregisterAvailableDependencies(actionDep.getAvailableDependencies());            }
            if (timeout) {
                unregisterMissingDependencies(actionDep.getMissingDependencies(), actionId);
            }
        }
        catch (Exception e) {
            final CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class);
            if (isTimeout()) {
                LOG.debug("Queueing timeout command");
                // XCommand.queue() will not work when there is a Exception
                callableQueueService.queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
                unregisterMissingDependencies(Arrays.asList(missingDepsArray), actionId);
            }
            else if (coordAction.getMissingDependencies() != null
                    && coordAction.getMissingDependencies().length() > 0) {
                // Queue again on exception as RecoveryService will not queue this again with
                // the action being updated regularly by CoordActionInputCheckXCommand
                callableQueueService.queue(new CoordPushDependencyCheckXCommand(coordAction.getId(),
                        registerForNotification, removeAvailDependencies),
                        Services.get().getConf().getInt(RecoveryService.CONF_COORD_OLDER_THAN, 600) * 1000);
            }
            throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
        }
    }
    return null;
}

周期性检查coordinator action缺省的依赖,【主动检查】,订阅或者取消订阅分区数据【被动通知】

CoordActionStartXCommand

protected Void execute() throws CommandException {
    boolean makeFail = true;
    String errCode = "";
    String errMsg = "";
    ParamChecker.notEmpty(user, "user");
    log.debug("actionid=" + actionId + ", status=" + coordAction.getStatus());
    if (coordAction.getStatus() == CoordinatorAction.Status.SUBMITTED) {
        // log.debug("getting.. job id: " + coordAction.getJobId());
        // create merged runConf to pass to WF Engine
        Configuration runConf = mergeConfig(coordAction);
        coordAction.setRunConf(XmlUtils.prettyPrint(runConf).toString());
        // log.debug("%%% merged runconf=" +
        // XmlUtils.prettyPrint(runConf).toString());
        DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user);
        try {
            Configuration conf = new XConfiguration(new StringReader(coordAction.getRunConf()));
            SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(), Status.STARTED,
                    SlaAppType.COORDINATOR_ACTION, log);
            if(slaEvent != null) {
                insertList.add(slaEvent);
            }
            if (OozieJobInfo.isJobInfoEnabled()) {
                conf.set(OozieJobInfo.COORD_NAME, appName);
                conf.set(OozieJobInfo.COORD_NOMINAL_TIME, coordAction.getNominalTimestamp().toString());
            }
            // Normalize workflow appPath here;
            JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), conf.get(OozieClient.GROUP_NAME), conf);
            if (coordAction.getExternalId() != null) {
                conf.setBoolean(OozieClient.RERUN_FAIL_NODES, true);
                dagEngine.reRun(coordAction.getExternalId(), conf);
            } else {
                String wfId = dagEngine.submitJobFromCoordinator(conf, actionId);
                coordAction.setExternalId(wfId);
            }
            coordAction.setStatus(CoordinatorAction.Status.RUNNING);
            coordAction.incrementAndGetPending();
            //store.updateCoordinatorAction(coordAction);
            JPAService jpaService = Services.get().get(JPAService.class);
            if (jpaService != null) {
                log.debug("Updating WF record for WFID :" + coordAction.getExternalId() + " with parent id: " + actionId);
                WorkflowJobBean wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_STARTTIME, coordAction.getExternalId());
                wfJob.setParentId(actionId);
                wfJob.setLastModifiedTime(new Date());
                BatchQueryExecutor executor = BatchQueryExecutor.getInstance();
                updateList.add(new UpdateEntry<WorkflowJobQuery>(                        WorkflowJobQuery.UPDATE_WORKFLOW_PARENT_MODIFIED, wfJob));                updateList.add(new UpdateEntry<CoordActionQuery>(
                        CoordActionQuery.UPDATE_COORD_ACTION_FOR_START, coordAction));
                try {
                    executor.executeBatchInsertUpdateDelete(insertList, updateList, null);
                    queue(new CoordActionNotificationXCommand(coordAction), 100);
                    if (EventHandlerService.isEnabled()) {
                        generateEvent(coordAction, user, appName, wfJob.getStartTime());
                    }
                }
                catch (JPAExecutorException je) {
                    throw new CommandException(je);
                }
            }
            else {
                log.error(ErrorCode.E0610);
            }
            makeFail = false;
        }
        catch (DagEngineException dee) {
            errMsg = dee.getMessage();
            errCode = dee.getErrorCode().toString();
            log.warn("can not create DagEngine for submitting jobs", dee);        }
        catch (CommandException ce) {
            errMsg = ce.getMessage();
            errCode = ce.getErrorCode().toString();
            log.warn("command exception occured ", ce);
        }
        catch (java.io.IOException ioe) {
            errMsg = ioe.getMessage();
            errCode = "E1005";
            log.warn("Configuration parse error. read from DB :" + coordAction.getRunConf(), ioe);
        }
        catch (Exception ex) {
            errMsg = ex.getMessage();
            errCode = "E1005";
            log.warn("can not create DagEngine for submitting jobs", ex);        }
        finally {
            if (makeFail == true) { // No DB exception occurs
                log.error("Failing the action " + coordAction.getId() + ". Because " + errCode + " : " + errMsg);
                coordAction.setStatus(CoordinatorAction.Status.FAILED);
                if (errMsg.length() > 254) { // Because table column size is 255
                    errMsg = errMsg.substring(0, 255);
                }
                coordAction.setErrorMessage(errMsg);
                coordAction.setErrorCode(errCode);
                updateList = new ArrayList<UpdateEntry>();
                updateList.add(new UpdateEntry<CoordActionQuery>(
                                CoordActionQuery.UPDATE_COORD_ACTION_FOR_START, coordAction));
                insertList = new ArrayList<JsonBean>();
                SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(), Status.FAILED,
                        SlaAppType.COORDINATOR_ACTION, log);
                if(slaEvent != null) {

                    insertList.add(slaEvent); //Update SLA events
                }
                try {
                    // call JPAExecutor to do the bulk writes
                    BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
                    if (EventHandlerService.isEnabled()) {
                        generateEvent(coordAction, user, appName, null);
                    }
                } 
               catch (JPAExecutorException je) {
                    throw new CommandException(je);
                }
                queue(new CoordActionReadyXCommand(coordAction.getJobId()));
            }
        }
    }
    return null;
}

coordinator action开始命令,实际是执行启动了action关联的wf。

CoordActionCheckXCommand


protected Void execute() throws CommandException {
    try {
        InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
        Status slaStatus = null;
        CoordinatorAction.Status initialStatus = coordAction.getStatus();
        if (workflowJob.getStatus() == WorkflowJob.Status.SUCCEEDED) {
            coordAction.setStatus(CoordinatorAction.Status.SUCCEEDED);
            // set pending to false as the status is SUCCEEDED
            coordAction.setPending(0);
            slaStatus = Status.SUCCEEDED;
        }
        else {
            if (workflowJob.getStatus() == WorkflowJob.Status.FAILED) {
                coordAction.setStatus(CoordinatorAction.Status.FAILED);
                slaStatus = Status.FAILED;
                // set pending to false as the status is FAILED
                coordAction.setPending(0);
            }
            else {
                if (workflowJob.getStatus() == WorkflowJob.Status.KILLED) {
                    coordAction.setStatus(CoordinatorAction.Status.KILLED);
                    slaStatus = Status.KILLED;
                    // set pending to false as the status is KILLED
                    coordAction.setPending(0);
                }
                else {
                    LOG.warn("Unexpected workflow " + workflowJob.getId() + " STATUS " + workflowJob.getStatus());
                    coordAction.setLastModifiedTime(new Date());
                    CoordActionQueryExecutor.getInstance().executeUpdate(
                            CoordActionQueryExecutor.CoordActionQuery.UPDATE_COORD_ACTION_FOR_MODIFIED_DATE,
                            coordAction);
                    return null;
                }
            }
        }
        LOG.debug("Updating Coordinator actionId :" + coordAction.getId() + "status to ="
                        + coordAction.getStatus());
        coordAction.setLastModifiedTime(new Date());
        updateList.add(new UpdateEntry<CoordActionQuery>(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME,
                coordAction));
        if (slaStatus != null) {
            SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(), slaStatus,
                    SlaAppType.COORDINATOR_ACTION, LOG);
            if(slaEvent != null) {
                insertList.add(slaEvent);
            }
        }
        BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
        CoordinatorAction.Status endStatus = coordAction.getStatus();
        if (endStatus != initialStatus && EventHandlerService.isEnabled()) {
            generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), workflowJob.getStartTime());
        }
    }
    catch (XException ex) {
        LOG.warn("CoordActionCheckCommand Failed ", ex);
        throw new CommandException(ex);
    }
    return null;
}

主动检查 coordinator action 关联wf的运行状态,来设置coordinator action的状态;[主动检查的方式,还有一种是wf完成后的通知]

CoordActionInfoXCommand

@Override
protected CoordinatorActionBean execute() throws CommandException {
    JPAService jpaService = Services.get().get(JPAService.class);
    if (jpaService != null) {
        CoordinatorActionBean action;
        try {
            action = jpaService.execute(new CoordActionGetForInfoJPAExecutor(this.id));
        }
        catch (JPAExecutorException e) {
           throw new CommandException(e);
        }
        return action;
    }
    else {
        LOG.error(ErrorCode.E0610);
        return null;
    }
}

查看 coordinator action 信息的命令;

CoordActionReadyXCommand

/** * Check for READY actions and change state to SUBMITTED by a command to submit the job to WF engine. 
* This method checks all the actions associated with a jobId to figure out which actions
* to start (based on concurrency and execution order [FIFO, LIFO, LAST_ONLY, NONE])
 * */
protected Void execute() throws CommandException {
    // number of actions to start (-1 means start ALL)
    int numActionsToStart = -1;
    // get execution setting for this job (FIFO, LIFO, LAST_ONLY)
    String jobExecution = coordJob.getExecution();
    // get concurrency setting for this job
    int jobConcurrency = coordJob.getConcurrency();
    // if less than 0, then UNLIMITED concurrency
    if (jobConcurrency >= 0) {
        // count number of actions that are already RUNNING or SUBMITTED
        // subtract from CONCURRENCY to calculate number of actions to start
        // in WF engine
        int numRunningJobs;
        try {
            numRunningJobs = jpaService.execute(new CoordJobGetRunningActionsCountJPAExecutor(jobId));
        }
        catch (JPAExecutorException je) {
            throw new CommandException(je);
        }
        numActionsToStart = jobConcurrency - numRunningJobs;
        if (numActionsToStart < 0) {
            numActionsToStart = 0;
        }
        log.debug("concurrency=" + jobConcurrency + ", execution=" + jobExecution + ", numRunningJobs="
                + numRunningJobs + ", numLeftover=" + numActionsToStart);
        // no actions to start
        if (numActionsToStart == 0) {
            log.warn("No actions to start for jobId=" + jobId + " as max concurrency reached!");
            return null;
        }
    }
    // get list of actions that are READY and fit in the concurrency and execution
List<CoordinatorActionBean> actions;
    try {
        actions = jpaService.execute(new CoordJobGetReadyActionsJPAExecutor(jobId, numActionsToStart, jobExecution));
    }
    catch (JPAExecutorException je) {
        throw new CommandException(je);
    }
    log.debug("Number of READY actions = " + actions.size());
    // make sure auth token is not null
    // log.denug("user=" + user + ", token=" + authToken);
    int counter = 0;
    for (CoordinatorActionBean action : actions) {
        // continue if numActionsToStart is negative (no limit on number of
        // actions), or if the counter is less than numActionsToStart
        if ((numActionsToStart < 0) || (counter < numActionsToStart)) {
            log.debug("Set status to SUBMITTED for id: " + action.getId());
            // change state of action to SUBMITTED
            action.setStatus(CoordinatorAction.Status.SUBMITTED);
            try {
                CoordActionQueryExecutor.getInstance().executeUpdate(
                        CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, action);
            }
            catch (JPAExecutorException je) {
                throw new CommandException(je);
            }
            // start action
            new CoordActionStartXCommand(action.getId(), coordJob.getUser(), coordJob.getAppName(),
                    action.getJobId()).call(getEntityKey());
        }
        else {
            break;
        }
        counter++;
    }
    return null;
}

根据并发量,和出队列的方式,来设置 coordinator action 状态从ready到提交状态。

CoordActionSkipXCommand

protected Void execute() throws CommandException {
    if (actionBean.getStatus() == CoordinatorAction.Status.WAITING
            || actionBean.getStatus() == CoordinatorAction.Status.READY) {
        LOG.info("Setting action [{0}] status to SKIPPED", actionBean.getId());
        actionBean.setStatus(CoordinatorAction.Status.SKIPPED);
        try {
            queue(new CoordActionNotificationXCommand(actionBean), 100);
            actionBean.setLastModifiedTime(new Date());
            CoordActionQueryExecutor.getInstance().executeUpdate(
                    CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, actionBean);
            if (EventHandlerService.isEnabled()) {
                generateEvent(actionBean, user, appName, null);
            }
        }
        catch (JPAExecutorException e) {
            throw new CommandException(e);
        }
    }
    return null;
}

设置Coordinator action 状态为 SKIPPED;

CoordActionTimeOutXCommand

protected Void execute() throws CommandException {
    if (actionBean.getStatus() == CoordinatorAction.Status.WAITING) {
        actionBean.setStatus(CoordinatorAction.Status.TIMEDOUT);
        try {
            queue(new CoordActionNotificationXCommand(actionBean), 100);
            actionBean.setLastModifiedTime(new Date());
            CoordActionQueryExecutor.getInstance().executeUpdate(
                    CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, actionBean);
            if (EventHandlerService.isEnabled()) {
                generateEvent(actionBean, user, appName, null);
            }
        }
        catch (JPAExecutorException e) {
            throw new CommandException(e);
        }
    }
    return null;
}

设置Coordinator action 状态为 超时状态;

CoordActionUpdateXCommand

protected Void execute() throws CommandException {
    try {
        LOG.debug("STARTED CoordActionUpdateXCommand for wfId=" + workflow.getId());
        Status slaStatus = null;
        if (workflow.getStatus() == WorkflowJob.Status.SUCCEEDED) {
            coordAction.setStatus(CoordinatorAction.Status.SUCCEEDED);
            coordAction.setPending(0);
            slaStatus = Status.SUCCEEDED;
        }
        else if (workflow.getStatus() == WorkflowJob.Status.FAILED) {
            coordAction.setStatus(CoordinatorAction.Status.FAILED);
            coordAction.setPending(0);
            slaStatus = Status.FAILED;
        }
        else if (workflow.getStatus() == WorkflowJob.Status.KILLED) {
            coordAction.setStatus(CoordinatorAction.Status.KILLED);
            coordAction.setPending(0);
            slaStatus = Status.KILLED;
        }
        else if (workflow.getStatus() == WorkflowJob.Status.SUSPENDED) {
            coordAction.setStatus(CoordinatorAction.Status.SUSPENDED);
            coordAction.decrementAndGetPending();
        }
        else if (workflow.getStatus() == WorkflowJob.Status.RUNNING ||
                workflow.getStatus() == WorkflowJob.Status.PREP) {
            // resume workflow job and update coord action accordingly
            coordAction.setStatus(CoordinatorAction.Status.RUNNING);
            coordAction.decrementAndGetPending();
        }
        else {
            LOG.warn("Unexpected workflow " + workflow.getId() + " STATUS " + workflow.getStatus());
            // update lastModifiedTime
            coordAction.setLastModifiedTime(new Date());
            CoordActionQueryExecutor.getInstance().executeUpdate(
                    CoordActionQueryExecutor.CoordActionQuery.UPDATE_COORD_ACTION_FOR_MODIFIED_DATE, coordAction);
            // TODO - Uncomment this when bottom up rerun can change terminal state
            /* CoordinatorJobBean coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordAction.getJobId()));
            if (!coordJob.isPending()) {
                coordJob.setPending();
                jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
            }*/
            return null;
        }
        LOG.info("Updating Coordintaor action id :" + coordAction.getId() + " status "
                + " to " + coordAction.getStatus() + ", pending = " + coordAction.getPending());
        coordAction.setLastModifiedTime(new Date());
        updateList.add(new UpdateEntry<CoordActionQuery>(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME,
                coordAction));
        // TODO - Uncomment this when bottom up rerun can change terminal state
        /*CoordinatorJobBean coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordAction.getJobId()));
        if (!coordJob.isPending()) {
            coordJob.setPending();
            jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
            LOG.info("Updating Coordinator job "+ coordJob.getId() + "pending to true");
        }*/
        if (slaStatus != null) {
            SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(), slaStatus,
                    SlaAppType.COORDINATOR_ACTION, LOG);
            if(slaEvent != null) {
                insertList.add(slaEvent);
            }
        }
        if (workflow.getStatus() != WorkflowJob.Status.SUSPENDED
                && workflow.getStatus() != WorkflowJob.Status.RUNNING) {
            queue(new CoordActionReadyXCommand(coordAction.getJobId()));
        }
        BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
        if (EventHandlerService.isEnabled()) {
            generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), workflow.getStartTime());
        }
        LOG.debug("ENDED CoordActionUpdateXCommand for wfId=" + workflow.getId());
    }
    catch (XException ex) {
        LOG.warn("CoordActionUpdate Failed ", ex.getMessage());        throw new CommandException(ex);
    }
    return null;
}

更新Coordinator action 状态

CoordActionInputCheckXCommand


protected Void execute() throws CommandException {
    LOG.debug("[" + actionId + "]::ActionInputCheck:: Action is in WAITING state.");
    // this action should only get processed if current time > nominal time;
    // otherwise, requeue this action for delay execution;
    Date nominalTime = coordAction.getNominalTime();
    Date currentTime = new Date();
    if (nominalTime.compareTo(currentTime) > 0) {
        queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), Math.max((nominalTime.getTime() - currentTime
                .getTime()), getCoordInputCheckRequeueInterval()));
        updateCoordAction(coordAction, false);
        LOG.info("[" + actionId
                + "]::ActionInputCheck:: nominal Time is newer than current time, so requeue and wait. Current="
                + DateUtils.formatDateOozieTZ(currentTime) + ", nominal=" + DateUtils.formatDateOozieTZ(nominalTime));
        return null;
    }
    StringBuilder actionXml = new StringBuilder(coordAction.getActionXml());
    boolean isChangeInDependency = false;
    try {
        Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
        Date now = new Date();
        if (coordJob.getExecutionOrder().equals(CoordinatorJobBean.Execution.LAST_ONLY)) {
            Date nextNominalTime = computeNextNominalTime();
            if (nextNominalTime != null) {
                // If the current time is after the next action's nominal time, then we've passed the window where this action
                // should be started; so set it to SKIPPED
                if (now.after(nextNominalTime)) {
                    LOG.info("LAST_ONLY execution: Preparing to skip action [{0}] because the current time [{1}] is later than "
                            + "the nominal time [{2}] of the next action]", coordAction.getId(),
                            DateUtils.formatDateOozieTZ(now), DateUtils.formatDateOozieTZ(nextNominalTime));
                    queue(new CoordActionSkipXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
                    return null;
                } else {
                    LOG.debug("LAST_ONLY execution: Not skipping action [{0}] because the current time [{1}] is earlier than "
                            + "the nominal time [{2}] of the next action]", coordAction.getId(),
                            DateUtils.formatDateOozieTZ(now), DateUtils.formatDateOozieTZ(nextNominalTime));
                }
            }
        }
        else if (coordJob.getExecutionOrder().equals(CoordinatorJobBean.Execution.NONE)) {
            // If the current time is after the nominal time of this action plus some tolerance,
            // then we've passed the window where this action
            // should be started; so set it to SKIPPED
            Calendar cal = Calendar.getInstance(DateUtils.getTimeZone(coordJob.getTimeZone()));
            cal.setTime(nominalTime);
            cal.add(Calendar.MINUTE, ConfigurationService.getInt(COORD_EXECUTION_NONE_TOLERANCE));
            nominalTime = cal.getTime();
            if (now.after(nominalTime)) {
                LOG.info("NONE execution: Preparing to skip action [{0}] because the current time [{1}] is later than "
                        + "the nominal time [{2}] of the current action]", coordAction.getId(),
                        DateUtils.formatDateOozieTZ(now), DateUtils.formatDateOozieTZ(nominalTime));
                queue(new CoordActionSkipXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
                return null;
            } else {
                LOG.debug("NONE execution: Not skipping action [{0}] because the current time [{1}] is earlier than "
                        + "the nominal time [{2}] of the current action]", coordAction.getId(),
                        DateUtils.formatDateOozieTZ(now), DateUtils.formatDateOozieTZ(coordAction.getNominalTime()));
            }
        }
        StringBuilder existList = new StringBuilder();
        StringBuilder nonExistList = new StringBuilder();
        StringBuilder nonResolvedList = new StringBuilder();
        String firstMissingDependency = "";
        String missingDeps = coordAction.getMissingDependencies();
        CoordCommandUtils.getResolvedList(missingDeps, nonExistList, nonResolvedList);
        // For clarity regarding which is the missing dependency in synchronous order
        // instead of printing entire list, some of which, may be available
        if(nonExistList.length() > 0) {
            firstMissingDependency = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR)[0];
        }
        LOG.info("[" + actionId + "]::CoordActionInputCheck:: Missing deps:" + firstMissingDependency + " "
                + nonResolvedList.toString());
        // Updating the list of data dependencies that are available and those that are yet not
        boolean status = checkInput(actionXml, existList, nonExistList, actionConf);
        String pushDeps = coordAction.getPushMissingDependencies();
        // Resolve latest/future only when all current missingDependencies and
        // pushMissingDependencies are met
        if (status && nonResolvedList.length() > 0) {
            status = (pushDeps == null || pushDeps.length() == 0) ? checkUnResolvedInput(actionXml, actionConf)
                    : false;
        }
        coordAction.setLastModifiedTime(currentTime);
        coordAction.setActionXml(actionXml.toString());
        if (nonResolvedList.length() > 0 && status == false) {
            nonExistList.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR).append(nonResolvedList);
        }
        String nonExistListStr = nonExistList.toString();
        if (!nonExistListStr.equals(missingDeps) || missingDeps.isEmpty()) {
            // missingDeps null or empty means action should become READY
            isChangeInDependency = true;
            coordAction.setMissingDependencies(nonExistListStr);
        }
        if (status && (pushDeps == null || pushDeps.length() == 0)) {
            String newActionXml = resolveCoordConfiguration(actionXml, actionConf, actionId);
            actionXml.replace(0, actionXml.length(), newActionXml);
            coordAction.setActionXml(actionXml.toString());
            coordAction.setStatus(CoordinatorAction.Status.READY);
            updateCoordAction(coordAction, true);
            new CoordActionReadyXCommand(coordAction.getJobId()).call(getEntityKey());        }
        else if (!isTimeout(currentTime)) {
            if (status == false) {
                queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()),
                        getCoordInputCheckRequeueInterval());
            }
            updateCoordAction(coordAction, isChangeInDependency);
        }
        else {
            if (!nonExistListStr.isEmpty() && pushDeps == null || pushDeps.length() == 0) {
                queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
            }
            else {
                // Let CoordPushDependencyCheckXCommand queue the timeout
                queue(new CoordPushDependencyCheckXCommand(coordAction.getId()));
            }
            updateCoordAction(coordAction, isChangeInDependency);
        }
    }
    catch (AccessControlException e) {
        LOG.error("Permission error in ActionInputCheck", e);
        if (isTimeout(currentTime)) {
            LOG.debug("Queueing timeout command");
            Services.get().get(CallableQueueService.class)                    .queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
        }
        else {
            // Requeue InputCheckCommand for permission denied error with longer interval
            Services.get()
                    .get(CallableQueueService.class)
                    .queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()),
                            2 * getCoordInputCheckRequeueInterval());
        }
        updateCoordAction(coordAction, isChangeInDependency);
    }    catch (Exception e) {
        if (isTimeout(currentTime)) {
            LOG.debug("Queueing timeout command");
            // XCommand.queue() will not work when there is a Exception
            Services.get().get(CallableQueueService.class)
                    .queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
        }
        updateCoordAction(coordAction, isChangeInDependency);
        throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
    }
    return null;
}

检查action的输入依赖是否满足,已经时间条件是否满足。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,658评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,482评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,213评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,395评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,487评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,523评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,525评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,300评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,753评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,048评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,223评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,905评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,541评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,168评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,417评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,094评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,088评论 2 352

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,646评论 18 139
  • 国家电网公司企业标准(Q/GDW)- 面向对象的用电信息数据交换协议 - 报批稿:20170802 前言: 排版 ...
    庭说阅读 10,940评论 6 13
  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 171,980评论 25 707
  • linux资料总章2.1 1.0写的不好抱歉 但是2.0已经改了很多 但是错误还是无法避免 以后资料会慢慢更新 大...
    数据革命阅读 12,156评论 2 33
  • 整理文件夹的时候发现了这篇很久以前写的小说,更算是我自己的一个臆想,因为故事中的那个男孩没有喜欢上我,也没有和我再...
    夏禾火阅读 872评论 4 5