Declarative streaming library powered by Effect.js, inspired by Apache Camel and Benthos
npm install effect-connect

Declarative streaming library powered by Effect.js, inspired by Apache Camel and Benthos.
Build type-safe data pipelines with YAML configuration for message processing.
- Declarative YAML Configuration - Define pipelines without code
- Type-Safe - Built with TypeScript and Effect.js for compile-time safety
- Stream Processing - Handle high-throughput message streams efficiently
- Backpressure Control - Prevent overwhelming downstream systems
- Dead Letter Queue (DLQ) - Graceful failure handling with automatic retries
- Built-in Observability - Automatic metrics, tracing, and correlation IDs
- Modular Architecture - Pluggable inputs, processors, and outputs
- Production-Ready - Connection pooling, batch processing, error categorization
``bash`
npx effect-connect run my-pipeline.yaml
`bash`
npm install -g effect-connect
effect-connect run my-pipeline.yaml
`bash`
npm install effect-connect
npx effect-connect run my-pipeline.yaml
Create a pipeline configuration file (e.g., my-pipeline.yaml):
Example 1: HTTP Webhook Forwarder
`yaml
input:
http:
port: 8080
path: "/webhook"
pipeline:
processors:
- metadata:
correlation_id_field: "correlationId"
add_timestamp: true
- log:
level: info
output:
http:
url: "https://api.example.com/events"
method: POST
headers:
Content-Type: "application/json"
`
Example 2: SQS to SQS Pipeline
`yaml
input:
aws_sqs:
url: "https://sqs.us-east-1.amazonaws.com/123456789012/input-queue"
region: "us-east-1"
pipeline:
processors:
- metadata:
correlation_id_field: "correlationId"
output:
aws_sqs:
url: "https://sqs.us-east-1.amazonaws.com/123456789012/output-queue"
region: "us-east-1"
`
`bashUsing npx (recommended)
npx effect-connect run my-pipeline.yaml
$3
For HTTP input pipelines, send test requests:
`bash
Start the pipeline
effect-connect run my-pipeline.yamlIn another terminal, send a test request
curl -X POST http://localhost:8080/webhook \
-H "Content-Type: application/json" \
-d '{"event": "user_signup", "user_id": 12345}'
`$3
`bash
Run a pipeline
effect-connect run Run with debug logging
effect-connect run --debugShow help
effect-connect --helpShow version
effect-connect --version
`$3
Enable detailed debug logging to troubleshoot pipeline configuration and execution:
`bash
Enable debug mode
effect-connect run my-pipeline.yaml --debug
`Debug mode provides:
- Configuration Details: View the parsed YAML configuration
- Pipeline Building: See how inputs, processors, and outputs are constructed
- Component Initialization: Track when components start and connect
- Processing Flow: Monitor message flow through the pipeline
Example debug output:
`
DEBUG MODE ENABLED
[23:06:11.565] DEBUG (#1): Loaded config: {
"input": {
"http": {
"port": 8080,
"host": "0.0.0.0",
"path": "/webhook"
}
},
...
}
[23:06:11.565] DEBUG (#1): buildPipeline received config: {...}
[23:06:11.565] DEBUG (#1): buildInput received config: {...}
`Programmatic Usage
You can also use Effect Connect as a library in your TypeScript/JavaScript projects:
`typescript
import { loadConfig } from "effect-connect"
import { buildPipeline } from "effect-connect"
import { run } from "effect-connect"
import { Effect } from "effect"const program = Effect.gen(function* () {
// Load configuration
const config = yield* loadConfig("my-pipeline.yaml")
// Build pipeline
const pipeline = yield* buildPipeline(config)
// Run pipeline
const result = yield* run(pipeline)
return result
})
// Execute
Effect.runPromise(program)
`$3
For local development with LocalStack and Redis, see the Local Development Guide.
Configuration Example
`yaml
input:
aws_sqs:
url: "https://sqs.us-east-1.amazonaws.com/123456789012/input-queue"
region: "us-east-1"
# See docs/inputs/sqs.md for all optionspipeline:
backpressure:
max_concurrent_messages: 10
max_concurrent_outputs: 5
processors:
- metadata:
correlation_id_field: "correlationId"
# See docs/processors/metadata.md
- mapping:
expression: |
{
"fullName": $uppercase(firstName) & " " & $uppercase(lastName),
"email": $lowercase(email)
}
# See docs/processors/mapping.md
output:
redis_streams:
url: "rediss://production-redis.example.com:6379"
stream: "processed-messages"
max_length: 10000
tls: true
# See docs/outputs/redis-streams.md
Optional: Dead Letter Queue for failures
dlq:
aws_sqs:
url: "https://sqs.us-east-1.amazonaws.com/123456789012/dlq-queue"
region: "us-east-1"
# See docs/advanced/dlq.md
`Components
$3
- HTTP - Receive webhook POST requests
- AWS SQS - Read from AWS SQS queues
- Redis Streams - Read from Redis Streams (simple or consumer-group mode)
$3
- Metadata - Add correlation IDs and timestamps
- Uppercase - Transform fields to uppercase
- Mapping - JSONata transformations (complex data manipulation)
- Logging - Log message flow for debugging
$3
- HTTP - Send to HTTP/HTTPS endpoints (webhooks, APIs)
- AWS SQS - Send to SQS queues (single or batch mode)
- Redis Streams - Send to Redis Streams with length management
$3
- Dead Letter Queue (DLQ) - Handle failures with automatic retries and error enrichment
- Backpressure Control - Control message throughput and concurrency
- Bloblang Integration - Use Benthos Bloblang syntax (for migrations)
Example Configurations
Explore ready-to-use configurations in
configs/:- http-webhook-example.yaml - HTTP webhook server forwarding to HTTP endpoint
- example-pipeline.yaml - Basic pipeline (SQS β Processors β Redis)
- dlq-example.yaml - Dead Letter Queue configuration
- backpressure-example.yaml - Backpressure and batch timeout
- advanced-connection.yaml - Production connection settings
Project Structure
`
effect-connect/
βββ src/
β βββ core/ # Pipeline orchestration, types, config loader
β βββ inputs/ # SQS, Redis Streams
β βββ processors/ # Metadata, Uppercase, Mapping, Logging
β βββ outputs/ # SQS, Redis Streams
β βββ cli.ts # CLI entry point
βββ docs/
β βββ inputs/ # Detailed input documentation
β βββ processors/ # Detailed processor documentation
β βββ outputs/ # Detailed output documentation
β βββ advanced/ # DLQ, Backpressure, Bloblang guides
β βββ COMPONENTS.md # Component development guide
βββ configs/ # Example pipeline configurations
βββ tests/
βββ unit/ # Unit tests (154 passing)
βββ e2e/ # End-to-end tests
`Development
$3
Effect Connect uses a scalable testing strategy that avoids NΓN test explosion:
`typescript
import { Effect } from "effect"
import {
createGenerateInput,
createCaptureOutput,
createPipeline,
runPipeline
} from "effect-connect"// Generate test messages
const input = createGenerateInput({
count: 5,
template: {
id: "msg-{{index}}",
value: "{{random}}"
}
})
// Capture output for assertions
const output = await Effect.runPromise(createCaptureOutput())
// Test your component
const pipeline = createPipeline({
name: "test",
input,
processors: [yourProcessor],
output
})
await Effect.runPromise(runPipeline(pipeline))
const messages = await Effect.runPromise(output.getMessages())
expect(messages).toHaveLength(5)
`Key Benefits:
- β
Test components in isolation
- β
No external dependencies needed
- β
Linear test growth: N components = ~3N tests (not NΒ²)
- β
Fast execution: 228 tests in < 10 seconds
See docs/TESTING.md for complete testing guide.
$3
`bash
All tests
npm testUnit tests only (228 tests)
npm test:unitE2E tests only
npm test:e2eWith coverage
npm test:coverage
`$3
`bash
npm run build
`$3
`bash
npm run lint
`Architecture
Effect Connect uses a functional, type-safe architecture powered by Effect.js:
`
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Pipeline β
β β
β Input Stream β Processorβ β Processorβ β Output β
β β β β β β
β Effect.Stream Effect Effect Effect β
β β
β Backpressure βββββββββββββββββββββββββββββββββββββββ β
β DLQ βββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
`$3
- Effect.js Foundation: All components use Effect monad for error handling
- Stream Processing: Inputs produce
Stream, processors transform via Effect
- Type Safety: Full TypeScript types with Effect.js schema validation
- Resource Management: Automatic cleanup with Effect's resource management
- Observability: Built-in metrics, tracing, and correlation IDsFor more details, see Component Development Guide.
Effect.js Integration
Effect Connect is built on Effect.js, a powerful library for functional programming in TypeScript:
- Error Handling: Type-safe errors with automatic retry logic
- Resource Management: Automatic cleanup of connections and resources
- Concurrency: Built-in backpressure and concurrent processing
- Composability: Pipeline components compose naturally with Effect operators
- Observability: Automatic spans, traces, and metrics collection
$3
Configurations are validated using Effect Schema:
`typescript
import { Schema } from "effect/Schema"const SqsInputConfig = Schema.Struct({
url: Schema.String,
region: Schema.String,
endpoint: Schema.optional(Schema.String),
wait_time_seconds: Schema.optional(Schema.Number),
max_number_of_messages: Schema.optional(Schema.Number),
})
`This provides:
- Type-safe configuration parsing
- Helpful error messages for invalid configs
- Auto-completion in IDEs
- Compile-time validation
Use Cases
- Webhook Forwarding - Receive webhooks and forward to multiple destinations with transformation
- Event-Driven Architectures - Process events between microservices
- Data Pipelines - ETL and data transformation workflows
- Message Queue Processing - Reliable message consumption and production
- Stream Processing - Real-time data processing with backpressure
- Integration Patterns - Connect different systems and protocols
- API Gateway Patterns - Route and transform HTTP requests to backend services
Why Effect Connect?
| Feature | Effect Connect | Benthos | Apache Camel |
|---------|------------------|---------|--------------|
| Language | TypeScript | Go | Java/Kotlin |
| Type Safety | β (Effect.js) | β | β (with Kotlin) |
| Configuration | YAML | YAML | Java/XML/YAML |
| Streaming | Effect.js Streams | Native | Camel Streams |
| Error Handling | Effect monad | Go errors | Exceptions |
| Observability | Built-in | β | β |
| Best For | Node.js projects | Go projects | JVM projects |
Future Enhancements
- [x] HTTP input and output
- [ ] More inputs (Kafka, File, Kinesis, WebSocket)
- [ ] More processors (Filter, Transform, Enrich, Split/Join)
- [ ] More outputs (Postgres, S3, Elasticsearch, gRPC)
- [ ] Circuit breaker pattern
- [ ] Web UI for pipeline management
- [ ] OpenTelemetry exporter integration
- [ ] Kafka Connect compatibility
- [ ] GraphQL processor
- [ ] Rate limiting processor
- [ ] Caching layer
Documentation
- Complete Component Catalog - Detailed documentation for all components
- Local Development Setup - LocalStack and Docker Compose guide
- Component Development Guide - Build custom components
- Example Configurations - Ready-to-use pipeline examples
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
1. Fork the repository
2. Create your feature branch (
git checkout -b feature/amazing-feature)
3. Commit your changes (git commit -m 'Add amazing feature')
4. Push to the branch (git push origin feature/amazing-feature`)MIT
- Inspired by Apache Camel
- Inspired by Benthos / Redpanda Connect
- Built with Effect.js
- Powered by JSONata for transformations