CoordinatorXCommand是对Coordinator 组件 操作命令的封装类,对coord的命令我们整理为:
CoordJobXCommand
/* (non-Javadoc)
* @see org.apache.oozie.command.XCommand#execute()
*/
@Override
protected CoordinatorJobBean execute() throws CommandException {
try {
JPAService jpaService = Services.get().get(JPAService.class);
CoordinatorJobBean coordJob = null;
if (jpaService != null) {
coordJob = jpaService.execute(new CoordJobGetJPAExecutor(id));
if (getActionInfo) {
int numAction = jpaService.execute(new CoordActionsCountForJobIdJPAExecutor(id));
List<CoordinatorActionBean> coordActions = null;
if (len == 0) {
coordActions = new ArrayList<CoordinatorActionBean>();
}
else {
coordActions = jpaService.execute(new CoordJobGetActionsSubsetJPAExecutor(id, filterMap, offset,
len, desc));
}
coordJob.setActions(coordActions);
coordJob.setNumActions(numAction);
}
}
else {
LOG.error(ErrorCode.E0610);
}
return coordJob;
}
catch (XException ex) {
throw new CommandException(ex);
}
}
获取 coordinator任务信息;
CoordJobsXCommand
/* (non-Javadoc)
* @see org.apache.oozie.command.XCommand#execute() */
@Override
protected CoordinatorJobInfo execute() throws CommandException {
try {
JPAService jpaService = Services.get().get(JPAService.class);
CoordinatorJobInfo coordInfo = null;
if (jpaService != null) {
coordInfo = jpaService.execute(new CoordJobInfoGetJPAExecutor(filter, start, len));
}
else {
LOG.error(ErrorCode.E0610);
}
return coordInfo;
}
catch (XException ex) {
throw new CommandException(ex);
}
}
获取 coordinator 任务列表 信息;
CoordChangeXCommand
@Override
protected Void execute() throws CommandException {
LOG.info("STARTED CoordChangeXCommand for jobId=" + jobId);
try {
if (newEndTime != null) {
// during coord materialization, nextMaterializedTime is set to
// startTime + n(actions materialized) * frequency and this can be AFTER endTime,
// while doneMaterialization is true. Hence the following checks
// for newEndTime being in the middle of endTime and nextMatdTime.
// Since job is already done materialization so no need to change
boolean dontChange = coordJob.getEndTime().before(newEndTime)
&& coordJob.getNextMaterializedTime() != null
&& coordJob.getNextMaterializedTime().after(newEndTime);
if (!dontChange) {
coordJob.setEndTime(newEndTime);
// OOZIE-1703, we should SUCCEEDED the coord, if it's in PREP and new endtime is before start time
if (coordJob.getStartTime().compareTo(newEndTime) >= 0) {
if (coordJob.getStatus() != CoordinatorJob.Status.PREP) {
processLookaheadActions(coordJob, newEndTime);
}
if (coordJob.getStatus() == CoordinatorJob.Status.PREP
|| coordJob.getStatus() == CoordinatorJob.Status.RUNNING) {
LOG.info("Changing coord status to SUCCEEDED, because it's in " + coordJob.getStatus()
+ " and new end time is before start time. Startime is " + coordJob.getStartTime()
+ " and new end time is " + newEndTime);
coordJob.setStatus(CoordinatorJob.Status.SUCCEEDED); coordJob.resetPending();
}
coordJob.setDoneMaterialization();
}
else {
// move it to running iff new end time is after starttime.
if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED) {
coordJob.setStatus(CoordinatorJob.Status.RUNNING); }
if (coordJob.getStatus() == CoordinatorJob.Status.DONEWITHERROR
|| coordJob.getStatus() == CoordinatorJob.Status.FAILED) {
// Check for backward compatibility for Oozie versions (3.2 and before)
// when RUNNINGWITHERROR, SUSPENDEDWITHERROR and
// PAUSEDWITHERROR is not supported
coordJob.setStatus(StatusUtils
.getStatusIfBackwardSupportTrue(CoordinatorJob.Status.RUNNINGWITHERROR));
}
coordJob.setPending();
coordJob.resetDoneMaterialization(); processLookaheadActions(coordJob, newEndTime);
}
}
else {
LOG.info("Didn't change endtime. Endtime is in between coord end time and next materialization time."
+ "Coord endTime = " + DateUtils.formatDateOozieTZ(newEndTime)
+ " next materialization time ="
+ DateUtils.formatDateOozieTZ(coordJob.getNextMaterializedTime()));
}
}
if (newConcurrency != null) { this.coordJob.setConcurrency(newConcurrency);
}
if (newPauseTime != null || resetPauseTime == true) {
this.coordJob.setPauseTime(newPauseTime);
if (oldPauseTime != null && newPauseTime != null) {
if (oldPauseTime.before(newPauseTime)) {
if (this.coordJob.getStatus() == Job.Status.PAUSED) {
this.coordJob.setStatus(Job.Status.RUNNING); }
else if (this.coordJob.getStatus() == Job.Status.PAUSEDWITHERROR) {
this.coordJob.setStatus(Job.Status.RUNNINGWITHERROR); }
}
}
else if (oldPauseTime != null && newPauseTime == null) {
if (this.coordJob.getStatus() == Job.Status.PAUSED) {
this.coordJob.setStatus(Job.Status.RUNNING);
}
else if (this.coordJob.getStatus() == Job.Status.PAUSEDWITHERROR) {
this.coordJob.setStatus(Job.Status.RUNNINGWITHERROR); }
}
if (!resetPauseTime) {
processLookaheadActions(coordJob, newPauseTime);
}
}
if (jobStatus != null) {
coordJob.setStatus(jobStatus);
LOG.info("Coord status is changed to " + jobStatus + " from " + prevStatus);
if (jobStatus.equals(CoordinatorJob.Status.RUNNING)) {
coordJob.setPending();
if (coordJob.getNextMaterializedTime() != null
&& coordJob.getEndTime().after(coordJob.getNextMaterializedTime())) {
coordJob.resetDoneMaterialization();
}
} else if (jobStatus.equals(CoordinatorJob.Status.IGNORED)) {
coordJob.resetPending();
coordJob.setDoneMaterialization();
}
}
if (coordJob.getNextMaterializedTime() != null && coordJob.getEndTime().compareTo(coordJob.getNextMaterializedTime()) <= 0) {
LOG.info("[" + coordJob.getId() + "]: all actions have been materialized, job status = " + coordJob.getStatus()
+ ", set pending to true");
// set doneMaterialization to true when materialization is done coordJob.setDoneMaterialization();
}
coordJob.setLastModifiedTime(new Date());
updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_CHANGE, coordJob));
BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, deleteList);
return null;
}
catch (XException ex) {
throw new CommandException(ex);
}
finally {
LOG.info("ENDED CoordChangeXCommand for jobId=" + jobId);
// update bundle action
if (coordJob.getBundleId() != null) {
//ignore pending as it'sync command
BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus, true);
bundleStatusUpdate.call();
}
}
}
更新 coordinator 任务命令;
CoordPushDependencyCheckXCommand
@Override
protected Void execute() throws CommandException {
String pushMissingDeps = coordAction.getPushMissingDependencies();
if (pushMissingDeps == null || pushMissingDeps.length() == 0) {
LOG.info("Nothing to check. Empty push missing dependency");
}
else {
String[] missingDepsArray = DependencyChecker.dependenciesAsArray(pushMissingDeps);
LOG.info("First Push missing dependency is [{0}] ", missingDepsArray[0]);
LOG.trace("Push missing dependencies are [{0}] ", pushMissingDeps);
if (registerForNotification) {
LOG.debug("Register for notifications is true");
}
try {
Configuration actionConf = null;
try {
actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
}
catch (IOException e) {
throw new CommandException(ErrorCode.E1307, e.getMessage(), e);
}
// Check all dependencies during materialization to avoid registering in the cache.
// But check only first missing one afterwards similar to
// CoordActionInputCheckXCommand for efficiency. listPartitions is costly.
ActionDependency actionDep = DependencyChecker.checkForAvailability(missingDepsArray, actionConf,
!registerForNotification);
boolean isChangeInDependency = true;
boolean timeout = false;
if (actionDep.getMissingDependencies().size() == 0) {
// All push-based dependencies are available
onAllPushDependenciesAvailable();
}
else {
if (actionDep.getMissingDependencies().size() == missingDepsArray.length) {
isChangeInDependency = false;
}
else {
String stillMissingDeps = DependencyChecker.dependenciesAsString(actionDep
.getMissingDependencies());
coordAction.setPushMissingDependencies(stillMissingDeps);
}
// Checking for timeout
timeout = isTimeout();
if (timeout) {
queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
}
else {
queue(new CoordPushDependencyCheckXCommand(coordAction.getId()),
getCoordPushCheckRequeueInterval());
}
}
updateCoordAction(coordAction, isChangeInDependency);
if (registerForNotification) {
registerForNotification(actionDep.getMissingDependencies(), actionConf);
}
if (removeAvailDependencies) {
unregisterAvailableDependencies(actionDep.getAvailableDependencies()); }
if (timeout) {
unregisterMissingDependencies(actionDep.getMissingDependencies(), actionId);
}
}
catch (Exception e) {
final CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class);
if (isTimeout()) {
LOG.debug("Queueing timeout command");
// XCommand.queue() will not work when there is a Exception
callableQueueService.queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
unregisterMissingDependencies(Arrays.asList(missingDepsArray), actionId);
}
else if (coordAction.getMissingDependencies() != null
&& coordAction.getMissingDependencies().length() > 0) {
// Queue again on exception as RecoveryService will not queue this again with
// the action being updated regularly by CoordActionInputCheckXCommand
callableQueueService.queue(new CoordPushDependencyCheckXCommand(coordAction.getId(),
registerForNotification, removeAvailDependencies),
Services.get().getConf().getInt(RecoveryService.CONF_COORD_OLDER_THAN, 600) * 1000);
}
throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
}
}
return null;
}
周期性检查coordinator action缺省的依赖,【主动检查】,订阅或者取消订阅分区数据【被动通知】
CoordActionStartXCommand
protected Void execute() throws CommandException {
boolean makeFail = true;
String errCode = "";
String errMsg = "";
ParamChecker.notEmpty(user, "user");
log.debug("actionid=" + actionId + ", status=" + coordAction.getStatus());
if (coordAction.getStatus() == CoordinatorAction.Status.SUBMITTED) {
// log.debug("getting.. job id: " + coordAction.getJobId());
// create merged runConf to pass to WF Engine
Configuration runConf = mergeConfig(coordAction);
coordAction.setRunConf(XmlUtils.prettyPrint(runConf).toString());
// log.debug("%%% merged runconf=" +
// XmlUtils.prettyPrint(runConf).toString());
DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user);
try {
Configuration conf = new XConfiguration(new StringReader(coordAction.getRunConf()));
SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(), Status.STARTED,
SlaAppType.COORDINATOR_ACTION, log);
if(slaEvent != null) {
insertList.add(slaEvent);
}
if (OozieJobInfo.isJobInfoEnabled()) {
conf.set(OozieJobInfo.COORD_NAME, appName);
conf.set(OozieJobInfo.COORD_NOMINAL_TIME, coordAction.getNominalTimestamp().toString());
}
// Normalize workflow appPath here;
JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), conf.get(OozieClient.GROUP_NAME), conf);
if (coordAction.getExternalId() != null) {
conf.setBoolean(OozieClient.RERUN_FAIL_NODES, true);
dagEngine.reRun(coordAction.getExternalId(), conf);
} else {
String wfId = dagEngine.submitJobFromCoordinator(conf, actionId);
coordAction.setExternalId(wfId);
}
coordAction.setStatus(CoordinatorAction.Status.RUNNING);
coordAction.incrementAndGetPending();
//store.updateCoordinatorAction(coordAction);
JPAService jpaService = Services.get().get(JPAService.class);
if (jpaService != null) {
log.debug("Updating WF record for WFID :" + coordAction.getExternalId() + " with parent id: " + actionId);
WorkflowJobBean wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_STARTTIME, coordAction.getExternalId());
wfJob.setParentId(actionId);
wfJob.setLastModifiedTime(new Date());
BatchQueryExecutor executor = BatchQueryExecutor.getInstance();
updateList.add(new UpdateEntry<WorkflowJobQuery>( WorkflowJobQuery.UPDATE_WORKFLOW_PARENT_MODIFIED, wfJob)); updateList.add(new UpdateEntry<CoordActionQuery>(
CoordActionQuery.UPDATE_COORD_ACTION_FOR_START, coordAction));
try {
executor.executeBatchInsertUpdateDelete(insertList, updateList, null);
queue(new CoordActionNotificationXCommand(coordAction), 100);
if (EventHandlerService.isEnabled()) {
generateEvent(coordAction, user, appName, wfJob.getStartTime());
}
}
catch (JPAExecutorException je) {
throw new CommandException(je);
}
}
else {
log.error(ErrorCode.E0610);
}
makeFail = false;
}
catch (DagEngineException dee) {
errMsg = dee.getMessage();
errCode = dee.getErrorCode().toString();
log.warn("can not create DagEngine for submitting jobs", dee); }
catch (CommandException ce) {
errMsg = ce.getMessage();
errCode = ce.getErrorCode().toString();
log.warn("command exception occured ", ce);
}
catch (java.io.IOException ioe) {
errMsg = ioe.getMessage();
errCode = "E1005";
log.warn("Configuration parse error. read from DB :" + coordAction.getRunConf(), ioe);
}
catch (Exception ex) {
errMsg = ex.getMessage();
errCode = "E1005";
log.warn("can not create DagEngine for submitting jobs", ex); }
finally {
if (makeFail == true) { // No DB exception occurs
log.error("Failing the action " + coordAction.getId() + ". Because " + errCode + " : " + errMsg);
coordAction.setStatus(CoordinatorAction.Status.FAILED);
if (errMsg.length() > 254) { // Because table column size is 255
errMsg = errMsg.substring(0, 255);
}
coordAction.setErrorMessage(errMsg);
coordAction.setErrorCode(errCode);
updateList = new ArrayList<UpdateEntry>();
updateList.add(new UpdateEntry<CoordActionQuery>(
CoordActionQuery.UPDATE_COORD_ACTION_FOR_START, coordAction));
insertList = new ArrayList<JsonBean>();
SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(), Status.FAILED,
SlaAppType.COORDINATOR_ACTION, log);
if(slaEvent != null) {
insertList.add(slaEvent); //Update SLA events
}
try {
// call JPAExecutor to do the bulk writes
BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
if (EventHandlerService.isEnabled()) {
generateEvent(coordAction, user, appName, null);
}
}
catch (JPAExecutorException je) {
throw new CommandException(je);
}
queue(new CoordActionReadyXCommand(coordAction.getJobId()));
}
}
}
return null;
}
coordinator action开始命令,实际是执行启动了action关联的wf。
CoordActionCheckXCommand
protected Void execute() throws CommandException {
try {
InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
Status slaStatus = null;
CoordinatorAction.Status initialStatus = coordAction.getStatus();
if (workflowJob.getStatus() == WorkflowJob.Status.SUCCEEDED) {
coordAction.setStatus(CoordinatorAction.Status.SUCCEEDED);
// set pending to false as the status is SUCCEEDED
coordAction.setPending(0);
slaStatus = Status.SUCCEEDED;
}
else {
if (workflowJob.getStatus() == WorkflowJob.Status.FAILED) {
coordAction.setStatus(CoordinatorAction.Status.FAILED);
slaStatus = Status.FAILED;
// set pending to false as the status is FAILED
coordAction.setPending(0);
}
else {
if (workflowJob.getStatus() == WorkflowJob.Status.KILLED) {
coordAction.setStatus(CoordinatorAction.Status.KILLED);
slaStatus = Status.KILLED;
// set pending to false as the status is KILLED
coordAction.setPending(0);
}
else {
LOG.warn("Unexpected workflow " + workflowJob.getId() + " STATUS " + workflowJob.getStatus());
coordAction.setLastModifiedTime(new Date());
CoordActionQueryExecutor.getInstance().executeUpdate(
CoordActionQueryExecutor.CoordActionQuery.UPDATE_COORD_ACTION_FOR_MODIFIED_DATE,
coordAction);
return null;
}
}
}
LOG.debug("Updating Coordinator actionId :" + coordAction.getId() + "status to ="
+ coordAction.getStatus());
coordAction.setLastModifiedTime(new Date());
updateList.add(new UpdateEntry<CoordActionQuery>(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME,
coordAction));
if (slaStatus != null) {
SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(), slaStatus,
SlaAppType.COORDINATOR_ACTION, LOG);
if(slaEvent != null) {
insertList.add(slaEvent);
}
}
BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
CoordinatorAction.Status endStatus = coordAction.getStatus();
if (endStatus != initialStatus && EventHandlerService.isEnabled()) {
generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), workflowJob.getStartTime());
}
}
catch (XException ex) {
LOG.warn("CoordActionCheckCommand Failed ", ex);
throw new CommandException(ex);
}
return null;
}
主动检查 coordinator action 关联wf的运行状态,来设置coordinator action的状态;[主动检查的方式,还有一种是wf完成后的通知]
CoordActionInfoXCommand
@Override
protected CoordinatorActionBean execute() throws CommandException {
JPAService jpaService = Services.get().get(JPAService.class);
if (jpaService != null) {
CoordinatorActionBean action;
try {
action = jpaService.execute(new CoordActionGetForInfoJPAExecutor(this.id));
}
catch (JPAExecutorException e) {
throw new CommandException(e);
}
return action;
}
else {
LOG.error(ErrorCode.E0610);
return null;
}
}
查看 coordinator action 信息的命令;
CoordActionReadyXCommand
/** * Check for READY actions and change state to SUBMITTED by a command to submit the job to WF engine.
* This method checks all the actions associated with a jobId to figure out which actions
* to start (based on concurrency and execution order [FIFO, LIFO, LAST_ONLY, NONE])
* */
protected Void execute() throws CommandException {
// number of actions to start (-1 means start ALL)
int numActionsToStart = -1;
// get execution setting for this job (FIFO, LIFO, LAST_ONLY)
String jobExecution = coordJob.getExecution();
// get concurrency setting for this job
int jobConcurrency = coordJob.getConcurrency();
// if less than 0, then UNLIMITED concurrency
if (jobConcurrency >= 0) {
// count number of actions that are already RUNNING or SUBMITTED
// subtract from CONCURRENCY to calculate number of actions to start
// in WF engine
int numRunningJobs;
try {
numRunningJobs = jpaService.execute(new CoordJobGetRunningActionsCountJPAExecutor(jobId));
}
catch (JPAExecutorException je) {
throw new CommandException(je);
}
numActionsToStart = jobConcurrency - numRunningJobs;
if (numActionsToStart < 0) {
numActionsToStart = 0;
}
log.debug("concurrency=" + jobConcurrency + ", execution=" + jobExecution + ", numRunningJobs="
+ numRunningJobs + ", numLeftover=" + numActionsToStart);
// no actions to start
if (numActionsToStart == 0) {
log.warn("No actions to start for jobId=" + jobId + " as max concurrency reached!");
return null;
}
}
// get list of actions that are READY and fit in the concurrency and execution
List<CoordinatorActionBean> actions;
try {
actions = jpaService.execute(new CoordJobGetReadyActionsJPAExecutor(jobId, numActionsToStart, jobExecution));
}
catch (JPAExecutorException je) {
throw new CommandException(je);
}
log.debug("Number of READY actions = " + actions.size());
// make sure auth token is not null
// log.denug("user=" + user + ", token=" + authToken);
int counter = 0;
for (CoordinatorActionBean action : actions) {
// continue if numActionsToStart is negative (no limit on number of
// actions), or if the counter is less than numActionsToStart
if ((numActionsToStart < 0) || (counter < numActionsToStart)) {
log.debug("Set status to SUBMITTED for id: " + action.getId());
// change state of action to SUBMITTED
action.setStatus(CoordinatorAction.Status.SUBMITTED);
try {
CoordActionQueryExecutor.getInstance().executeUpdate(
CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, action);
}
catch (JPAExecutorException je) {
throw new CommandException(je);
}
// start action
new CoordActionStartXCommand(action.getId(), coordJob.getUser(), coordJob.getAppName(),
action.getJobId()).call(getEntityKey());
}
else {
break;
}
counter++;
}
return null;
}
根据并发量,和出队列的方式,来设置 coordinator action 状态从ready到提交状态。
CoordActionSkipXCommand
protected Void execute() throws CommandException {
if (actionBean.getStatus() == CoordinatorAction.Status.WAITING
|| actionBean.getStatus() == CoordinatorAction.Status.READY) {
LOG.info("Setting action [{0}] status to SKIPPED", actionBean.getId());
actionBean.setStatus(CoordinatorAction.Status.SKIPPED);
try {
queue(new CoordActionNotificationXCommand(actionBean), 100);
actionBean.setLastModifiedTime(new Date());
CoordActionQueryExecutor.getInstance().executeUpdate(
CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, actionBean);
if (EventHandlerService.isEnabled()) {
generateEvent(actionBean, user, appName, null);
}
}
catch (JPAExecutorException e) {
throw new CommandException(e);
}
}
return null;
}
设置Coordinator action 状态为 SKIPPED;
CoordActionTimeOutXCommand
protected Void execute() throws CommandException {
if (actionBean.getStatus() == CoordinatorAction.Status.WAITING) {
actionBean.setStatus(CoordinatorAction.Status.TIMEDOUT);
try {
queue(new CoordActionNotificationXCommand(actionBean), 100);
actionBean.setLastModifiedTime(new Date());
CoordActionQueryExecutor.getInstance().executeUpdate(
CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, actionBean);
if (EventHandlerService.isEnabled()) {
generateEvent(actionBean, user, appName, null);
}
}
catch (JPAExecutorException e) {
throw new CommandException(e);
}
}
return null;
}
设置Coordinator action 状态为 超时状态;
CoordActionUpdateXCommand
protected Void execute() throws CommandException {
try {
LOG.debug("STARTED CoordActionUpdateXCommand for wfId=" + workflow.getId());
Status slaStatus = null;
if (workflow.getStatus() == WorkflowJob.Status.SUCCEEDED) {
coordAction.setStatus(CoordinatorAction.Status.SUCCEEDED);
coordAction.setPending(0);
slaStatus = Status.SUCCEEDED;
}
else if (workflow.getStatus() == WorkflowJob.Status.FAILED) {
coordAction.setStatus(CoordinatorAction.Status.FAILED);
coordAction.setPending(0);
slaStatus = Status.FAILED;
}
else if (workflow.getStatus() == WorkflowJob.Status.KILLED) {
coordAction.setStatus(CoordinatorAction.Status.KILLED);
coordAction.setPending(0);
slaStatus = Status.KILLED;
}
else if (workflow.getStatus() == WorkflowJob.Status.SUSPENDED) {
coordAction.setStatus(CoordinatorAction.Status.SUSPENDED);
coordAction.decrementAndGetPending();
}
else if (workflow.getStatus() == WorkflowJob.Status.RUNNING ||
workflow.getStatus() == WorkflowJob.Status.PREP) {
// resume workflow job and update coord action accordingly
coordAction.setStatus(CoordinatorAction.Status.RUNNING);
coordAction.decrementAndGetPending();
}
else {
LOG.warn("Unexpected workflow " + workflow.getId() + " STATUS " + workflow.getStatus());
// update lastModifiedTime
coordAction.setLastModifiedTime(new Date());
CoordActionQueryExecutor.getInstance().executeUpdate(
CoordActionQueryExecutor.CoordActionQuery.UPDATE_COORD_ACTION_FOR_MODIFIED_DATE, coordAction);
// TODO - Uncomment this when bottom up rerun can change terminal state
/* CoordinatorJobBean coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordAction.getJobId()));
if (!coordJob.isPending()) {
coordJob.setPending();
jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
}*/
return null;
}
LOG.info("Updating Coordintaor action id :" + coordAction.getId() + " status "
+ " to " + coordAction.getStatus() + ", pending = " + coordAction.getPending());
coordAction.setLastModifiedTime(new Date());
updateList.add(new UpdateEntry<CoordActionQuery>(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME,
coordAction));
// TODO - Uncomment this when bottom up rerun can change terminal state
/*CoordinatorJobBean coordJob = jpaService.execute(new CoordJobGetJPAExecutor(coordAction.getJobId()));
if (!coordJob.isPending()) {
coordJob.setPending();
jpaService.execute(new CoordJobUpdateJPAExecutor(coordJob));
LOG.info("Updating Coordinator job "+ coordJob.getId() + "pending to true");
}*/
if (slaStatus != null) {
SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(), slaStatus,
SlaAppType.COORDINATOR_ACTION, LOG);
if(slaEvent != null) {
insertList.add(slaEvent);
}
}
if (workflow.getStatus() != WorkflowJob.Status.SUSPENDED
&& workflow.getStatus() != WorkflowJob.Status.RUNNING) {
queue(new CoordActionReadyXCommand(coordAction.getJobId()));
}
BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
if (EventHandlerService.isEnabled()) {
generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), workflow.getStartTime());
}
LOG.debug("ENDED CoordActionUpdateXCommand for wfId=" + workflow.getId());
}
catch (XException ex) {
LOG.warn("CoordActionUpdate Failed ", ex.getMessage()); throw new CommandException(ex);
}
return null;
}
更新Coordinator action 状态
CoordActionInputCheckXCommand
protected Void execute() throws CommandException {
LOG.debug("[" + actionId + "]::ActionInputCheck:: Action is in WAITING state.");
// this action should only get processed if current time > nominal time;
// otherwise, requeue this action for delay execution;
Date nominalTime = coordAction.getNominalTime();
Date currentTime = new Date();
if (nominalTime.compareTo(currentTime) > 0) {
queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), Math.max((nominalTime.getTime() - currentTime
.getTime()), getCoordInputCheckRequeueInterval()));
updateCoordAction(coordAction, false);
LOG.info("[" + actionId
+ "]::ActionInputCheck:: nominal Time is newer than current time, so requeue and wait. Current="
+ DateUtils.formatDateOozieTZ(currentTime) + ", nominal=" + DateUtils.formatDateOozieTZ(nominalTime));
return null;
}
StringBuilder actionXml = new StringBuilder(coordAction.getActionXml());
boolean isChangeInDependency = false;
try {
Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
Date now = new Date();
if (coordJob.getExecutionOrder().equals(CoordinatorJobBean.Execution.LAST_ONLY)) {
Date nextNominalTime = computeNextNominalTime();
if (nextNominalTime != null) {
// If the current time is after the next action's nominal time, then we've passed the window where this action
// should be started; so set it to SKIPPED
if (now.after(nextNominalTime)) {
LOG.info("LAST_ONLY execution: Preparing to skip action [{0}] because the current time [{1}] is later than "
+ "the nominal time [{2}] of the next action]", coordAction.getId(),
DateUtils.formatDateOozieTZ(now), DateUtils.formatDateOozieTZ(nextNominalTime));
queue(new CoordActionSkipXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
return null;
} else {
LOG.debug("LAST_ONLY execution: Not skipping action [{0}] because the current time [{1}] is earlier than "
+ "the nominal time [{2}] of the next action]", coordAction.getId(),
DateUtils.formatDateOozieTZ(now), DateUtils.formatDateOozieTZ(nextNominalTime));
}
}
}
else if (coordJob.getExecutionOrder().equals(CoordinatorJobBean.Execution.NONE)) {
// If the current time is after the nominal time of this action plus some tolerance,
// then we've passed the window where this action
// should be started; so set it to SKIPPED
Calendar cal = Calendar.getInstance(DateUtils.getTimeZone(coordJob.getTimeZone()));
cal.setTime(nominalTime);
cal.add(Calendar.MINUTE, ConfigurationService.getInt(COORD_EXECUTION_NONE_TOLERANCE));
nominalTime = cal.getTime();
if (now.after(nominalTime)) {
LOG.info("NONE execution: Preparing to skip action [{0}] because the current time [{1}] is later than "
+ "the nominal time [{2}] of the current action]", coordAction.getId(),
DateUtils.formatDateOozieTZ(now), DateUtils.formatDateOozieTZ(nominalTime));
queue(new CoordActionSkipXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
return null;
} else {
LOG.debug("NONE execution: Not skipping action [{0}] because the current time [{1}] is earlier than "
+ "the nominal time [{2}] of the current action]", coordAction.getId(),
DateUtils.formatDateOozieTZ(now), DateUtils.formatDateOozieTZ(coordAction.getNominalTime()));
}
}
StringBuilder existList = new StringBuilder();
StringBuilder nonExistList = new StringBuilder();
StringBuilder nonResolvedList = new StringBuilder();
String firstMissingDependency = "";
String missingDeps = coordAction.getMissingDependencies();
CoordCommandUtils.getResolvedList(missingDeps, nonExistList, nonResolvedList);
// For clarity regarding which is the missing dependency in synchronous order
// instead of printing entire list, some of which, may be available
if(nonExistList.length() > 0) {
firstMissingDependency = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR)[0];
}
LOG.info("[" + actionId + "]::CoordActionInputCheck:: Missing deps:" + firstMissingDependency + " "
+ nonResolvedList.toString());
// Updating the list of data dependencies that are available and those that are yet not
boolean status = checkInput(actionXml, existList, nonExistList, actionConf);
String pushDeps = coordAction.getPushMissingDependencies();
// Resolve latest/future only when all current missingDependencies and
// pushMissingDependencies are met
if (status && nonResolvedList.length() > 0) {
status = (pushDeps == null || pushDeps.length() == 0) ? checkUnResolvedInput(actionXml, actionConf)
: false;
}
coordAction.setLastModifiedTime(currentTime);
coordAction.setActionXml(actionXml.toString());
if (nonResolvedList.length() > 0 && status == false) {
nonExistList.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR).append(nonResolvedList);
}
String nonExistListStr = nonExistList.toString();
if (!nonExistListStr.equals(missingDeps) || missingDeps.isEmpty()) {
// missingDeps null or empty means action should become READY
isChangeInDependency = true;
coordAction.setMissingDependencies(nonExistListStr);
}
if (status && (pushDeps == null || pushDeps.length() == 0)) {
String newActionXml = resolveCoordConfiguration(actionXml, actionConf, actionId);
actionXml.replace(0, actionXml.length(), newActionXml);
coordAction.setActionXml(actionXml.toString());
coordAction.setStatus(CoordinatorAction.Status.READY);
updateCoordAction(coordAction, true);
new CoordActionReadyXCommand(coordAction.getJobId()).call(getEntityKey()); }
else if (!isTimeout(currentTime)) {
if (status == false) {
queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()),
getCoordInputCheckRequeueInterval());
}
updateCoordAction(coordAction, isChangeInDependency);
}
else {
if (!nonExistListStr.isEmpty() && pushDeps == null || pushDeps.length() == 0) {
queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
}
else {
// Let CoordPushDependencyCheckXCommand queue the timeout
queue(new CoordPushDependencyCheckXCommand(coordAction.getId()));
}
updateCoordAction(coordAction, isChangeInDependency);
}
}
catch (AccessControlException e) {
LOG.error("Permission error in ActionInputCheck", e);
if (isTimeout(currentTime)) {
LOG.debug("Queueing timeout command");
Services.get().get(CallableQueueService.class) .queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
}
else {
// Requeue InputCheckCommand for permission denied error with longer interval
Services.get()
.get(CallableQueueService.class)
.queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()),
2 * getCoordInputCheckRequeueInterval());
}
updateCoordAction(coordAction, isChangeInDependency);
} catch (Exception e) {
if (isTimeout(currentTime)) {
LOG.debug("Queueing timeout command");
// XCommand.queue() will not work when there is a Exception
Services.get().get(CallableQueueService.class)
.queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
}
updateCoordAction(coordAction, isChangeInDependency);
throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
}
return null;
}
检查action的输入依赖是否满足,已经时间条件是否满足。