Dev Guide
Dev GuideUser GuideGitHubNuGetDevCommunitySubmit a ticketLog In
GitHubNuGetDevCommunitySubmit a ticket

Write a job

Write a job for a sample Optimizely Connect Platform (OCP) app to process data from an Azure storage account and import it into Optimizely Data Platform (ODP).

Before writing a job that processes data from an Azure storage account and imports it into Optimizely Data Platform (ODP), you must first:

Create a job that incrementally retrieves data from a file containing offline stores in an Azure storage container and imports it into ODP. You will learn how to:

  • Schedule a job
  • Structure a job to be interruptible
  • Define and handle job state
  • Prepare and process job execution
  • Trigger a job manually

For information on jobs and their functionalities, see the OCP jobs developer documentation.

Define data types

Define the data types to use in the src/data/Azure.ts file. The StorageFile data type represents the data that the app imports into ODP from the Azure storage account.

export interface StorageFile extends ValueHash {
  name: string;
  lastModified: number;
}

Use the OfflineStore data type to represent the data read from the Azure storage container. The OdpOfflineStore extends the ObjectPayload, and represents the data sent to ODP. Add this to the src/data/OfflineStore.ts file:

import { ObjectPayload } from '@zaiusinc/node-sdk';

export interface OfflineStore {
  id: string;
  name: string;
  location: string;
}

export interface OdpOfflineStore extends ObjectPayload {
  ocp_quickstart_offline_store_id: string;
  offline_store_name: string;
  offline_store_location: string;
}

Define a new job

Optimizely Connect Platform (OCP) provides job functionality to support incremental or historical imports/exports. Start by defining a new job in the app.yml file:

jobs:
  offline_store_import:
    entry_point: OfflineStoreImportJob
    description: Import offline stores from Azure Storage Account
    cron: 0 0 6 * * ?

The entry_point parameter defines the name of the file where you implemented the job and the cron parameter defines the schedule of the job. In this case, the job lives in the src/jobs/OfflineStoreImportJob.ts file and runs every day at 6:00 AM.

Define and handle job state

When importing large amounts of data, it is important to consider how to divide the work up into smaller chunks, for example, setup, paginating through a result set, and then cleanup. By thinking of a job as a state machine, you can better keep track of its progress, as well as incur less risk should anything fail.

For the OfflineStoreImportJob, implement the following three states:

enum JobState {
  FETCH_OFFLINE_STORE_FILES,
  IMPORT_OFFLINE_STORE_FILES,
  COMPLETE
}

These states help manage the different stages of the job and ensure that the job can resume, if necessary.

Begin this job with a call to prepare, which should initialize and return job state. This state will then be passed to (and returned by) each call of process.

interface OfflineStoreImportJobStatus extends App.JobStatus {
  state: {
    files: StorageFile[];
    lastFileTimestamp: number;
    count: number;
    jobState: JobState;
  };
}

See below for a description of each property in this state:

  • files – Contains the files that the app needs to import. During the FETCH_OFFLINE_STORE_FILES state, these files are queried from the storage container configured in Configure and authenticate the app.
  • lastFileTimestamp – The timestamp of the last file that the app processed. It is persisted between job runs and is used to avoid processing files that the app has already imported.
  • count – Represents the number of entries imported in the current run. Used for reporting purposes.
  • jobState – Represents the current state of the job.

Define code for the job

Implement a job by extending the job class from the OCP App SDK. Define the job's structure in the src/jobs/OfflineStoreImportJob.ts file:

📘

Note

Required imports are included.

import * as App from '@zaiusinc/app-sdk';
import {OdpOfflineStore} from '../data/OfflineStore';
import {Batcher, logger, notifications, storage, ValueHash} from '@zaiusinc/app-sdk';
import {Azure} from '../lib/Azure/Azure';
import {StorageFile} from '../data/Azure';
import {AzureOfflineStoreClient} from '../lib/Azure/OfflineStoreClient';
import {z} from '@zaiusinc/node-sdk';

