Links

Pipelines

A guide on how to use Event's modern and lightweight SDK to programmatically manage your data Pipelines
Event's SDK requires an Event API Key, which can be created via the settings page in your Developer Dashboard

Getting Started

To start using the Event SDK, install the package:
Node.js
npm install @event-inc/pipelines

Initialize

We'll start off by initializing the client, which requires your Event API key. You can specify the environment as either "sandbox" or "production".
Node.js
import { createClient } from '@event-inc/pipelines';
const client = createClient(process.env.EVENT_INC_API_KEY, {
environment: "sandbox"
});

Create a Pipeline

Create a Pipeline using the createPipeline function and use it to orchestrate data. Here's an example:
JavaScript
TypeScript
import { createPipeline } from '@event-inc/pipelines';
const pipeline = await createPipeline(client, {
label: "Sync Stripe Customers to MongoDB",
source: {
key: source.key,
events: ["customer.created"],
},
destination: {
key: destination.key,
action: "insertOne",
},
extractors: [
{
type: "http",
label: "shippingAddress",
config: {
url: "https://api.yourapp.com/user-shipping-address",
method: "GET"
},
settings: {
startToCloseTimeout: "10 minutes",
retry: {
maximumAttempts: 10
}
}
}
],
transformation: {
type: "javascript",
func: function transform(event, context) {
const body = JSON.parse(event.body);
const { name, email } = body.data.object;
const shippingAddress = context.shippingAddress;
return {
collection: "stripe-customers",
document: {
name,
email,
shippingAddress
},
};
},
},
settings: {
startToCloseTimeout: "30 minutes",
retry: {
maximumAttempts: 100,
},
},
});
import { createPipeline } from '@event-inc/pipelines';
import { Event } from "@event-inc/types";
import Stripe from "stripe";
const pipeline = await createPipeline<"stripe", "mongodb">(client, {
label: "Sync Stripe Customers to MongoDB",
source: {
key: source.key,
events: ["customer.created"],
},
destination: {
key: destination.key,
action: "insertOne",
},
extractors: [
{
type: "http",
label: "shippingAddress",
config: {
url: "https://api.yourapp.com/user-shipping-address",
method: "GET"
},
settings: {
startToCloseTimeout: "10 minutes",
retry: {
maximumAttempts: 10
},
}
}
],
transformation: {
type: "javascript",
func: function transform(event: Event, context: Record<string, unknown>) {
const body: Stripe.Customer = JSON.parse(event.body).data.object;
const { name, email } = body;
const shippingAddress = context.shippingAddress;
return {
collection: "stripe-customers",
document: {
name,
email,
shippingAddress
},
};
},
},
settings: {
startToCloseTimeout: "30 minutes",
retry: {
maximumAttempts: 10,
},
},
});
The createPipeline function requires the following parameters:
Label Required String
A human-readable label for your Pipeline
Source Required Object
The source of data for your Pipeline
  • key: the Source's unique key String
  • events: the trigger event for your Pipeline String[]
Destination Required Object
The destination of data for your Pipeline
  • key: the Destination's unique key String
  • action: the API action to call when triggered String
Extractors Optional Array
The enrichment layer for your Pipeline
  • type: the type of Extractor you'd like to use String
  • label: a human-readable name String
  • config:
    • url: the HTTP endpoint you'll use to retrieve data String
    • method: the method of the HTTP request String
  • settings:
    • startToCloseTimeout: the total duration of time to retry on failure String
    • retry
      • maximumAttempts: the total amount of retries to attempt Number
Transformation Required Object
The transformation layer for your Pipeline
  • type: the type of transformation you'll be writing String
  • func: the written transformation code Function
Settings Optional Object
Additional policies and settings to apply to your Pipeline
  • startToCloseTimeout: the duration of time to retry on failure String
  • retry:
    • maximumAttempts: the total amount of retry attempts String
After successfully creating the pipeline, you should receive a response providing details about the created pipeline.
{
_id: 'pipe_MTY4NjE2OTUzMDM4MQ::ZjA4MzllNjYtYWI0NS00ZDZiLWEzNDUtY2FhYjUyMTdiOWYw',
name: 'Sync Stripe Customers to MongoDB',
key: 'pipeline::stripe::stripe-customers2::mongodb::stripe-customer-936435::MTY4NjE2OTUzMDM4MQ',
source: {
id: 'conn_src_MTY4NjE2OTUyNjczMw::ZTU1MDBjMzEtYjQ3NS00NmU2LTk0NzgtM2UzZGYyMzI5MDJk',
key: 'source::stripe::stripe-customers2',
type: 'stripe',
group: 'stripe-customers2',
events: [ 'customer.created' ],
// ...
},
destinations: [
{
id: 'conn_dst_MTY4NjE2OTUzMDE3Mg::NTAxZGY0ZmItM2RiMS00ZTk1LTlhYTYtMDI4YzdkOTM4MTYy',
key: 'destination::mongodb::stripe-customer-936435',
name: 'Production MongoDB',
label: 'Production MongoDB',
type: 'mongodb',
group: 'stripe-customer-936435',
action: 'insertOne',
// ...
}
],
middleware: [
{
key: 'shippingAddress',
_type: 'extractor::http',
url: 'https://api.yourapp.com/user-shipping-address',
headers: '{}',
method: 'GET',
data: '{}',
startToCloseTimeout: '10 minutes',
policies : {
retry : {
maximumAttempts: 3
}
}
},
{
_type: 'transformer',
language: 'javascript',
code: 'function transform(event, context) {\n' +
' const body = JSON.parse(event.body).data.object;\n' +
' const { name, email } = body;\n' +
' const shippingAddress = context.shippingAddress;\n' +
' return {\n' +
' collection: "stripe-customers",\n' +
' document: {\n' +
' name,\n' +
' email,\n' +
' shippingAddress\n' +
' },\n' +
' };\n' +
' }'
}
],
config: {
startToCloseTimeout: '10 minutes',
policies: {
retry: {
maximumAttempts: 10,
}
}
},
signature: {
secrets: [
'whsk_test_1e5f5291808ed64019832c5768d9f61d',
'whsk_live_e6ef7b9c679aeb9fc239a64efa227e71'
],
algorithm: 'sha256',
},
active: true,
deleted: false,
createdAt: 1686169530381,
createdDate: '2023-06-07T20:25:30.381Z'
}