Redis event broadcaster for Uploadista
IORedis-backed event broadcaster for Uploadista. Provides clustered event broadcasting with automatic failover and replica scaling.
The IORedis event broadcaster uses the IORedis library's advanced features for distributed event broadcasting. Perfect for:
- Redis Clustering: Built-in cluster support with auto-discovery
- Sentinel HA: Automatic failover with Redis Sentinel
- Replica Scaling: Distribute read load across replicas
- Large Deployments: 50+ server instances with efficient broadcasting
- Production High-Availability: Enterprise-grade resilience
Events published to a channel reach all subscribers across all servers, with automatic failover if nodes go down.
``bash`
npm install @uploadista/event-broadcaster-ioredis ioredisor
pnpm add @uploadista/event-broadcaster-ioredis ioredis
- Node.js 18+
- Redis 5.0+ or Redis Cluster
- Optional: Redis Sentinel for automatic failover
- Two IORedis connections (one for pub, one for sub)
`typescript
import { ioRedisEventBroadcaster } from "@uploadista/event-broadcaster-ioredis";
import Redis from "ioredis";
import { Effect } from "effect";
// Create IORedis clients (one for pub, one for sub)
const redis = new Redis({
host: "localhost",
port: 6379,
});
const subscriberRedis = new Redis({
host: "localhost",
port: 6379,
});
const program = Effect.gen(function* () {
// Event broadcaster is automatically available
});
Effect.runSync(
program.pipe(
Effect.provide(
ioRedisEventBroadcaster({
redis,
subscriberRedis,
})
),
// ... other layers
)
);
`
- ✅ Redis Clustering: Automatic cluster discovery and failover
- ✅ Sentinel Support: Monitored automatic master failover
- ✅ Replica Reads: Distribute read load across replicas
- ✅ Cluster Scaling: Add nodes dynamically without reconfiguration
- ✅ Connection Pooling: Efficient resource management
- ✅ Advanced Retry Logic: Robust connection recovery
#### ioRedisEventBroadcaster(config: IoRedisEventBroadcasterConfig): Layer
Creates an Effect layer providing the EventBroadcasterService backed by IORedis.
`typescript
import { ioRedisEventBroadcaster } from "@uploadista/event-broadcaster-ioredis";
import Redis from "ioredis";
const redis = new Redis({ host: "localhost" });
const subscriberRedis = new Redis({ host: "localhost" });
const layer = ioRedisEventBroadcaster({
redis,
subscriberRedis,
});
`
Configuration:
`typescript`
interface IoRedisEventBroadcasterConfig {
redis: Redis; // Connection for publishing
subscriberRedis: Redis; // Connection for subscribing
}
#### createIoRedisEventBroadcaster(config: IoRedisEventBroadcasterConfig): EventBroadcaster
Factory function to create a broadcaster instance.
`typescript
import { createIoRedisEventBroadcaster } from "@uploadista/event-broadcaster-ioredis";
const broadcaster = createIoRedisEventBroadcaster({
redis,
subscriberRedis,
});
`
The IORedis broadcaster implements the EventBroadcaster interface:
#### publish(channel: string, message: string): Effect
Broadcast a message to all subscribers (across cluster).
`typescript`
const program = Effect.gen(function* () {
yield* broadcaster.publish("uploads:complete", JSON.stringify({
uploadId: "abc123",
duration: 45000,
}));
});
#### subscribe(channel: string, handler: (message: string) => void): Effect
Subscribe to a channel (with cluster awareness).
`typescriptUpload complete: ${event.uploadId}
const program = Effect.gen(function* () {
yield* broadcaster.subscribe("uploads:complete", (message: string) => {
const event = JSON.parse(message);
console.log();`
});
});
#### unsubscribe(channel: string): Effect
Unsubscribe from a channel.
`typescript`
const program = Effect.gen(function* () {
yield* broadcaster.unsubscribe("uploads:complete");
});
`typescript
import { ioRedisEventBroadcaster } from "@uploadista/event-broadcaster-ioredis";
import Redis from "ioredis";
const redis = new Redis({
host: "localhost",
port: 6379,
password: process.env.REDIS_PASSWORD,
});
const subscriberRedis = new Redis({
host: "localhost",
port: 6379,
password: process.env.REDIS_PASSWORD,
});
const layer = ioRedisEventBroadcaster({
redis,
subscriberRedis,
});
`
With automatic discovery:
`typescript
import { ioRedisEventBroadcaster } from "@uploadista/event-broadcaster-ioredis";
import Redis from "ioredis";
const redis = new Redis.Cluster(
[
{ host: "cluster-node-1", port: 6379 },
{ host: "cluster-node-2", port: 6379 },
{ host: "cluster-node-3", port: 6379 },
],
{
redisOptions: {
password: process.env.REDIS_PASSWORD,
},
}
);
const subscriberRedis = new Redis.Cluster(
[
{ host: "cluster-node-1", port: 6379 },
{ host: "cluster-node-2", port: 6379 },
{ host: "cluster-node-3", port: 6379 },
],
{
redisOptions: {
password: process.env.REDIS_PASSWORD,
},
}
);
const layer = ioRedisEventBroadcaster({
redis: redis as any,
subscriberRedis: subscriberRedis as any,
});
`
For automatic failover:
`typescript
import { ioRedisEventBroadcaster } from "@uploadista/event-broadcaster-ioredis";
import Redis from "ioredis";
const redis = new Redis({
sentinels: [
{ host: "sentinel-1", port: 26379 },
{ host: "sentinel-2", port: 26379 },
{ host: "sentinel-3", port: 26379 },
],
name: "mymaster",
sentinelPassword: process.env.SENTINEL_PASSWORD,
password: process.env.REDIS_PASSWORD,
});
const subscriberRedis = new Redis({
sentinels: [
{ host: "sentinel-1", port: 26379 },
{ host: "sentinel-2", port: 26379 },
{ host: "sentinel-3", port: 26379 },
],
name: "mymaster",
sentinelPassword: process.env.SENTINEL_PASSWORD,
password: process.env.REDIS_PASSWORD,
});
const layer = ioRedisEventBroadcaster({
redis,
subscriberRedis,
});
`
`typescript
import { ioRedisEventBroadcaster } from "@uploadista/event-broadcaster-ioredis";
import Redis from "ioredis";
const createBroadcaster = () => {
let redis: Redis;
let subscriberRedis: Redis;
if (process.env.REDIS_CLUSTER_NODES) {
// Cluster mode
const nodes = process.env.REDIS_CLUSTER_NODES.split(",").map(
(addr) => {
const [host, port] = addr.split(":");
return { host, port: parseInt(port) };
}
);
redis = new Redis.Cluster(nodes, {
redisOptions: {
password: process.env.REDIS_PASSWORD,
},
});
subscriberRedis = new Redis.Cluster(nodes, {
redisOptions: {
password: process.env.REDIS_PASSWORD,
},
});
} else {
// Single instance
redis = new Redis({
url: process.env.REDIS_URL || "redis://localhost:6379",
});
subscriberRedis = new Redis({
url: process.env.REDIS_URL || "redis://localhost:6379",
});
}
return ioRedisEventBroadcaster({
redis: redis as any,
subscriberRedis: subscriberRedis as any,
});
};
const layer = createBroadcaster();
`
Large deployment across many servers:
`typescript
import { ioRedisEventBroadcaster } from "@uploadista/event-broadcaster-ioredis";
import { uploadServer } from "@uploadista/server";
import Redis from "ioredis";
import { Effect } from "effect";
// 5-node cluster
const redis = new Redis.Cluster(
[
{ host: "redis-1.cluster", port: 6379 },
{ host: "redis-2.cluster", port: 6379 },
{ host: "redis-3.cluster", port: 6379 },
{ host: "redis-4.cluster", port: 6379 },
{ host: "redis-5.cluster", port: 6379 },
],
{
redisOptions: {
password: process.env.REDIS_PASSWORD,
},
}
);
const subscriberRedis = new Redis.Cluster(
// Same cluster nodes
{
redisOptions: {
password: process.env.REDIS_PASSWORD,
},
}
);
const program = Effect.gen(function* () {
const server = yield* uploadServer;
// Subscribe for uploads completing
yield* broadcaster.subscribe("uploads:complete", (message: string) => {
const { uploadId, size } = JSON.parse(message);
console.log(Upload complete: ${uploadId} (${size} bytes));
// Trigger flow processing
// Each server independently processes
});
// Server 1 completes upload
yield* broadcaster.publish("uploads:complete", JSON.stringify({
uploadId: "upl_server1_123",
size: 1048576,
completedBy: "server-1",
}));
// ALL servers (including server 1) receive notification
});
Effect.runSync(
program.pipe(
Effect.provide(
ioRedisEventBroadcaster({
redis: redis as any,
subscriberRedis: subscriberRedis as any,
})
)
)
);
`
Automatic failover if primary goes down:
`typescript
import { ioRedisEventBroadcaster } from "@uploadista/event-broadcaster-ioredis";
import Redis from "ioredis";
import { Effect } from "effect";
const redis = new Redis({
sentinels: [
{ host: process.env.SENTINEL_1, port: 26379 },
{ host: process.env.SENTINEL_2, port: 26379 },
{ host: process.env.SENTINEL_3, port: 26379 },
],
name: "uploadista-primary",
sentinelPassword: process.env.SENTINEL_PASSWORD,
password: process.env.REDIS_PASSWORD,
});
const subscriberRedis = new Redis({
sentinels: [
{ host: process.env.SENTINEL_1, port: 26379 },
{ host: process.env.SENTINEL_2, port: 26379 },
{ host: process.env.SENTINEL_3, port: 26379 },
],
name: "uploadista-primary",
sentinelPassword: process.env.SENTINEL_PASSWORD,
password: process.env.REDIS_PASSWORD,
});
const broadcaster = ioRedisEventBroadcaster({
redis,
subscriberRedis,
});
// If Redis primary fails:
// 1. Sentinel detects failure
// 2. Promotes replica automatically
// 3. IORedis reconnects to new primary
// 4. Events continue flowing (transparent failover)
`
`typescript
import { ioRedisEventBroadcaster } from "@uploadista/event-broadcaster-ioredis";
import Redis from "ioredis";
// Write to master, read from replicas
const redis = new Redis.Cluster(
[
{ host: "master", port: 6379 },
{ host: "replica-1", port: 6379 },
{ host: "replica-2", port: 6379 },
],
{
scaleReads: "slave", // Read from replicas
}
);
const subscriberRedis = new Redis.Cluster(
[
{ host: "master", port: 6379 },
{ host: "replica-1", port: 6379 },
{ host: "replica-2", port: 6379 },
],
{
scaleReads: "slave",
}
);
// Publishes always go to master
// Subscriptions distributed across replicas
const broadcaster = ioRedisEventBroadcaster({
redis: redis as any,
subscriberRedis: subscriberRedis as any,
});
`
| Operation | Latency | Throughput | Cluster Aware |
|-----------|---------|-----------|--------------|
| publish() | 1-2ms | 100k+ events/sec | ✅ |
| subscribe() | 2-5ms | N/A | ✅ |
| unsubscribe() | 1-2ms | N/A | ✅ |
| Failover | 1-5s | Auto-recovery | ✅ |
``
App Servers ──→ Pub Redis Master ──→ Replicate to ──→ Replica 1
Replica 2
Replica 3
``
Sentinel 1 ┐
Sentinel 2 ├──→ Monitor ──→ Primary Redis ──→ Replica 1
Sentinel 3 ┘ Replica 2
↓
On failure: Promote Replica 1
Apps reconnect automatically
``
Region 1: Cluster Nodes ─┐
Region 2: Cluster Nodes ├──→ Single logical cluster
Region 3: Cluster Nodes ─┘ (apps in any region)
`typescript
// Development: Single instance
const redis = new Redis();
// Production: Cluster
const redis = new Redis.Cluster(nodes);
`
`typescript`
const redis = new Redis({
maxRetriesPerRequest: 3,
retryStrategy: (times: number) => {
const delay = Math.min(times * 50, 2000);
return delay;
},
});
`bashCheck cluster nodes
redis-cli -c CLUSTER NODES
Deployment
$3
`yaml
version: "3"
services:
app:
environment:
REDIS_CLUSTER_NODES: redis1:6379,redis2:6379,redis3:6379 redis1:
image: redis:7-alpine
command: redis-server --cluster-enabled yes --cluster-config-file nodes.conf
redis2:
image: redis:7-alpine
command: redis-server --cluster-enabled yes --cluster-config-file nodes.conf
redis3:
image: redis:7-alpine
command: redis-server --cluster-enabled yes --cluster-config-file nodes.conf
`$3
`yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: redis-config
data:
redis.conf: |
cluster-enabled yes
cluster-config-file nodes.conf---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: redis-cluster
spec:
serviceName: redis
replicas: 6
template:
spec:
containers:
- name: redis
image: redis:7-alpine
ports:
- containerPort: 6379
- containerPort: 16379
`Related Packages
- @uploadista/core - Core types
- @uploadista/event-broadcaster-redis - Standard Redis broadcaster
- @uploadista/event-broadcaster-memory - Single-process broadcaster
- @uploadista/kv-store-ioredis - IORedis KV store
- @uploadista/server - Upload server
Troubleshooting
$3
Verify cluster nodes are reachable:
`bash
redis-cli -c
> CLUSTER INFOShould show: cluster_state:ok
`$3
Check Sentinel configuration:
`bash
redis-cli -p 26379
> SENTINEL MASTERSShould list your master instance
`$3
Check cluster topology:
`bash
redis-cli -c
> CLUSTER SLOTSVerify even slot distribution
``See LICENSE in the main repository.
- EVENT_SYSTEM.md - Architecture guide
- Redis Broadcaster - Simpler Redis option
- ioredis Documentation - Official ioredis docs
- Redis Cluster Tutorial - Redis clustering
- Redis Sentinel Guide - HA with Sentinel