Events

Often, you need to write code that responds to creates, updates, or deletes in an API you're working with. For example, if a customer cancels their account, their Stripe `subscription.status` will change from `active` to `canceled`. In this instance, perhaps your team wants to write code that will deprovision their account, send them a confirmation email, and send a Slack message to their account manager.

Some web services use webhooks to notify you about events. But webhooks are ephemeral. There's risk that you might miss them or the service might not send them. And if you later find a bug in your webhook handler, there's no easy way to replay them.

Sequin's events are a durable event stream right in your Postgres database. Every sync contains a `_sync_event` table:

NameNotes
`id`(`string`)
`inserted_at`(`timestamp with time zone`)
`event_type`(`string`) One of `insert`, `update`, or `delete`
`object_type`(`string`) The name of the database table (e.g. `subscription`)
`payload`(`jsonb`) The payload of the event (more below)

You can read from this table directly. Or use our client library.

Enabling

To enable events for a sync, flip the switch in the Advanced section of the editing panel:

Advanced events

Client library

Sequin's client library for events lets you write code that gets triggered whenever a new event is inserted into the `_sync_event` table:

handler.on("event", (event: SequinEvent) => {
switch (event.object_type) {
case "subscription":
if (
event.event_type === "update" &&
event.payload.changes.status === "active"
) {
// Handle subscription that just turned active
}
default:
null;
}
});

We currently have a beta library for Node.

We have support for more languages coming soon. While we include instructions below to build your own events consumer, we'd prefer if you reached out to let us know so we can support you.

Payloads

The shape of an event `payload` depends on the `event_type`:

`insert`

{
"record_id": <string>,
// full jsonb of the new record
"record": <jsonb>
}

`update`

{
"record_id": <string>,
// full jsonb of the record after update
"record": <jsonb>,
// full jsonb of the record before update
"old_record": <jsonb>,
// jsonb that contains only the updated fields
"changes": <jsonb>
}

`delete`

{
"record_id": <string>,
// full jsonb of the record prior to deletion
"record": <jsonb>,
}

Example payloads

Stripe subscription created

{
"record_id": "sub_OiVTBEskMWApiYMT",
"record": {
"object": "subscription",
"collection_method": "charge_automatically",
"status": "active"
// ...
}
}

Stripe subscription updated

{
"record_id": "sub_OiVTBEskMWApiYMT",
"record": {
"object": "subscription",
"collection_method": "charge_automatically",
"status": "active"
// ...
},
"record_old": {
"object": "subscription",
"collection_method": "charge_automatically",
"status": "trialing"
// ...
},
"changes": {
"status": "active"
}
}

Airtable record deleted

{
"record_id": "rec_Hx8PczKgotc0UKAf",
"record": {
"company_name": "LL&H",
"current_employees_count": 2,
"status": "Reached out"
// ...
}
}

Build your own

When working with event streams, there are two primary challenges:

  1. "Locking" an event so that only one instance of your code can work on it at a time.
  2. Using a cursor to ensure your system doesn't process the same event more than once.

If you want to work with the `_sync_event` table directly, we provide two constructs that can help you manage locking and cursors:

  1. The `_sync_event_cursor` table
  2. The Postgres function `_assign_event_cursor()`

Here's how you can process events using this table and function:

1. First, acquire a lock on a cursor:

select stripe._assign_event_cursor()

If that returns `null`, your code wasn't able to get a lock. So it should try again after some delay.

Otherwise, this function will return a cursor, which you can use to read events from the `_sync_event` table.

2. Read events from `_sync_event`

Using the cursor returned by `_assign_event_cursor`, you can grab new events like this:

-- while you can choose whatever limit/batch size you like, limit 1 is easiest
-- With limit 1, you can process and ack each event individually
-- $id was returned by _assign_event_cursor()
select * from stripe._sync_event where id > $1 limit 1

3. Process the event

Your code now has the event and can perform its function.

4. Update the cursor

After you successfully process the event, you need to update the cursor. You'll use the `id` of the event you just processed. Run the following to do so:

-- $1 is the *new* cursor which is the id of the event you just processed
update stripe._sync_event_cursor set group_offset = $1

With the event processed, you can loop back to step #1.

Customization

The full signature of the cursor function is this:

_assign_event_cursor (name varchar DEFAULT 'default', default_start text DEFAULT 'FROM_NOW')

The first argument, `name`, is what you want to call your cursor group. You only need to worry about this if you'll have multiple systems processing this table. Otherwise, you can keep this as the default, `'default'`.

If you use multiple cursor groups, be sure to only update one group at a time:

-- $1 is the *new* cursor which is the id of the event you just processed
-- $2 is the name of the cursor group that processed the event
update stripe._sync_event_cursor set group_offset = $1 and group_name = $2

The second argument, `default_start`, is where in the `_sync_event` stream a cursor group should start if it's a brand new cursor. The default is `'FROM_NOW'` which means "only process events from now going forward." This is usually what you want, so if you're not sure, keep the default.

Syncing to your database
ORMs

Was this helpful?