Message bus system with event registry, channels, and subscriptions
npm install @jucie-engine/messageA robust message passing system for the Jucie engine that enables typed, reliable communication between different parts of an application using Web Workers' MessageChannel API.
The Message extension provides a MessageBus (primary/central hub) and Channel (secondary/client) architecture for inter-process communication. It supports typed events, message routing, subscription management, and error handling.
```
┌─────────────────┐ MessageChannel ┌─────────────────┐
│ MessageBus │◄────────────────────►│ Channel │
│ (Primary) │ │ (Secondary) │
│ │ │ │
│ • Manages ports │ │ • Connects to │
│ • Routes msgs │ │ MessageBus │
│ • Handles subs │ │ • Publishes │
│ • Validates │ │ • Subscribes │
└─────────────────┘ └─────────────────┘
- MessageBus: Central message router that manages multiple channels and routes messages between them
- Channel: Client that connects to MessageBus and can publish/subscribe to typed events
- Port: Low-level communication layer handling MessageChannel handshaking and message transport
- EventRegistry: Manages event schemas, validation, and symbol-to-string conversion
`javascript
import { Engine } from '@jucie-engine/core';
import { MessageBus, Channel } from '@jucie-engine/message';
const engine = Engine.create()
.install(MessageBus.configure({
mode: 'development',
schemas: [userEvents, systemEvents]
}))
.install(Channel.configure());
`
Define typed events using the defineEvents function:
`javascript
import { defineEvents } from '@brickworks/message';
const userEvents = defineEvents('user', () => ({
login: ['String', 'String'], // username, password
logout: ['String'], // username
profile: {
update: ['Object'],
delete: ['String']
}
}));
const systemEvents = defineEvents('system', () => ({
ready: '*', // wildcard - accepts any payload
error: ['String', 'Object'],
config: null // no payload validation
}));
`
- Array: ['String', 'Object'] - Validates argument count and types'*'
- Wildcard: - Accepts any payloadnull
- Null: - No payload validation
- Nested: Objects create namespaced events
`javascript
// MessageBus (Primary) - typically on main thread
const messageBusEngine = Engine.create()
.install(MessageBus.configure({
mode: 'development',
schemas: [userEvents]
}))
.install(Channel.configure());
// Channel (Secondary) - typically in worker/iframe
const channelEngine = Engine.create()
.install(Channel.configure());
`
`javascript
// Create a channel from MessageBus
const port = messageBusEngine.messageBus.createChannel('worker1');
// Connect Channel to MessageBus
await channelEngine.channel.connect(port);
// Verify connection and event schemas are available
expect(channelEngine.channel.events).toBeDefined();
`
`javascript
// Subscribe to events
channelEngine.channel.subscribe(
channelEngine.channel.events.user.login,
(username, password) => {
console.log('User login:', username);
}
);
// Publish events
channelEngine.channel.publish(
channelEngine.channel.events.user.login,
'john_doe',
'secret123'
);
`
#### One-time Subscriptions
`javascript`
channelEngine.channel.subscribeOnce(
channelEngine.channel.events.system.ready,
() => console.log('System is ready!')
);
#### Namespaced Events
`javascript
// Subscribe to namespaced event
channelEngine.channel.namespace('admin').subscribe(
channelEngine.channel.events.user.login,
(username) => console.log('Admin login:', username)
);
// Publish to namespaced event
channelEngine.channel.namespace('admin').publish(
channelEngine.channel.events.user.login,
'admin_user'
);
`
#### Targeted Messaging
`javascript`
// Send message to specific channel only
channelEngine.channel.to('worker1').publish(
channelEngine.channel.events.user.logout,
'username'
);
#### Multiple Subscribers
`javascript
const subscriber1 = (data) => console.log('Sub1:', data);
const subscriber2 = (data) => console.log('Sub2:', data);
// Both subscribers will receive the message
channelEngine.channel.subscribe(event, subscriber1);
channelEngine.channel.subscribe(event, subscriber2);
`
`javascript
// Check if channel is active
const isActive = messageBusEngine.messageBus.isChannelActive('worker1');
// Enable/disable channels
await messageBusEngine.messageBus.disableChannel('worker1');
await messageBusEngine.messageBus.enableChannel('worker1');
// Use external MessageChannel
const { MessageChannel } = await import('worker_threads');
const messageChannel = new MessageChannel();
messageBusEngine.messageBus.useChannel('external', messageChannel.port1);
`
The system provides comprehensive error handling:
`javascript
// Unknown events throw errors
try {
channelEngine.channel.publish('unknown:event', 'data');
} catch (error) {
console.error('Unknown event:', error.message);
}
// Connection errors are handled gracefully
const result = await channelEngine.channel.connect(invalidPort);
if (result === false) {
console.error('Connection failed');
}
`
Events are automatically validated against their schemas:
`javascript
// This will pass validation
channelEngine.channel.publish(
channelEngine.channel.events.user.login,
'username', // String
'password' // String
);
// This will fail validation and log warnings
channelEngine.channel.publish(
channelEngine.channel.events.user.login,
'username', // String ✓
123 // Number ✗ (expected String)
);
`
`javascript`
MessageBus.configure({
mode: 'development', // 'development' | 'production' | 'test'
schemas: [userEvents, systemEvents] // Event schema definitions
})
`javascript`
Channel.configure({
port: messagePort, // Optional: pre-configured MessagePort
subscriptions: [ // Optional: pre-configured subscriptions
{
name: 'user.login',
action: 'subscribe',
event: 'user:login',
namespace: 'admin',
subscriber: (username) => console.log(username)
}
]
})
The extension includes comprehensive test coverage:
`bash`
npm test
Tests cover:
- Basic connection flow
- Message publishing and subscribing
- Multiple subscribers and one-time subscriptions
- Namespace functionality
- Targeted messaging
- Error handling
- Channel management
- Event validation
- Complex event schemas
- createChannel(name, callback?) - Create a new channeluseChannel(name, port, callback?)
- - Use external MessageChanneldisableChannel(name)
- - Disable a channelenableChannel(name)
- - Enable a channelisChannelActive(name)
- - Check if channel is active
- connect(port, callback?) - Connect to MessageBuspublish(event, ...payload)
- - Publish an eventsubscribe(event, subscriber)
- - Subscribe to an eventsubscribeOnce(event, subscriber)
- - Subscribe once to an eventunsubscribe(event, subscriber)
- - Unsubscribe from an eventnamespace(name)
- - Create namespaced channelto(...channels)
- - Create targeted channel
- processSchemas(schemas) - Process event schemasvalidateEvent(event, payload)
- - Validate event payloadgetEventObject()
- - Get nested event object with symbolsgetPath(symbol)
- - Convert symbol to string pathgetSymbol(path)
- - Convert string path to symbol
1. Define schemas early - Set up event schemas before creating channels
2. Use typed events - Leverage schema validation for better debugging
3. Handle errors gracefully - Always wrap channel operations in try-catch
4. Clean up subscriptions - Use unsubscribe functions to prevent memory leaks
5. Use namespaces - Organize events with namespaces for better structure
6. Test thoroughly - Use the comprehensive test suite as a reference
See the test files for complete working examples:
- __tests__/MessageBus.test.js - Integration tests__tests__/EventRegistry.test.js
- - Event schema tests__tests__/Port.test.js` - Low-level communication tests
-