Pipelines
A guide on how to use Event's modern and lightweight SDK to programmatically manage your data Pipelines
Event's SDK requires an Event API Key, which can be created via the settings page in your Developer Dashboard
To start using the Event SDK, install the package:
Node.js
npm install @event-inc/pipelines
We'll start off by initializing the client, which requires your Event API key. You can specify the environment as either "sandbox" or "production".
Node.js
import { createClient } from '@event-inc/pipelines';
const client = createClient(process.env.EVENT_INC_API_KEY, {
environment: "sandbox"
});
Create a Pipeline using the
createPipeline
function and use it to orchestrate data. Here's an example:JavaScript
TypeScript
import { createPipeline } from '@event-inc/pipelines';
const pipeline = await createPipeline(client, {
label: "Sync Stripe Customers to MongoDB",
source: {
key: source.key,
events: ["customer.created"],
},
destination: {
key: destination.key,
action: "insertOne",
},
extractors: [
{
type: "http",
label: "shippingAddress",
config: {
url: "https://api.yourapp.com/user-shipping-address",
method: "GET"
},
settings: {
startToCloseTimeout: "10 minutes",
retry: {
maximumAttempts: 10
}
}
}
],
transformation: {
type: "javascript",
func: function transform(event, context) {
const body = JSON.parse(event.body);
const { name, email } = body.data.object;
const shippingAddress = context.shippingAddress;
return {
collection: "stripe-customers",
document: {
name,
email,
shippingAddress
},
};
},
},
settings: {
startToCloseTimeout: "30 minutes",
retry: {
maximumAttempts: 100,
},
},
});
import { createPipeline } from '@event-inc/pipelines';
import { Event } from "@event-inc/types";
import Stripe from "stripe";
const pipeline = await createPipeline<"stripe", "mongodb">(client, {
label: "Sync Stripe Customers to MongoDB",
source: {
key: source.key,
events: ["customer.created"],
},
destination: {
key: destination.key,
action: "insertOne",
},
extractors: [
{
type: "http",
label: "shippingAddress",
config: {
url: "https://api.yourapp.com/user-shipping-address",
method: "GET"
},
settings: {
startToCloseTimeout: "10 minutes",
retry: {
maximumAttempts: 10
},
}
}
],
transformation: {
type: "javascript",
func: function transform(event: Event, context: Record<string, unknown>) {
const body: Stripe.Customer = JSON.parse(event.body).data.object;
const { name, email } = body;
const shippingAddress = context.shippingAddress;
return {
collection: "stripe-customers",
document: {
name,
email,
shippingAddress
},
};
},
},
settings: {
startToCloseTimeout: "30 minutes",
retry: {
maximumAttempts: 10,
},
},
});
The
createPipeline
function requires the following parameters:The enrichment layer for your Pipeline
- type: the type of Extractor you'd like to use
String
- label: a human-readable name
String
- config:
- url: the HTTP endpoint you'll use to retrieve data
String
- method: the method of the HTTP request
String
- settings:
- startToCloseTimeout: the total duration of time to retry on failure
String
- retry
- maximumAttempts: the total amount of retries to attempt
Number
After successfully creating the pipeline, you should receive a response providing details about the created pipeline.
{
_id: 'pipe_MTY4NjE2OTUzMDM4MQ::ZjA4MzllNjYtYWI0NS00ZDZiLWEzNDUtY2FhYjUyMTdiOWYw',
name: 'Sync Stripe Customers to MongoDB',
key: 'pipeline::stripe::stripe-customers2::mongodb::stripe-customer-936435::MTY4NjE2OTUzMDM4MQ',
source: {
id: 'conn_src_MTY4NjE2OTUyNjczMw::ZTU1MDBjMzEtYjQ3NS00NmU2LTk0NzgtM2UzZGYyMzI5MDJk',
key: 'source::stripe::stripe-customers2',
type: 'stripe',
group: 'stripe-customers2',
events: [ 'customer.created' ],
// ...
},
destinations: [
{
id: 'conn_dst_MTY4NjE2OTUzMDE3Mg::NTAxZGY0ZmItM2RiMS00ZTk1LTlhYTYtMDI4YzdkOTM4MTYy',
key: 'destination::mongodb::stripe-customer-936435',
name: 'Production MongoDB',
label: 'Production MongoDB',
type: 'mongodb',
group: 'stripe-customer-936435',
action: 'insertOne',
// ...
}
],
middleware: [
{
key: 'shippingAddress',
_type: 'extractor::http',
url: 'https://api.yourapp.com/user-shipping-address',
headers: '{}',
method: 'GET',
data: '{}',
startToCloseTimeout: '10 minutes',
policies : {
retry : {
maximumAttempts: 3
}
}
},
{
_type: 'transformer',
language: 'javascript',
code: 'function transform(event, context) {\n' +
' const body = JSON.parse(event.body).data.object;\n' +
' const { name, email } = body;\n' +
' const shippingAddress = context.shippingAddress;\n' +
' return {\n' +
' collection: "stripe-customers",\n' +
' document: {\n' +
' name,\n' +
' email,\n' +
' shippingAddress\n' +
' },\n' +
' };\n' +
' }'
}
],
config: {
startToCloseTimeout: '10 minutes',
policies: {
retry: {
maximumAttempts: 10,
}
}
},
signature: {
secrets: [
'whsk_test_1e5f5291808ed64019832c5768d9f61d',
'whsk_live_e6ef7b9c679aeb9fc239a64efa227e71'
],
algorithm: 'sha256',
},
active: true,
deleted: false,
createdAt: 1686169530381,
createdDate: '2023-06-07T20:25:30.381Z'
}
Last modified 3mo ago