A JavaScript/TypeScript implementation of an RDF-based orchestrator for managing and executing data processing pipelines.
npm install @rdfc/orchestrator-jsA JavaScript/TypeScript implementation of an RDF-based orchestrator for managing and executing data processing pipelines.
- Features
- Usage
- CLI
- Programmatic API
- Configuration
- Architecture
- Development
- Prerequisites
- Building
- Testing
- Linting & Formatting
- Contributing
- License
- ๐ Pipeline Management: Define and manage data processing pipelines using RDF
- โก TypeScript Support: Built with TypeScript for better developer experience
- ๐ Modular Architecture: Easily extensible with custom processors and runners
- ๐งช Test Coverage: Comprehensive test suite with Vitest
- ๐ ๏ธ Developer Tools: ESLint and Prettier for code quality
The orchestrator can be run using the provided CLI:
``bash`Install the orchestrator
npm install @rdfc/orchestrator-jsRun with a pipeline configuration
npx rdfc path/to/your/pipeline.ttl
The CLI tool loads the RDF pipeline configuration, starts the gRPC server, spawns the configured runners, initializes processors, and manages the entire pipeline lifecycle.
Pipeline configurations are defined using RDF/Turtle format. Here's an example configuration:
`turtle
@prefix rdfc:
@prefix owl:
rdfc:reader
rdfc:level "info";
rdfc:label "test".
`
- Node.js 16+
- npm 7+ or yarn
- TypeScript 4.7+
`bashInstall dependencies
npm install
$3
`bash
Run tests
npm testRun tests with coverage
npm test -- --coverageRun specific test file
npm test path/to/test/file.test.ts
`$3
`bash
Run linter
npm run lintFix linting issues
npm run lint -- --fixFormat code
npm run format
`Project Structure
`
orchestrator-js/
โโโ bin/ # Executable scripts
โ โโโ orchestrator.js # mainStream CLI entry point and pipeline executor
โโโ lib/ # Compiled JavaScript output
โโโ src/ # TypeScript source files
โ โโโ index.ts # mainStream export file
โ โโโ instantiator.ts # Runner instantiation logic
โ โโโ jsonld.ts # JSON-LD utilities and RDF processing
โ โโโ jsonld.ttl # JSON-LD processor definitions
โ โโโ logUtil.ts # Logging utilities
โ โโโ model.ts # Data models and types
โ โโโ model.ttl # RDF model definitions
โ โโโ orchestrator.ts # Core orchestrator logic
โ โโโ server.ts # gRPC server implementation
โ โโโ util.ts # Utility functions
โ โโโ pipeline.ttl # Pipeline configuration schema
โ โโโ minimal.ttl # Minimal example configuration
โโโ __tests__/ # Test files
โ โโโ orchestrator.test.ts
โ โโโ jsonld_derive.test.ts
โ โโโ config.ttl
โ โโโ ...
โโโ .github/ # GitHub workflows and templates
โโโ .husky/ # Git hooks
โโโ package.json # Project configuration and dependencies
โโโ tsconfig.json # TypeScript configuration
โโโ jest.config.js # Jest test configuration
โโโ eslint.config.mjs # ESLint configuration
โโโ .prettierrc # Prettier configuration
โโโ .editorconfig # Editor configuration
โโโ README.md # This file
`Contributing
Contributions are welcome! Please follow these steps:
1. Fork the repository
2. Create a feature branch (
git checkout -b feature/AmazingFeature)
3. Commit your changes (git commit -m 'Add some AmazingFeature')
4. Push to the branch (git push origin feature/AmazingFeature)
5. Open a Pull Request$3
We follow Conventional Commits for commit messages:
-
feat: New feature
- fix: Bug fix
- docs: Documentation changes
- style: Code style changes (formatting, etc.)
- refactor: Code refactoring
- test: Adding or modifying tests
- chore: Build process or auxiliary tool changesLicense
This project is licensed under the MIT License - see the LICENSE file for details.
---
Architecture
The system follows a modular architecture with the following main components:
- Orchestrator: Manages the overall pipeline execution
- Runners: Handle the execution of processing tasks
- Processors: Individual processing units that transform or analyze data
- Server/Client: Communication layer between components
$3
#### Initialization Sequence
Initialization sequence diagram
`mermaid
sequenceDiagram
autonumber
participant O as Orchestrator
participant R as Runner
participant P as Processor Note over O: Initialize gRPC server on port 50051 (by default)
Load and parse RDF pipeline configuration
O->>O: startInstantiators(addr, pipeline)
loop For each instantiator in pipeline
O->>O: expectRunner(instantiator)
Note over O: Create promise to wait for runner connection
O->>R: Spawn runner process with address
rect rgba(255, 0, 0, .1)
R->>O: stub.connect() as mainStream
end
rect rgba(0, 0, 255, .1)
R->>O: mainStream(FromRunner{identify: RunnerIdentify{ uri }})
end
O->>O: Resolve runner connection promise
rect rgba(0, 0, 255, .1)
O->>R: Send pipeline configuration
mainStream(ToRunner{ pipeline })
end
end
Note over O,P: Initialize all processors
loop For each processor in each runner
O->>O: expectProcessor(instantiator)
Note over O: Generate JSON-LD configuration for processor
rect rgba(0, 0, 255, .1)
O->>R: Start processor with configuration
mainStream(ToRunner{proc: Processor{ uri, config, arguments }})
end
R->>P: Initialize processor
P->>R: Processor ready
rect rgba(0, 0, 255, .1)
R->>O: Initialized message with processor URI
mainStream(FromRunner{initialized: ProcessorInitialized{ uri, error? }})
end
O->>O: Resolve processor startup promise
end
Note over O,P: Start all runners
loop For each runner
rect rgba(0, 0, 255, .1)
O->>R: Processors can start
mainStream(ToRunner{ start })
end
loop For each processor in runner
R->>P: Start processor execution
end
end
`
Message sequence diagram
`mermaid
sequenceDiagram
autonumber
participant P1 as Processor 1
participant R1 as Runner 1
participant O as Orchestrator
participant R2 as Runner 2
participant P2 as Processor 2 Note over P1: Processor generates message for a channel
P1->>R1: Message with data
rect rgba(0, 0, 255, .1)
R1->>O: Send message to orchestrator
mainStream(FromRunner{msg: SendingMessage { localSequenceNumber, channel, data }})
end
Note over O: Orchestrator routes message to target instantiator
O->>O: Look up channelToInstantiator[channel]
Translate localSequenceNumber to globalSequenceNumber
rect rgba(0, 0, 255, .1)
O->>R2: Forward message to receiving runner
mainStream(ToRunner{msg: ReceivingMessage{ globalSequenceNumber, channel, data }})
end
R2->>P2: Runner forwards message to target processor
P2->>P2: Process message
P2->>R2: Message processed
rect rgba(0, 0, 255, .1)
R2->>O: mainStream(FromRunner{processed: GlobalAck{ globalSequenceNumber, channel }})
end
rect rgba(0, 0, 255, .1)
O->>R1: mainStream(ToRunner{processed: LocalAck{ localSequenceNumber, channel }})
end
Note over P1: Processor is allowed to send a new message
`
Streaming message sequence diagram
`mermaid
sequenceDiagram
autonumber
participant P1 as Processor 1
participant R1 as Runner 1
participant O as Orchestrator
participant R2 as Runner 2
participant P2 as Processor 2 P1->>R1: Start streaming message
rect rgba(255, 0, 0, .1)
R1->>O: Initiate sending stream
stub.sendStreamMessage() as sendingStream
end
R1->>O: Send identify message
sendingStream(StreamChunk{id: StreamIdentify{ localSequenceNumber, channel, runner }})
rect rgba(0, 0, 255, .1)
O->>R2: Notify receiving runner of incoming stream message
mainStream(ToRunner{streamMsg: ReceivingStreamMessage{ globalSequenceNumber, channel }})
end
rect rgba(255, 0, 0, .1)
R2->>O: Initiate receiving stream
stub.receiveStreamMessage() as receivingStream
end
R2->>O: Send identify message
receivingStream(SendingStreamControl{ globalSequenceNumber })
O->>R1: Send stream control message, indicating that the stream is ready to accept data
sendingStream(ReceivingStreamControl{ streamSequenceNumber })
Note over P1: Begin streaming data
loop For Each Chunk
P1->>R1: Send a chunk of data
R1->>O: Send a chunk
sendingStream(StreamChunk{data: DataChunk{ data }})
O->>R2: Receive a chunk
receivingStream(DataChunk{ data })
R2->>P2: Forward chunks to processor
P2->>P2: Handle chunk
P2->>R2: Chunk handled
R2->>O: sequence number of the chunk in the stream
receivingStream(SendingStreamControl{ streamSequenceNumber })
O->>R1: sendingStream(ReceivingStreamControl{ streamSequenceNumber })
Note over P1: Processor is allowed to send a new chunk
end
P1->>R1: End of stream
R1->>O: sendingStream closed
O->>R2: receivingStream closed
rect rgba(0, 0, 255, .1)
R2->>O: mainStream(FromRunner{processed: GlobalAck{ globalSequenceNumber, channel }})
end
rect rgba(0, 0, 255, .1)
O->>R1: mainStream(ToRunner{processed: LocalAck{ localSequenceNumber, channel }})
end
Note over P1: Processor is allowed to send a new message
``