Data sync source
Build an Optimizely Connect Platform (OCP) app that creates custom sources that users who install your app can use in data syncs in the OCP Sync Manager.
You can add a custom source to your Optimizely Connect Platform (OCP) app by completing the following steps:
- Declare sources in the app manifest file (
app.yml). - Declare the schema for your sources (one schema per source).
src/sources/schema/\*.ymlorsrc/sources/{SchemaFunction}.ts
- Implement the logic for the sources.
- Retrieve data from external systems by implementing functions (
src/sources/{EntryPoint}.ts) and jobs (src/sources/{JobClassName}.ts) - Manage the source lifecycle events (
src/sources/{Lifecycle}.ts)
- Retrieve data from external systems by implementing functions (
Prerequisite
Register and scaffold your app.
Declare sources in the app manifest file
You can define multiple sources in a single app. You must declare every source in the sources section of the app manifest file (app.yml), and each source must have its own schema and logic.
description– Human-readable description of the source's purpose.schema– A string referencing a static schema name or an object withentry_pointfor dynamic schemas.function– (Optional) Defines the entry point for source functionality.entry_point– (Optional) Class name implementing the source logic.
lifecycle– (Optional) Defines the entry point for source lifecycle management.entry_point– (Optional) Class name implementing lifecycle hooks.
jobs– (Optional) Defines background tasks for the source. Contains key-value pairs where each key is a job name and each value is a job configuration.
The following is an example sources section:
sources:
wordpress_post_sync:
description: Sync Posts from WordPress
schema: post
function:
entry_point: WordpressPostSource
lifecycle:
entry_point: WordpressPostLifecycle
jobs:
post_sync:
entry_point: WordpressPostSyncJob
description: Performs synchronization of postsWhen a user installs your app, any sources you defined in the app displays in the source's Object drop-down list on the Sync Manager page in OCP.
Schema
The schema is where you define the structure of data that flows from your app to the receiving data sync. The source schema is used to configure the data sync field mappings.
Structure
A schema consists of fields and custom types. Fields can use either built-in primitive types (string, integer, boolean, or decimal) or reference custom types that you define.
Schema-level properties:
name– Unique identifier matching theschemavalue in the manifest.display_name– User-friendly name.description– The schema's purpose.fields– Array of field definitions.custom_types– (Optional) Array of reusable custom type definitions.
Field properties:
name– Field identifier.display_name– User-friendly field name.description– Field's purpose.type– Field type. Can be primitive, array, or custom type reference.primary– (Optional) Boolean. Designates the unique identifier (only for primitive types can be primary).
Custom Type properties:
name– Unique identifier for the custom type.display_name– User-friendly name.description– What this type represents.fields– Array of field definitions (same structure as schema fields).
Types
Source schemas support primitive types, arrays, and custom types.
| Type Syntax | Description | Example |
|---|---|---|
string | Single text value. | "Product Name" |
[string] | Array of text values. | ["tag1", "tag2"] |
integer | Single whole number. | 42 |
[integer] | Array of whole numbers. | [1, 2, 3] |
boolean | True or false value. | true |
[boolean] | Array of booleans. | [true, false] |
decimal | Decimal number. | 19.99 |
[decimal] | Array of decimals. | [9.99, 19.99] |
ExampleCustomType | Custom type object. | {id: 1, name: "..."} |
[ExampleCustomType] | Array of custom objects. | [{...}, {...}] |
Type definitions
Source schemas support primitive types, arrays, and custom types:
Primitive Types – Basic data types that include the following:
string– Text data.boolean– True or false values.integer– Whole numbers (also aliased asint).decimal– Floating-point numbers (also aliased asfloat).
Array Types – Collections using bracket syntax [type].
[string],[integer],[boolean],[decimal]– Arrays of primitive types.[CustomTypeName]– Arrays of custom type objects.
Custom Types – Reusable structured data types that can be referenced across multiple fields. Useful for complex, nested data structures.
When to Use
Arrays – Use when a field contains multiple values.
- Lists of primitive values (tags, IDs, prices).
- Collections of structured objects.
Custom types – Use when you have reusable structured data.
- Nested objects with multiple fields.
- Data structures used in multiple places.
- Complex hierarchical models.
Use Cases
Arrays of primitives
- Tags/Categories – Use
[string]for product tags, article categories, or keywords. - Related IDs – Use
[string]or[integer]for related product IDs or category IDs. - Multi-value Attributes – Use
[string]for colors, sizes, or other multi-select attributes. - Price History – Use
[decimal]for historical pricing data. - Feature Flags – Use
[boolean]for per-region or per-variant feature flags.
Custom types
- Reusable Address/Contact Information – Define once, use for billing, shipping, or warehouse.
- Pricing Structures – Create custom types for pricing tiers, discounts, or promotions.
- User/Author Profiles – Define user information that appears in multiple contexts.
- Nested Product Variations – Model product variants, options, or configurations.
- Hierarchical Categories – Define category structures with multiple levels.
Best practices
-
Choose the right type.
- Use primitive arrays
[string]or[integer]for simple lists. - Use custom types when objects have multiple related fields.
- Use primitive arrays
-
Use descriptive names – Field and custom type names should clearly indicate what they represent.
-
Define once, reference everywhere – Avoid duplicating field definitions by using custom types.
-
Keep types focused – Each custom type should represent a single, cohesive concept.
-
Document relationships – Use clear descriptions to explain how fields relate to each other.
-
Consider performance – Large arrays may impact processing.
Declare the schema
The schema is where you define the structure of data that flows through your app, which then displays as options in the data sync field mappings. OCP calls your source class method with properties defined in your schema.
You can choose to implement a static or dynamic schema.
- Static – Use when your data structure is fixed and known in advance, all sources of this type share the same data structure, and you want to define the schema declaratively without code.
- Dynamic schema – Use when you need to fetch schema information from external systems, and when different instances of the same source might have different fields.
Declare static schema
You must create a matching .yml file in src/sources/schemas for each schema field you defined in the app.yml file.
The following is an example for the post.yml schema:
name: post
display_name: Post
description: External post information
fields:
- name: external_id
type: string
display_name: External ID
description: Primary identifier in external system
primary: true
- name: post_title
type: string
display_name: Post Title
description: The title of the post
- name: tags
type: "[string]"
display_name: Tags
description: Post tags
- name: related_post_ids
type: "[string]"
display_name: Related Posts
description: IDs of related posts
- name: author
type: Author
display_name: Author
description: Post author information
- name: contributors
type: "[Author]"
display_name: Contributors
description: Additional contributors to the post
custom_types:
- name: Author
display_name: Author
description: Author profile information
fields:
- name: author_id
type: string
display_name: Author ID
description: Unique author identifier
- name: name
type: string
display_name: Name
description: Author's full name
- name: email
type: string
display_name: Email
description: Author's email addressDeclare dynamic schema
You must create and export a class in src/sources/ that extends SourceSchemaFunction. The class name must match the schema entry_point in the app manifest.
Implement the getSourcesSchema() method, which returns a SourceSchema object.
The following shows an example of the app manifest:
sources:
wordpress_post_sync:
...
schema:
entry_point: WordpressPostSchema
...The following is an example implementation:
import * as App from '@zaiusinc/app-sdk';
export class WordPressPostSchema extends App.SourceSchemaFunction {
public async getSourcesSchema(): Promise<App.SourceSchema> {
try {
// Fetch a sample post from WordPress API
const response = await fetch(`${this.config.wordpressUrl}/wp-json/wp/v2/posts?per_page=1`);
const posts = await response.json();
// Use the sample post to build our schema
const samplePost = posts[0];
// Build schema fields based on sample data
const fields = [
// Always include ID as primary field
{
name: 'id',
type: 'string',
display_name: 'Post ID',
description: 'WordPress post identifier',
primary: true,
},
];
// Add fields based on what is in the sample post
if (samplePost.title) {
fields.push({
name: 'title',
type: 'string',
display_name: 'Title',
description: 'Post title',
});
}
if (samplePost.content) {
fields.push({
name: 'content',
type: 'string',
display_name: 'Content',
description: 'Post content',
});
}
if (samplePost.tags && Array.isArray(samplePost.tags)) {
fields.push({
name: 'tags',
type: '[string]',
display_name: 'Tags',
description: 'Post tags',
});
}
if (samplePost.author) {
fields.push({
name: 'author',
type: 'Author',
display_name: 'Author',
description: 'Post author information',
});
}
if (samplePost.comments && Array.isArray(samplePost.comments)) {
fields.push({
name: 'comments',
type: '[Comment]',
display_name: 'Comments',
description: 'Post comments',
});
}
// Define custom types
const customTypes = [];
if (samplePost.author) {
customTypes.push({
name: 'Author',
display_name: 'Author',
description: 'WordPress author information',
fields: [
{
name: 'id',
type: 'integer',
display_name: 'Author ID',
description: 'WordPress author identifier',
},
{
name: 'name',
type: 'string',
display_name: 'Name',
description: 'Author display name',
},
{
name: 'email',
type: 'string',
display_name: 'Email',
description: 'Author email address',
},
],
});
}
if (samplePost.comments && Array.isArray(samplePost.comments)) {
customTypes.push({
name: 'Comment',
display_name: 'Comment',
description: 'WordPress comment data',
fields: [
{
name: 'id',
type: 'integer',
display_name: 'Comment ID',
description: 'WordPress comment identifier',
},
{
name: 'author_name',
type: 'string',
display_name: 'Commenter Name',
description: 'Name of the commenter',
},
{
name: 'content',
type: 'string',
display_name: 'Comment Text',
description: 'Comment content',
},
{
name: 'approved',
type: 'boolean',
display_name: 'Approved',
description: 'Whether the comment is approved',
},
],
});
}
return {
name: 'wordpress_post',
display_name: 'WordPress Post',
description: 'Blog posts from WordPress with author and comment data',
fields: fields,
custom_types: customTypes.length > 0 ? customTypes : undefined,
};
} catch (error) {
// Fallback schema if API request fails
return {
name: 'wordpress_post',
display_name: 'WordPress Post',
description: 'Blog posts from WordPress',
fields: [
{
name: 'id',
type: 'string',
display_name: 'Post ID',
description: 'WordPress post identifier',
primary: true,
},
],
};
}
}
}Implement the logic
Manage the source lifecycle
Class Structure
Create and export a class in src/sources/ that extends SourceLifecycle. The class name must match the lifecycle entry_point in the manifest.
Methods to implement
Handle the following methods async and handle errors gracefully:
onSourceCreate()– Called when a source is created. Use this to configure webhooks.onSourceUpdate()– Called when a source configuration is updated.onSourceDelete()– Called when a source is deleted. Use this for cleanup.onSourceEnable()– Called when a source is enabled.onSourcePause()– Called when a source is paused.
All methods should return a SourceCallbackResponse with a success: boolean and an optional message.
The following is an example implementation:
import * as App from '@zaiusinc/app-sdk';
// Handles lifecycle events for the WordPress Post source
export class WordpressPostLifecycle extends App.SourceLifecycle {
// Called when a source is created (for example, register webhooks)
public async onSourceCreate(): Promise<App.SourceCreateResponse> {
try {
const webhookUrl = this.config.webhookUrl;
// Register webhook with external API here
return {success: true};
} catch (error) {
return {
success: false,
message: `Failed to create webhook: ${error.message}`,
};
}
}
// Called when a source configuration is updated
public async onSourceUpdate(): Promise<App.SourceUpdateResponse> {
try {
// Update webhook or other resources here
return {success: true};
} catch (error) {
return {
success: false,
message: `Failed to update webhook: ${error.message}`,
};
}
}
// Called when a source is deleted (for example, cleanup)
public async onSourceDelete(): Promise<App.SourceDeleteResponse> {
try {
// Delete webhook or cleanup resources here
return {success: true};
} catch (error) {
return {
success: false,
message: `Failed to delete webhook: ${error.message}`,
};
}
}
// Called when a source is enabled
public async onSourceEnable(): Promise<App.SourceEnableResponse> {
// Implement logic needed when the source is enabled
return {success: true};
}
// Called when a source is paused
public async onSourcePause(): Promise<App.SourcePauseResponse> {
// Implement logic needed when the source is paused
return {success: true};
}
}Implement source functions (push model)
Use source functions to receive and process incoming data from external systems (push model).
Source functions
- handle incoming data (external systems push data to the app).
- have an event-driven processing model and is triggered by webhooks or API calls.
- are best for quick operations (seconds) and small to medium payloads per request.
- are stateless between requests.
The following are common use cases for source functions:
- Webhook endpoints for third-party platforms (for example, WordPress, Shopify, or other CRM systems).
- Real-time event processing from external APIs.
- Data transformation pipelines where external data must be standardized.
- Operations with no need to maintain state.
Class structure
Create and export a class in src/sources/ that extends SourceFunction.
The class name must match the entry_point in the manifest (for example, WordpressPostSource).
Method to implement
perform()
- Processes incoming webhook data.
- Returns a
Responseobject. - Use
this.source.emit()to send data to configured destinations.
The following is an example implementation:
import * as App from '@zaiusinc/app-sdk';
export interface PostData {
id: string;
title: string;
content?: string;
author?: string;
}
// Handles incoming WordPress post data
export class WordpressPostSource extends App.SourceFunction {
public async perform(): Promise<App.Response> {
try {
// Extract data from the request
const webhookData = this.request.body;
// Transform data to match your schema
const postData: PostData = {
id: webhookData.id,
title: webhookData.title.rendered,
};
// Emit the data to be processed
const result = await this.source.emit({data: postData});
if (!result.success) {
return new App.Response(400, 'Failed to process data');
}
return new App.Response(200, 'Success');
} catch (error) {
return new App.Response(500, error.message);
}
}
}Implement source jobs (pull model)
Use source jobs to pull data from external systems on-demand (pull model).
Source jobs
- actively fetch data from external systems.
- have a batch-oriented processing model and processes data in iterative steps.
- designed for long-running operations (minutes to hours).
- handle large datasets using pagination and chunking.
- maintain state across processing steps.
- let you track progress and state.
The following are common use cases for source jobs:
- Historical data imports or backfilling.
- Resource-intensive operations that should run during off-peak hours.
- Background tasks that you can manually trigger to run for a source.
Class structure
- Create and export a class in
src/sources/that extendsSourceJob. - The class name must match the
entry_pointin the manifest.
Methods to implement
prepare(params: ValueHash, status?: SourceJobStatus, resuming?: boolean): Promise<SourceJobStatus>
- Called at the start of a job and when a job is resumed after interruption.
- Used to establish connections, read secrets, and set up initial job state.
- Returns the initial job status to be used for the first perform call.
perform(status: SourceJobStatus): Promise<SourceJobStatus>
- Performs a unit of work and returns the updated state.
- Called repeatedly until
completeis set totruein the returned status. - Design this method to perform small, interruptible units of work.
The following is an example implementation:
import * as App from '@zaiusinc/app-sdk';
// Background job to sync WordPress posts
export class WordpressPostSyncJob extends App.SourceJob {
public async prepare(
params: App.ValueHash,
status?: App.SourceJobStatus,
resuming?: boolean,
): Promise<App.SourceJobStatus> {
// Initialize or restore job state
if (resuming && status) {
return status; // Resume with previous state
}
// Set up initial state for a new job
return {
state: {
page: 0,
processed: 0,
hasMore: true,
},
complete: false,
};
}
public async perform(status: App.SourceJobStatus): Promise<App.SourceJobStatus> {
const {page, processed, hasMore} = status.state;
if (!hasMore) {
// No more data to process, mark job as complete
status.complete = true;
return status;
}
try {
// Fetch a batch of data using WordPress API
const response = await fetchPostsPage(page, 10);
// Process the batch, send each item to destination
for (const item of response.data) {
await this.source.emit({data: item});
}
// Update state for next iteration
status.state = {
page: page + 1,
processed: processed + response.data.length,
hasMore: response.hasMore,
};
return status;
} catch (error) {
// Handle errors
console.error('Error in job:', error);
status.complete = true;
return status;
}
}
}Manage job state
SourceJobStatusinterface
state– Object storing the current job state. This state is passed between perform calls.complete– Boolean flag indicating if the job is finished.- Jobs run in a loop where
perform()is called repeatedly untilcompleteis set totrue. - Each call to
perform()should handle a small unit of work and return updated state. - The state returned by each
perform()call is passed to the next call.
Best practices
- Small work units – Design
perform()to handle small units of work that can complete quickly. - Stateful design – Store all necessary state in the
stateobject to enable resumption. - Error handling – Implement proper error handling to ensure jobs fail gracefully.
- Interruptibility– Use
performInterruptibleTask()andsleep()for operations that may take time. - Progress tracking – Update the state with progress information to show job advancement.
Handle errors
- Use appropriate HTTP status codes in your
Responseobjects. - Return detailed error messages to help with debugging.
- Implement proper try and catch blocks in your functions.
Complete and publish your app
Updated 17 days ago