NestJS custom transport strategy for PostgreSQL PubSub
npm install nestjs-pg-notifyNestJS custom transport strategy for PostgreSQL Pub/Sub.






PostgreSQL can be used as a Pub/Sub message broker.
Its functionality is similar to the Redis Pub/Sub, but has its own features and limitations.
The References section contains links that you may find useful to familiarize yourself with the PostgreSQL asynchronous notifications.
NestJS PG Notify implements Pub/Sub messaging paradigm using PostgreSQL as a NestJS custom transporter.
It wraps the pg-listen library under the hood.
It can be used in microservice and hybrid
NestJS applications. The example folder contains examples for both types of applications.
``bash`
$ npm i nestjs-pg-notify pg
`typescript
import { PgNotifyServer } from 'nestjs-pg-notify';
const app = await NestFactory.createMicroservice
strategy: new PgNotifyServer({
/**
* Required parameter
* Corresponds to the "pg" library's connection config
*/
connection: {
host: 'localhost',
port: 5432,
database: 'pgnotify',
user: 'pgnotify',
password: 'pgnotify',
},
/**
* Optional parameter
* Contains retry-strategy config passing the data to the "pg-listen" library
*/
strategy: {
retryInterval: 1_000,
retryTimeout: Infinity,
},
/**
* Optional parameter
* Overrides default logger
*/
logger: new Logger(),
})
});
`
NestJS PG Notify offers two decorators to register message handlers: @PgNotifyEventPattern() and @PgNotifyMessagePattern().@EventPattern()
These are an alternative to standard decorators: and @MessagePattern().
Message handler's binding can be used only within controller classes.
`typescript
import { PgNotifyContext, PgNotifyEventPattern, PgNotifyMessagePattern } from 'nestjs-pg-notify';
@Controller()
export class AppController {
@PgNotifyEventPattern({event: 'greeting'})
@UsePipes(new ValidationPipe())
onGreetingEvent(@Payload() payload: any, @Ctx() context: PgNotifyContext): void {
Logger.log(payload.message);
}
@PgNotifyMessagePattern('greeting')
@UsePipes(new ValidationPipe())
onGreetingRequest(@Payload() payload: any, @Ctx() context: PgNotifyContext): string {
Logger.log(payload.message);
return 'Hello!';
}
}
`
The standard decorator @Ctx() allows access to the context of the incoming request. In our case, the context object is an instance of PgNotifyContext.
The client proxy can be registered as a custom provider. The configuration is the same as the configuration of the PgNotifyServer.
`typescript
import { PgNotifyClient } from 'nestjs-pg-notify';
@Module({
providers: [
{
provide: 'PG_NOTIFY_CLIENT',
useFactory: (): ClientProxy => new PgNotifyClient({
connection: {
host: 'localhost',
port: 5432,
database: 'pgnotify',
user: 'pgnotify',
password: 'pgnotify',
},
strategy: {
retryInterval: 1_000,
retryTimeout: Infinity,
},
})
},
],
exports: [
'PG_NOTIFY_CLIENT',
]
})
export class AppModule {}
`
Then we can inject the client proxy.
`typescript
import { PgNotifyResponse } from 'nestjs-pg-notify';
export class AppService {
constructor(
@Inject('PG_NOTIFY_CLIENT')
private readonly client: ClientProxy,
) {}
sendRequest(): Observable
// Send request and expect response
return this.client.send('greeting', {message: 'Hello!'}).pipe(
timeout(2_000),
tap(response => Logger.debug(response)),
);
}
emitEvent(): Observable
// Emit event
return this.client.emit({event: 'greeting'}, {message: 'Hello!'});
}
}
`
The client proxy generates request identifier when we send requests using client.send().
The request identifier in the context of the incoming request means that we need to prepare an error response for the client.
We can use the PgNotifyResponse.error() factory in order to unify the structure of the response.
`typescript
import { PgNotifyContext, PgNotifyResponse } from 'nestjs-pg-notify';
@Catch()
export class ExceptionFilter implements ExceptionFilter {
catch(error: Error, host: ArgumentsHost): Observable
const {status, message} = parseError(error);
const context = host.switchToRpc().getContext
const requestId = context.getRequestId();
Logger.error(message, error.stack, 'PgNotifyExceptionFilter');
if (requestId) {
return of(PgNotifyResponse.error(message, status));
}
return of(undefined);
}
}
`
Then we can register the filter using the standard @UseFilters() decorator. It supports method-scope and controller-scope modes.
`typescript`
@Controller()
@UseFilters(ExceptionFilter)
export class AppController {
// ...
}
`typescript
import { PgNotifyContext } from 'nestjs-pg-notify';
@Injectable()
export class LoggingInterceptor implements NestInterceptor {
public intercept(context: ExecutionContext, next: CallHandler): Observable
const pgNotifyContext = context
.switchToRpc()
.getContext
return next.handle().pipe(
tap(() => Logger.log(JSON.stringify(pgNotifyContext), LoggingInterceptor.name)),
);
}
}
`
To register interceptor we can use @UseInterceptors() decorator. It also supports method-scope and controller-scope modes.
`typescript``
@Controller()
@UseInterceptors(LoggingInterceptor)
export class AppController {
// ...
}
API documentation is available here.
1. PostgreSQL Documentation:
* Asynchronous Notification
* NOTIFY
* LISTEN
2. PgBouncer Documentation:
* Transaction pool mode does not support NOTIFY/LISTEN features
3. NestJS Documentation:
* Microservices
* Hybrid applications
* Custom transporters
4. Dependencies:
* pg-listen
This project is licensed under the MIT License.