export class OfflineStoreImportJob extends App.Job {
  private azure!: AzureOfflineStoreClient;
  private batcher: Batcher<OdpOfflineStore> = new Batcher(async (i) => await this.storeBatcher(i), 100);

  public async prepare(params: ValueHash, status?: OfflineStoreImportJobStatus): Promise<OfflineStoreImportJobStatus> {
    // Job preparation goes here
  }

  public async perform(status: OfflineStoreImportJobStatus): Promise<OfflineStoreImportJobStatus> {
    // Job invocation goes here
  }

  private async storeBatcher(store: any) {
    await z.object('ocp_quickstart_offline_stores', store);
  }
}

Prepare job execution

Job execution starts with a call to the prepare method, where you can perform checks and setup, and return the initial job state.

public async prepare(params: ValueHash, status?: OfflineStoreImportJobStatus): Promise<OfflineStoreImportJobStatus> {
  logger.info('Preparing Offline Store Import:', params, 'and status', status);

  try {
    this.azure = await Azure.createOfflineStoreClient();
  } catch (error) {
    logger.error('Error creating offline store client', error);

    // Mark the job as complete so that we don't continue.
    return {complete: true} as OfflineStoreImportJobStatus;
  }

  if (status) {
    // when resuming, we will be provided the last state where we left off
    // Long running jobs are more likely to be forced to pause and resume
    // Shorter jobs are less likely, but may still be resumed
    return status;
  }
  const lastFileTimestamp = (await storage.kvStore.get('lastFileImport')).timestamp as number || 0;
  return {
    state: {
      lastFileTimestamp,
      count: 0,
      files: [],
      jobState: JobState.FETCH_OFFLINE_STORE_FILES
    },
    complete: false
  };
}

In the prepare stage, you initialize the Azure client, read out the timestamp of the file that was last processed from the key value store, and initialize the OfflineStoreImportStatus.

Query files from an Azure storage account and import data

Next, implement the perform method of the job. This method acts as a state machine, performing different actions for each JobState you have defined previously. Invoke this method repeatedly until the job is considered complete. To indicate the completion of the job, set the status.complete flag to true.

Populate files to import

In the FETCH_OFFLINE_STORE_FILES state, populate the files property with all files that the app has not processed since the last time the job ran. Then, set the jobState property to IMPORT_OFFLINE_STORE_FILES.

public async perform(status: OfflineStoreImportJobStatus): Promise<OfflineStoreImportJobStatus> {
  const state = status.state;
  switch (state.jobState) {
  case JobState.FETCH_OFFLINE_STORE_FILES:
    logger.info('Fetching names of offline store files');
    try {
      const files = await this.azure.listOfflineStoreBlobs();
      logger.debug('Found offline store files', files);
      const storesFilesToImport = files.filter((f) =>
        f.lastModified && f.lastModified > new Date(state.lastFileTimestamp).getTime()
      );
      logger.info('Found files to be imported: ', storesFilesToImport.map((s) => s.name));
      state.files = storesFilesToImport;
      state.jobState = JobState.IMPORT_OFFLINE_STORE_FILES;
    } catch (error: any) {
      logger.error('Failed to fetch names of offline store files', error);
      await notifications.error(
        'Offline Store Import',
        'Failed to fetch offline store blobs',
        `Reason: ${error}`
      );
      status.complete = true;
    }
    break;
  }
  return status;
}

To support this operation, add the ability to list blobs in your Azure storage container to the AzureOfflineStoreClient helper class:

import {Credentials, StorageAccountSettings, StorageFile} from '../../data/Azure';
import {ClientSecretCredential} from '@azure/identity';
import {BlobServiceClient, ContainerClient} from '@azure/storage-blob';

export class AzureOfflineStoreClient {
  private offlineStoreContainerClient: ContainerClient;

  public constructor(credentials: Credentials, settings: StorageAccountSettings) {
    const csc = new ClientSecretCredential(credentials.tenantId, credentials.clientId, credentials.clientSecret);
    const blobServiceClient = new BlobServiceClient(`https://${settings.accountName}.blob.core.windows.net`, csc);
    this.offlineStoreContainerClient = blobServiceClient.getContainerClient(settings.offlineStoreContainer);
  }

