SNS adapter for message-queue-toolkit
npm install @message-queue-toolkit/snsAWS SNS (Simple Notification Service) implementation for the message-queue-toolkit. Provides a robust, type-safe abstraction for publishing messages to SNS topics and consuming them via SQS queue subscriptions, with support for both standard and FIFO topics.
- Installation
- Features
- Core Concepts
- Quick Start
- Standard Topic Publisher
- SNS-to-SQS Consumer
- FIFO Topic Publisher
- FIFO Topic Consumer
- Configuration
- Topic Creation
- Topic Locator
- Publisher Options
- Consumer Options
- SNS-Specific Features
- Topic Subscriptions
- Fan-Out Pattern
- Message Filtering
- Cross-Account Publishing
- Advanced Features
- FIFO Topics
- FIFO Topic Requirements
- Message Groups
- FIFO-Specific Configuration
- Testing
- API Reference
``bash`
npm install @message-queue-toolkit/sns @message-queue-toolkit/sqs @message-queue-toolkit/core
Peer Dependencies:
- @aws-sdk/client-sns - AWS SDK for SNS@aws-sdk/client-sqs
- - AWS SDK for SQS (required for consumers)@aws-sdk/client-sts
- - AWS SDK for STS (for ARN resolution)zod
- - Schema validation
- ✅ Type-safe message handling with Zod schema validation
- ✅ Standard and FIFO topic support
- ✅ Automatic topic and subscription creation
- ✅ Fan-out pattern - publish once, consume from multiple queues
- ✅ Message filtering - subscribe to specific message types
- ✅ Message deduplication (publisher and consumer level)
- ✅ Payload offloading for large messages (S3 integration)
- ✅ Automatic retry logic with exponential backoff (inherited from SQS consumer)
- ✅ Dead Letter Queue (DLQ) support
- ✅ Handler spies for testing
- ✅ Pre-handlers and barriers for complex message processing
- ✅ Cross-account and cross-region publishing
SNS publishers send messages to topics (not queues). A topic is a logical access point that acts as a communication channel. Publishers are responsible for:
- Message validation against Zod schemas
- Automatic serialization
- Optional deduplication (preventing duplicate sends)
- Optional payload offloading (for messages > 256KB)
- FIFO-specific concerns (MessageGroupId, MessageDeduplicationId)
SNS consumers subscribe to topics via SQS queues (SNS-to-SQS pattern). This provides:
- Decoupling - topics don't know about subscribers
- Fan-out - one message published to multiple queues
- Persistence - messages queued until processed
- Retry logic - inherited from SQS consumer capabilities
- Message filtering - only receive relevant messages
Consumers are actually AbstractSnsSqsConsumer which extends AbstractSqsConsumer from the SQS package, inheriting all SQS consumer capabilities.
`text`
Publisher → SNS Topic → [Subscriptions] → SQS Queues → Consumers
↓
(optional filtering)
Key Difference from SQS:
- SQS: Direct queue-to-queue communication (1:1)
- SNS: Pub/Sub pattern with fan-out (1:N)
Messages use the same schema requirements as SQS. Each message must have:
- A unique message type field (discriminator for routing) - configurable via messageTypeResolver (required)messageIdField
- A message ID field (for tracking and deduplication) - configurable via (default: 'id')messageTimestampField
- A timestamp field (added automatically if missing) - configurable via (default: 'timestamp')
See the SQS README - Message Schemas section for full details.
`typescript
import { AbstractSnsPublisher } from '@message-queue-toolkit/sns'
import { SNSClient } from '@aws-sdk/client-sns'
import { STSClient } from '@aws-sdk/client-sts'
import z from 'zod'
// Define your message schemas
const UserCreatedSchema = z.object({
id: z.string(),
messageType: z.literal('user.created'),
userId: z.string(),
email: z.string().email(),
timestamp: z.string().optional(),
})
const UserUpdatedSchema = z.object({
id: z.string(),
messageType: z.literal('user.updated'),
userId: z.string(),
changes: z.record(z.unknown()),
timestamp: z.string().optional(),
})
type UserCreated = z.infer
type UserUpdated = z.infer
type SupportedMessages = UserCreated | UserUpdated
// Create your publisher class
class UserEventsPublisher extends AbstractSnsPublisher
constructor(snsClient: SNSClient, stsClient: STSClient) {
super(
{
snsClient,
stsClient,
logger: console,
errorReporter: { report: (error) => console.error(error) },
},
{
messageSchemas: [UserCreatedSchema, UserUpdatedSchema],
messageTypeResolver: { messageTypePath: 'messageType' },
creationConfig: {
topic: {
Name: 'user-events-topic',
},
},
deletionConfig: {
deleteIfExists: false,
},
}
)
}
}
// Use the publisher
const snsClient = new SNSClient({ region: 'us-east-1' })
const stsClient = new STSClient({ region: 'us-east-1' })
const publisher = new UserEventsPublisher(snsClient, stsClient)
await publisher.init()
// Publish to the topic - all subscribers will receive this message
await publisher.publish({
id: '123',
messageType: 'user.created',
userId: 'user-456',
email: 'user@example.com',
})
await publisher.close()
`
Consumers subscribe to SNS topics via SQS queues:
`typescript
import { AbstractSnsSqsConsumer } from '@message-queue-toolkit/sns'
import { MessageHandlerConfigBuilder } from '@message-queue-toolkit/core'
import type { Either } from '@lokalise/node-core'
type ExecutionContext = {
userService: UserService
}
class UserEventsConsumer extends AbstractSnsSqsConsumer<
SupportedMessages,
ExecutionContext
> {
constructor(
snsClient: SNSClient,
sqsClient: SQSClient,
stsClient: STSClient,
userService: UserService
) {
super(
{
snsClient,
sqsClient,
stsClient,
logger: console,
errorReporter: { report: (error) => console.error(error) },
consumerErrorResolver: {
resolveError: () => ({ resolve: 'retryLater' as const }),
},
transactionObservabilityManager: {
start: () => {},
stop: () => {},
},
},
{
messageTypeResolver: { messageTypePath: 'messageType' },
handlers: new MessageHandlerConfigBuilder
.addConfig(
UserCreatedSchema,
async (message, context): Promise
await context.userService.createUser(message.userId, message.email)
return { result: 'success' }
}
)
.addConfig(
UserUpdatedSchema,
async (message, context): Promise
await context.userService.updateUser(message.userId, message.changes)
return { result: 'success' }
}
)
.build(),
// Queue configuration (SQS queue that subscribes to SNS topic)
creationConfig: {
queue: {
QueueName: 'user-events-consumer-queue',
},
topic: {
Name: 'user-events-topic', // Must match publisher's topic name
},
},
// Subscription configuration
subscriptionConfig: {
updateAttributesIfExists: false,
},
deletionConfig: {
deleteIfExists: false,
},
},
{ userService } // Execution context
)
}
}
// Use the consumer
const consumer = new UserEventsConsumer(snsClient, sqsClient, stsClient, userService)
await consumer.start() // Creates queue, subscribes to topic, starts consuming
// Later, to stop
await consumer.close()
`
What happens during start():
1. Creates SNS topic (if using creationConfig)creationConfig
2. Creates SQS queue (if using )
3. Subscribes queue to topic
4. Configures queue permissions to allow SNS to publish
5. Starts consuming messages from the queue
FIFO topics guarantee message ordering and exactly-once delivery:
`typescript
class UserEventsFifoPublisher extends AbstractSnsPublisher
constructor(snsClient: SNSClient, stsClient: STSClient) {
super(
{
snsClient,
stsClient,
logger: console,
errorReporter: { report: (error) => console.error(error) },
},
{
messageSchemas: [UserCreatedSchema, UserUpdatedSchema],
messageTypeResolver: { messageTypePath: 'messageType' },
fifoTopic: true, // Enable FIFO mode
// Option 1: Use a field from the message as MessageGroupId
messageGroupIdField: 'userId',
// Option 2: Use a default MessageGroupId for all messages
// defaultMessageGroupId: 'user-events',
creationConfig: {
topic: {
Name: 'user-events-topic.fifo', // Must end with .fifo
Attributes: {
FifoTopic: 'true',
ContentBasedDeduplication: 'false', // or 'true' for automatic deduplication
},
},
},
}
)
}
}
// Publishing to FIFO topic
const fifoPublisher = new UserEventsFifoPublisher(snsClient, stsClient)
await fifoPublisher.init()
// Messages with the same userId will be processed in order
await fifoPublisher.publish({
id: '123',
messageType: 'user.created',
userId: 'user-456', // Used as MessageGroupId
email: 'user@example.com',
})
await fifoPublisher.publish({
id: '124',
messageType: 'user.updated',
userId: 'user-456', // Same group - processed after the first message
changes: { name: 'John Doe' },
})
// You can also explicitly provide MessageGroupId
await fifoPublisher.publish(
{
id: '125',
messageType: 'user.created',
userId: 'user-789',
email: 'other@example.com',
},
{
MessageGroupId: 'custom-group-id',
MessageDeduplicationId: 'unique-dedup-id', // Optional
}
)
`
`typescript
class UserEventsFifoConsumer extends AbstractSnsSqsConsumer<
SupportedMessages,
ExecutionContext
> {
constructor(
snsClient: SNSClient,
sqsClient: SQSClient,
stsClient: STSClient,
userService: UserService
) {
super(
{
snsClient,
sqsClient,
stsClient,
logger: console,
errorReporter: { report: (error) => console.error(error) },
consumerErrorResolver: {
resolveError: () => ({ resolve: 'retryLater' as const }),
},
transactionObservabilityManager: {
start: () => {},
stop: () => {},
},
},
{
fifoQueue: true, // Enable FIFO mode for SQS queue
messageTypeResolver: { messageTypePath: 'messageType' },
handlers: new MessageHandlerConfigBuilder
.addConfig(UserCreatedSchema, handleUserCreated)
.addConfig(UserUpdatedSchema, handleUserUpdated)
.build(),
creationConfig: {
queue: {
QueueName: 'user-events-consumer-queue.fifo', // Must end with .fifo
Attributes: {
FifoQueue: 'true',
ContentBasedDeduplication: 'false',
},
},
topic: {
Name: 'user-events-topic.fifo', // Must match publisher's FIFO topic
Attributes: {
FifoTopic: 'true',
ContentBasedDeduplication: 'false',
},
},
},
subscriptionConfig: {
updateAttributesIfExists: false,
},
// Optional: Configure concurrent consumers for parallel processing of different groups
concurrentConsumersAmount: 3, // Process 3 different message groups in parallel
},
{ userService }
)
}
}
`
When using creationConfig, the topic will be created automatically if it doesn't exist:
`typescript
{
creationConfig: {
topic: {
Name: 'my-topic',
Attributes: {
// Standard Topic attributes
DisplayName: 'My Notification Topic',
// FIFO Topic attributes (only for .fifo topics)
FifoTopic: 'true', // Must be 'true' for FIFO topics
ContentBasedDeduplication: 'false', // Automatic deduplication based on message body
// Encryption
KmsMasterKeyId: 'alias/aws/sns', // KMS key for encryption
// Delivery policy
DeliveryPolicy: JSON.stringify({
healthyRetryPolicy: {
minDelayTarget: 20,
maxDelayTarget: 20,
numRetries: 3,
numMaxDelayRetries: 0,
numNoDelayRetries: 0,
numMinDelayRetries: 0,
backoffFunction: 'linear'
}
}),
},
Tags: [
{ Key: 'Environment', Value: 'production' },
{ Key: 'Team', Value: 'backend' },
],
},
queue: {
QueueName: 'my-consumer-queue',
// See SQS README for full queue configuration options
},
updateAttributesIfExists: true, // Update attributes if topic/queue exists
forceTagUpdate: false, // Force tag update even if unchanged
// Queue permissions for SNS publishing (automatically configured)
queueUrlsWithSubscribePermissionsPrefix: 'https://sqs.us-east-1.amazonaws.com/123456789012/',
},
}
`
When using locatorConfig, you connect to an existing topic without creating it:
`typescript
{
locatorConfig: {
// Option 1: By topic ARN
topicArn: 'arn:aws:sns:us-east-1:123456789012:my-topic',
// Option 2: By topic name (ARN will be resolved using STS)
// topicName: 'my-topic',
// Optional: Existing queue URL or name
queueUrl: 'https://sqs.us-east-1.amazonaws.com/123456789012/my-queue',
// or
// queueName: 'my-queue',
// Optional: Existing subscription ARN
subscriptionArn: 'arn:aws:sns:us-east-1:123456789012:my-topic:uuid',
},
}
`
When your SNS topic or SQS queue may not exist at startup (e.g., created by another service or infrastructure-as-code pipeline), you can enable polling to wait for resources to become available instead of failing immediately:
`typescript
{
locatorConfig: {
topicName: 'my-topic',
queueUrl: 'https://sqs.us-east-1.amazonaws.com/123456789012/my-queue',
subscriptionArn: 'arn:aws:sns:us-east-1:123456789012:my-topic:uuid',
// Enable startup resource polling
startupResourcePolling: {
enabled: true,
pollingIntervalMs: 5000, // Check every 5 seconds (default)
timeoutMs: 60000, // Timeout after 60 seconds
// timeoutMs: 'NO_TIMEOUT', // Or poll indefinitely
// Optional: Non-blocking mode
nonBlocking: false, // Default: false (blocking)
},
},
}
`
#### Blocking Mode (Default)
In blocking mode, init() or start() will wait until both the topic and queue are available:
`typescript
const consumer = new MyConsumer(deps, {
locatorConfig: {
topicName: 'my-topic',
queueUrl: 'https://sqs...',
subscriptionArn: 'arn:aws:sns:...',
startupResourcePolling: {
enabled: true,
pollingIntervalMs: 5000,
timeoutMs: 60000,
},
},
})
// This will block until topic and queue are available (or timeout)
await consumer.start()
`
If the timeout is reached before resources are available, a StartupResourcePollingTimeoutError is thrown.
#### Non-Blocking Mode
In non-blocking mode, init() returns immediately even if resources aren't available. Background polling continues and you're notified via callbacks:
`typescript
const consumer = new MyConsumer(deps, {
locatorConfig: {
topicName: 'my-topic',
queueUrl: 'https://sqs...',
subscriptionArn: 'arn:aws:sns:...',
startupResourcePolling: {
enabled: true,
pollingIntervalMs: 5000,
timeoutMs: 60000,
nonBlocking: true, // Return immediately
},
},
})
// Returns immediately, even if resources don't exist yet
await consumer.start()
// Check if resources are ready
if (!consumer.resourcesReady) {
console.log('Resources not yet available, waiting in background...')
}
`
#### Callbacks for Non-Blocking Mode
When using non-blocking mode, you can provide callbacks to be notified when resources become available or when polling fails:
`typescript
// Using initSnsSqs directly for more control
import { initSnsSqs } from '@message-queue-toolkit/sns'
const result = await initSnsSqs(
sqsClient,
snsClient,
stsClient,
{
topicName: 'my-topic',
queueUrl: 'https://sqs...',
startupResourcePolling: {
enabled: true,
pollingIntervalMs: 5000,
timeoutMs: 60000,
nonBlocking: true,
},
},
creationConfig,
subscriptionConfig,
{
// Called when all resources become available
onResourcesReady: ({ topicArn, queueUrl }) => {
console.log('Resources are now available!')
console.log('Topic ARN:', topicArn)
console.log('Queue URL:', queueUrl)
},
// Called if background polling fails (e.g., timeout or unexpected error)
onResourcesError: (error, context) => {
if (context.isFinal) {
// Polling has stopped and will not retry
console.error('Failed to wait for resources:', error.message)
// Handle error - maybe alert, retry, or graceful degradation
} else {
// Transient error, polling will continue
console.warn('Transient error while polling:', error.message)
}
},
},
)
if (result.resourcesReady) {
console.log('Resources were immediately available')
} else {
console.log('Waiting for resources in background...')
}
`
#### Subscription Creation Mode
When you want to create a subscription (no existing subscriptionArn), startup resource polling will wait for the topic to exist before attempting to subscribe:
`typescript
const consumer = new MyConsumer(deps, {
locatorConfig: {
topicName: 'my-topic', // Topic created by another service
// No subscriptionArn - we'll create the subscription
startupResourcePolling: {
enabled: true,
pollingIntervalMs: 5000,
timeoutMs: 60000,
},
},
creationConfig: {
queue: { QueueName: 'my-consumer-queue' }, // Queue will be created
},
})
// This will:
// 1. Poll until the topic exists
// 2. Create the SQS queue
// 3. Subscribe the queue to the topic
// 4. Start consuming
await consumer.start()
`
`typescript
{
// Required - Message Schema Configuration
messageSchemas: [Schema1, Schema2], // Array of Zod schemas
messageTypeResolver: { messageTypePath: 'messageType' }, // Field containing message type discriminator
// Topic Configuration (one of these required)
creationConfig: { / ... / }, // Create topic if doesn't exist
locatorConfig: { / ... / }, // Use existing topic
// Optional - FIFO Configuration
fifoTopic: false, // Set to true for FIFO topics
messageGroupIdField: 'userId', // Field to use as MessageGroupId
defaultMessageGroupId: 'default', // Default MessageGroupId if field not present
// Optional - Message Field Configuration (same as SQS)
messageIdField: 'id', // Default: 'id'
messageTimestampField: 'timestamp', // Default: 'timestamp'
messageDeduplicationIdField: 'deduplicationId', // Default: 'deduplicationId'
messageDeduplicationOptionsField: 'deduplicationOptions', // Default: 'deduplicationOptions'
// Optional - Features
logMessages: false, // Log all published messages
handlerSpy: true, // Enable handler spy for testing
// Optional - Deduplication (same as SQS)
enablePublisherDeduplication: false,
messageDeduplicationConfig: { / ... / },
// Optional - Payload Offloading (same as SQS)
payloadStoreConfig: { / ... / },
// Optional - Deletion
deletionConfig: { / ... / },
}
`
See the SQS README - Publisher Options section for full details on shared options.
SNS consumers use the same options as SQS consumers, plus SNS-specific subscription configuration:
`typescript
{
// All SQS consumer options are supported
// See SQS README for full consumer options
// Required - Message Handling Configuration
handlers: MessageHandlerConfigBuilder.build(),
messageTypeResolver: { messageTypePath: 'messageType' },
// Topic & Queue Configuration
creationConfig: {
topic: { / SNS topic config / },
queue: { / SQS queue config / },
},
// or
locatorConfig: {
topicArn: 'arn:aws:sns:...',
queueUrl: 'https://sqs...',
subscriptionArn: 'arn:aws:sns:...',
},
// SNS-Specific - Subscription Configuration
subscriptionConfig: {
updateAttributesIfExists: false, // Update subscription attributes if exists
// Optional: Message filtering
filterPolicy: {
messageType: ['user.created', 'user.updated'], // Only receive these types
},
// Optional: Raw message delivery (disable SNS envelope)
rawMessageDelivery: false,
// Optional: Redrive policy (DLQ for undeliverable messages)
redrivePolicy: {
deadLetterTargetArn: 'arn:aws:sqs:us-east-1:123456789012:my-dlq',
},
},
// Optional - FIFO Configuration
fifoQueue: false,
// Optional - Other options inherited from SQS
concurrentConsumersAmount: 1,
maxRetryDuration: 345600, // 4 days
deadLetterQueue: { / ... / },
consumerOverrides: { / ... / },
// ... see SQS README for full list
}
`
See the SQS README - Consumer Options section for full details on shared options.
SNS topics can have multiple subscribers. Each subscriber receives a copy of every message published to the topic:
`typescript
// Publisher publishes to one topic
class NotificationPublisher extends AbstractSnsPublisher
constructor(snsClient: SNSClient, stsClient: STSClient) {
super(dependencies, {
messageSchemas: [NotificationSchema],
creationConfig: {
topic: { Name: 'notifications' },
},
})
}
}
// Multiple consumers subscribe to the same topic
class EmailConsumer extends AbstractSnsSqsConsumer
constructor(deps) {
super(deps, {
handlers: buildHandlers(sendEmail),
creationConfig: {
queue: { QueueName: 'email-notifications' },
topic: { Name: 'notifications' }, // Same topic
},
})
}
}
class SMSConsumer extends AbstractSnsSqsConsumer
constructor(deps) {
super(deps, {
handlers: buildHandlers(sendSMS),
creationConfig: {
queue: { QueueName: 'sms-notifications' },
topic: { Name: 'notifications' }, // Same topic
},
})
}
}
class PushConsumer extends AbstractSnsSqsConsumer
constructor(deps) {
super(deps, {
handlers: buildHandlers(sendPush),
creationConfig: {
queue: { QueueName: 'push-notifications' },
topic: { Name: 'notifications' }, // Same topic
},
})
}
}
// One publish reaches all three consumers
await publisher.publish({
id: '123',
messageType: 'notification.created',
userId: 'user-456',
message: 'Your order has shipped!',
})
// → Email sent
// → SMS sent
// → Push notification sent
`
The fan-out pattern enables broadcasting messages to multiple independent processing pipelines:
`typescript
// Single event publisher
class OrderEventPublisher extends AbstractSnsPublisher
constructor(snsClient: SNSClient, stsClient: STSClient) {
super(dependencies, {
messageSchemas: [OrderCreatedSchema, OrderUpdatedSchema],
creationConfig: {
topic: { Name: 'order-events' },
},
})
}
}
// Different services consume the same events independently
// Inventory service - updates stock levels
class InventoryConsumer extends AbstractSnsSqsConsumer
constructor(deps) {
super(deps, {
handlers: buildInventoryHandlers(),
creationConfig: {
queue: { QueueName: 'inventory-order-events' },
topic: { Name: 'order-events' },
},
})
}
}
// Analytics service - tracks order metrics
class AnalyticsConsumer extends AbstractSnsSqsConsumer
constructor(deps) {
super(deps, {
handlers: buildAnalyticsHandlers(),
creationConfig: {
queue: { QueueName: 'analytics-order-events' },
topic: { Name: 'order-events' },
},
})
}
}
// Shipping service - prepares shipments
class ShippingConsumer extends AbstractSnsSqsConsumer
constructor(deps) {
super(deps, {
handlers: buildShippingHandlers(),
creationConfig: {
queue: { QueueName: 'shipping-order-events' },
topic: { Name: 'order-events' },
},
})
}
}
// Benefits:
// - Each service processes independently
// - Failure in one doesn't affect others
// - Easy to add new consumers without changing publisher
// - Each consumer can have its own retry/DLQ configuration
`
Subscribers can filter messages to receive only specific types:
`typescript
class UserCreatedConsumer extends AbstractSnsSqsConsumer
constructor(deps) {
super(deps, {
handlers: buildHandlers(),
creationConfig: {
queue: { QueueName: 'user-created-processor' },
topic: { Name: 'user-events' },
},
subscriptionConfig: {
// Only receive user.created events
filterPolicy: {
messageType: ['user.created'],
},
},
})
}
}
class UserModificationConsumer extends AbstractSnsSqsConsumer
constructor(deps) {
super(deps, {
handlers: buildHandlers(),
creationConfig: {
queue: { QueueName: 'user-modification-processor' },
topic: { Name: 'user-events' },
},
subscriptionConfig: {
// Only receive update and delete events
filterPolicy: {
messageType: ['user.updated', 'user.deleted'],
},
},
})
}
}
// Advanced filtering with message attributes
class HighPriorityConsumer extends AbstractSnsSqsConsumer
constructor(deps) {
super(deps, {
handlers: buildHandlers(),
creationConfig: {
queue: { QueueName: 'high-priority-orders' },
topic: { Name: 'order-events' },
},
subscriptionConfig: {
filterPolicy: {
messageType: ['order.created'],
priority: ['high', 'critical'], // Filters on message attribute
},
},
})
}
}
`
Benefits:
- Reduces unnecessary message processing
- Lowers SQS costs (fewer messages received)
- Filtering happens at SNS level (before queuing)
- Each subscriber can have different filters
SNS supports publishing from one AWS account to topics in another:
`typescript
// Account A - Publisher
class CrossAccountPublisher extends AbstractSnsPublisher
constructor(snsClient: SNSClient, stsClient: STSClient) {
super(dependencies, {
messageSchemas: [MessageSchema],
locatorConfig: {
// Topic in Account B
topicArn: 'arn:aws:sns:us-east-1:222222222222:shared-topic',
},
})
}
}
// Account B - Consumer (topic owner)
// Topic policy must allow Account A to publish:
{
creationConfig: {
topic: {
Name: 'shared-topic',
Attributes: {
Policy: JSON.stringify({
Version: '2012-10-17',
Statement: [
{
Effect: 'Allow',
Principal: {
AWS: 'arn:aws:iam::111111111111:root', // Account A
},
Action: 'SNS:Publish',
Resource: 'arn:aws:sns:us-east-1:222222222222:shared-topic',
},
],
}),
},
},
},
}
`
SNS consumers inherit all advanced features from SQS consumers. See the SQS README for detailed documentation on:
- Custom Message Field Names - Adapt to existing schemas
- Dead Letter Queue (DLQ) - Handle permanently failing messages
- Message Retry Logic - Exponential backoff and retry limits
- Message Deduplication - Publisher and consumer-level deduplication
- Payload Offloading - S3 storage for large messages
- Message Handlers - Type-safe handler configuration
- Pre-handlers and Barriers - Middleware and message dependencies
- Handler Spies - Testing async message flows
All these features work identically for SNS consumers since they extend the SQS consumer implementation.
FIFO (First-In-First-Out) topics provide message ordering and exactly-once delivery, similar to FIFO queues.
1. Topic name must end with .fifo
`typescript`
Name: 'my-topic.fifo' // ✅ Valid
Name: 'my-topic' // ❌ Invalid for FIFO
2. FifoTopic attribute must be 'true'
`typescript`
Attributes: {
FifoTopic: 'true',
}
3. Subscribed queues must also be FIFO
`typescript`
creationConfig: {
topic: {
Name: 'events.fifo',
Attributes: { FifoTopic: 'true' },
},
queue: {
QueueName: 'consumer.fifo', // Must be FIFO
Attributes: { FifoQueue: 'true' },
},
}
4. MessageGroupId required for all messages
`typescript
// Option 1: From message field
messageGroupIdField: 'userId'
// Option 2: Default value
defaultMessageGroupId: 'default-group'
// Option 3: Explicit in publish call
await publisher.publish(message, {
MessageGroupId: 'custom-group',
})
`
5. DLQ must also be FIFO
`typescript`
deadLetterQueue: {
creationConfig: {
queue: {
QueueName: 'my-dlq.fifo', // Must be FIFO
Attributes: { FifoQueue: 'true' },
},
},
}
Message groups work the same way as in FIFO queues. See the SQS README - Message Groups section for detailed information on:
- Group assignment and parallelism
- Best practices for group design
- Balancing group sizes
- Sizing concurrent consumers
SNS FIFO Fan-Out Example:
`typescript
// FIFO publisher publishes to FIFO topic
const fifoPublisher = new OrderEventsFifoPublisher(snsClient, stsClient)
await fifoPublisher.publish({
id: '123',
messageType: 'order.created',
customerId: 'customer-A',
orderId: 'order-1',
}, {
MessageGroupId: 'customer-A', // All messages for customer-A ordered
})
// Multiple FIFO consumers subscribe to the same FIFO topic
const inventoryConsumer = new InventoryFifoConsumer(deps)
const analyticsConsumer = new AnalyticsFifoConsumer(deps)
const shippingConsumer = new ShippingFifoConsumer(deps)
// Each consumer receives messages in order within each group
// Different consumers can process different groups in parallel
`
`typescript
// Publisher
{
fifoTopic: true,
// Choose one or more:
messageGroupIdField: 'userId', // Use field from message
defaultMessageGroupId: 'default', // Fallback value
// or provide in publish call
}
// Consumer
{
fifoQueue: true,
concurrentConsumersAmount: 3, // Process 3 groups in parallel
// Note: Retry behavior inherits from SQS FIFO queues
// - No DelaySeconds support (AWS limitation)
// - Messages retry immediately
// - Order is preserved
}
// Topic Configuration
{
creationConfig: {
topic: {
Name: 'my-topic.fifo',
Attributes: {
FifoTopic: 'true',
// Optional: Automatic deduplication based on message body
ContentBasedDeduplication: 'false', // or 'true'
},
},
},
}
// Queue Configuration
{
creationConfig: {
queue: {
QueueName: 'my-queue.fifo',
Attributes: {
FifoQueue: 'true',
// Optional: Queue-level deduplication settings
ContentBasedDeduplication: 'false',
DeduplicationScope: 'queue', // or 'messageGroup'
FifoThroughputLimit: 'perQueue', // or 'perMessageGroupId'
},
},
},
}
`
FIFO Limitations:
- Throughput: 3,000 messages/second per topic (or 300/second per message group)
- No delays: FIFO queues don't support DelaySeconds
- Strict ordering: Within a message group, messages are delivered in exact order
- FIFO-to-FIFO only: FIFO topics can only fan out to FIFO queues
See the SQS README - FIFO Queues section for comprehensive FIFO documentation.
SNS testing works the same as SQS testing. Handler spies enable testing of async pub/sub flows:
`typescript
import { describe, it, expect, beforeEach, afterEach } from 'vitest'
describe('SNS Publisher and Consumer', () => {
let publisher: UserEventsPublisher
let consumer: UserEventsConsumer
beforeEach(async () => {
publisher = new UserEventsPublisher(snsClient, stsClient, { handlerSpy: true })
consumer = new UserEventsConsumer(snsClient, sqsClient, stsClient, userService, {
handlerSpy: true
})
await publisher.init()
await consumer.start()
})
afterEach(async () => {
await consumer.close()
await publisher.close()
})
it('publishes to topic and consumes from subscribed queue', async () => {
// Publish to SNS topic
await publisher.publish({
id: '123',
messageType: 'user.created',
userId: 'user-456',
email: 'test@example.com',
})
// Wait for publisher spy
await publisher.handlerSpy.waitForMessageWithId('123', 'published')
// Wait for consumer spy
const consumedMessage = await consumer.handlerSpy.waitForMessageWithId('123', 'consumed')
expect(consumedMessage.userId).toBe('user-456')
expect(userService.createUser).toHaveBeenCalledWith('user-456', 'test@example.com')
})
})
`
See the SQS README - Testing section for comprehensive testing documentation including:
- Integration tests with LocalStack
- Unit tests with handler spies
- Testing indirect message publishing
- Complex workflow testing
`typescript
class AbstractSnsPublisher
constructor(
dependencies: SNSDependencies,
options: SNSPublisherOptions
)
async init(): Promise
async close(): Promise
async publish(
message: MessagePayloadType,
options?: SNSMessageOptions
): Promise
readonly handlerSpy: HandlerSpy
readonly topicArn: string
}
`
`typescript
class AbstractSnsSqsConsumer<
MessagePayloadType extends object,
ExecutionContext,
PrehandlerOutput = undefined
> extends AbstractSqsConsumer<...> {
constructor(
dependencies: SNSSQSConsumerDependencies,
options: SNSSQSConsumerOptions
executionContext: ExecutionContext
)
async init(): Promise
async start(): Promise
async close(abort?: boolean): Promise
readonly handlerSpy: HandlerSpy
readonly topicArn: string
readonly subscriptionArn: string
readonly queueUrl: string
readonly queueName: string
}
`
`typescript
// Message options for publishing
type SNSMessageOptions = {
MessageGroupId?: string // Required for FIFO topics
MessageDeduplicationId?: string // Optional for FIFO topics
}
// Dependencies
type SNSDependencies = {
snsClient: SNSClient
stsClient: STSClient
logger: Logger
errorReporter: ErrorReporter
messageMetricsManager?: MessageMetricsManager
}
// Consumer dependencies (extends SNSDependencies and SQSDependencies)
type SNSSQSConsumerDependencies = SNSDependencies & SQSDependencies & {
consumerErrorResolver: ErrorResolver
transactionObservabilityManager: TransactionObservabilityManager
}
// Subscription options
type SNSSubscriptionOptions = {
updateAttributesIfExists?: boolean
filterPolicy?: Record
rawMessageDelivery?: boolean
redrivePolicy?: {
deadLetterTargetArn: string
}
}
`
`typescript
// Topic validation
function isFifoTopicName(topicName: string): boolean
function validateFifoTopicName(topicName: string, isFifoTopic: boolean): void
// Topic operations
async function assertTopic(
snsClient: SNSClient,
stsClient: STSClient,
topicOptions: CreateTopicCommandInput,
extraParams?: ExtraSNSCreationParams
): Promise
async function deleteTopic(
snsClient: SNSClient,
stsClient: STSClient,
topicName: string
): Promise
async function getTopicAttributes(
snsClient: SNSClient,
topicArn: string
): Promise
// Subscription operations
async function subscribeToTopic(
snsClient: SNSClient,
topicArn: string,
queueArn: string,
options?: SNSSubscriptionOptions
): Promise
async function findSubscriptionByTopicAndQueue(
snsClient: SNSClient,
topicArn: string,
queueArn: string
): Promise
// Message reading
function deserializeSNSMessage(
message: SQSMessage
): Either
// Message size calculation (same as SQS)
function calculateOutgoingMessageSize(message: unknown): number
``
MIT
Contributions are welcome! Please see the main repository for guidelines.
- Main Repository
- Core Package
- SQS Package - SNS consumers extend SQS consumers
- AWS SNS Documentation
- FIFO Topic Documentation
- SNS Message Filtering