Universal data repository pattern for Node.js supporting multiple data sources (Databricks, MongoDB, TypeORM, REST APIs, Message Queues)
Universal Data Repository Pattern for Node.js - Write once, run anywhere.
A flexible, type-safe repository pattern implementation supporting multiple data sources: Databricks, MongoDB, TypeORM, REST APIs, and Message Queues.
- Universal Interface: Single repository pattern API across all data sources
- Multiple Implementations: Databricks, Mongoose, TypeORM, REST APIs, Message Queues
- Zero Config Overhead: Simple, intuitive API with sensible defaults
- Dependency Injection: Bring your own clients, loggers, and validation schemas
- Pure JavaScript: ES6 modules, no TypeScript required (but works great with it)
- Extensible: Easy to add custom repository implementations
- Production Ready: Battle-tested patterns used by enterprise applications
``bash`
npm install @griffith-made/ormjs
Install only what you need based on your data sources:
`bashFor Databricks
npm install @databricks/sql
Note: Knex is NOT required for Databricks. ormjs includes a custom
DatabricksQueryBuilder that provides the same fluent API without Knex's complexity. See "Why Not Knex?" below.Quick Start
$3
`javascript
import { DatabricksRepository } from '@griffith-made/ormjs';
import { DBSQLClient } from '@databricks/sql';// Create Databricks adapter
class DatabricksAdapter {
constructor() {
this.client = new DBSQLClient();
this.session = null;
}
async connect() {
const connectedClient = await this.client.connect({
host: process.env.DATABRICKS_HOST,
path: process.env.DATABRICKS_HTTP_PATH,
token: process.env.DATABRICKS_TOKEN
});
this.session = await connectedClient.openSession();
}
async query(sql) {
if (!this.session) await this.connect();
const queryOperation = await this.session.executeStatement(sql);
const result = await queryOperation.fetchAll();
await queryOperation.close();
return result;
}
}
const databricksAdapter = new DatabricksAdapter();
// Create repository with adapter
const userRepo = new DatabricksRepository({
tableName: 'catalog.schema.users',
adapter: databricksAdapter // Pass adapter, not Knex!
});
// Use it!
const users = await userRepo.find({ active: true });
const user = await userRepo.findById('123');
const count = await userRepo.count({ role: 'admin' });
`$3
`javascript
import { MongooseRepository } from '@griffith-made/ormjs';
import mongoose from 'mongoose';const UserModel = mongoose.model('User', new mongoose.Schema({
name: String,
email: String,
active: Boolean
}));
const userRepo = new MongooseRepository({
model: UserModel
});
const users = await userRepo.find({ active: true });
`$3
`javascript
import { APIRepository } from '@griffith-made/ormjs';
import axios from 'axios';const userRepo = new APIRepository({
baseURL: 'https://api.example.com',
resource: '/users',
httpClient: axios
});
const users = await userRepo.find({ role: 'admin' });
const user = await userRepo.findById('123');
await userRepo.create({ name: 'John', email: 'john@example.com' });
`Core API
All repositories implement the same interface:
`javascript
// Read operations
await repo.find(where, options); // Find all matching records
await repo.findById(id); // Find by ID
await repo.findOne(where); // Find single record
await repo.findWhereIn(column, values); // Find with WHERE IN
await repo.count(where); // Count matching records
await repo.exists(where); // Check if exists
await repo.distinct(column, where); // Get distinct values
await repo.rawQuery(sql, params); // Execute raw query// Read operations with automatic retry (Databricks)
await repo.findWithRetry(where, options); // Automatic retry for transient errors
// Write operations (if supported by data source)
await repo.create(data); // Create new record
await repo.update(where, data); // Update records
await repo.delete(where); // Delete records
`Error Handling & Retry Logic
$3
ormjs provides specialized error classes for better error handling in Databricks operations:
`javascript
import {
DatabricksError,
DatabricksConnectionError,
DatabricksTimeoutError,
DatabricksSyntaxError,
DatabricksAuthenticationError,
getHttpStatusCode
} from '@griffith-made/ormjs';try {
const results = await repository.find({ status: 'active' });
} catch (error) {
// Handle specific error types
if (error instanceof DatabricksTimeoutError) {
console.error('Query timed out after', error.duration, 'ms');
} else if (error instanceof DatabricksSyntaxError) {
console.error('Invalid SQL:', error.sql);
} else if (error instanceof DatabricksConnectionError) {
console.error('Connection failed, may be transient');
}
// Get appropriate HTTP status code for API responses
const statusCode = getHttpStatusCode(error);
res.status(statusCode).json({ error: error.message });
}
`Available Error Classes:
-
DatabricksError - Base error class
- DatabricksConnectionError - Connection failures (transient)
- DatabricksTimeoutError - Query timeouts (transient)
- DatabricksSyntaxError - SQL syntax errors (permanent)
- DatabricksAuthenticationError - Auth failures (permanent)$3
Use
findWithRetry() for automatic retry with exponential backoff on transient errors:`javascript
import { DatabricksRepository } from '@griffith-made/ormjs';class FlightRepository extends DatabricksRepository {
constructor(adapter) {
super({
tableName: 'flights',
adapter,
maxRetries: 3, // Default retry attempts
baseRetryDelay: 1000, // Base delay in milliseconds
defaultTimeout: 30000 // Query timeout
});
}
async findDeparted() {
// Automatically retries connection errors and timeouts
return this.findWithRetry(
{ status: 'departed' },
{
limit: 100,
maxRetries: 5, // Override default
baseRetryDelay: 2000 // Override default
}
);
}
}
`Retry Behavior:
- Only retries transient errors (connection, timeout)
- Does NOT retry permanent errors (syntax, authentication)
- Uses exponential backoff: 1s → 2s → 4s → 8s...
- Adds jitter to prevent thundering herd
$3
Track requests across your system with automatic correlation ID generation:
`javascript
import { generateCorrelationId } from '@griffith-made/ormjs';// Generate correlation ID
const correlationId = generateCorrelationId('query');
// Returns: "query-1763565275859-zaoq4o8"
// Use in queries - appears in all logs
const results = await repository.find(
{ status: 'active' },
{ correlationId }
);
// Extract timestamp from correlation ID
import { getTimestampFromCorrelationId } from '@griffith-made/ormjs';
const timestamp = getTimestampFromCorrelationId(correlationId);
`$3
Classify and analyze errors programmatically:
`javascript
import {
classifyError,
isTransientError,
getUserFriendlyMessage,
getHttpStatusCode
} from '@griffith-made/ormjs';try {
await adapter.query('SELECT * FROM table');
} catch (error) {
// Classify the error
const classified = classifyError(error, sql, duration);
// Check if retriable
if (isTransientError(classified)) {
// Implement custom retry logic
console.log('Transient error, will retry');
} else {
// Permanent error, don't retry
console.log('Permanent error:', classified.message);
}
// Get user-friendly message
const friendlyMessage = getUserFriendlyMessage(classified);
// Get HTTP status code
const statusCode = getHttpStatusCode(classified);
// Returns: 400 (syntax), 401 (auth), 503 (connection), 504 (timeout)
}
`$3
`javascript
import {
DatabricksRepository,
DatabricksTimeoutError,
getHttpStatusCode,
generateCorrelationId
} from '@griffith-made/ormjs';class UserRepository extends DatabricksRepository {
async findActiveUsers(req, res) {
const correlationId = generateCorrelationId('user-query');
try {
// Automatic retry for transient errors
const users = await this.findWithRetry(
{ active: true },
{
correlationId,
maxRetries: 3,
timeout: 30000
}
);
res.json({
data: users,
correlationId
});
} catch (error) {
// Error already classified and logged with correlationId
const statusCode = getHttpStatusCode(error);
res.status(statusCode).json({
error: error.message,
correlationId,
retriable: error.isTransient || false
});
}
}
}
`Three Levels of Abstraction (Databricks)
@griffith-made/ormjs provides three ways to query Databricks, from highest to lowest level:
$3
Use built-in CRUD methods for 80% of queries:
`javascript
import { DatabricksRepository } from '@griffith-made/ormjs';const projectRepo = new DatabricksRepository({
tableName: 'catalog.schema.projects',
adapter: databricksAdapter
});
// Find with filters
const active = await projectRepo.find(
{ status: 'active' },
{ limit: 50, orderBy: 'created_at', order: 'desc' }
);
// Find by ID
const project = await projectRepo.findById('PJT-001');
// Find one
const latest = await projectRepo.findOne({ status: 'active' });
// Count
const total = await projectRepo.count({ status: 'active' });
// Check existence
const exists = await projectRepo.exists({ project_id: 'PJT-001' });
// Distinct values
const statuses = await projectRepo.distinct('status');
// WHERE IN
const projects = await projectRepo.findWhereIn(
'project_id',
['PJT-001', 'PJT-002', 'PJT-003']
);
`$3
Use the built-in query builder for 15% of complex queries:
`javascript
// Access query builder through repository
const results = await projectRepo.query()
.where({ status: 'active' })
.whereBetween('budget', [100000, 500000])
.select(['project_id', 'name', 'budget'])
.orderBy('budget', 'desc')
.limit(100)
.offset(20);// Aggregations
const stats = await projectRepo.query()
.select(['status', 'COUNT(*) as count', 'AVG(budget) as avg_budget'])
.groupBy('status')
.orderBy('count', 'desc');
// Get first result
const topProject = await projectRepo.query()
.where({ status: 'active' })
.orderBy('budget', 'desc')
.first();
`Query Builder Methods:
-
.where(conditions) - WHERE clause
- .whereIn(column, values) - WHERE IN
- .whereBetween(column, [min, max]) - WHERE BETWEEN
- .select(columns) - SELECT specific columns
- .distinct(column) - DISTINCT
- .orderBy(column, direction) - ORDER BY
- .limit(n) - LIMIT
- .offset(n) - OFFSET
- .count('* as count') - COUNT aggregation
- .groupBy(columns) - GROUP BY
- .first() - Get first result only$3
Use raw SQL for 5% of advanced queries with joins, subqueries, or complex analytics:
`javascript
import databricksAdapter from './adapters/DatabricksAdapter.js';const results = await databricksAdapter.query(
);
`$3
| Level | Use Case | Example |
|-------|----------|---------|
| Repository Methods | Simple CRUD, filtering, counting | Find active users, count by status |
| Query Builder | Complex WHERE clauses, aggregations | Multi-field filters, GROUP BY queries |
| Raw SQL | Joins, subqueries, window functions | Complex analytics, multi-table queries |
$3
The Problem: Knex doesn't support Databricks natively.
Knex is an excellent query builder for traditional databases (PostgreSQL, MySQL, SQLite), but it's designed around specific database drivers (pg, mysql2, sqlite3) that don't work with Databricks SQL Warehouse.
What We Tried:
- Using Knex's MySQL dialect (closest to Databricks SQL syntax)
- Creating custom connection wrappers around
@databricks/sql
- Result: Knex kept calling driver-specific methods that don't exist in DatabricksThe Solution: Custom
DatabricksQueryBuilderWe built a lightweight query builder specifically for Databricks that:
- ✅ Provides Knex-like fluent API (
.where(), .select(), .orderBy(), etc.)
- ✅ Works directly with @databricks/sql driver via adapter pattern
- ✅ Generates SQL optimized for Databricks
- ✅ Simpler than Knex (no unnecessary abstractions)
- ✅ Promise-based "thenable" pattern (works with await)Benefits Over Knex:
1. No impedance mismatch - Built specifically for Databricks
2. Simpler codebase - Only what Databricks needs
3. Better error messages - No Knex-specific abstraction errors
4. Smaller footprint - No heavyweight dependencies
Backward Compatibility:
DatabricksRepository still accepts a
knex parameter for legacy code, but the recommended approach is to pass an adapter:`javascript
// ❌ Old (not recommended, requires Knex workarounds)
new DatabricksRepository({ tableName: 'users', knex: knexInstance });// ✅ New (recommended, no Knex needed)
new DatabricksRepository({ tableName: 'users', adapter: databricksAdapter });
`Factory Pattern
Create repositories dynamically:
`javascript
import { createRepository, RepositoryFactory } from '@griffith-made/ormjs';
import databricksAdapter from './adapters/DatabricksAdapter.js';// Using convenience function
const repo = createRepository('databricks', {
tableName: 'flights',
adapter: databricksAdapter
});
// Using factory methods
const userRepo = RepositoryFactory.databricks('users', databricksAdapter);
const productRepo = RepositoryFactory.mongoose(ProductModel);
const orderRepo = RepositoryFactory.api('https://api.example.com', '/orders', axios);
`Extending Repositories
Create domain-specific repositories by extending base classes:
`javascript
import { DatabricksRepository } from '@griffith-made/ormjs';
import databricksAdapter from './adapters/DatabricksAdapter.js';
import logger from './utils/logger.js';class FlightRepository extends DatabricksRepository {
constructor() {
super({
tableName: 'catalog.schema.flights',
adapter: databricksAdapter, // Pass adapter instance
schema: null, // Optional Zod schema
logger
});
}
// Add domain-specific methods
async findByRoute(origin, destination, options = {}) {
return this.find(
{ origin, destination },
{ limit: 50, ...options }
);
}
async getRecentFlights(limit = 50) {
return this.find({}, {
orderBy: 'departure_time',
order: 'desc',
limit
});
}
async getDelayedFlights() {
return this.find({ status: 'delayed' });
}
// Use query builder for complex queries
async getFlightsByDateRange(startDate, endDate) {
return this.query()
.whereBetween('departure_time', [startDate, endDate])
.orderBy('departure_time', 'asc')
.limit(1000);
}
}
// Create singleton instance
const flightRepository = new FlightRepository();
export default flightRepository;
// Use it
import flightRepository from './repositories/FlightRepository.js';
const sfoToLax = await flightRepository.findByRoute('SFO', 'LAX');
`Validation with Zod
Add runtime validation to your repositories:
`javascript
import { z } from 'zod';
import { DatabricksRepository } from '@griffith-made/ormjs';
import databricksAdapter from './adapters/DatabricksAdapter.js';const UserSchema = z.object({
id: z.string(),
name: z.string(),
email: z.string().email(),
age: z.number().min(0).max(150),
active: z.boolean(),
created_at: z.string().or(z.date()).optional()
});
const userRepo = new DatabricksRepository({
tableName: 'catalog.schema.users',
adapter: databricksAdapter,
schema: UserSchema // Data validated on every query
});
// Data is automatically validated
const users = await userRepo.find({ active: true });
// Throws ZodError if data doesn't match schema
`Custom Logging
Inject your own logger:
`javascript
import winston from 'winston';
import databricksAdapter from './adapters/DatabricksAdapter.js';const logger = winston.createLogger({
level: 'debug',
format: winston.format.json(),
transports: [new winston.transports.Console()]
});
const repo = new DatabricksRepository({
tableName: 'catalog.schema.users',
adapter: databricksAdapter,
logger: logger // Must have debug(), error(), info(), warn() methods
});
// Logger will output query execution details
const users = await repo.find({ active: true });
// Logs: "Executing find query { table: 'users', where: { active: true }, ... }"
`Documentation
- User Guide - Complete usage guide with examples
- Developer Guide - Contributing and local development
- Maintainer Guide - Publishing and version management
- Extending Guide - Creating custom repositories
Supported Data Sources
| Data Source | Repository Class | Read | Write | Raw Query |
|-------------|-----------------|------|-------|-----------|
| Databricks SQL |
DatabricksRepository | ✅ | ❌ | ✅ (SQL) |
| MongoDB | MongooseRepository | ✅ | ✅ | ✅ (Aggregation) |
| PostgreSQL/MySQL | TypeORMRepository | ✅ | ✅ | ✅ (SQL) |
| REST APIs | APIRepository | ✅ | ✅ | ✅ (Custom) |
| Message Queues | QueueRepository | ✅ | ✅ (Send) | ✅ (Peek/Purge) |Why @griffith-made/ormjs?
$3
Traditional ORMs tightly couple your business logic to specific databases:
`javascript
// TypeORM code
const users = await UserEntity.find({ where: { active: true } });// Can't easily switch to Databricks or an API
`$3
@griffith-made/ormjs provides a consistent interface across all data sources:
`javascript
// Works with Databricks, MongoDB, APIs, Queues...
const users = await userRepo.find({ active: true });// Swap implementations without changing business logic
`$3
- Flexibility: Switch data sources without rewriting code
- Testability: Mock repositories easily for unit tests
- Migration-Friendly: Gradually migrate from SQL to NoSQL to APIs
- Polyglot Persistence: Use the right tool for each job
Philosophy
@griffith-made/ormjs follows these principles:
1. Simplicity: Simple API that's easy to learn and use
2. Flexibility: Support many data sources with one interface
3. Pragmatism: No magic, no complex configuration
4. Dependency Injection: Bring your own tools (clients, loggers, validators)
5. Extensibility: Easy to extend for custom use cases
Architecture: Adapter vs Repository
$3
No! Adapters and Repositories are complementary patterns at different abstraction levels:
- Adapter Pattern (Infrastructure Layer): Handles HOW to communicate with external systems
- Repository Pattern (Domain Layer): Handles WHAT data your business needs
$3
Responsibility: Wrap external drivers and handle infrastructure concerns
`javascript
// DatabricksAdapter: knows HOW to talk to Databricks
class DatabricksAdapter {
async query(sql) {
if (!this.session) await this.connect();
const queryOperation = await this.session.executeStatement(sql);
const result = await queryOperation.fetchAll();
await queryOperation.close();
return result;
}
}// Usage: Low-level, SQL-focused
const results = await databricksAdapter.query('SELECT * FROM users WHERE id = 123');
`Concerns:
- Connection management (pooling, reconnection)
- Raw query execution
- Infrastructure health checks
- Driver-specific error handling
$3
Responsibility: Provide domain-oriented data access and business logic
`javascript
// FlightRepository: knows WHAT data the business needs
class FlightRepository extends DatabricksRepository {
async findByRoute(origin, destination) {
return this.find({ origin, destination });
}
}// Usage: High-level, business-focused
const flights = await flightRepository.findByRoute('SFO', 'LAX');
`Concerns:
- Business logic (domain-specific query patterns)
- Schema validation
- Data transformation
- Domain rules and constraints
$3
1. Single Responsibility Principle
- Adapter: "I know how to execute SQL on Databricks"
- Repository: "I know what flight data the business needs"
2. Replaceability
- Switch from Databricks to PostgreSQL? Replace the adapter, repositories stay the same
- Change business logic? Modify repositories, adapter stays the same
3. Testability
- Mock the adapter to test repository logic in isolation
- Mock repositories to test route handlers without hitting databases
4. Reusability
- One adapter shared by many repositories:
-
FlightDataRepository uses DatabricksAdapter
- ProjectRepository uses DatabricksAdapter
- Both share the same connection pool and infrastructure$3
`
Route Handler
↓ calls domain method
Domain Repository (FlightDataRepository)
↓ extends
Base Repository (DatabricksRepository from ormjs)
↓ uses
Query Builder (DatabricksQueryBuilder)
↓ uses
Adapter (DatabricksAdapter)
↓ wraps
Driver (@databricks/sql)
`$3
Think of it like HTTP:
- Adapter =
axios or fetch (knows HOW to make HTTP requests)
- Repository = Your API client class (knows WHAT endpoints to call for business needs)You wouldn't put all your business logic in axios. Same principle applies here!
$3
`javascript
// ❌ Without separation: Mixed concerns
class FlightRepository {
async findByRoute(origin, dest) {
// Business logic + connection management + SQL all mixed
const client = new DBSQLClient();
await client.connect({ host: '...', token: '...' });
const session = await client.openSession();
const query = await session.executeStatement(
SELECT * FROM flights WHERE origin = '${origin}' AND dest = '${dest}'
); // SQL injection risk!
return await query.fetchAll();
}
}// ✅ With separation: Clear responsibilities
class FlightRepository extends DatabricksRepository {
async findByRoute(origin, dest) {
// Just business logic - infrastructure handled elsewhere
return this.find({ origin, destination: dest });
}
}
``Benefits:
- ✅ Connection pooling handled once in adapter
- ✅ SQL injection prevention via query builder
- ✅ Easy to test business logic
- ✅ Can swap Databricks for another source without changing business logic
See the examples/ directory for complete working examples:
- Databricks: Analytics queries with adapter pattern
- Mongoose: MongoDB CRUD operations
- TypeORM: PostgreSQL with entity relationships
- API: REST API integration
- Queue: Message queue pub/sub patterns
- Custom: Building your own repository implementation
Contributions welcome! Please read our Developer Guide first.
MIT
- Issues: Bitbucket Issues
- Documentation: docs/
- Examples: examples/
---
Made with ❤️ by Griffith Made