Database adapters for Nexxus
npm install @mayhem93/nexxus-database-lib> Database abstraction layer for Nexxus - Pluggable adapters for different database systems
---
The Database package provides a unified interface for data persistence across different database systems. It comes with a built-in Elasticsearch adapter and allows developers to implement adapters for any database of their choice.
Key Responsibility: Abstract database operations (CRUD, search, bulk operations) behind a consistent API while translating Nexxus-specific constructs (FilterQuery, JsonPatch) into native database queries.
---
- Built-in Elasticsearch adapter
- Extend DatabaseAdapter for other databases (PostgreSQL, MongoDB, etc.)
- Consistent API regardless of underlying database
- Converts FilterQuery DSL to native database queries
- Database-agnostic filtering logic
- Support for complex nested queries ($and, $or, operators)
- JsonPatch to database update translation
- Bulk update support with scripted operations
- Partial field returns (dot notation support)
- Batch create, update, delete for performance
- Transaction-like semantics where supported
- Efficient bulk indexing
---
```
Application Code
β
DatabaseAdapter (Abstract)
β
ββββββββββββββββββββββββββββββββ
β ElasticsearchAdapter β (Built-in)
β PostgresAdapter β (Custom)
β MongoDBAdapter β (Custom)
β Neo4jAdapter β (Custom)
ββββββββββββββββββββββββββββββββ
β
Underlying Database
---
- Full-text search capabilities
- Scalable horizontal scaling
- JSON-native document storage
- Real-time indexing for instant queries
- Aggregations for analytics
- Single-node and cluster support
- Index management with settings/mappings
- Bulk operations with error handling
- Script-based updates for nested fields
- Source filtering for partial returns
---
`typescript`
await database.createItem({
index: 'tasks',
item: {
id: 'task-123',
title: 'Implement feature',
status: 'todo',
assignee: { email: 'dev@example.com' }
},
returnFields: ['id', 'title', 'createdAt']
});
`typescript
// Get by ID
const task = await database.getItem({
index: 'tasks',
id: 'task-123'
});
// Get multiple
const tasks = await database.getItems({
index: 'tasks',
ids: ['task-123', 'task-456']
});
`
`typescript`
const results = await database.searchItems({
index: 'tasks',
filters: new NexxusFilterQuery({
"$and": [
{ "status": { "$eq": "todo" } },
{ "assignee.email": { "$eq": "dev@example.com" } }
]
}),
sort: [{ field: 'createdAt', order: 'desc' }],
limit: 20,
offset: 0
});
`typescript`
// Single update with JsonPatch
await database.updateItem({
index: 'tasks',
id: 'task-123',
item: new NexxusJsonPatch({
op: 'replace',
path: ['status'],
value: ['completed'],
metadata: { / ... / }
}),
returnFields: ['status', 'updatedAt']
});
`typescript`
// Update multiple items with different patches
await database.updateItems({
index: 'tasks',
items: [
{
id: 'task-123',
item: patch1
},
{
id: 'task-456',
item: patch2
}
],
returnFields: ['status', 'updatedAt']
});
`typescript
// Single delete
await database.deleteItem({
index: 'tasks',
id: 'task-123'
});
// Bulk delete
await database.deleteItems({
index: 'tasks',
ids: ['task-123', 'task-456', 'task-789']
});
`
---
`typescript`
{
"$and": [
{ "status": { "$in": ["todo", "in_progress"] } },
{ "priority": { "$gte": 5 } },
{ "$or": [
{ "assignee.email": { "$eq": "dev@example.com" } },
{ "team": { "$eq": "backend" } }
]}
]
}
`json`
{
"bool": {
"must": [
{ "terms": { "status": ["todo", "in_progress"] } },
{ "range": { "priority": { "gte": 5 } } },
{
"bool": {
"should": [
{ "term": { "assignee.email": "dev@example.com" } },
{ "term": { "team": "backend" } }
]
}
}
]
}
}
---
`typescript`
{
op: "replace",
path: ["status", "assignee.email"],
value: ["completed", "new-dev@example.com"]
}
`javascript`
ctx._source.status = params.status;
ctx._source.assignee.email = params.assignee_email;
Features:
- Handles nested object updates
- Dot notation to nested structure conversion
- Validation of paths against schema
- Safe parameter binding (prevents injection)
---
`typescript
import { DatabaseAdapter } from '@mayhem93/nexxus-database';
export class PostgresDatabaseAdapter extends DatabaseAdapter {
private pool: pg.Pool;
async connect(config: any) {
this.pool = new pg.Pool(config);
}
async disconnect() {
await this.pool.end();
}
async createItem(options: NexxusDbCreateOptions) {
const query =
INSERT INTO ${options.index} (data)
VALUES ($1)
RETURNING *
;
const result = await this.pool.query(query, [JSON.stringify(options.item)]);
return result.rows[0];
}
async searchItems(options: NexxusDbSearchOptions) {
// Translate FilterQuery to SQL WHERE clause
const whereClause = this.filterQueryToSQL(options.filters);
const query =
SELECT * FROM ${options.index}
WHERE ${whereClause}
LIMIT ${options.limit}
OFFSET ${options.offset}
;
const result = await this.pool.query(query);
return result.rows;
}
private filterQueryToSQL(filter: NexxusFilterQuery): string {
// Convert FilterQuery DSL to SQL
// Example: { "status": { "$eq": "active" } } β "status = 'active'"
}
// Implement other abstract methods...
}
`
`typescript`
const database = new PostgresDatabaseAdapter();
await database.connect({
host: 'localhost',
port: 5432,
database: 'nexxus'
});
---
`typescript`
{
database: {
adapter: "elasticsearch",
nodes: ["http://localhost:9200"],
auth: {
username: "elastic",
password: "changeme"
},
tls: {
rejectUnauthorized: false
}
}
}
`typescript`
{
database: {
adapter: new PostgresDatabaseAdapter(),
config: {
host: "localhost",
port: 5432,
database: "nexxus",
user: "nexxus_user",
password: "secret"
}
}
}
---
``
src/
βββ lib/
β βββ ElasticsearchDb.ts # Built-in Elasticsearch adapter
β βββ DatabaseAdapter.ts # Abstract base class
β βββ DatabaseService.ts # Service wrapper
β
βββ index.ts # Public exports
---
Base class for all database adapters.
Abstract Methods:
- connect(config: any): Promisedisconnect(): Promise
- createItem(options: NexxusDbCreateOptions): Promise
- getItem(options: NexxusDbGetOptions): Promise
- getItems(options: NexxusDbGetItemsOptions): Promise
- searchItems(options: NexxusDbSearchOptions): Promise
- updateItem(options: NexxusDbUpdateOptions): Promise
- updateItems(options: NexxusDbUpdateItemsOptions): Promise
- deleteItem(options: NexxusDbDeleteOptions): Promise
- deleteItems(options: NexxusDbDeleteItemsOptions): Promise
-
Elasticsearch implementation of DatabaseAdapter.
Features:
- Index management (create, delete, exists)
- Bulk operations (create, update, delete)
- Script-based updates for nested fields
- Source filtering with dot notation
- Error handling with detailed logging
---
Runtime:
- @elastic/elasticsearch (built-in adapter)@mayhem93/nexxus-core
- (FilterQuery, JsonPatch, models)
DevDependencies:
- TypeScript
- Node.js type definitions
---
`typescript
// In @mayhem93/nexxus-api
import { DatabaseAdapter } from '@mayhem93/nexxus-database';
const results = await database.searchItems({
index: 'tasks',
filters: new NexxusFilterQuery({ / ... / })
});
// In @mayhem93/nexxus-worker
import { NexxusElasticsearchDb } from '@mayhem93/nexxus-database';
const db = new NexxusElasticsearchDb();
await db.connect(config);
await db.createItem({ / ... / });
`
---
`typescript`
class PostgresDatabaseAdapter extends DatabaseAdapter {
// Store models as JSONB columns
// Translate FilterQuery to SQL WHERE clauses
// Use JSON path operators for nested queries
}
`typescript`
class MongoDBAdapter extends DatabaseAdapter {
// Native document storage (similar to Elasticsearch)
// FilterQuery maps cleanly to MongoDB query operators
// JsonPatch to MongoDB update operators
}
`typescript`
class Neo4jAdapter extends DatabaseAdapter {
// Store models as nodes with properties
// FilterQuery to Cypher WHERE clauses
// Relationships for nested objects
}
---
- Use bulk operations for multiple items (10-100x faster)
- Elasticsearch bulk API processes 1000+ docs/second
- Batching reduces network overhead
- Elasticsearch refresh interval affects write performance
- Configure index settings per use case (real-time vs. throughput)
- Use index templates for consistent settings
- Use FilterQuery validation to catch errors early
- Leverage database-specific optimizations (indices, caching)
- Return only needed fields with returnFields`
---
π§ Work in Progress - Additional adapters and optimizations planned.
Coming Soon:
- Connection pooling configuration
- Transaction support (where applicable)
- Query result caching
- Database-specific optimizations
---
- @mayhem93/nexxus-core - FilterQuery, JsonPatch, model definitions
- @mayhem93/nexxus-api - Uses database for reads and queued writes
- @mayhem93/nexxus-worker - Writer worker persists to database
---
MPL-2.0