Jobs
This topic describes the lifecycle, best practices, and various states of jobs in the Optimizely Connect Platform (OCP).
The Basics
A job is a unit of work that can be scheduled or manually triggered. A common use for jobs is to handle historical and incremental imports. In the example below, we will walk through handling both in a single job.
Paradigm
Lifecycle of a job in a simple paradigm:
- Job starts up with an ID (
this.invocation.jobId
will be the same if retrying a failed job once) prepare
is called to setup the Jobperform
is called with the state of the job repeatedly until it is set to complete- During perform, your responsibility is to do small units of work, then return the job state so your job can continue on the next set of work (for example, fully process one page of data at a time).
- A job might be interrupted by the system at any checkpoint or in the middle of work if it does not provide regular checkpoints.
The best way to use this paradigm is to treat it like a state machine where the state of the job dictates the work needed then transitions the Job to the next state. This works well for retrying jobs and ensuring it picks up in the same spot.
Best Practices
prepare
is called for both a fresh start as well as a resume. Make sure you handle both cases so your job can safely resume.perform
should perform a small amount of work < 60 seconds at a time, then return the current job state to resume from to avoid unexpected errors due to jobs being stopped and resumed.
️ Warning
If your job's
perform
loop runs for longer than 60 seconds, it may (though rarely) be terminated without warning. To avoid ending up in an unrecoverable state, follow the best practices.
- Long running API calls are OK, but know that a job may be evicted during the call. Make sure your job can pick up where it left off by recording the state immediately before making a long API call, even if that means it needs to make the API call again in a scenario where it needed to be evicted.
- Use the
sleep(time, {interruptible: true})
member function to safely sleep when waiting for a remote system to complete a task. This allows the job to be evicted if necessary during this sleep. It will be resumed with the previously recorded job state as soon as possible. - If you use the Batcher, make sure you flush BEFORE returning your job state so that if the job has to be evicted and resumed, you will not lose any data.
- Designing your job as a state machine will make it easy to progress through the various phases of your job while being able to resume from any checkpoint.
Helpful utilities in Jobs:
- CSV Parser
- Batcher for batching API calls
Managing State
Here's how you might define job state in a typical import job.
JobState
export enum JobState {
INITIALIZE,
IMPORTING
FINISHIMPORT,
DONE
}
Status
interface ImportJobStatus extends App.JobStatus {
state: {
jobState: JobState,
imports: Import[];
currentImport?: Import;
};
}
Prepare
/**
* Prepares to run a job. Prepare is called at the start of a job
* and again only if the job was interrupted and is being resumed.
* Use this function to read secrets and establish connections to simplify the job loop (perform).
* @param params a hash if params were supplied to the job run, otherwise an empty hash
* @param status if job was interrupted and should continue from the last known state
*/
public async prepare(params: ValueHash, status?: ImportJobStatus): Promise<ImportJobStatus> {
logger.info('Preparing Nightly Import Job with params:', params, 'and status', status);
// On a rerun, the Job will already have a status, return that
if (status) {
return status;
}
return {
state: {
jobState: JobState.INITIALIZE,
imports: [
Vendor.Contact
],
currentImport: undefined
},
complete: false
};
}
Perform
/**
* Performs a unit of work. Jobs should perform a small unit of work and then return the current state.
* Perform is called in a loop where the previously returned state will be given to the next iteration.
* Iteration will continue until the returned state.complete is set to true or the job is interrupted.
* @param status last known job state and status
* @returns The current JobStatus/state that can be used to perform the next iteration or resume a job if interrupted.
*/
public async perform(status: ImportJobStatus): Promise<ImportJobStatus> {
switch (status.state.jobState) {
case JobState.INITIALIZE:
// Initialize import status
status.state.jobState = JobState.IMPORTING;
break;
case JobState.IMPORTING:
// Do Work
// Set ImportStatus
logger.info('Setting State:', status.state);
if (this.noMoreToDo()) {
status.state.jobState = JobState.DONE;
}
break;
case JobState.FINISHIMPORT:
// Notify for ActivityLog
App.notifications.success(
'Integration',
'Collect',
'VendorName',
`Imported ${x} records`
);
status.state.jobState = JobState.DONE;
break;
case JobState.DONE:
status.complete = true; //<-- set job to complete
break;
}
return status;
}
Failure Scenarios
When a Job fails at any point, it is automatically retried once with the same jobId
. After that, it is not retried until the next scheduled run.
Managing Jobs
Once your app has been installed in one or more accounts, its jobs can be managed using OCP CLI.
ocp jobs list
: View a list of current and recent job executions. This command supports a range of parameters to sort and filter your result set.
ocp jobs runtimeStatus
: Returns a job's full runtime status. Note that this information is only available for currently running jobs.
ocp jobs trigger
: Manually trigger a job. You may want to run ocp directory listInstalls
first to fetch your tracker ID.
ocp jobs terminate
: Manually terminate a job.
Updated about 1 month ago