  public async listOfflineStoreBlobs(): Promise<StorageFile[]> {
    const blobs: StorageFile[] = [];
    for await (const blob of this.offlineStoreContainerClient.listBlobsFlat()) {
      blobs.push({name: blob.name, lastModified: blob.properties.lastModified.getTime()});
    }
    return blobs.sort((a, b) => a.lastModified - b.lastModified);
  }
}

Import data from files

In the previous step, you transitioned the job into the IMPORT_OFFLINE_STORE_FILES state. Next, process a single file, reading it from the Azure storage container and writing its contents to ODP in batches. When there are no more files left, transition the job to COMPLETE.

    case JobState.IMPORT_OFFLINE_STORE_FILES:
    logger.info('Importing offline stores from files');
    const file = state.files.pop();
    if (!file) {
      state.jobState = JobState.COMPLETE;
      break;
    }
    
    try {
      logger.info('Downloading and importing offline store file: ', file.name);
      const stores = JSON.parse(await this.azure.getOfflineStoreBlob(file.name)) as OfflineStore[];
      for (const store of stores) {
        logger.debug('Importing offline store', store);
        await this.batcher.append(this.offlineStoreTransform(store));
        state.count++;
      }
      await this.batcher.flush();
      await storage.kvStore.put('lastFileImport', {timestamp: file.lastModified});
    } catch (error: any) {
      logger.error(`Failed to import stores from file: ${file.name}`, error);
      await notifications.error(
        'Offline Store Import',
        'Failed to fetch offline store blobs',
        `Reason: ${error}`
      );
      status.complete = true;
    }
    break;

To support this operation, add the ability to download a blob—from an Azure storage container to your helper class—to the lib/Azure/OfflineStoreClient.ts file:

public async getOfflineStoreBlob(name: string) {
  const response = await this.offlineStoreContainerClient.getBlobClient(name).downloadToBuffer();
  return response.toString();
}

Provide an instantiator in your Azure namespace to the src/lib/Azure/Azure.ts file:

import {AzureOfflineStoreClient} from './OfflineStoreClient';
/* ... */
export namespace Azure {
  export async function createOfflineStoreClient() {
    const credentials = await storage.settings.get<Credentials>('credentials');
    if (!await validateCredentials(credentials)) {
      logger.error('Invalid credentials.');
      throw new Error('Invalid Azure credentials.');
    }
    const settings = await storage.settings.get<StorageAccountSettings>('settings');

    return new AzureOfflineStoreClient(credentials, settings);
  }

Define job completion

Finally, handle the COMPLETE state. In this state, flush the batcher to ensure that all records have been sent to the ODP API. Also, use notifications to inform users about the success or failure of a job. Notifications are sent to the Account Settings > Activity Log page in ODP using the OCP App SDK notifier interface. This lets the app user see the status of jobs and take appropriate action. Lastly, set the status.complete flag to true to indicate that the job is complete.

    case JobState.COMPLETE:
      logger.info(`Offline Store Import complete, imported ${state.count} offline stores`);
      try {
        await this.batcher.flush();
      } catch (error: any) {
        logger.error('Failed to flush final batch', error);
      }

      await notifications.info(
        'Offline Store Import',
        'Imported data',
        `Imported ${state.count} offline stores`
      );
      status.complete = true;
      break;
    }

Test the job

To test your job implementation, upload some data to your Azure storage container. The following example shows a dataset that you can upload to the offline stores container:

