Generic MQTT Client supporting MQTT 3.1, 3.1.1, and 5.0 with TCP, TLS, WebSocket, and WSS transports
npm install mqtt-client-tssA modular, enterprise-grade MQTT client library written in TypeScript. Designed for reliability and performance, this library supports MQTT 3.1, 3.1.1, and 5.0 protocols with TCP, TLS/SSL, WebSocket, and Secure WebSocket transports.
- Features
- Installation
- Quick Start
- Connection Options
- API Reference
- Creating a Client
- Connecting
- Publishing Messages
- Subscribing to Topics
- Unsubscribing
- Disconnecting
- Events
- Transport Examples
- Protocol Versions
- MQTT 5.0 Features
- Advanced Features
- Metrics Collection
- Rate Limiting
- Message Queue
- Connection Pool
- Topic Matcher
- Retry Manager
- Message Store
- Logger
- Modular Architecture
- TypeScript Support
- Building from Source
- License
``bash`
npm install mqtt-client-tss
The library requires the ws package for WebSocket support:
`bash`
npm install ws
For TypeScript projects, install the type definitions:
`bash`
npm install --save-dev @types/ws @types/node typescript
`typescript
import { createMqttClient } from 'mqtt-client-tss';
const client = createMqttClient({
host: 'broker.hivemq.com',
port: 1883,
transport: 'tcp',
clientId: 'my-app-001',
});
client.on('connect', () => {
console.log('Connected to broker');
});
client.on('message', (topic, payload, message) => {
console.log(${topic}: ${payload.toString()});
});
client.on('error', (error) => {
console.error('Connection error:', error.message);
});
async function main() {
await client.connect();
await client.subscribe('sensors/#', { qos: 1 });
await client.publish('sensors/temperature', '23.5', { qos: 1 });
}
main();
`
`typescript
interface MqttClientOptions {
// Required
host: string;
port: number;
// Transport (default: 'tcp')
transport?: 'tcp' | 'tls' | 'ws' | 'wss';
// Protocol version (default: 4, which is MQTT 3.1.1)
protocolVersion?: 3 | 4 | 5;
// Client identification
clientId?: string;
username?: string;
password?: string;
// Session management
cleanSession?: boolean; // MQTT 3.x
cleanStart?: boolean; // MQTT 5.0
sessionExpiryInterval?: number; // MQTT 5.0, in seconds
// Timing
keepAlive?: number; // Seconds (default: 60)
connectTimeout?: number; // Milliseconds (default: 30000)
// Reconnection
reconnect?: boolean; // Enable auto-reconnect (default: true)
reconnectInterval?: number; // Milliseconds (default: 1000)
maxReconnectAttempts?: number; // (default: 10)
// TLS options
rejectUnauthorized?: boolean;
ca?: string | Buffer | Array
cert?: string | Buffer;
key?: string | Buffer;
passphrase?: string;
tlsOptions?: tls.ConnectionOptions;
// WebSocket options
wsPath?: string; // (default: '/mqtt')
wsHeaders?: Record
wsProtocols?: string | string[];
// Will message
will?: {
topic: string;
payload: string | Buffer;
qos?: 0 | 1 | 2;
retain?: boolean;
properties?: MqttProperties;
};
// MQTT 5.0 connection properties
properties?: MqttProperties;
// Advanced options
queueSize?: number; // Offline queue size (default: 1000)
rateLimit?: number; // Messages per second
enableMetrics?: boolean; // Enable metrics collection (default: true)
enableLogging?: boolean; // Enable logging (default: false)
logLevel?: 'trace' | 'debug' | 'info' | 'warn' | 'error' | 'silent';
}
`
`typescript
import { createMqttClient, MqttClient } from 'mqtt-client-tss';
// Using factory function
const client = createMqttClient({
host: 'localhost',
port: 1883,
});
// Using constructor
const client = new MqttClient({
host: 'localhost',
port: 1883,
});
`
`typescript`
await client.connect();
The connect method returns a Promise that resolves when the connection is established and the CONNACK packet is received. It rejects if the connection fails or times out.
`typescript
// QoS 0 - Fire and forget
await client.publish('topic/test', 'Hello World');
// QoS 1 - At least once delivery
await client.publish('topic/important', 'Important message', { qos: 1 });
// QoS 2 - Exactly once delivery
await client.publish('topic/critical', 'Critical data', { qos: 2 });
// With retain flag
await client.publish('device/status', 'online', { qos: 1, retain: true });
// With MQTT 5.0 properties
await client.publish('data/sensor', JSON.stringify({ temp: 25.5 }), {
qos: 1,
properties: {
messageExpiryInterval: 3600,
contentType: 'application/json',
userProperties: [
{ key: 'device-id', value: 'sensor-001' }
]
}
});
`
`typescript
// Single topic
await client.subscribe('sensors/temperature');
// Single topic with QoS
await client.subscribe('sensors/+/temperature', { qos: 1 });
// Multiple topics with same options
await client.subscribe(['sensors/#', 'alerts/#'], { qos: 1 });
// Multiple topics with different options
const grants = await client.subscribe({
'sensors/temperature': { qos: 0 },
'sensors/humidity': { qos: 1 },
'alerts/critical': { qos: 2 }
});
// Check granted QoS levels
grants.forEach(grant => {
console.log(${grant.topic}: QoS ${grant.qos});`
});
MQTT 5.0 subscription options:
`typescript`
await client.subscribe('commands/#', {
qos: 1,
nl: true, // No Local - do not receive own messages
rap: true, // Retain As Published - preserve retain flag
rh: 1 // Retain Handling: 0=always, 1=if new sub, 2=never
});
`typescript
// Single topic
await client.unsubscribe('sensors/temperature');
// Multiple topics
await client.unsubscribe(['sensors/#', 'alerts/#']);
`
`typescript
// Normal disconnect
await client.disconnect();
// MQTT 5.0 disconnect with reason code
import { ReasonCode } from 'mqtt-client-tss';
await client.disconnect(ReasonCode.NormalDisconnection, {
reasonString: 'Client shutting down'
});
// Force close without sending DISCONNECT packet
client.end(true);
`
`typescript
// Connection established
client.on('connect', ({ sessionPresent, returnCode, properties }) => {
console.log('Connected, session present:', sessionPresent);
});
// Connection closed
client.on('close', () => {
console.log('Disconnected');
});
// Reconnecting
client.on('reconnect', ({ attempt, delay }) => {
console.log(Reconnect attempt ${attempt} in ${delay}ms);
});
// Reconnection failed
client.on('reconnect_failed', () => {
console.log('Max reconnection attempts reached');
});
// Message received
client.on('message', (topic, payload, message) => {
console.log(Topic: ${topic});Payload: ${payload.toString()}
console.log();QoS: ${message.qos}
console.log();Retain: ${message.retain}
console.log();Properties:
if (message.properties) {
console.log(, message.properties);
}
});
// Error occurred
client.on('error', (error) => {
console.error('Error:', error.message);
});
// Ping response (keep-alive)
client.on('pingresp', () => {
console.log('Ping response received');
});
// MQTT 5.0 disconnect from broker
client.on('disconnect', ({ reasonCode, properties }) => {
console.log('Broker disconnected us:', reasonCode);
});
`
`typescript`
const client = createMqttClient({
host: 'broker.hivemq.com',
port: 1883,
transport: 'tcp',
});
`typescript
import * as fs from 'fs';
const client = createMqttClient({
host: 'broker.example.com',
port: 8883,
transport: 'tls',
rejectUnauthorized: true,
ca: fs.readFileSync('ca.crt'),
cert: fs.readFileSync('client.crt'),
key: fs.readFileSync('client.key'),
passphrase: 'key-password',
});
`
`typescript`
const client = createMqttClient({
host: 'broker.hivemq.com',
port: 8000,
transport: 'ws',
wsPath: '/mqtt',
});
`typescript`
const client = createMqttClient({
host: 'broker.hivemq.com',
port: 8884,
transport: 'wss',
wsPath: '/mqtt',
wsHeaders: {
'Authorization': 'Bearer your-token'
},
});
`typescript`
const client = createMqttClient({
host: 'broker.example.com',
port: 1883,
protocolVersion: 3,
cleanSession: true,
});
`typescript`
const client = createMqttClient({
host: 'broker.example.com',
port: 1883,
protocolVersion: 4,
cleanSession: true,
});
`typescript`
const client = createMqttClient({
host: 'broker.example.com',
port: 1883,
protocolVersion: 5,
cleanStart: true,
sessionExpiryInterval: 3600,
properties: {
receiveMaximum: 100,
topicAliasMaximum: 10,
requestResponseInformation: 1,
},
});
| Property | Type | Description |
|----------|------|-------------|
| sessionExpiryInterval | number | Session expiry in seconds |
| receiveMaximum | number | Max inflight QoS 1/2 messages |
| maximumPacketSize | number | Max packet size in bytes |
| topicAliasMaximum | number | Max topic alias value |
| requestResponseInformation | 0 or 1 | Request response info from broker |
| requestProblemInformation | 0 or 1 | Request problem info on failures |
| userProperties | Array | Custom key-value pairs |
| authenticationMethod | string | Authentication method name |
| authenticationData | Buffer | Authentication data |
| Property | Type | Description |
|----------|------|-------------|
| payloadFormatIndicator | 0 or 1 | 0=bytes, 1=UTF-8 string |
| messageExpiryInterval | number | Message expiry in seconds |
| topicAlias | number | Topic alias to use |
| responseTopic | string | Response topic for request/response |
| correlationData | Buffer | Correlation data for request/response |
| contentType | string | MIME content type |
| userProperties | Array | Custom key-value pairs |
`typescript
import { ReasonCode } from 'mqtt-client-tss';
ReasonCode.Success // 0x00
ReasonCode.NormalDisconnection // 0x00
ReasonCode.UnspecifiedError // 0x80
ReasonCode.MalformedPacket // 0x81
ReasonCode.ProtocolError // 0x82
ReasonCode.NotAuthorized // 0x87
ReasonCode.ServerUnavailable // 0x88
ReasonCode.TopicFilterInvalid // 0x8F
ReasonCode.TopicNameInvalid // 0x90
ReasonCode.QuotaExceeded // 0x97
`
The library includes built-in metrics collection for monitoring performance:
`typescript
import { createMqttClient, createMetricsCollector } from 'mqtt-client-tss';
const client = createMqttClient({
host: 'localhost',
port: 1883,
enableMetrics: true,
});
// Get metrics after some activity
const stats = client.getMetrics();
console.log(stats);
// Output:
// {
// messages: { published: 150, received: 42, publishRate: '2.50/s', receiveRate: '0.70/s' },
// bytes: { published: '15.23 KB', received: '4.12 KB', publishRate: '253.83 B/s', receiveRate: '68.67 B/s' },
// latency: { average: '12.34ms', p50: '10.00ms', p95: '25.00ms', p99: '45.00ms' },
// connection: { reconnections: 0, errors: 0, uptime: '1h 23m 45s', connectedAt: '2024-01-15T10:30:00.000Z' }
// }
`
You can also use the MetricsCollector independently:
`typescript
import { createMetricsCollector } from 'mqtt-client-tss';
const metrics = createMetricsCollector();
metrics.recordPublish(256);
metrics.recordReceive(128);
metrics.recordLatency(15);
console.log(metrics.getAverageLatency());
console.log(metrics.getLatencyPercentile(95));
console.log(metrics.getPublishRate());
`
Prevent overwhelming the broker with a token bucket rate limiter:
`typescript
import { createMqttClient } from 'mqtt-client-tss';
// Limit to 100 messages per second
const client = createMqttClient({
host: 'localhost',
port: 1883,
rateLimit: 100,
});
// Messages exceeding the rate will be queued automatically
`
Using the rate limiter independently:
`typescript
import { createRateLimiter, createSlidingWindowRateLimiter } from 'mqtt-client-tss';
// Token bucket rate limiter
const limiter = createRateLimiter({
tokensPerSecond: 100,
bucketSize: 150, // Allow bursts up to 150
});
async function sendMessage() {
await limiter.acquire();
// Send message
}
// Check without blocking
if (limiter.tryAcquire()) {
// Token available, send immediately
}
// Sliding window rate limiter (more accurate for short bursts)
const slidingLimiter = createSlidingWindowRateLimiter({
windowMs: 1000, // 1 second window
maxRequests: 100, // Max 100 requests per window
});
`
Messages published while disconnected are queued and sent upon reconnection:
`typescript
import { createMqttClient } from 'mqtt-client-tss';
const client = createMqttClient({
host: 'localhost',
port: 1883,
reconnect: true,
queueSize: 1000, // Queue up to 1000 messages
});
// If disconnected, this message is queued
await client.publish('status', 'online', { qos: 1 });
// Message will be sent automatically when reconnected
`
Using the message queue independently:
`typescript
import { createMessageQueue } from 'mqtt-client-tss';
const queue = createMessageQueue({
maxSize: 1000,
maxAge: 60000, // Messages expire after 1 minute
maxRetries: 3,
priorityLevels: 3, // Higher QoS = higher priority
});
queue.enqueue('topic', Buffer.from('payload'), { qos: 1 });
const message = queue.dequeue();
if (message) {
// Process message
}
// Get queue statistics
const stats = queue.getStats();
`
Manage multiple connections for load balancing and failover:
`typescript
import { createConnectionPool, createMqttClient } from 'mqtt-client-tss';
const pool = createConnectionPool({
minConnections: 2,
maxConnections: 10,
loadBalancing: 'round-robin', // or 'least-connections', 'weighted', 'random'
healthCheckInterval: 30000,
});
// Add connections
pool.addConnection(
{ host: 'broker1.example.com', port: 1883 },
() => createMqttClient({ host: 'broker1.example.com', port: 1883 }).connect(),
{ weight: 2, priority: 1 }
);
pool.addConnection(
{ host: 'broker2.example.com', port: 1883 },
() => createMqttClient({ host: 'broker2.example.com', port: 1883 }).connect(),
{ weight: 1, priority: 0 }
);
// Start health checks
pool.startHealthChecks();
// Acquire a connection from the pool
const { id, connection } = await pool.acquire();
await connection.publish('topic', 'message');
pool.release(id);
// Get pool statistics
const stats = pool.getStats();
// Clean up
await pool.close();
`
MQTT wildcard topic matching utility:
`typescript
import { TopicMatcher, createTopicMatcher } from 'mqtt-client-tss';
// Static method for simple matching
TopicMatcher.matches('sensors/+/temperature', 'sensors/living-room/temperature'); // true
TopicMatcher.matches('sensors/#', 'sensors/a/b/c'); // true
TopicMatcher.matches('sensors/+', 'sensors/a/b'); // false
// Instance for subscription management
const matcher = createTopicMatcher();
matcher.addSubscription('sensors/#', 'subscriber-1');
matcher.addSubscription('alerts/+/critical', 'subscriber-2');
const patterns = matcher.getMatchingPatterns('sensors/temperature');
// ['sensors/#']
const subscribers = matcher.getSubscriberIds('alerts/fire/critical');
// ['subscriber-2']
// Validation
TopicMatcher.isValidTopic('sensors/temperature'); // true
TopicMatcher.isValidTopic('sensors/+/temperature'); // false (wildcards not allowed in topics)
TopicMatcher.isValidFilter('sensors/+/temperature'); // true
TopicMatcher.isValidFilter('sensors/#/more'); // false (# must be last)
// Shared subscriptions (MQTT 5.0)
TopicMatcher.isSharedSubscription('$share/group1/sensors/#'); // true
const parsed = TopicMatcher.parseSharedSubscription('$share/group1/sensors/#');
// { groupId: 'group1', topic: 'sensors/#' }
`
Intelligent retry logic with exponential backoff:
`typescript
import { createRetryManager, RetryError } from 'mqtt-client-tss';
const retry = createRetryManager({
maxRetries: 5,
initialDelay: 1000,
maxDelay: 30000,
multiplier: 2,
jitter: true,
jitterFactor: 0.2,
onRetry: (attempt, error, delay) => {
console.log(Retry ${attempt} after ${delay}ms: ${error.message});
}
});
// Execute with automatic retry
try {
const result = await retry.execute(async () => {
// Your operation that might fail
return await fetchData();
});
} catch (error) {
if (error instanceof RetryError) {
console.log(Failed after ${error.attempts} attempts);Last error: ${error.lastError.message}
console.log();
}
}
// Wrap a function for reuse
const fetchWithRetry = retry.wrap(fetchData);
const result = await fetchWithRetry();
// Get the retry schedule
const schedule = retry.getSchedule();
// [1000, 2000, 4000, 8000, 16000]
const maxDelay = retry.getMaxTotalDelay();
// 31000
`
Persistence layer for QoS 1 and QoS 2 messages:
`typescript
import { MemoryMessageStore, FileMessageStore, createMessageStore } from 'mqtt-client-tss';
// In-memory store (messages lost on restart)
const memoryStore = new MemoryMessageStore();
// File-based store (persistent across restarts)
const fileStore = new FileMessageStore('./mqtt-messages.json');
// Factory function
const store = createMessageStore({ type: 'file', filePath: './messages.json' });
// Store a message
await store.put({
packetId: 1,
topic: 'test',
payload: Buffer.from('hello'),
qos: 1,
retain: false,
timestamp: Date.now(),
retries: 0,
state: 'pending'
});
// Retrieve a message
const message = await store.get(1);
// Get all pending messages (for resending on reconnect)
const pending = await store.getAll();
// Delete after acknowledgment
await store.delete(1);
`
Flexible logging with multiple levels:
`typescript
import { createLogger, silentLogger } from 'mqtt-client-tss';
const logger = createLogger({
level: 'debug',
prefix: 'MyApp',
timestamps: true,
colors: true,
});
logger.trace('Very detailed info');
logger.debug('Debugging info');
logger.info('General info');
logger.warn('Warning message');
logger.error('Error occurred');
// Create child logger with sub-prefix
const subLogger = logger.child('Connection');
subLogger.info('Connected'); // Output: [MyApp:Connection] Connected
// Silent logger for testing
const client = createMqttClient({
host: 'localhost',
port: 1883,
enableLogging: false, // Uses silent logger
});
`
The library is organized into independent modules that can be imported separately:
``
mqtt-client-tss/
src/
index.ts # Main entry point, exports everything
types.ts # TypeScript type definitions
client/
mqtt-client.ts # Main MQTT client implementation
index.ts
protocol/
packet-builder.ts # Constructs MQTT packets
packet-parser.ts # Parses incoming MQTT packets
property-encoder.ts # Encodes MQTT 5.0 properties
property-decoder.ts # Decodes MQTT 5.0 properties
index.ts
utils/
logger.ts # Logging utility
topic-matcher.ts # Wildcard topic matching
message-store.ts # Message persistence
metrics.ts # Performance metrics
rate-limiter.ts # Rate limiting
message-queue.ts # Offline message queue
retry-manager.ts # Retry with backoff
connection-pool.ts # Connection pooling
index.ts
`typescript
// Import only what you need
import { MqttClient, createMqttClient } from 'mqtt-client-tss';
import { TopicMatcher } from 'mqtt-client-tss';
import { createRateLimiter } from 'mqtt-client-tss';
// Or import from specific modules
import { PacketBuilder, PacketParser } from 'mqtt-client-tss/dist/protocol';
import { Logger, MetricsCollector } from 'mqtt-client-tss/dist/utils';
`
The library is written in TypeScript and provides complete type definitions:
`typescript
import {
MqttClient,
createMqttClient,
MqttClientOptions,
MqttMessage,
MqttProperties,
PublishOptions,
SubscribeOptions,
SubscriptionGrant,
QoS,
TransportType,
MqttProtocolVersion,
ReasonCode,
ConnectionState,
PacketType,
} from 'mqtt-client-tss';
const options: MqttClientOptions = {
host: 'localhost',
port: 1883,
transport: 'tcp' as TransportType,
protocolVersion: 5 as MqttProtocolVersion,
};
const client: MqttClient = createMqttClient(options);
client.on('message', (topic: string, payload: Buffer, message: MqttMessage) => {
const qos: QoS = message.qos;
const properties: MqttProperties | undefined = message.properties;
});
const publishOpts: PublishOptions = {
qos: 1,
retain: false,
properties: {
messageExpiryInterval: 3600,
},
};
const grants: SubscriptionGrant[] = await client.subscribe('test/#', { qos: 1 });
`
`bashClone the repository
git clone
cd mqtt-client-tss
The compiled JavaScript files will be in the
dist directory, along with TypeScript declaration files (.d.ts) and source maps.File Structure After Build
`
dist/
index.js # Main entry point
index.d.ts # Type declarations
types.js
types.d.ts
client/
mqtt-client.js
mqtt-client.d.ts
index.js
index.d.ts
protocol/
packet-builder.js
packet-builder.d.ts
packet-parser.js
packet-parser.d.ts
property-encoder.js
property-encoder.d.ts
property-decoder.js
property-decoder.d.ts
index.js
index.d.ts
utils/
logger.js
logger.d.ts
topic-matcher.js
topic-matcher.d.ts
message-store.js
message-store.d.ts
metrics.js
metrics.d.ts
rate-limiter.js
rate-limiter.d.ts
message-queue.js
message-queue.d.ts
retry-manager.js
retry-manager.d.ts
connection-pool.js
connection-pool.d.ts
index.js
index.d.ts
``MIT License
Contributions are welcome. Please feel free to submit issues and pull requests.