Saxe Edge - MQTT batch processing with offline buffering for Node-RED
npm install node-red-contrib-saxe-edgeSaxe Edge - MQTT batch processing with offline buffering for Node-RED.
This plugin provides a configuration node and example flows for implementing the Saxe Edge ST1 protocol: buffered sensor data collection, GPS aggregation, batch processing, and reliable MQTT uplink with offline resilience.
- Offline Buffering: SQLite-based outbox for resilient data storage during network outages
- Batch Processing: Automatic batching with 3-limit enforcement (records, bytes, time span)
- GPS Aggregation: Combines separate latitude/longitude readings into atomic ST1 GPS format (v5/v6)
- Poison Record Handling: Isolates invalid records to dead_letter table
- Sequence Tracking: Monotonic sequence numbers per device for cloud-side deduplication
- MQTT QoS 1: Reliable delivery with PUBACK confirmation before record deletion
- Configurable: Shared credentials per organization/site
1. Open Node-RED editor
2. Click the menu (☰) → Manage palette
3. Go to the "Install" tab
4. Search for node-red-contrib-saxe-edge
5. Click "Install"
``bash`
cd ~/.node-red
npm install node-red-contrib-saxe-edge
Restart Node-RED after installation.
1. In Node-RED editor, deploy any flow to make the config node available
2. Add any node that requires configuration (or use the example flows)
3. Click the pencil icon to add a new saxe-edge-configacme_corp
4. Configure:
- Organization: Your org identifier (e.g., )factory_oslo
- Site: Your site identifier (e.g., )wss://mqtt.saxe-cloud.com:443
- MQTT Broker: Broker URL (e.g., or mqtt://localhost:1883)/edge-data/outbox.db
- MQTT User/Password: Optional credentials
- Batch Size: Max records per batch cycle (default: 100)
- Database Path: SQLite database location (default: )
5. Click "Add" or "Update"
The config node will automatically:
- Generate a unique Edge UUID (persisted to edge-uuid.txt)ST1/edge/{org}/{site}/{edge_uuid}/online
- Set global context variables for use in flows
- Publish online/offline presence to
Example flows are provided in the examples/ directory:
1. Copy the contents of examples/saxe-edge-batch.json
2. In Node-RED, click menu → Import
3. Paste the JSON
4. Click "Import"
The example flows include:
- ST1 - Batch: Core batch processing (database init, batch cycle, MQTT uplink)
- ST1 - Ingest: GPS aggregation and sensor data ingestion examples
Send sensor data to the ingest flow with the following message format:
`javascript`
msg.payload = {
value: 23.5,
unit: "°C"
};
msg.domain = "devices"; // Optional, use for device status
msg.subsystem = "line1/zoneA"; // Optional path (e.g., "area/cell")
msg.device = "PLC-01"; // Device identifier
msg.metric = "temperature"; // Metric name
Device status example:
`javascript`
msg.payload = {
status: "online",
ts: new Date().toISOString()
};
msg.domain = "devices";
msg.device = "PLC-01";
msg.metric = "status";
GPS example (send latitude + longitude as separate messages):
`javascript`
msg.payload = { value: 59.9139, unit: "deg" };
msg.device = "gps";
msg.metric = "latitude"; // or "longitude"
The flow will:
1. Transform to ST1 format
2. Buffer in SQLite outbox
3. Batch on a 1 second interval (internal timer)
4. Publish to MQTT with QoS 1
5. Delete records after PUBACK confirmation
| Property | Type | Required | Default | Description |
|----------|------|----------|---------|-------------|
| Organization | string | Yes | - | Organization identifier for MQTT topic |
| Site | string | Yes | - | Site identifier for MQTT topic |
| MQTT Broker | string | Yes | - | Broker URL (mqtt://, mqtts://, ws://, wss://) |
| MQTT User | string | No | - | Optional username for broker authentication |
| MQTT Password | string | No | - | Optional password for broker authentication |
| Batch Size | number | Yes | 100 | Max records to claim per batch cycle |
| Database Path | string | Yes | /edge-data/outbox.db | Path to SQLite database file |
The config node sets these global context variables for use in flows:
- EDGE_UUID: Unique edge device identifierORG
- : Organization identifierSITE
- : Site identifierMQTT_BROKER
- : MQTT broker URLMQTT_USE_TLS
- : Boolean, true for mqtts:// or wss://BATCH_SIZE
- : Max records per batchDB_PATH
- : Database file path
Access in function nodes:
`javascript`
const edgeUuid = global.get('EDGE_UUID');
const org = global.get('ORG');
const site = global.get('SITE');
The plugin creates three tables:
| Column | Type | Description |
|--------|------|-------------|
| id | INTEGER PK | Auto-increment ID |
| topic | TEXT | MQTT topic |
| payload | TEXT | JSON payload |
| created_at | TEXT | ISO 8601 timestamp |
| sent | INTEGER | 0=pending, 1=sent, 2=inflight |
Indexes: (sent, id), (created_at)
| Column | Type | Description |
|--------|------|-------------|
| group_key | TEXT PK | Unique per {org}/{site}/{subsystem}/{device} |
| seq | INTEGER | Last sequence number used |
| Column | Type | Description |
|--------|------|-------------|
| id | INTEGER PK | Auto-increment ID |
| original_id | INTEGER | Original outbox record ID |
| topic | TEXT | Original MQTT topic |
| payload | TEXT | Original JSON payload |
| error | TEXT | Error description |
| created_at | TEXT | Original creation timestamp |
| moved_at | TEXT | Timestamp when moved to dead_letter |
``
ST1/batch/sensors/{org}/{site}/{subsystem_path?}/{device}
Example: ST1/batch/sensors/acme_corp/factory_oslo/line1/zoneA/PLC-01
``
ST1/batch/devices/{org}/{site}/{subsystem_path?}/{device}/status
Example: ST1/batch/devices/acme_corp/factory_oslo/line1/zoneA/PLC-01/status
``
ST1/batch/sensors/{org}/{site}/gps
Example: ST1/batch/sensors/acme_corp/factory_oslo/gps
``
ST1/edge/{org}/{site}/{edge_uuid}/online
Presence payload includes {online, ts, edge_uuid, org, site}.
`json`
{
"v": 1,
"org": "acme_corp",
"site": "factory_oslo",
"subsystem": "line1/zoneA",
"device": "PLC-01",
"edge_uuid": "87a70b51-b370-427c-bc7e-06ee62dcea61",
"seq": 42,
"records": [
{
"ts": "2026-01-08T10:00:00Z",
"metric": "temperature",
"value": 23.5,
"unit": "°C"
}
]
}
Note: edge_uuid is automatically included in all batch payloads (v1.0.9+). This enables the cloud to track which edge device sent the data, supporting scenarios with multiple edge devices per site and enabling offline status indication in frontend applications.
`json`
{
"v": 1,
"org": "acme_corp",
"site": "factory_oslo",
"edge_uuid": "87a70b51-b370-427c-bc7e-06ee62dcea61",
"seq": 12,
"records": [
{
"ts": "2026-01-08T10:00:00Z",
"lat": 59.9139,
"lon": 10.7522
}
]
}
Batches are closed when ANY of these limits is reached:
1. Max Records: 5000 records per batch
2. Max Bytes: 1 MB payload size
3. Max Time Span: 5 seconds between first and last record
Batch interval: 1 second (internal timer in saxe-edge-buffer)
If you see "SQLITE_READONLY" errors:
`bashEnsure directory exists and is writable
mkdir -p /edge-data
chmod 755 /edge-data
$3
1. Check broker URL format: Must include protocol (mqtt://, mqtts://, ws://, wss://)
2. Verify credentials: Ensure username/password are correct
3. Test connectivity: Use mosquitto_pub to verify broker is reachable
4. Check TLS: For mqtts:// or wss://, ensure certificates are valid
$3
1. Check outbox: Query database to see if records are accumulating
`bash
sqlite3 /edge-data/outbox.db "SELECT COUNT(*) FROM outbox WHERE sent = 0;"
`
2. Check debug output: Enable debug nodes in the batch flow
3. Verify MQTT connection: Check if MQTT node shows "connected" status
4. Check batch cycle: Ensure the 1-second inject node is runningDocker vs Plugin
This plugin supports two deployment models:
$3
For greenfield deployments with full control:
`yaml
services:
nodered:
image: nodered/node-red:latest
volumes:
- nodered-data:/data
- ./edge-data:/edge-data
environment:
- ORG=acme_corp
- SITE=factory_oslo
- MQTT_BROKER=wss://mqtt.saxe-cloud.com:443
- MQTT_USER=edge_user
- MQTT_PASSWORD=secret
- BATCH_SIZE=100
`Docker setup includes:
- Auto-installation of plugin via npm
- Auto-configuration of config node from environment variables
- Simulation flows for testing
$3
For existing Node-RED instances:
1. Install plugin via palette manager
2. Create config node manually
3. Import example flows
4. Connect your sensors
Architecture
`
┌─────────────┐
│ Sensors │
│ (Modbus, │
│ GPS, etc.) │
└──────┬──────┘
│
▼
┌─────────────┐
│ ST1 Ingest │ ← Transform to ST1 format
│ Flow │ ← GPS aggregation (lat/lon → {ts,lat,lon})
└──────┬──────┘
│
▼
┌─────────────┐
│ SQLite │ ← Append to outbox table
│ Outbox │
└──────┬──────┘
│
▼
┌─────────────┐
│ Batch Cycle │ ← Every 1 second
│ (1s timer) │
└──────┬──────┘
│
▼
┌─────────────┐
│ Claim Recs │ ← Atomic UPDATE...RETURNING
└──────┬──────┘
│
▼
┌─────────────┐
│ Build Batch │ ← Group by device, enforce 3 limits
└──────┬──────┘
│
▼
┌─────────────┐
│Handle Poison│ ← Move invalid → dead_letter
└──────┬──────┘
│
▼
┌─────────────┐
│ Apply Seq │ ← UPSERT seq_state, assign seq
└──────┬──────┘
│
▼
┌─────────────┐
│ Format ST1 │ ← Build final JSON payload
└──────┬──────┘
│
▼
┌─────────────┐
│ MQTT Publish│ ← QoS 1 to cloud
│ (QoS 1) │
└──────┬──────┘
│
▼
┌─────────────┐
│ PUBACK │ ← Wait for confirmation
└──────┬──────┘
│
▼
┌─────────────┐
│DELETE Recs │ ← Remove from outbox
└─────────────┘
`Examples
See
examples/ directory for:-
saxe-edge-batch.json: Core batch processing flow
- sensor-simulation.json: Example sensor data generation (for testing)
- gps-simulation.json: Example GPS data generation (for testing)Development
`bash
Clone repository
git clone https://github.com/saxe/node-red-contrib-saxe-edge.git
cd node-red-contrib-saxe-edgeInstall dependencies
npm installLink for local development
npm linkIn your Node-RED directory
cd ~/.node-red
npm link node-red-contrib-saxe-edgeRestart Node-RED
`Testing
`bash
npm test
``Saxe Noncommercial No-Derivatives License. See LICENSE.
Commercial use is permitted only for Saxe and Saxe Customers as defined in the
license.
Noncommercial users may use and distribute unmodified copies only.
- Issues: https://github.com/saxe/node-red-contrib-saxe-edge/issues
- Documentation: https://github.com/saxe/node-red-contrib-saxe-edge/blob/main/README.md
- Specification: See ST1_MQTT_SPEC_v6.md for ST1 protocol details and EDGE_OFFLINE_BUFFER_SPEC_v5.md for edge buffering
See CHANGELOG.md for version history and migration guides.
We do not accept external contributions. Please contact Saxe if you need changes.
---
Saxe Edge - Reliable IoT data collection at the edge.