Redis Streams-based transporter for NestJS microservices, enabling message passing via Redis Streams
npm install @nestjs-redis/streams-transporter
Custom NestJS microservices transporter using Redis Streams with event and request/response patterns




 
---
- Redis Streams–based transport: Events and requests stored as stream entries; replies written to per-client reply streams
- Event and request/response: Fire-and-forget events (dispatchEvent) and request/response via send() with routing callbacks
- Consumer groups: Server uses XREADGROUP + XACK for at-least-once delivery and horizontal scaling
- Configurable options: Stream prefix, consumer group/name, block timeout, batch size, max stream length (MAXLEN trim), retry delay
- NestJS integration: RedisStreamsContext (stream name, message id, consumer group/name) passed to handlers; optional onProcessingStartHook / onProcessingEndHook
- Type-safe: Event/request/response type guards and resolved options
``bash`
npm install @nestjs-redis/streams-transporter redis
`typescript
// main.ts
import { NestFactory } from '@nestjs/core';
import { MicroserviceOptions } from '@nestjs/microservices';
import { RedisStreamServer } from '@nestjs-redis/streams-transporter';
import { AppModule } from './app.module';
async function bootstrap() {
const app = await NestFactory.createMicroservice
AppModule,
{
strategy: new RedisStreamServer({
url: 'redis://localhost:6379',
streamPrefix: '_microservices',
consumerGroup: 'nestjs-streams',
consumerName: 'my-consumer',
}),
},
);
await app.listen();
}
bootstrap();
`
`typescript
// app.module.ts
import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { RedisStreamClient } from '@nestjs-redis/streams-transporter';
import { AppController } from './app.controller';
import { AppService } from './app.service';
@Module({
imports: [
ClientsModule.register([
{
name: 'STREAMS_SERVICE',
customClass: RedisStreamClient,
options: {
url: 'redis://localhost:6379',
streamPrefix: '_microservices',
},
},
]),
],
controllers: [AppController],
providers: [AppService],
})
export class AppModule {}
`
`typescript
// app.controller.ts
import { Controller, Get, Inject } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
import { firstValueFrom } from 'rxjs';
@Controller()
export class AppController {
constructor(
@Inject('STREAMS_SERVICE') private readonly client: ClientProxy,
) {}
@Get('echo')
async echo() {
return firstValueFrom(this.client.send('user.echo', { hello: 'world' }));
}
}
`
`typescript
// app.controller.ts
import { Controller } from '@nestjs/common';
import { EventPattern, MessagePattern, Payload } from '@nestjs/microservices';
import { Ctx } from '@nestjs/microservices';
import { RedisStreamsContext } from '@nestjs-redis/streams-transporter';
@Controller()
export class AppController {
@MessagePattern('user.echo')
echo(@Payload() data: object, @Ctx() ctx: RedisStreamsContext) {
return { ok: true, data };
}
@EventPattern('user.created')
onUserCreated(@Payload() data: object, @Ctx() ctx: RedisStreamsContext) {
// fire-and-forget; no reply
console.log('User created', data, ctx.getStreamName(), ctx.getMessageId());
}
}
`
RedisStreamServer and RedisStreamClient accept RedisStreamsOptions, which extends Redis client options and adds:
| Option | Default | Description |
| ----------------- | ------------------------------------- | ------------------------------------------------------------------------- |
| streamPrefix | '_microservices' | Prefix for request streams (e.g. {prefix}:user.echo) and reply streams. |consumerGroup
| | 'nestjs-streams' | Consumer group name for server XREADGROUP. |consumerName
| | '' (then consumer-${process.pid}) | Consumer name in the group. |blockTimeout
| | 100 | Block timeout (ms) for XREAD / XREADGROUP. |batchSize
| | 50 | Max entries per read (COUNT). |maxStreamLength
| | 10000 | Max length for streams; XADD ... TRIM MAXLEN ~ is used on add. |retryDelay
| | 250 | Delay (ms) before retrying after a read/connection error. |
Use resolveRedisStreamsOptions(options) to get a fully resolved options object (all optional fields filled with defaults).
- RedisStreamClient – NestJS ClientProxy implementation. Connects to Redis, publishes events/requests to streams, listens for replies on a dedicated reply stream and dispatches to routingMap callbacks. close() flushes callbacks with an error, deletes the reply stream, and quits the client.RedisStreamServer
- – NestJS CustomTransportStrategy. Creates consumer groups, consumes via XREADGROUP, ACKs with XACK, invokes message/event handlers with RedisStreamsContext, and writes replies to the client’s reply stream.RedisStreamsContext
- – Context passed to handlers (like Nest’s RPC context). Methods: getStreamName(), getMessageId(), getConsumerGroup(), getConsumerName().RedisStreamsOptions
- / RedisStreamsResolvedOptions – Options and resolved type; resolveRedisStreamsOptions(options) – returns resolved options.
Stream entry shape:
- Events: e: '1', data (JSON).e: '0'
- Requests: , id, replyTo, data (JSON).id
- Responses: , data or err (JSON), isDisposed: '1'`.
- Root repo: CSenshi/nestjs-redis
- Issues: GitHub Issues
- Discussions: GitHub Discussions
Please see the root contributing guidelines.
MIT © CSenshi