Express-like routing for DynamoDB Streams with type-safe handlers, validation, attribute filtering, batch processing, and SQS deferral
npm install @rogerchi/ddb-stream-routerA TypeScript library providing Express-like routing for DynamoDB Stream events. Register type-safe handlers for INSERT, MODIFY, and REMOVE operations using discriminator functions or schema validators like Zod. Defer heavy processing to SQS with a simple .defer(handlerId) chain to keep your stream processing fast and reliable.
``bash`
npm install @rogerchi/ddb-stream-routeror
yarn add @rogerchi/ddb-stream-router
Peer dependencies:
`bash`
npm install @aws-sdk/util-dynamodb @aws-sdk/client-sqs
- Express-like API - Familiar .onInsert(), .onModify(), .onRemove(), .onTTLRemove(), .use() methodsreportBatchItemFailures
- Type Safety - Full TypeScript inference from discriminators and parsers
- Flexible Matching - Use type guards or schema validators (Zod, etc.)
- Attribute Filtering - React to specific attribute changes in MODIFY events
- Batch Processing - Group records and process them together
- Middleware Support - Intercept records before handlers
- Deferred Processing - Automatically enqueue to SQS for async processing
- Global Tables - Filter records by region
- Partial Batch Failures - Lambda retry support via
- TTL Removal Handling - Separate handlers for TTL-triggered vs user-initiated deletions
`typescript
import { StreamRouter } from '@rogerchi/ddb-stream-router';
import type { DynamoDBStreamHandler } from 'aws-lambda';
interface User {
pk: string;
sk: string;
name: string;
email: string;
}
// Type guard discriminator
const isUser = (record: unknown): record is User =>
typeof record === 'object' &&
record !== null &&
'pk' in record &&
(record as { pk: string }).pk.startsWith('USER#');
const router = new StreamRouter();
router
.onInsert(isUser, async (newUser, ctx) => {
console.log(User created: ${newUser.name});User updated: ${oldUser.name} -> ${newUser.name}
})
.onModify(isUser, async (oldUser, newUser, ctx) => {
console.log();User deleted: ${deletedUser.name}
})
.onRemove(isUser, async (deletedUser, ctx) => {
console.log();
});
// Simplified export with built-in batch failure support
export const handler = router.streamHandler;
// Or with custom options:
// export const handler: DynamoDBStreamHandler = async (event) => {
// return router.process(event, { reportBatchItemFailures: true });
// };
`
The discriminator/parser is matched against different images based on event type:
| Event Type | Image Used for Matching |
|------------|------------------------|
| INSERT | newImage |MODIFY
| | newImage |REMOVE
| | oldImage |
For MODIFY events, the newImage is used for matching because you typically want to route based on the current state of the record. If a record's type changed (e.g., pk prefix changed from USER# to ADMIN#), the handler for the new type will be invoked.
Validation Target: Use the validationTarget option to control which image(s) are validated:"newImage"
- (default) - Validates the new image"oldImage"
- - Validates the old image (useful for processing based on previous state)"both"
- - Both images must match (useful for type migration validation)
`typescript`
// Only process if BOTH old and new images match
router.onModify(isUser, async (oldUser, newUser, ctx) => {
// Both states are guaranteed to be valid Users
}, { validationTarget: "both" });
`typescript
import { z } from 'zod';
import { StreamRouter } from '@rogerchi/ddb-stream-router';
const UserSchema = z.object({
pk: z.string().startsWith('USER#'),
sk: z.string(),
name: z.string(),
email: z.string(),
});
type User = z.infer
const router = new StreamRouter();
// Schema validates and parses data before handler receives it
router.onInsert(UserSchema, async (newUser: User, ctx) => {
// newUser is guaranteed to match the schema
console.log(Valid user: ${newUser.email});`
});
React only when specific attributes change:
`typescript
// Only trigger when email changes
router.onModify(
isUser,
async (oldUser, newUser, ctx) => {
await sendEmailVerification(newUser.email);
},
{ attribute: 'email', changeType: 'changed_attribute' }
);
// Nested attributes with dot notation
router.onModify(
isUser,
async (oldUser, newUser, ctx) => {
console.log(Theme: ${oldUser.preferences.theme} -> ${newUser.preferences.theme});
},
{ attribute: 'preferences.theme', changeType: 'changed_attribute' }
);
// Watching parent catches all nested changes
router.onModify(
isUser,
async (oldUser, newUser, ctx) => {
// Triggers when preferences.theme OR preferences.notifications changes
console.log('Any preference changed');
},
{ attribute: 'preferences' }
);
// Trigger when tags are added to a collection
router.onModify(
isUser,
async (oldUser, newUser, ctx) => {
console.log('New tags added');
},
{ attribute: 'tags', changeType: 'new_item_in_collection' }
);
// Multiple change types (OR logic)
router.onModify(
isUser,
async (oldUser, newUser, ctx) => {
console.log('Tags modified');
},
{ attribute: 'tags', changeType: ['new_item_in_collection', 'remove_item_from_collection'] }
);
`
Change types:
- new_attribute - Attribute addedremove_attribute
- - Attribute removed changed_attribute
- - Attribute value changednew_item_in_collection
- - Item added to List or Set (SS/NS/BS)remove_item_from_collection
- - Item removed from List or Setchanged_item_in_collection
- - Items both added and removed in List/Set
Supported collection types:
- Arrays (DynamoDB Lists)
- Sets (DynamoDB String Sets, Number Sets, Binary Sets)
DynamoDB TTL automatically deletes expired items, creating REMOVE events with special userIdentity metadata. The router lets you handle TTL removals separately from user-initiated deletions.
`typescriptSession expired via TTL: ${oldImage.sessionId}
// Handle TTL-triggered removals only
router.onTTLRemove(isSession, async (oldImage, ctx) => {
console.log();
await cleanupExpiredSession(oldImage);
});
// Handle user-initiated deletions only (exclude TTL)
router.onRemove(isSession, async (oldImage, ctx) => {
console.log(User logged out: ${oldImage.userId});
await handleUserLogout(oldImage);
}, { excludeTTL: true });
// Default: onRemove receives both TTL and user-initiated removals
router.onRemove(isUser, async (oldImage, ctx) => {
console.log(User removed: ${oldImage.userId});`
});
TTL removals are identified by:
- userIdentity.type === "Service"userIdentity.principalId === "dynamodb.amazonaws.com"
-
See examples/ttl-removal.ts for more examples.
`typescriptProcessing ${record.eventName}: ${record.eventID}
// Logging middleware
router.use(async (record, next) => {
console.log();
await next();
});
// Skip certain records
router.use(async (record, next) => {
if (record.eventSourceARN?.includes('test-table')) {
return; // Don't call next() to skip
}
await next();
});
// Error handling
router.use(async (record, next) => {
try {
await next();
} catch (error) {
await recordMetric('stream.error', 1);
throw error;
}
});
`
Process multiple records together:
`typescriptProcessing ${records.length} changes
// All matching records in one handler call
router.onInsert(
isInventoryChange,
async (records) => {
console.log();
for (const { newImage, ctx } of records) {
// process each record
}
},
{ batch: true }
);
// Group by attribute value
router.onInsert(
isAuditLog,
async (records) => {
const userId = records[0].newImage.userId;
console.log(${records.length} logs for user ${userId});
},
{ batch: true, batchKey: 'userId' }
);
// Group by primary key
router.onModify(
isItem,
async (records) => {
// All records for the same pk+sk
},
{ batch: true, batchKey: { partitionKey: 'pk', sortKey: 'sk' } }
);
`
Offload heavy processing to SQS to keep stream processing fast:
`typescript
import { SQSClient, SendMessageCommand } from '@aws-sdk/client-sqs';
import { StreamRouter, createSQSClient } from '@rogerchi/ddb-stream-router';
const router = new StreamRouter({
deferQueue: process.env.DEFER_QUEUE_URL,
sqsClient: createSQSClient(new SQSClient({}), SendMessageCommand),
});
// Immediate handler
router.onInsert(isOrder, async (order, ctx) => {
console.log('Order received');
});
// Deferred handler - enqueues to SQS instead of executing immediately
router
.onInsert(isOrder, async (order, ctx) => {
await sendConfirmationEmail(order);
await generateInvoice(order);
})
.defer('order-email-handler', { delaySeconds: 30 });
// Stream handler
export const streamHandler = router.streamHandler;
// SQS handler (can be same or different Lambda function)
export const sqsHandler = router.sqsHandler;
`
The handler ID in .defer(id) is required to match records when processing from SQS. It ensures stable routing across deployments and supports cross-function processing.
Handler signatures adapt based on your DynamoDB stream configuration:
| Stream View Type | INSERT | MODIFY | REMOVE |
|-----------------|--------|--------|--------|
| KEYS_ONLY | (keys, ctx) | (keys, ctx) | (keys, ctx) |NEW_IMAGE
| | (newImage, ctx) | (undefined, newImage, ctx) | (undefined, ctx) |OLD_IMAGE
| | (undefined, ctx) | (oldImage, undefined, ctx) | (oldImage, ctx) |NEW_AND_OLD_IMAGES
| | (newImage, ctx) | (oldImage, newImage, ctx) | (oldImage, ctx) |
Process only records from the current region:
`typescript`
const router = new StreamRouter({ sameRegionOnly: true });
`typescript`
const router = new StreamRouter({
// Stream view type (default: 'NEW_AND_OLD_IMAGES')
streamViewType: 'NEW_AND_OLD_IMAGES',
// Auto-unmarshall DynamoDB JSON (default: true)
unmarshall: true,
// Only process same-region records (default: false)
sameRegionOnly: false,
// Default SQS queue for deferred handlers
deferQueue: 'https://sqs...',
// SQS client for deferred processing
sqsClient: { sendMessage: async (params) => { ... } },
// Return batchItemFailures format for streamHandler/sqsHandler (default: true)
reportBatchItemFailures: true,
// Optional logger for trace logging (e.g., console, pino, Powertools Logger)
logger: console,
});
`typescript
const result = await router.process(event);
// { processed: 10, succeeded: 9, failed: 1, errors: [...] }
// Or with partial batch failures for Lambda retry
const result = await router.process(event, { reportBatchItemFailures: true });
// { batchItemFailures: [{ itemIdentifier: 'sequence-number' }] }
`
`typescript`
class StreamRouter
constructor(options?: StreamRouterOptions);
onInsert
onModify
onRemove
onTTLRemove
use(middleware): this;
process(event, options?): Promise
processDeferred(sqsEvent, options?): Promise
}
`typescript`
interface HandlerRegistration {
// id: unique identifier for this deferred handler (used to match records in processDeferred)
defer(id: string, options?: { queue?: string; delaySeconds?: number }): StreamRouter;
onInsert(...): HandlerRegistration;
onModify(...): HandlerRegistration;
onRemove(...): HandlerRegistration;
onTTLRemove(...): HandlerRegistration;
}
`typescript``
interface HandlerContext {
eventName: 'INSERT' | 'MODIFY' | 'REMOVE';
eventID?: string;
eventSourceARN?: string;
}
Apache-2.0