HomeGuidesAPI ReferenceGraphQL
Submit Documentation FeedbackJoin Developer CommunityLog In

Jobs

This topic describes the lifecycle, best practices, and various states of jobs in the Optimizely Connect Platform (OCP).

The Basics

A job is a unit of work that can be scheduled or triggered by another part of the app such as a form. A common use for jobs is to handle historical and incremental imports. In the example below, we will walk through handling both in a single job.

Most APIs give you a way to page through results or know where you left off and ask for the next N records.

Paradigm

Lifecycle of a job in a simple paradigm:

  1. Job started up with an ID (this.invocation.jobId will be the same if retrying a failed job once)
  2. prepare is called to setup the Job
  3. perform is called with the state of the job repeatedly until it is set to complete
    • During perform, your responsibility is to do small units of work, then return the job state so your job can continue on the next set of work (for example, fully process one page of data at a time).
    • A job can be interrupted at any checkpoint or in the middle of work if it does not provide regular checkpoints.

The best way to use this paradigm is to treat it like a state machine where the state of the job dictates the work needed then transitions the Job to the next state. This works well for retrying jobs and ensuring it picks up in the same spot.

Best Practices

  • prepare is called for both a fresh start as well as a resume. Make sure you handle both cases so your job can safely resume.
  • perform should perform a small amount of work < 60 seconds at a time, then return the current job state to resume from to avoid unexpected errors due to jobs being stopped and resumed.

️ Warning

If your job's perform loop runs for longer than 60 seconds, it may (though rarely) be terminated without warning. To avoid ending up in an unrecoverable state, follow the best practices.

  • Long running API calls are OK, but know that a job may be evicted during the call. Make sure your job can pick up where it left off by recording the state immediately before making a long API call, even if that means it needs to make the API call again in a scenario where it needed to be evicted.
  • Use the sleep(time, {interruptible: true}) member function to safely sleep when waiting for a remote system to complete a task. This allows the job to be evicted if necessary during this sleep. It will be resumed with the previously recorded job state as soon as possible.
  • If you use the Batcher, make sure you flush BEFORE returning your job state so that if the job has to be evicted and resumed, you will not lose any data.
  • Designing your job as a state machine will make it easy to progress through the various phases of your job while being able to resume from any checkpoint.

Helpful utilities in Jobs:

Managing State

JobState

export enum JobState {
  INITIALIZE,
  IMPORTING
  FINISHIMPORT,
  DONE,
  SKIP
}

Status

interface ImportJobStatus extends App.JobStatus {
  state: {
    jobState: JobState,
    imports: Import[];
    currentImport?: Import;
  };
}

Prepare

/**
   * Prepares to run a job. Prepare is called at the start of a job
   * and again only if the job was interrupted and is being resumed.
   * Use this function to read secrets and establish connections to simplify the job loop (perform).
   * @param params a hash if params were supplied to the job run, otherwise an empty hash
   * @param status if job was interrupted and should continue from the last known state
   */
  public async prepare(params: ValueHash, status?: ImportJobStatus): Promise<ImportJobStatus> {
    // Make sure two Jobs aren't running at the same time
    const check = await storage.kvStore.get<{running: boolean, jobId: string}>('check');
    if (check.running && check.jobId !== this.invocation.jobId) {
      return {state: {jobState: JobState.Skip, imports: []}, complete: false};
    } else {
      await storage.kvStore.put('check', {running: true, jobId: this.invocation.jobId}, {ttl: (86400 / 2)});
    }
    
    logger.info('Preparing Nightly Import Job with params:', params, 'and status', status);
 
    // On a rerun, the Job will already have a status, return that
    if (status) {
      return status;
    }
    
    return {state: {
      jobState: JobState.INITIALIZE,
      imports: [
        Vendor.Contact
      ],
      currentImport: undefined
    }, complete: false};
  }

Perform

/**
   * Performs a unit of work. Jobs should perform a small unit of work and then return the current state.
   * Perform is called in a loop where the previously returned state will be given to the next iteration.
   * Iteration will continue until the returned state.complete is set to true or the job is interrupted.
   * @param status last known job state and status
   * @returns The current JobStatus/state that can be used to perform the next iteration or resume a job if interrupted.
   */
  public async perform(status: ImportJobStatus): Promise<ImportJobStatus> {

    switch (status.state.jobState) {
      case JobState.INITIALIZE:
        // Initialize import status
        status.state.jobState = JobState.IMPORTING;
        break;
      case JobState.IMPORTING:
        // Do Work
        // Set ImportStatus
        logger.info('Setting State:', status.state);
        if (this.noMoreToDo()) {
          status.state.jobState = JobState.DONE;
        }
        break;
      case JobState.FINISHIMPORT:
        // Notify for ActivityLog
        App.notifications.success(
          'Integration',
          'Collect',
          'VendorName',
          `Imported ${x} records`
        );
        status.state.jobState = JobState.DONE;
        break;
      case JobState.DONE:
        status.complete = true; //<-- set job to complete
        await storage.kvStore.put('check', {running: false, jobId: ''});
        break;
      case JobState.SKIP:
        status.complete = true;
        logger.info('Skipping Import, job already running');
        break;
    }
    return status;
  }

Failure Scenarios

When a Job fails at any point, it is automatically retried once with the same jobId. After that, it is not retried until the next scheduled run.


Did this page help you?