npm install @nitra/natsNATS JetStream helper для Node.js.1
Простий API для публікації, обробки та моніторингу повідомлень у черзі з гнучким управлінням consumer-ами через конфігурацію та CLI інструменти.
---
``sh`
bun add @nitra/natsабо
npm install @nitra/nats
---
Пакет використовує такі змінні середовища:
- NATS_URL — адреса сервера NATS (наприклад, nats://localhost:4222)NATS_STREAM
- — назва stream (за замовчуванням dev)
---
Всі функції працюють із subject у форматі:
``
projectName:subjectName
Приклади:
- myProject:jobsservice:notifications
-
---
`js
import { publish } from '@nitra/nats'
// Публікація повідомлення
await publish('project:subject', { id: 1, foo: 'bar' })
await publish('service:notifications', { message: 'Hello!' })
`
- Повідомлення публікується у subject dev.project:subject (або ${NATS_STREAM}.project:subject)ensureConsumer
- Consumer-и потрібно створювати окремо через або CLI
---
`js
import { ensureConsumer } from '@nitra/nats'
// Створення простого consumer-а
await ensureConsumer({
streamName: 'dev',
durableName: 'project:subject',
filterSubjects: ['project:subject'],
deliverPolicy: 'all',
ackPolicy: 'explicit'
})
// Створення групового consumer-а для кількох subject-ів
await ensureConsumer({
streamName: 'dev',
durableName: 'worker-group',
filterSubjects: ['project:orders', 'project:payments'],
deliverPolicy: 'all',
ackPolicy: 'explicit'
})
`
Створіть YAML файл з конфігурацією consumer-а:
`yaml`consumer.yaml
apiVersion: jetstream.nats.io/v1beta2
kind: Consumer
metadata:
name: 'nats:test'
namespace: dev
spec:
streamName: dev
durableName: 'nats:test'
filterSubjects:
- 'nats:subject'
- 'nats:subject2'
deliverPolicy: all
ackPolicy: explicit
Застосуйте конфігурацію:
`bashЗ змінними середовища
NODE_ENV=development NATS_URL=nats://localhost:4222 NATS_STREAM=dev node cli.js consumer.yaml
---
Обробка повідомлення (worker)
`js
import { read, finish } from '@nitra/nats'// Читання для стандартного durable consumer (durable_name = subject)
const data = await read('project:subject')
// ...обробка data...
await finish() // підтвердження (ack) повідомлення
// Читання для кастомного durable consumer
const data2 = await read('worker-group')
await finish()
`- Якщо не викликати
finish(), повідомлення буде повернуто у чергу (nak) при завершенні процесу або помилці.
- Durable consumer створюється автоматично при першій публікації.---
Кількість непрочитаних повідомлень для durable consumer
`js
import { getPendingCount } from '@nitra/nats'const count = await getPendingCount('project:subject') // для стандартного durable
console.log('pending:', count)
const count2 = await getPendingCount('worker-group') // для кастомного durable
console.log('pending for group:', count2)
`---
Як це працює
- publish(subject, data):
- Публікує повідомлення у subject
${stream}.${subject}
- Перевіряє формат subject (має бути project:subject)- ensureConsumer(spec):
- Створює consumer якщо не існує
- Оновлює
filter_subjects якщо вони змінились
- Перестворює consumer якщо змінились deliverPolicy або ackPolicy
- Автоматично створює stream якщо потрібно- read(durableName):
- Читає одне повідомлення з черги для durable consumer
- finish():
- Підтверджує (ack) повідомлення
- getPendingCount(durableName):
- Повертає кількість непрочитаних повідомлень для durable consumer
---
Важливо
- STREAM у NATS за замовчуванням
dev (або значення змінної NATS_STREAM)
- Consumer-и потрібно створювати явно через ensureConsumer або CLI
- Subject має відповідати формату project:subject
- Пакет розрахований на single-message workflow (одне повідомлення на читання за раз)
- ensureConsumer розумно оновлює конфігурацію без втрати повідомлень
- CLI підтримує YAML конфігурації для декларативного управління consumer-ами---
Приклад повного workflow
`js
import { publish, ensureConsumer, read, finish, getPendingCount } from '@nitra/nats'// 1. Створення consumer-ів
await ensureConsumer({
streamName: 'dev',
durableName: 'project:subject',
filterSubjects: ['project:subject'],
deliverPolicy: 'all',
ackPolicy: 'explicit'
})
await ensureConsumer({
streamName: 'dev',
durableName: 'worker-group',
filterSubjects: ['project:subject', 'project:orders'],
deliverPolicy: 'all',
ackPolicy: 'explicit'
})
// 2. Публікація повідомлень
await publish('project:subject', { id: 1, action: 'create' })
await publish('project:orders', { orderId: 123, amount: 100 })
// 3. Перевірка pending повідомлень
const count1 = await getPendingCount('project:subject')
const count2 = await getPendingCount('worker-group')
console.log(
pending: ${count1}, worker-group: ${count2})// 4. Обробка повідомлень
const data1 = await read('project:subject')
console.log('received:', data1)
await finish()
const data2 = await read('worker-group')
console.log('group received:', data2)
await finish()
`---
CLI Інструмент
CLI підтримує роботу з YAML конфігураціями consumer-ів у форматі JetStream Consumer API.
$3
`bash
Застосування конфігурації consumer-а з YAML файлу
NATS_URL=nats://localhost:4222 node cli.js consumer.yamlЧерез npx після публікації пакету
npx @nitra/nats consumer.yaml
`$3
`yaml
apiVersion: jetstream.nats.io/v1beta2
kind: Consumer
metadata:
name: my-consumer
namespace: dev
spec:
streamName: dev
durableName: my-consumer
filterSubjects:
- project:orders
- project:payments
deliverPolicy: all
ackPolicy: explicit
``CLI автоматично:
- Створить consumer якщо не існує
- Оновить filter_subjects якщо вони змінились
- Перестворить consumer якщо змінились deliverPolicy або ackPolicy
- Створить stream якщо потрібно
---
MIT