[
  {"id": "7651b3f5-5d5d-4e32-a651-7b47c029ce8f", "name": "Shop Emporium", "location": "New York City"},
  {"id": "a6cbf16e-f9d9-4c27-b2b2-11c8d7d68ddc", "name": "WooCommerce World", "location": "London"},
  {"id": "f42a8c78-40fc-4611-b3b3-b54c63eb068a", "name": "BigCommerce Bazaar", "location": "Paris"},
  {"id": "7d04b3f3-8861-433f-8e7e-02cda2f7d19e", "name": "Magento Market", "location": "Tokyo"},
  {"id": "4f4ca1aa-835e-4a26-8d1b-29c2b7d5b5e2", "name": "Squarespace Square", "location": "San Francisco"},
  {"id": "92652336-5f2a-4760-a6e3-3e1a6a1bb8ec", "name": "Volusion Village", "location": "Sydney"},
  {"id": "efb4e0bf-48e8-4d9c-9ce2-6a5574668d34", "name": "PrestaShop Place", "location": "Berlin"},
  {"id": "e50b90c1-8f7f-44e6-ae7b-b2c63e585f61", "name": "3dcart Corner", "location": "Toronto"},
  {"id": "48c9a384-57d7-44a6-a086-614d2c414138", "name": "OpenCart Outlet", "location": "Moscow"},
  {"id": "c13c6b17-b6c8-45d2-a270-98f9327b9c63", "name": "Weebly World", "location": "Hong Kong"}
]

You can use the Azure Portal or Azure Storage Explorer to upload the data to the storage container.

📘

Note

You can split this data into multiple files to test incremental updates.

Next, build, publish, and test the job. Use the --bump-dev-version and --publish flags to do everything in one step. Run the following command in the OCP command-line interface (CLI):

ocp app prepare --bump-dev-version --publish

The app should already be installed in your ODP account from a previous step, and since you are only incrementing the dev version, the update should take place automatically. You can confirm this by running ocp directory list-installs ocp_quickstart in the OCP CLI.

OCP executes the job at 6:00 AM UTC every day. For testing purposes, manually trigger the job using the OCP CLI:

ocp jobs trigger ocp_quickstart offline_store_import [trackerId]

The above command initiates the job right away. To check the history of job runs, run the following command in the OCP CLI:

$ ocp jobs list ocp_quickstart

Job ID                                      Version            Job Function                Tracker ID        Status          Created At                  Updated At                  Duration
********-****-****-****-************        1.0.0-dev.4        offline_store_import        [trackerId]       RUNNING         2023-05-17T10:07:13Z        2023-05-17T10:07:13Z        18.178s

This command provides the ID of the job run. To view the logs for a specific job run, you can run the following command in the OCP CLI:

$ ocp app logs --jobId=[id]

2023-05-17T10:07:25.844Z INFO  Preparing Offline Store Import: {} and status 
2023-05-17T10:07:27.671Z INFO  Fetching names of offline store files
2023-05-17T10:07:28.691Z DEBUG Found offline store files [
  {
    name: 'quickstart_test_offlinestores.json',
    lastModified: 1684317162000
  }
]
2023-05-17T10:07:28.693Z INFO  Found files to be imported:  [ 'quickstart_test_offlinestores.json' ]
2023-05-17T10:07:28.703Z INFO  Importing offline stores from files
2023-05-17T10:07:28.703Z INFO  Downloading and importing offline store file:  quickstart_test_offlinestores.json
2023-05-17T10:07:28.865Z DEBUG Importing offline store {
  id: '7651b3f5-5d5d-4e32-a651-7b47c029ce8f',
  name: 'Shop Emporium',
  location: 'New York City'
}
2023-05-17T10:07:28.865Z DEBUG Importing offline store {
  id: 'a6cbf16e-f9d9-4c27-b2b2-11c8d7d68ddc',
  name: 'WooCommerce World',
  location: 'London'
}
...
2023-05-17T10:07:28.905Z INFO  Importing offline stores from files
2023-05-17T10:07:28.914Z INFO  Offline Store Import complete, imported 10 offline stores

These commands let you test the implementation of the OfflineStoreImportJob and debug any issues that arise.

To confirm the successful import of offline stores, use the [ODP GraphQL Explorer](https://app.zaius.com/app#/graphql](https://support.optimizely.com/hc/en-us/articles/4407282629005-Personalize-with-pre-built-real-time-segments-through-GraphQL):

  1. In ODP, go to Account Settings > APIs > GraphQL Explorer.
  2. Select your custom object from the Explorer pane on the left.
  3. Execute the pre-generated query to view the imported offline stores.

If the import was successful, you should see the imported offline stores in the results, similar to the following:

Next, you can publish the app to the ODP App Directory.