Distributed Command Bus library supporting RPC and job queuing with BullMQ for Redis/DragonflyDB
Distributed Command Bus library supporting RPC and job queuing with BullMQ for Redis/DragonflyDB.
pp-command-bus to zaawansowana biblioteka do obsługi rozproszonych komend zgodna ze wzorcem CQRS (Command Query Responsibility Segregation). Zapewnia wysoką wydajność, automatyczną optymalizację i zaawansowane funkcje produkcyjne.
- ✅ Fire-and-forget commands - wysyłanie komend bez oczekiwania na wynik
- ✅ RPC (Remote Procedure Call) - synchroniczne wywołania przez Redis Pub/Sub z oczekiwaniem na odpowiedź
- ✅ Automatyczna kompresja - gzip dla payloadów RPC >1KB (konfigurowalne)
- ✅ Auto-optymalizacja - dynamiczne dostosowywanie concurrency na podstawie CPU/RAM
- ✅ Process isolation - izolacja odpowiedzi RPC między procesami Node.js
- ✅ Job queuing - kolejkowanie zadań z retry, backoff i delayed execution
- ✅ BullMQ integration - wydajna kolejka zadań na Redis/DragonflyDB
- ✅ TypeScript - pełne wsparcie typów i strict mode
- ✅ Memory leak protection - zaawansowana diagnostyka i cleanup
- ✅ Command logging - opcjonalne logowanie komend do plików JSONL
- ✅ RPC Job Cancellation - automatyczne usuwanie niepodjętych jobów RPC przy timeout
- ✅ Modularna architektura - komponenty zgodne z zasadami SOLID, DDD i SRP
```
┌─────────────────────────────────────────────────────────────────────┐
│ CommandBus │
│ ┌────────────────┐ ┌──────────────┐ ┌─────────────────────────┐ │
│ │ QueueManager │ │ RpcCoordinator│ │ WorkerOrchestrator │ │
│ │ - Cache kolejek│ │ - Pub/Sub │ │ - Dynamiczne concurrency│ │
│ │ - BullMQ Queue │ │ - Process ID │ │ - Worker benchmark │ │
│ └────────┬───────┘ └──────┬───────┘ └────────┬────────────────┘ │
│ │ │ │ │
│ │ │ │ │
│ ┌────────▼──────────────────▼────────────────────▼───────────────┐ │
│ │ JobProcessor (Command Handler Execution) │ │
│ │ - Kompresja/dekompresja payloadów │ │
│ │ - Wykonywanie handlerów │ │
│ │ - Wysyłanie odpowiedzi RPC przez Pub/Sub │ │
│ └──────────────────────────┬──────────────────────────────────────┘ │
└─────────────────────────────┼────────────────────────────────────────┘
│
┌─────────▼──────────┐
│ PayloadCompression │
│ Service │
│ - gzip compression │
│ - threshold: 1KB │
└─────────────────────┘
#### 1. CommandBus (główna klasa orkiestrująca)
- Odpowiedzialność: Główny punkt wejścia dla użytkownika, orkiestracja wszystkich komponentów
- Metody publiczne:
- dispatch(command) - wysyłanie fire-and-forgetcall(command, timeout)
- - synchroniczne RPChandle(commandClass, handler)
- - rejestracja handlerówclose()
- - graceful shutdown
- Zarządzanie połączeniami: 3 dedykowane połączenia Redis (Queue, Worker, RPC)
#### 2. RpcCoordinator (zarządzanie RPC)
- Odpowiedzialność: Zarządzanie cyklem życia wywołań RPC przez Redis Pub/Sub
- Kluczowe funkcje:
- Shared Subscriber - jeden subscriber dla wszystkich RPC calls (pattern matching)
- Process Isolation - UUID procesu Node.js dla izolacji odpowiedzi między procesami
- Timeout Management - automatyczne cleanup po timeout
- Job Cancellation - usuwanie niepodjętych jobów przy timeout (optymalizacja zasobów)
- Multiplexing - routing odpowiedzi do odpowiednich promises
- Kanały: rpc:response:{processId}:{correlationId}
#### 3. WorkerOrchestrator (orkiestracja workerów)
- Odpowiedzialność: Zarządzanie workerami BullMQ i dynamiczna optymalizacja
- Kluczowe funkcje:
- WorkerBenchmark - automatyczny benchmark przy rejestracji handlera
- WorkerMetricsCollector - event-driven metrics collection
- Dynamic Concurrency - dostosowywanie concurrency +/-20% co 30s
- Event Handlers - obsługa zdarzeń: active, completed, failed, stalled
- Limity concurrency: min 10, max 2000
#### 4. JobProcessor (wykonywanie handlerów)
- Odpowiedzialność: Wykonywanie handlerów komend i obsługa odpowiedzi RPC
- Flow przetwarzania:
1. Dekompresja payloadu (jeśli skompresowany)
2. Sprawdzenie flagi cancellation (pomijanie anulowanych RPC)
3. Rekonstrukcja obiektów Date
4. Opcjonalne logowanie komendy
5. Wykonanie handlera
6. Wysłanie odpowiedzi RPC przez Pub/Sub (jeśli RPC)
- Kompresja odpowiedzi: automatyczna kompresja przez PayloadCompressionService
- RPC Cancellation: pomijanie jobów oznaczonych jako anulowane (timeout)
#### 5. QueueManager (zarządzanie kolejkami)
- Odpowiedzialność: Cache kolejek BullMQ dla optymalizacji pamięci
- Funkcje:
- getOrCreateQueue(commandName) - lazy loading kolejekcloseAllQueues()
- - graceful shutdown wszystkich kolejek{CommandName}
- Naming: jako nazwa kolejki
#### 6. PayloadCompressionService (kompresja)
- Odpowiedzialność: Automatyczna kompresja/dekompresja payloadów gzip
- Threshold: 1024 bajty (1KB) domyślnie, konfigurowalne przez ENV
- Metody:
- compressCommand(command) - dodaje flagę __compresseddecompressCommand(command)
- - dekompresja i usunięcie flagicompress(data)
- - generyczna kompresja do base64decompress(data, compressed)
- - generyczna dekompresja
- Współdzielony serwis: jedna instancja dla całego CommandBus
#### 7. RpcJobCancellationService (anulowanie jobów RPC)
- Odpowiedzialność: Zarządzanie anulowaniem niepodjętych jobów RPC przy timeout
- Kluczowe funkcje:
- markAsCancelled(correlationId) - oznacza job jako anulowany w Redis
- isCancelled(correlationId) - sprawdza flagę cancellation
- clearCancellation(correlationId) - usuwa flagę po przetworzeniu/usunięciu
- tryRemoveJob(jobId, queueName, callback) - próba usunięcia joba z kolejki
- TTL kluczy Redis: 24 godziny (automatyczne wygaśnięcie)
- Graceful degradation: błędy Redis nie blokują przetwarzania (zwraca false)
- Klucze Redis: rpc:cancelled:{correlationId}
#### 8. CommandLogger (opcjonalne logowanie)
- Odpowiedzialność: Persystencja komend do plików JSONL
- Format: {timestamp}.jsonl - rotacja co godzinęCOMMAND_BUS_LOG=./command-logs
- Zawartość: pełny payload komendy z metadanymi
- Aktywacja: przez ENV
#### 9. AutoConfigOptimizer (auto-optymalizacja)
- Odpowiedzialność: Obliczanie optymalnego concurrency na podstawie zasobów systemowych
- Heurystyka:
- I/O-heavy workload (Redis/BullMQ)
- concurrency = CPU cores * 2 + (availableMemory / 512MB)COMMAND_BUS_AUTO_OPTIMIZE=false
- Zakładane 512MB RAM per worker
- Aktywacja: domyślnie włączone (ENV wyłącza)
#### 10. RedisConnectionFactory (fabryka połączeń Redis)
- Odpowiedzialność: Tworzenie połączeń Redis z wbudowaną obsługą błędów i eventów
- Zgodność z SRP: CommandBus nie zarządza bezpośrednio połączeniami
- Kluczowe funkcje:
- create(options, name) - tworzy połączenie z pełną obsługą eventówcreateForWorker(options, name)
- - dodaje maxRetriesPerRequest: null dla BullMQerror
- Obsługa eventów: , close, reconnecting, connect, readyAggregateError
- Formatowanie błędów (Node.js dual-stack IPv4/IPv6)on('error')
- Event Handlers:
- - logowanie błędów połączenia (zapobiega Unhandled error event)on('close')
- - informacja o zamknięciu połączeniaon('reconnecting')
- - informacja o ponownym łączeniuon('connect')
- / on('ready') - potwierdzenie nawiązania połączenia
#### Flow 1: dispatch() - Fire-and-forget
`
User Code
│
├─→ 1. commandBus.dispatch(command)
│
├─→ 2. PayloadCompressionService.compressCommand(command)
│ └─→ Jeśli payload >1KB → gzip → base64 → __compressed: true
│
├─→ 3. QueueManager.getOrCreateQueue(commandName)
│ └─→ Cache hit/miss → zwraca Queue
│
├─→ 4. queue.add(commandName, compressedCommand, options)
│ └─→ BullMQ dodaje job do Redis
│
└─→ 5. Promise
Worker Side (asynchronicznie)
│
├─→ 6. Worker pobiera job z kolejki
│
├─→ 7. JobProcessor.process(job)
│ ├─→ Dekompresja (jeśli __compressed)
│ ├─→ Rekonstrukcja Date
│ ├─→ Opcjonalne logowanie (CommandLogger)
│ └─→ Wykonanie handlera
│
└─→ 8. Worker kończy job (success/fail)
`
#### Flow 2: call() - RPC przez Redis Pub/Sub
`
User Code
│
├─→ 1. commandBus.call(command, timeout)
│
├─→ 2. RpcCoordinator.registerCall(correlationId, commandName, timeout)
│ ├─→ Oczekiwanie na gotowość shared subscriber (5s timeout)
│ ├─→ Utworzenie Promise
│ ├─→ Zapisanie pending call w Map
│ └─→ Zwrócenie responsePromise (bez blokowania)
│
├─→ 3. PayloadCompressionService.compressCommand(command)
│ └─→ Jeśli payload >1KB → gzip → __compressed: true
│
├─→ 4. RpcCoordinator.prepareRpcCommand(compressedCommand)
│ └─→ Dodaje __rpcMetadata: { correlationId, responseChannel, timestamp }
│
├─→ 5. QueueManager.getOrCreateQueue(commandName)
│ └─→ Cache hit/miss → zwraca Queue
│
├─→ 6. queue.add(commandName, commandWithMetadata, options)
│ └─→ BullMQ dodaje job do Redis
│
└─→ 7. await responsePromise (czeka na odpowiedź z Worker)
Worker Side
│
├─→ 8. Worker pobiera job z kolejki
│
├─→ 9. JobProcessor.process(job)
│ ├─→ Dekompresja payloadu (jeśli __compressed)
│ ├─→ Rekonstrukcja Date
│ ├─→ Wykonanie handlera → result
│ │
│ └─→ 10. JobProcessor.sendRpcResponse(rpcMetadata, result, null)
│ ├─→ PayloadCompressionService.compress({ correlationId, result, error })
│ ├─→ Wrapper: { data, compressed }
│ └─→ redis.publish(responseChannel, JSON.stringify(wrapper))
│
Shared Subscriber (RpcCoordinator)
│
├─→ 11. pmessage event → handleRpcMessage(channel, message)
│ ├─→ Ekstraktuj correlationId z channel
│ ├─→ Weryfikuj processInstanceId (process isolation)
│ ├─→ Znajdź pending call w Map
│ ├─→ PayloadCompressionService.decompress(wrapper.data, wrapper.compressed)
│ ├─→ resolve(result) lub reject(error)
│ └─→ Cleanup: clearTimeout, delete z Map
│
└─→ 12. User Code otrzymuje wynik → Promise
`
`bash`
npm install pp-command-bus
- Node.js >= 14
- Redis >= 5 lub DragonflyDB
- TypeScript >= 4.5 (opcjonalnie)
`typescript
import { CommandBus, CommandBusConfig, Command } from 'pp-command-bus';
// Konfiguracja CommandBus
const config = new CommandBusConfig({
redisUrl: 'redis://localhost:6379',
logger: console, // ILogger interface
logLevel: 'log', // debug | log | warn | error
concurrency: 5,
maxAttempts: 1,
});
// Utwórz instancję CommandBus
const commandBus = new CommandBus(config);
`
`typescript`
class CreateUserCommand extends Command<{ email: string; name: string }> {
constructor(payload: { email: string; name: string }) {
super(payload);
}
}
`typescriptCreating user: ${name} (${email})
commandBus.handle(CreateUserCommand, async (command) => {
const { name, email } = command.__payload;
console.log();
// Twoja logika biznesowa
const user = await createUser(email, name);
// Zwróć wynik (opcjonalnie)
return { userId: user.id };
});
`
`typescript
const command = new CreateUserCommand({
email: 'jan.kowalski@example.com',
name: 'Jan Kowalski'
});
// Wyślij komendę bez oczekiwania na wynik
await commandBus.dispatch(command);
`
`typescript
// Wywołaj komendę i poczekaj na wynik
const result = await commandBus.call<{ userId: string }>(command, 5000); // timeout 5s
console.log(User created with ID: ${result.userId});`
Biblioteka wspiera konfigurację poprzez zmienne środowiskowe z prefiksem COMMAND_BUS_ (z fallbackiem do starszych nazw EVENT_BUS_*):
| Zmienna | Typ | Wartość Domyślna | Opis |
|---------|-----|------------------|------|
| REDIS_URL | string | redis://localhost:6379 | URL połączenia Redis/DragonflyDB (wspiera username, password, db) |REDIS_RETRY_DELAY
| | number | 5000 | Opóźnienie między próbami reconnect do Redis w milisekundach |REDIS_MAX_RETRIES
| | number | 0 | Maksymalna liczba prób reconnect (0 = nieskończoność) |LOG_LEVEL
| | enum | log | Poziom logowania: debug, log, warn, error |COMMAND_BUS_CONCURRENCY
| | number | 1 (lub auto) | Liczba równoległych workerów do przetwarzania komend |COMMAND_BUS_MAX_ATTEMPTS
| | number | 1 | Maksymalna liczba prób przetworzenia zadania |COMMAND_BUS_BACKOFF_DELAY
| | number | 2000 | Opóźnienie między próbami w milisekundach |COMMAND_BUS_QUEUE_MODE
| | enum | fifo | Tryb przetwarzania kolejki: fifo (First In First Out) lub lifo (Last In First Out) |COMMAND_BUS_LOG
| | string | _(puste)_ | Ścieżka do katalogu logów komend (JSONL format, rotacja co godzinę) |COMMAND_BUS_AUTO_OPTIMIZE
| | boolean | true | Włącz auto-optymalizację concurrency na podstawie CPU/RAM |COMMAND_BUS_COMPRESSION_THRESHOLD
| | number | 1024 | Próg kompresji gzip dla payloadów RPC w bajtach (1KB domyślnie) |
Dla kompatybilności wstecznej obsługiwane są również prefiksy EVENT_BUS_*:
- EVENT_BUS_CONCURRENCY → COMMAND_BUS_CONCURRENCYEVENT_BUS_MAX_ATTEMPTS
- → COMMAND_BUS_MAX_ATTEMPTSEVENT_BUS_BACKOFF_DELAY
- → COMMAND_BUS_BACKOFF_DELAYEVENT_BUS_QUEUE_MODE
- → COMMAND_BUS_QUEUE_MODEEVENT_BUS_LOG
- → COMMAND_BUS_LOG
`bashPołączenie Redis
REDIS_URL=redis://username:password@localhost:6379/0
$3
1. Parametry konstruktora - najwyższy priorytet
2. Zmienne środowiskowe - średni priorytet
3. Wartości domyślne - najniższy priorytet
`typescript
// Przykład: parametry konstruktora nadpisują ENV
const config = new CommandBusConfig({
redisUrl: 'redis://localhost:6379',
concurrency: 20, // Nadpisuje COMMAND_BUS_CONCURRENCY
autoOptimize: false, // Wyłącza auto-optymalizację
});
`Konfiguracja zaawansowana
$3
Auto-optymalizacja automatycznie oblicza optymalną wartość concurrency na podstawie zasobów systemowych:
`typescript
// Auto-optymalizacja włączona (domyślnie)
const config = new CommandBusConfig({
redisUrl: 'redis://localhost:6379',
autoOptimize: true, // Domyślnie true
// concurrency zostanie obliczone jako: CPU cores * 2 + (availableMemory / 512MB)
});// Wyłączenie auto-optymalizacji
const config2 = new CommandBusConfig({
redisUrl: 'redis://localhost:6379',
autoOptimize: false,
concurrency: 10, // Ręczna wartość
});
`Algorytm auto-optymalizacji:
`
concurrency = (CPU cores * 2) + Math.floor(availableMemory / 512MB)Przykład:
- 8 CPU cores
- 16GB RAM dostępne
- concurrency = (8 * 2) + Math.floor(16384 / 512) = 16 + 32 = 48
`$3
Automatyczna kompresja gzip dla payloadów RPC większych niż threshold:
`typescript
const config = new CommandBusConfig({
redisUrl: 'redis://localhost:6379',
compressionThreshold: 2048, // 2KB (domyślnie 1KB)
});// Przykład: payload 3KB zostanie automatycznie skompresowany
const largeCommand = new ProcessReportCommand(largeData); // 3KB
const result = await commandBus.call(largeCommand); // Automatyczna kompresja/dekompresja
`Korzyści kompresji:
- Redukcja transferu danych przez Redis
- Szybsze przesyłanie dużych payloadów
- Niższe zużycie pamięci Redis
- Transparent dla użytkownika (automatyczna dekompresja)
$3
Logowanie komend do plików JSONL (rotacja co godzinę):
`typescript
const config = new CommandBusConfig({
redisUrl: 'redis://localhost:6379',
commandLog: './command-logs', // Ścieżka do katalogu logów
});// Struktura plików:
// ./command-logs/2025-01-27T10.jsonl
// ./command-logs/2025-01-27T11.jsonl
`Format JSONL (JSON Lines):
`json
{"__name":"CreateUserCommand","__id":"uuid","__time":1706347200000,"email":"jan@example.com","name":"Jan"}
{"__name":"ProcessOrderCommand","__id":"uuid","__time":1706347201000,"orderId":"12345"}
`$3
`typescript
const config = new CommandBusConfig({
redisUrl: 'redis://username:password@localhost:6379/0',
// Parsuje się do:
// - host: localhost
// - port: 6379
// - username: username (opcjonalnie)
// - password: password (opcjonalnie)
// - db: 0 (opcjonalnie)
});
`$3
`typescript
const config = new CommandBusConfig({
redisUrl: 'redis://localhost:6379',
concurrency: 10, // Liczba równoległych workerów (lub auto)
maxAttempts: 5, // Maksymalna liczba prób przetworzenia zadania
backoffDelay: 3000, // Opóźnienie między próbami (3s)
});
`$3
`typescript
const config = new CommandBusConfig({
redisUrl: 'redis://localhost:6379',
queueMode: 'fifo', // 'fifo' (First In First Out) lub 'lifo' (Last In First Out)
});
`$3
Logger jest automatycznie opakowywany przez wewnętrzny wrapper, który filtruje logi według
logLevel:`typescript
// Prosty logger - używa console
const config = new CommandBusConfig({
redisUrl: 'redis://localhost:6379',
logger: console,
logLevel: 'log', // debug | log | warn | error
});// Własny logger - musi implementować metody: log, error, warn, debug
class MyLogger {
log(message: string, ...args: unknown[]): void {
// Twoja implementacja
}
error(message: string, ...args: unknown[]): void {
// Twoja implementacja
}
warn(message: string, ...args: unknown[]): void {
// Twoja implementacja
}
debug(message: string, ...args: unknown[]): void {
// Twoja implementacja
}
}
const config2 = new CommandBusConfig({
redisUrl: 'redis://localhost:6379',
logger: new MyLogger(),
logLevel: 'debug', // Wszystkie poziomy
});
`API Reference
$3
####
dispatch(command: Command): PromiseWysyła komendę do kolejki bez oczekiwania na wynik (fire-and-forget).
Flow:
1. Kompresja payloadu (jeśli >threshold)
2. Pobranie/utworzenie kolejki z cache
3. Dodanie job do BullMQ
4. Natychmiastowy return
`typescript
await commandBus.dispatch(new CreateUserCommand({
email: 'email@example.com',
name: 'Jan Kowalski'
}));
`####
callWywołuje komendę synchronicznie i czeka na odpowiedź przez Redis Pub/Sub (RPC). Domyślny timeout: 30000ms (30s).
Flow:
1. Rejestracja pending call z timeoutem
2. Kompresja payloadu (jeśli >threshold)
3. Przygotowanie metadanych RPC (correlationId, responseChannel)
4. Dodanie job do BullMQ
5. Oczekiwanie na odpowiedź przez shared subscriber
6. Dekompresja odpowiedzi
7. Zwrócenie wyniku lub błędu
`typescript
const result = await commandBus.call<{ userId: string }>(command, 5000);
`####
handleRejestruje handler dla określonej klasy komendy. Tylko jeden handler per typ komendy.
Automatyczne akcje:
1. Rejestracja handlera w Map
2. Utworzenie workera BullMQ
3. Uruchomienie benchmarku dla optymalnego concurrency
4. Utworzenie metrics collector
5. Setup event handlers
`typescript
commandBus.handle(CreateUserCommand, async (command) => {
const { email, name } = command.__payload;
const user = await createUser(email, name);
return { userId: user.id };
});
`####
close(): PromiseZamyka wszystkie połączenia i workery z graceful shutdown.
Cleanup:
1. Zamknięcie wszystkich workerów BullMQ
2. Zamknięcie wszystkich kolejek z cache
3. Zamknięcie RpcCoordinator (reject pending calls)
4. Zamknięcie 3 połączeń Redis (Queue, Worker, RPC)
`typescript
await commandBus.close();
`$3
Klasa bazowa dla wszystkich komend. Każda komenda dziedziczy po
Command gdzie T to typ danych biznesowych:`typescript
class MyCommand extends Command<{ data: string }> {
constructor(payload: { data: string }) {
super(payload);
}
}// Uzycie
const cmd = new MyCommand({ data: 'hello' });
console.log(cmd.__payload.data); // 'hello'
`#### Struktura Komendy
Kazda komenda po serializacji ma nastepujaca strukture:
`typescript
{
"__name": "MyCommand", // Nazwa klasy komendy
"__id": "550e8400-e29b-41d4-a716-446655440000", // UUID komendy
"__time": 1706347200000, // Timestamp utworzenia (ms)
"__payload": { // Dane biznesowe komendy
"data": "hello"
}
}
`#### Opis Pol Komendy
| Pole | Typ | Opis |
|------|-----|------|
|
__name | string | Nazwa klasy komendy (automatycznie ustawiana z constructor.name). Uzywana do routowania do odpowiedniego handlera. |
| __id | string | Unikalny identyfikator komendy (UUID v4). Generowany automatycznie przy tworzeniu komendy. Uzywany do korelacji RPC i logowania. |
| __time | number | Timestamp utworzenia komendy w milisekundach (Date.now()). Uzywany do audytu i debugowania. |
| __payload | T | Dane biznesowe komendy. Wszystkie dane specyficzne dla komendy powinny byc przechowywane tutaj. Typ T jest generyczny i definiowany przy tworzeniu klasy komendy. |#### Przyklad Definicji Komendy
`typescript
// Komenda z prostym payloadem
class CreateUserCommand extends Command<{ name: string; email: string }> {
constructor(payload: { name: string; email: string }) {
super(payload);
}
}// Komenda ze zlozonym payloadem
class ProcessOrderCommand extends Command<{
orderId: string;
items: Array<{ productId: string; quantity: number }>;
shippingAddress: {
street: string;
city: string;
postalCode: string;
};
}> {
constructor(payload: {
orderId: string;
items: Array<{ productId: string; quantity: number }>;
shippingAddress: { street: string; city: string; postalCode: string };
}) {
super(payload);
}
}
// Uzycie
const createUser = new CreateUserCommand({
name: 'Jan Kowalski',
email: 'jan@example.com'
});
console.log(createUser.__name); // 'CreateUserCommand'
console.log(createUser.__id); // '550e8400-e29b-41d4-...'
console.log(createUser.__time); // 1706347200000
console.log(createUser.__payload.name); // 'Jan Kowalski'
console.log(createUser.__payload.email); // 'jan@example.com'
`#### Dostep do Danych w Handlerze
`typescript
commandBus.handle(CreateUserCommand, async (command) => {
// Dostep do danych biznesowych przez __payload
const { name, email } = command.__payload; // Dostep do metadanych komendy
console.log(
Processing command ${command.__id} (${command.__name}));
console.log(Created at: ${new Date(command.__time).toISOString()}); // Logika biznesowa
const user = await createUser(name, email);
return { userId: user.id };
});
`Metody statyczne:
-
reconstructDates(obj) - rekonstrukcja obiektow Date z serializowanych danych (uzywane wewnetrznie przez JobProcessor)$3
Konfiguracja CommandBus z opcjami:
`typescript
interface CommandBusConfigOptions {
redisUrl?: string; // URL Redis (domyślnie: 'redis://localhost:6379' lub REDIS_URL)
logger?: ILogger; // Logger (domyślnie console)
logLevel?: 'debug' | 'log' | 'warn' | 'error'; // Poziom logowania (domyślnie 'log')
concurrency?: number; // Liczba workerów (domyślnie 1 lub auto)
maxAttempts?: number; // Maksymalna liczba prób (domyślnie 1)
backoffDelay?: number; // Opóźnienie między próbami w ms (domyślnie 2000)
queueMode?: 'fifo' | 'lifo'; // Tryb kolejki (domyślnie 'fifo')
commandLog?: string; // Ścieżka do katalogu logów komend (opcjonalnie)
autoOptimize?: boolean; // Auto-optymalizacja concurrency (domyślnie true)
compressionThreshold?: number; // Próg kompresji w bajtach (domyślnie 1024)
}
`Przykłady Użycia
$3
`typescript
import { CommandBus, CommandBusConfig, Command } from 'pp-command-bus';// Definicja komendy z payloadem
class CalculateCommand extends Command<{
a: number;
b: number;
operation: 'add' | 'multiply';
}> {
constructor(payload: { a: number; b: number; operation: 'add' | 'multiply' }) {
super(payload);
}
}
// Konfiguracja
const config = new CommandBusConfig({
redisUrl: 'redis://localhost:6379',
});
const commandBus = new CommandBus(config);
// Rejestracja handlera
commandBus.handle(CalculateCommand, async (command) => {
const { a, b, operation } = command.__payload;
console.log(
Calculating: ${a} ${operation} ${b}); switch (operation) {
case 'add':
return a + b;
case 'multiply':
return a * b;
}
});
// Fire-and-forget
await commandBus.dispatch(new CalculateCommand({ a: 5, b: 3, operation: 'add' }));
// RPC - czekamy na wynik
const result = await commandBus.call(
new CalculateCommand({ a: 5, b: 3, operation: 'multiply' }),
5000 // timeout 5s
);
console.log(
Result: ${result}); // Result: 15
`$3
`typescript
// Wiele rownoległych RPC calls
const [result1, result2, result3] = await Promise.all([
commandBus.call(new CalculateCommand({ a: 10, b: 5, operation: 'add' })),
commandBus.call(new GetUserInfoCommand({ userId: 'user-1' })),
commandBus.call(new ValidateUserCommand({ email: 'jan@example.com', age: 30 })),
]);console.log('All results:', { result1, result2, result3 });
`$3
`typescript
commandBus.handle(CreateUserCommand, async (command) => {
try {
const { email, name } = command.__payload; // Walidacja
if (!email.includes('@')) {
throw new Error('Invalid email format');
}
const user = await createUser(email, name);
return { userId: user.id };
} catch (error) {
// Loguj blad
console.error('Failed to create user:', error);
// Rzuc blad - BullMQ sprobuje ponownie (maxAttempts)
throw error;
}
});
// Obsluga bledow w RPC
try {
const result = await commandBus.call(
new CreateUserCommand({ email: 'invalid-email', name: 'Jan' })
);
} catch (error) {
console.error('RPC failed:', error.message);
}
`$3
`typescript
process.on('SIGTERM', async () => {
console.log('Shutting down CommandBus...');
await commandBus.close();
process.exit(0);
});process.on('SIGINT', async () => {
console.log('Shutting down CommandBus...');
await commandBus.close();
process.exit(0);
});
`Zaawansowane Funkcje
$3
WorkerOrchestrator automatycznie dostosowuje concurrency na podstawie metryk:
- Benchmark przy starcie - optymalny concurrency dla każdego workera
- Event-driven metrics - zbieranie metryk z workerów
- Dynamiczne dostosowanie - +/-20% co 30s (cooldown)
- Limity - min 10, max 2000
`typescript
// Automatyczne - benchmark ustali optymalną wartość
const config = new CommandBusConfig({
redisUrl: 'redis://localhost:6379',
// concurrency zostanie ustalone przez benchmark (np. 15-20)
});
`$3
Każdy proces Node.js ma unikalny UUID - odpowiedzi RPC są izolowane między procesami:
`
Process A (UUID: abc-123):
- Kanał: rpc:response:abc-123:*
- Otrzymuje tylko swoje odpowiedziProcess B (UUID: def-456):
- Kanał: rpc:response:def-456:*
- Otrzymuje tylko swoje odpowiedzi
`$3
Jeden shared subscriber dla wszystkich RPC calls zamiast N subskrybentów:
Tradycyjne podejście (N subskrybentów):
`
1000 RPC calls → 1000 Redis subscriptions → duże obciążenie
`Shared Subscriber (1 subskrybent):
`
1000 RPC calls → 1 Redis pattern subscription → multiplexing w pamięci
`$3
PayloadCompressionService automatycznie kompresuje duże payloady:
`typescript
// Mała komenda (<1KB) - brak kompresji
const smallCommand = new CreateUserCommand('jan@example.com', 'Jan');
await commandBus.call(smallCommand); // Bez kompresji// Duża komenda (>1KB) - automatyczna kompresja
const largeCommand = new ProcessReportCommand(largeData); // 5KB
await commandBus.call(largeCommand); // Automatyczna kompresja gzip → base64
`Flagi kompresji:
-
__compressed: true - payload został skompresowany
- Automatyczna dekompresja w JobProcessor
- Transparent dla użytkownika$3
Persystencja wszystkich komend do plików JSONL:
`typescript
const config = new CommandBusConfig({
commandLog: './command-logs',
});// Każda komenda jest logowana do pliku:
// ./command-logs/2025-01-27T10.jsonl
`Use cases:
- Auditing i compliance
- Replay komend
- Debugging produkcyjnych problemów
- Analiza przepływu komend
$3
Automatyczne usuwanie niepodjętych jobów RPC przy timeout:
`
RPC Timeout Flow:User Code RpcCoordinator Redis JobProcessor
│ │ │ │
│── call(cmd) ─────▶│ │ │
│ │── registerCall ──▶│ │
│ │── queue.add ─────▶│ │
│ │ │ │
│ [timeout] │ │ │
│ │ │ │
│ │── markCancelled ─▶│ SET rpc:cancelled:xxx
│ │── tryRemoveJob ──▶│ (próba usunięcia)
│◀── Error ─────────│ │ │
│ │ │ │
│ │ │ [job picked up] │
│ │ │──────────────────▶│
│ │ │ │── isCancelled?
│ │ │◀──────────────────│ → true
│ │ │ │── SKIP handler
│ │ │ │── clearCancellation
`Kluczowe cechy:
- Dwufazowe anulowanie: Flaga Redis + próba usunięcia joba z kolejki
- Graceful degradation: Błędy Redis nie blokują przetwarzania
- TTL 24h: Automatyczne wygaśnięcie kluczy cancellation
- Aktywne czyszczenie: Klucze usuwane po przetworzeniu lub usunięciu joba
- Kompatybilność wsteczna: Funkcjonalność jest opcjonalna
Korzyści:
- Oszczędność zasobów - nieprzetworzone joby nie obciążają workerów
- Brak efektów ubocznych - handler nie wykonuje się dla timeout'owanych RPC
- Lepsza diagnostyka - logi pokazują pominięte joby
Best Practices
$3
Biblioteka jest napisana w TypeScript z strict mode. Wykorzystaj typy dla lepszego DX:
`typescript
interface UserCreatedResult {
userId: string;
createdAt: Date;
}const result = await commandBus.call(command);
`$3
Handlery powinny byc idempotentne (wielokrotne wykonanie = ten sam rezultat):
`typescript
commandBus.handle(CreateUserCommand, async (command) => {
const { email, name } = command.__payload; // Sprawdz czy uzytkownik juz istnieje
const existing = await findUserByEmail(email);
if (existing) {
return { userId: existing.id }; // Zwroc istniejacego
}
// Utworz nowego
const user = await createUser(email, name);
return { userId: user.id };
});
`$3
`typescript
// Ustaw timeout dostosowany do czasu przetwarzania komendy
const shortCommand = new QuickCommand();
const result1 = await commandBus.call(shortCommand, 1000); // 1s dla szybkich operacjiconst longRunningCommand = new ProcessReportCommand();
const result2 = await commandBus.call(longRunningCommand, 60000); // 60s dla długich operacji
`$3
`typescript
commandBus.handle(CreateUserCommand, async (command) => {
const { email, name } = command.__payload; // Walidacja na poczatku
if (!email || !email.includes('@')) {
throw new Error('Invalid email');
}
if (!name || name.length < 2) {
throw new Error('Invalid name');
}
// Logika biznesowa
return await createUser(email, name);
});
`$3
`typescript
commandBus.handle(CreateUserCommand, async (command) => {
const { email, name } = command.__payload; console.log('Processing CreateUserCommand', {
commandId: command.__id,
timestamp: command.__time,
email: email,
});
// Logika biznesowa
const user = await createUser(email, name);
console.log('User created successfully', {
commandId: command.__id,
userId: user.id,
});
return { userId: user.id };
});
`Testing
$3
`typescript
import { CommandBus, CommandBusConfig, Command } from 'pp-command-bus';// Definicja komendy testowej
class CreateUserCommand extends Command<{ email: string; name: string }> {
constructor(payload: { email: string; name: string }) {
super(payload);
}
}
describe('User Commands', () => {
let commandBus: CommandBus;
beforeAll(async () => {
const config = new CommandBusConfig({
redisUrl: 'redis://localhost:6379',
logger: console,
logLevel: 'error', // Tylko bledy w testach
});
commandBus = new CommandBus(config);
// Zarejestruj handler
commandBus.handle(CreateUserCommand, async (command) => {
return { userId: 'test-user-id' };
});
});
afterAll(async () => {
await commandBus.close();
});
it('should create user', async () => {
const command = new CreateUserCommand({
email: 'test@example.com',
name: 'Test User'
});
const result = await commandBus.call<{ userId: string }>(command, 5000);
expect(result.userId).toBeDefined();
expect(result.userId).toBe('test-user-id');
});
it('should handle multiple commands in parallel', async () => {
const commands = [
new CreateUserCommand({ email: 'user1@example.com', name: 'User 1' }),
new CreateUserCommand({ email: 'user2@example.com', name: 'User 2' }),
new CreateUserCommand({ email: 'user3@example.com', name: 'User 3' }),
];
const results = await Promise.all(
commands.map((cmd) => commandBus.call<{ userId: string }>(cmd, 5000))
);
expect(results).toHaveLength(3);
results.forEach((result) => {
expect(result.userId).toBeDefined();
});
});
});
`Troubleshooting
$3
Upewnij się że Redis działa:
`bash
redis-cli ping
Powinno zwrócić: PONG
`$3
Zwiększ timeout lub sprawdź czy handler został zarejestrowany:
`typescript
// Zwiększ timeout
const result = await commandBus.call(command, 60000); // 60s// Sprawdź czy handler został zarejestrowany PRZED wywołaniem
commandBus.handle(MyCommand, async (command) => {
// Handler implementation
return { result: 'success' };
});
// Teraz możesz wywołać komendę
const result = await commandBus.call(new MyCommand());
`$3
Upewnij się że handler został zarejestrowany przed wysłaniem komendy:
`typescript
// ❌ ŹLE - handler po dispatch
await commandBus.dispatch(new MyCommand());
commandBus.handle(MyCommand, async (cmd) => { ... }); // Za późno!// ✅ DOBRZE - handler przed dispatch
commandBus.handle(MyCommand, async (cmd) => { ... });
await commandBus.dispatch(new MyCommand()); // Teraz OK
`$3
1. Wyłącz command logging jeśli nie jest potrzebne
2. Zmniejsz concurrency jeśli workery używają dużo pamięci
3. Zwiększ compressionThreshold jeśli duże payloady powodują problemy
`typescript
const config = new CommandBusConfig({
commandLog: undefined, // Wyłącz logging
concurrency: 5, // Zmniejsz concurrency
compressionThreshold: 512, // Kompresuj już od 512B
});
`$3
Worker został zatrzymany (prawdopodobnie crashed). BullMQ automatycznie przeniesie job do innego workera.
Przyczyny:
- Out of memory
- Uncaught exception w handlerze
- Timeout w handlerze
Rozwiązanie:
- Dodaj try/catch w handlerze
- Zwiększ pamięć dla procesu
- Zmniejsz concurrency
Struktura Projektu
`
src/
├── command-bus/ # Główna logika CommandBus
│ ├── config/ # Konfiguracja CommandBus
│ │ ├── command-bus-config.ts
│ │ └── auto-config-optimizer.ts
│ ├── job/ # Przetwarzanie jobów i opcje
│ │ ├── job-processor.ts
│ │ └── job-options-builder.ts
│ ├── logging/ # Command logging do plików
│ │ └── command-logger.ts
│ ├── queue/ # Queue management i cache
│ │ └── queue-manager.ts
│ ├── rpc/ # RPC coordination
│ │ ├── rpc-coordinator.ts
│ │ ├── payload-compression.service.ts
│ │ └── rpc-job-cancellation.service.ts # Anulowanie jobów RPC przy timeout
│ ├── worker/ # Worker orchestration
│ │ ├── worker-orchestrator.ts
│ │ ├── worker-benchmark.ts
│ │ └── worker-metrics-collector.ts
│ ├── types/ # Typy TypeScript
│ │ └── index.ts
│ ├── command.ts # Klasa bazowa Command
│ └── index.ts # CommandBus główna klasa
├── shared/ # Wspólne komponenty
│ ├── config/ # Base config z Redis
│ │ └── base-config.ts
│ ├── logging/ # Logger wrapper z poziomami
│ │ ├── logger.ts
│ │ └── log-level.ts
│ ├── redis/ # Fabryka połączeń Redis (SRP)
│ │ ├── redis-connection-factory.ts # Tworzenie połączeń z event handlers
│ │ ├── redis-error-formatter.ts # Formatowanie błędów (AggregateError)
│ │ └── index.ts # Eksporty modułu
│ └── types.ts # Współdzielone typy
├── examples/ # Przykłady użycia
│ ├── rpc.demo.ts # Demo RPC calls
│ ├── rpc-throughput.demo.ts # Demo wydajności RPC
│ ├── rpc-compression.demo.ts # Demo kompresji
│ └── rpc-resilience.demo.ts # Demo reconnect/failover (5 min test)
└── index.ts # Główny export pakietu
`Migracja z pp-event-bus 1.x
Jeśli używałeś Command Bus z pakietu
pp-event-bus w wersji 1.x:$3
`typescript
import { CommandBus, Command, CommandBusConfig } from 'pp-event-bus';
`$3
`bash
npm install pp-command-bus
``typescript
import { CommandBus, Command, CommandBusConfig } from 'pp-command-bus';
`Pełna zgodność API - jedyna zmiana to źródło importu.
Wersjonowanie i Releases
Projekt używa Semantic Versioning oraz automatycznych release'ów dzięki semantic-release.
$3
Używamy Conventional Commits:
`bash
Nowa funkcjonalność (minor release)
feat: dodano wsparcie dla delayed commandsPoprawka błędu (patch release)
fix: naprawiono memory leak w RPCBreaking change (major release)
feat!: zmieniono API CommandBusConfigInne typy (patch release)
docs: zaktualizowano dokumentację API
perf: zoptymalizowano przetwarzanie komend
refactor: uproszczono kod RPC handler
test: dodano testy dla command logging
`$3
-
feat: - nowa funkcjonalność (→ minor release)
- fix: - poprawka błędu (→ patch release)
- perf: - optymalizacja wydajności (→ patch release)
- docs: - zmiany w dokumentacji (→ patch release)
- style: - formatowanie kodu (→ patch release)
- refactor: - refaktoryzacja (→ patch release)
- test: - dodanie/poprawka testów (→ patch release)
- build: - zmiany w buildzie/zależnościach (→ patch release)
- ci: - zmiany w CI/CD (→ patch release)
- chore: - inne zmiany (bez release)
- ! lub BREAKING CHANGE:` - breaking change (→ major release)Szczegółowa dokumentacja architektury systemu, wzorców projektowych i zasad SOLID:
- ARCHITECTURE.md - Kompletny opis architektury, diagramy komponentów, przepływy danych
- QueueManager - zarządzanie kolejkami BullMQ i cache kolejek
- WorkerOrchestrator - orkiestracja workerów, dynamiczne concurrency, benchmark
- JobProcessor - przetwarzanie jobów i wykonanie handlerów
- RpcCoordinator - zarządzanie wywołaniami RPC przez Redis Pub/Sub
- RpcJobCancellationService - anulowanie niepodjętych jobów RPC przy timeout
- JobOptionsBuilder - konfiguracja opcji dla jobów BullMQ
- CommandLogger - persystencja komend do plików JSONL
- PayloadCompressionService - automatyczna kompresja gzip
- AutoConfigOptimizer - optymalizacja concurrency na podstawie zasobów
MIT
- GitLab Repository
- Issue Tracker
- BullMQ Documentation
- Semantic Versioning
- Conventional Commits