Minimalist framework to collect and handle data in a Digital Twin project
npm install digitaltwin-coreDigital Twin Core is a minimalist TypeScript framework used to collect and process data for Digital Twin projects. It provides building blocks to create scheduled collectors, harvesters and HTTP handlers while abstracting storage and database access.
- Collectors - fetch regular data from APIs (typically JSON) based on a Buffer schedule, store it and expose it via GET endpoints.
- Harvesters – transform data collected by collectors, store the results and expose them via GET endpoints.
- Handlers – expose GET endpoints that directly return the result of the method defined in the decorator.
- Assets Manager – upload, store and manage file assets with metadata, providing RESTful endpoints for CRUD operations.
- Custom Table Manager – manage structured data in custom database tables with automatic CRUD endpoints and custom business logic endpoints.
- Storage adapters – currently local filesystem and OVH Object Storage via S3 API.
- Database adapter – implemented with Knex to index metadata.
- Engine – orchestrates components, schedules jobs with BullMQ and exposes endpoints via Express.
- Authentication – pluggable authentication system supporting API gateway headers, JWT tokens, or no-auth mode.
``bash`
pnpm add digitaltwin-core
The project requires Node.js 20 or later.
Compile the TypeScript sources to dist/:
`bash`
npm run build
During development you can use the watcher:
`bash`
npm run dev
The test suite uses Japa. Run all tests with:
`bash`
npm test
Below is a very small example showing how the engine may be instantiated. Storage and database implementations are selected through the provided factories.
`ts
import { DigitalTwinEngine } from './src/engine/digital_twin_engine.js';
import { StorageServiceFactory } from './src/storage/storage_factory.js';
import { KnexDatabaseAdapter } from './src/database/adapters/knex_database_adapter.js';
import { Env } from './src/.env/.env.js';
// Validate environment variables and bootstrap services
const env = Env.validate({
STORAGE_CONFIG: Env.schema.enum(['local', 'ovh'])
});
const storage = StorageServiceFactory.create();
const database = new KnexDatabaseAdapter({ client: 'sqlite3', connection: ':memory:' }, storage);
const engine = new DigitalTwinEngine({ storage, database });
engine.start();
`
The framework provides automatic component discovery to reduce boilerplate:
`typescript
import { DigitalTwinEngine, loadComponents } from 'digitaltwin-core'
// Before: manual imports for each component
// import { WeatherCollector } from './components/weather_collector.js'
// import { SensorCollector } from './components/sensor_collector.js'
// ...
// After: automatic discovery
const result = await loadComponents('./dist/components')
const engine = new DigitalTwinEngine({ storage, database })
engine.registerComponents(result)
await engine.start()
`
The loader discovers components based on file naming conventions:
- *_collector.js → Collectors*_harvester.js
- → Harvesters*_handler.js
- → Handlers*_assets_manager.js
- → Assets Managers*_custom_table.js
- → Custom Table Managers
`typescript
const result = await loadComponents('./dist/components', {
recursive: true, // Scan subdirectories (default)
verbose: true, // Log discovered components
extensions: ['.js'], // File extensions
exclude: ['.test.'] // Exclusion patterns
})
// Result includes metadata
console.log(result.summary)
// { total: 5, collectors: 2, harvesters: 1, handlers: 2, errors: 0 }
// Check for loading errors
if (result.errors.length > 0) {
console.warn('Some components failed to load:', result.errors)
}
`
Runtime type detection utilities:
`typescript
import { isCollector, isHandler, detectComponentType } from 'digitaltwin-core'
if (isCollector(component)) {
// TypeScript narrowing works here
await component.collect()
}
const type = detectComponentType(component) // 'collector' | 'handler' | ...
`
Collectors are scheduled components that fetch data from external sources at regular intervals. They implement a collect() method that returns a Buffer, which is then stored and exposed via HTTP endpoints.
Key features:
- Cron-based scheduling
- Automatic storage and metadata indexing
- HTTP GET endpoint for retrieving latest data
- Event emission on successful collection
The Assets Manager provides a complete solution for file asset management with metadata support. It's an abstract base class that can be extended for specific asset types.
Key features:
- File upload with metadata (description, source URL, owner, filename)
- RESTful CRUD operations via HTTP endpoints
- Content-type aware storage and retrieval
- Separate display and download endpoints
- Source URL validation for data provenance
- File extension validation for upload security
- Component isolation (each manager handles its own asset type)
Available endpoints:
- GET /{assetType} - List all assets with metadataPOST /{assetType}/upload
- - Upload new asset with metadataGET /{assetType}/{id}
- - Retrieve asset content for displayGET /{assetType}/{id}/download
- - Download asset with attachment headersPUT /{assetType}/{id}
- - Update asset metadataDELETE /{assetType}/{id}
- - Delete asset
Example usage:
`typescript`
class GLTFAssetsManager extends AssetsManager {
getConfiguration() {
return {
name: 'gltf',
description: 'GLTF 3D models manager',
contentType: 'model/gltf-binary',
extension: '.glb', // Optional: restricts uploads to .glb files only
tags: ['assets', '3d', 'gltf']
}
}
}
File Extension Validation:
When the extension property is set in the configuration, the Assets Manager will automatically validate uploaded files:/upload
- POST and POST /upload-batch endpoints will reject files that don't match the specified extension.GLB
- Validation is case-insensitive ( and .glb are treated the same)
- If no extension is specified, all file types are accepted
- Error message clearly indicates the expected extension
`typescript
// Example with extension validation
class DocumentsManager extends AssetsManager {
getConfiguration() {
return {
name: 'documents',
description: 'PDF documents manager',
contentType: 'application/pdf',
extension: '.pdf' // Only PDF files allowed
}
}
}
// Upload attempt with wrong extension will return:
// Status: 400
// Error: "Invalid file extension. Expected: .pdf"
`
The Custom Table Manager provides a powerful solution for managing structured data with custom database tables. It automatically generates CRUD endpoints and supports custom business logic endpoints.
Key features:
- Custom database table creation with configurable columns and SQL types
- Automatic CRUD endpoints (GET, POST, PUT, DELETE)
- Custom business logic endpoints with full request/response control
- Query validation and field requirements
- Built-in search and filtering capabilities
- Support for complex data relationships
Available endpoints (automatic):
- GET /{tableName} - List all recordsPOST /{tableName}
- - Create new recordGET /{tableName}/{id}
- - Get specific recordPUT /{tableName}/{id}
- - Update specific recordDELETE /{tableName}/{id}
- - Delete specific record
Example usage:
`typescript
class WMSLayersManager extends CustomTableManager {
getConfiguration() {
return {
name: 'wms_layers',
description: 'Manage WMS layers for mapping applications',
columns: {
'wms_url': 'text not null',
'layer_name': 'text not null',
'description': 'text',
'active': 'boolean default true',
'created_by': 'text',
'projection': 'text default "EPSG:4326"'
},
// Custom endpoints for business logic
endpoints: [
{ path: '/add-layers', method: 'post', handler: 'addMultipleLayers' },
{ path: '/activate/:id', method: 'put', handler: 'toggleLayerStatus' },
{ path: '/search', method: 'get', handler: 'searchLayers' },
{ path: '/by-projection/:projection', method: 'get', handler: 'findByProjection' }
]
}
}
// Custom endpoint: Add multiple layers at once
async addMultipleLayers(req: any): Promise
try {
const { layers } = req.body
const results = []
for (const layerData of layers) {
// Use built-in validation
const id = await this.create({
wms_url: layerData.url,
layer_name: layerData.name,
description: layerData.description || '',
active: true,
created_by: layerData.user || 'system'
})
results.push({ id, name: layerData.name })
}
return {
status: 200,
content: JSON.stringify({
message: Successfully added ${results.length} layers,
layers: results
}),
headers: { 'Content-Type': 'application/json' }
}
} catch (error) {
return {
status: 400,
content: JSON.stringify({ error: error.message }),
headers: { 'Content-Type': 'application/json' }
}
}
}
// Custom endpoint: Toggle layer active status
async toggleLayerStatus(req: any): Promise
try {
const { id } = req.params
const layer = await this.findById(parseInt(id))
if (!layer) {
return {
status: 404,
content: JSON.stringify({ error: 'Layer not found' }),
headers: { 'Content-Type': 'application/json' }
}
}
const newStatus = !layer.active
await this.update(parseInt(id), { active: newStatus })
return {
status: 200,
content: JSON.stringify({
message: Layer ${newStatus ? 'activated' : 'deactivated'},
layer_id: id,
active: newStatus
}),
headers: { 'Content-Type': 'application/json' }
}
} catch (error) {
return {
status: 500,
content: JSON.stringify({ error: error.message }),
headers: { 'Content-Type': 'application/json' }
}
}
}
// Custom endpoint: Advanced search with validation
async searchLayers(req: any): Promise
try {
const { query, active_only, projection } = req.query
const conditions: Record
if (active_only === 'true') {
conditions.active = true
}
if (projection) {
conditions.projection = projection
}
// Use built-in search with validation
const layers = await this.findByColumns(conditions, {
validate: (conditions) => {
if (query && query.length < 3) {
throw new Error('Search query must be at least 3 characters long')
}
}
})
// Filter by text search if provided
let results = layers
if (query) {
results = layers.filter(layer =>
layer.layer_name.toLowerCase().includes(query.toLowerCase()) ||
layer.description?.toLowerCase().includes(query.toLowerCase())
)
}
return {
status: 200,
content: JSON.stringify({
results,
total: results.length,
query: { query, active_only, projection }
}),
headers: { 'Content-Type': 'application/json' }
}
} catch (error) {
return {
status: 400,
content: JSON.stringify({ error: error.message }),
headers: { 'Content-Type': 'application/json' }
}
}
}
}
`
Generated endpoints for above example:
- Standard CRUD: GET /wms_layers, POST /wms_layers, etc.POST /wms_layers/add-layers
- Custom business logic: , PUT /wms_layers/activate/:id, GET /wms_layers/search
SQL Types supported:
- text / text not null - Variable length textvarchar(255)
- / varchar(100) not null - Fixed length textinteger
- / integer not null - Whole numbersboolean
- / boolean default true - True/false valuesdatetime
- / timestamp - Date and time valuesreal
- / decimal / float - Decimal numbers
Built-in query methods:
- findAll() - Get all recordsfindById(id)
- - Get specific recordfindByColumn(column, value)
- - Search by single columnfindByColumns(conditions, validation)
- - Advanced search with validationcreate(data)
- - Create new recordupdate(id, data)
- - Update existing recorddelete(id)
- - Delete record
The framework includes VineJS integration for type-safe request validation:
`typescript
import { validate, AssetUploadSchema } from 'digitaltwin-core'
// In your component
async handleUpload(req: any) {
const data = await validate(AssetUploadSchema, req.body)
// data is now typed and validated
}
`
Validation errors return HTTP 422 with detailed error messages:
`json`
{
"error": "Validation failed",
"details": [
{ "field": "description", "message": "The description field must be a string" }
]
}
The framework provides custom error classes for structured error handling:
`typescript
import { CollectorError, ValidationError, StorageError } from 'digitaltwin-core'
// Errors include context
throw new CollectorError('Failed to fetch data', 'weather-collector', originalError)
// Validation errors return 422
throw new ValidationError('Invalid input', details)
`
All component errors are caught and logged with context (component name, stack trace). Non-critical operations use safeAsync to log errors without crashing:
`typescript
import { safeAsync } from 'digitaltwin-core'
// Won't throw, just logs on failure
await safeAsync(() => cleanup(), 'cleanup temporary files', logger)
`
The engine supports graceful shutdown with configurable timeout:
`typescript
const engine = new DigitalTwinEngine({ database, storage })
// Configure shutdown timeout (default: 30s)
engine.setShutdownTimeout(60000)
// Check if shutting down
if (engine.isShuttingDown()) {
// Don't accept new work
}
// Graceful stop
await engine.stop()
`
Register custom health checks for monitoring:
`typescript
engine.registerHealthCheck('external-api', async () => {
const response = await fetch('https://api.example.com/health')
return { status: response.ok ? 'up' : 'down' }
})
// Built-in checks: database, redis (if configured)
const names = engine.getHealthCheckNames() // ['database', 'redis', 'external-api']
// Remove check
engine.removeHealthCheck('external-api')
`
Generate OpenAPI 3.0 specs from your components:
`typescript
import { OpenAPIGenerator } from 'digitaltwin-core'
const spec = OpenAPIGenerator.generate({
info: { title: 'My API', version: '1.0.0' },
components: [collector, assetsManager, handler]
})
// Output as JSON or YAML
const json = OpenAPIGenerator.toJSON(spec)
const yaml = OpenAPIGenerator.toYAML(spec)
`
HTTP compression is disabled by default because API gateways (Apache APISIX, Kong, Nginx, etc.) typically handle compression at the gateway level.
For standalone deployments without a gateway, enable compression via environment variable:
`bash`
export DIGITALTWIN_ENABLE_COMPRESSION=true
When enabled, the server uses gzip compression for JSON responses larger than 1KB, reducing bandwidth by 60-80%.
The framework supports multiple authentication modes:
- Gateway (default): Uses headers from API gateways (Apache APISIX, KrakenD)
- JWT: Direct JWT token validation
- None: Disabled for development/testing
No configuration needed. The framework reads x-user-id and x-user-roles headers set by your API gateway.
`bash`
export AUTH_MODE=jwt
export JWT_SECRET=your-secret-keyOr for RSA: JWT_PUBLIC_KEY or JWT_PUBLIC_KEY_FILE
`bash`
export DIGITALTWIN_DISABLE_AUTH=trueOr
export AUTH_MODE=none
For detailed configuration options, see src/auth/README.md.
Use create-digitaltwin to quickly bootstrap new projects:
`bash`
npm init digitaltwin my-project
cd my-project
npm install
npm run dev
Generated projects include digitaltwin-cli for component generation:
`bash`
node dt make:collector WeatherCollector --description "Weather data collector"
node dt make:handler ApiHandler --method post
node dt make:harvester DataProcessor --source weather-collector
- src/ – framework sourcesauth/
- – authentication providers and user managementcomponents/
- – base classes for collectors, harvesters, handlers and assets managerengine/
- – orchestration logic (includes component type guards)loader/
- – component auto-discovery utilitiesstorage/
- – storage service abstractions and adaptersdatabase/
- – metadata database adapterenv/
- – environment configuration helpererrors/
- – custom error classesvalidation/
- – request validation with VineJSutils/
- – utility functions (graceful shutdown, HTTP responses, etc.)tests/` – unit tests
-
---
This project is licensed under the MIT License.