Functional Data Access Layer for Kysely - query functions, context passing, composition, plugin integration
npm install @kysera/dalFunctional Data Access Layer for Kysera - Query functions with automatic plugin support.
@kysera/dal provides a functional approach to database access as an alternative to traditional repository patterns. Write query functions that are composable, type-safe, and easy to test.
The DAL works through @kysera/executor to provide automatic plugin support. When you pass a KyseraExecutor (instead of a raw Kysely instance) to your query functions, all plugins (soft-delete, RLS, audit, etc.) are automatically applied while maintaining a clean functional API.
- Query Functions - Pure functions instead of repository methods
- Type Inference - Return types automatically inferred from queries
- Context Passing - Explicit database context (no dependency injection)
- Plugin Support - Automatic plugin interception via @kysera/executor
- Transaction Support - First-class transactions with automatic plugin propagation
- Composition Utilities - Combine queries using compose, chain, parallel, etc.
- Zero Dependencies - Only @kysera/executor dependency (peers on Kysely)
- Fully Typed - Complete TypeScript support with strict mode
``bash
npm install @kysera/dal @kysera/executor kysely
Quick Start
`typescript
import { Kysely } from 'kysely'
import { createExecutor } from '@kysera/executor'
import { softDeletePlugin } from '@kysera/soft-delete'
import { createQuery, withTransaction } from '@kysera/dal'// Define your database schema
interface Database {
users: {
id: number
email: string
name: string
deleted_at: Date | null
}
}
const db = new Kysely({
/ config /
})
// Create executor with plugins
const executor = await createExecutor(db, [softDeletePlugin()])
// Define query functions
const getUserById = createQuery((ctx, id: number) =>
ctx.db.selectFrom('users').select(['id', 'email', 'name']).where('id', '=', id).executeTakeFirst()
)
const createUser = createQuery((ctx, data: { email: string; name: string }) =>
ctx.db.insertInto('users').values(data).returningAll().executeTakeFirstOrThrow()
)
// Use directly - soft-delete filter automatically applied
const user = await getUserById(executor, 1)
// Use in transactions - plugins propagate automatically
const result = await withTransaction(executor, async ctx => {
const newUser = await createUser(ctx, {
email: 'test@example.com',
name: 'Test User'
})
return newUser
})
`Core Concepts
$3
Query functions are the building blocks of the Functional DAL. They accept a database context and arguments, returning a Promise.
`typescript
import { createQuery } from '@kysera/dal'// Simple select query
const findUserByEmail = createQuery((ctx, email: string) =>
ctx.db.selectFrom('users').selectAll().where('email', '=', email).executeTakeFirst()
)
// Insert query
const insertPost = createQuery((ctx, data: { title: string; body: string; user_id: number }) =>
ctx.db.insertInto('posts').values(data).returningAll().executeTakeFirstOrThrow()
)
// Update query
const updateUserName = createQuery((ctx, id: number, name: string) =>
ctx.db.updateTable('users').set({ name }).where('id', '=', id).returningAll().executeTakeFirst()
)
// Delete query
const deletePost = createQuery((ctx, id: number) =>
ctx.db.deleteFrom('posts').where('id', '=', id).executeTakeFirst()
)
`$3
The DAL works through
@kysera/executor for plugin support. Instead of implementing its own plugin system, the DAL leverages the executor's plugin interception mechanism. When you pass a KyseraExecutor (instead of a raw Kysely instance) to your query functions, all plugins are automatically applied at the query builder level.#### Basic Plugin Integration
`typescript
import { createExecutor } from '@kysera/executor'
import { softDeletePlugin } from '@kysera/soft-delete'
import { rlsPlugin } from '@kysera/rls'
import { createQuery } from '@kysera/dal'// Create executor with plugins
const executor = await createExecutor(db, [
softDeletePlugin(),
rlsPlugin({
schema: {
users: { tenantIdColumn: 'tenant_id' },
posts: { tenantIdColumn: 'tenant_id' }
},
getCurrentTenantId: () => currentTenantId
})
])
// Define queries - plugins apply automatically
const getUsers = createQuery(ctx => ctx.db.selectFrom('users').selectAll().execute())
// Soft-delete filter and RLS automatically applied
const users = await getUsers(executor)
// Only returns users where:
// - deleted_at IS NULL (soft-delete plugin)
// - tenant_id = currentTenantId (RLS plugin)
`#### How Plugin Propagation Works
1. Query Creation: When you pass a
KyseraExecutor to a query function, the context preserves the executor with all its plugins
2. Transaction Wrapping: withTransaction() automatically wraps transaction instances with the same plugins as the parent executor
3. Automatic Interception: All query builders (selectFrom, insertInto, etc.) are intercepted by plugins before execution
4. Type Safety: Full TypeScript support - the database schema type is preserved through all transformations#### Multiple Plugins
`typescript
import { createExecutor } from '@kysera/executor'
import { softDeletePlugin } from '@kysera/soft-delete'
import { rlsPlugin } from '@kysera/rls'
import { auditPlugin } from '@kysera/audit'const executor = await createExecutor(db, [
softDeletePlugin(), // Priority: 100
rlsPlugin({
/ ... /
}), // Priority: 90
auditPlugin({
/ ... /
}) // Priority: 80
])
// All plugins apply in priority order (higher = runs first)
const getUsers = createQuery(ctx => ctx.db.selectFrom('users').selectAll().execute())
const users = await getUsers(executor)
`$3
Execute multiple queries atomically within a transaction. Plugins automatically propagate to the transaction context.
`typescript
import { withTransaction, createTransactionalQuery } from '@kysera/dal'// Regular transaction
const result = await withTransaction(executor, async ctx => {
const user = await createUser(ctx, userData)
const profile = await createProfile(ctx, { userId: user.id, ...profileData })
return { user, profile }
})
// Query that REQUIRES a transaction
const transferFunds = createTransactionalQuery(
async (ctx, fromId: number, toId: number, amount: number) => {
await ctx.db
.updateTable('accounts')
.set(eb => ({ balance: eb('balance', '-', amount) }))
.where('id', '=', fromId)
.execute()
await ctx.db
.updateTable('accounts')
.set(eb => ({ balance: eb('balance', '+', amount) }))
.where('id', '=', toId)
.execute()
return { success: true }
}
)
// This will work
await withTransaction(executor, ctx => transferFunds(ctx, 1, 2, 100))
// This will throw: "Query requires a transaction"
await transferFunds(executor, 1, 2, 100)
`Composition
$3
Compose two query functions sequentially, passing the result of the first to the second:
`typescript
import { createQuery, compose } from '@kysera/dal'const getUserById = createQuery((ctx, id: number) =>
ctx.db.selectFrom('users').selectAll().where('id', '=', id).executeTakeFirstOrThrow()
)
const getPostsByUserId = createQuery((ctx, userId: number) =>
ctx.db.selectFrom('posts').selectAll().where('user_id', '=', userId).execute()
)
const getUserWithPosts = compose(getUserById, async (ctx, user) => ({
...user,
posts: await getPostsByUserId(ctx, user.id)
}))
const result = await getUserWithPosts(executor, 1)
// { id: 1, email: '...', name: '...', posts: [...] }
`$3
Chain multiple transformations on a query result:
`typescript
import { createQuery, chain } from '@kysera/dal'const getUser = createQuery((ctx, id: number) =>
ctx.db.selectFrom('users').selectAll().where('id', '=', id).executeTakeFirstOrThrow()
)
const getUserComplete = chain(
getUser,
async (ctx, user) => ({ ...user, posts: await getPosts(ctx, user.id) }),
async (ctx, data) => ({ ...data, followers: await getFollowers(ctx, data.id) }),
async (ctx, data) => ({ ...data, stats: await getStats(ctx, data.id) })
)
const fullUser = await getUserComplete(executor, 1)
// { ...user, posts: [...], followers: [...], stats: {...} }
`$3
Execute multiple queries concurrently and combine their results:
`typescript
import { createQuery, parallel } from '@kysera/dal'const getUserById = createQuery((ctx, id: number) =>
ctx.db.selectFrom('users').selectAll().where('id', '=', id).executeTakeFirst()
)
const getUserStats = createQuery((ctx, id: number) =>
ctx.db.selectFrom('user_stats').selectAll().where('user_id', '=', id).executeTakeFirst()
)
const getNotifications = createQuery((ctx, id: number) =>
ctx.db.selectFrom('notifications').selectAll().where('user_id', '=', id).execute()
)
const getDashboardData = parallel({
user: getUserById,
stats: getUserStats,
notifications: getNotifications
})
const dashboard = await getDashboardData(executor, userId)
// { user: {...}, stats: {...}, notifications: [...] }
`$3
Execute a query conditionally based on runtime logic:
`typescript
import { createQuery, conditional } from '@kysera/dal'const getPremiumFeatures = createQuery((ctx, userId: number) =>
ctx.db.selectFrom('premium_features').selectAll().where('user_id', '=', userId).execute()
)
const getFeatures = conditional(
(ctx, userId: number, isPremium: boolean) => isPremium,
getPremiumFeatures,
[] // Fallback: empty array for non-premium users
)
const features = await getFeatures(executor, userId, true) // Executes query
const emptyFeatures = await getFeatures(executor, userId, false) // Returns []
`$3
Transform array results with a mapper function:
`typescript
import { createQuery, mapResult } from '@kysera/dal'const getAllUsers = createQuery(ctx => ctx.db.selectFrom('users').selectAll().execute())
const getUserNames = mapResult(getAllUsers, user => user.name)
const names = await getUserNames(executor) // string[]
`API Reference
$3
####
createQueryCreate a typed query function.
Parameters:
-
queryFn: (ctx: DbContext - Query implementationReturns:
QueryFunctionExample:
`typescript
const getUserById = createQuery((ctx, id: number) =>
ctx.db.selectFrom('users').select(['id', 'email', 'name']).where('id', '=', id).executeTakeFirst()
)// Usage with KyseraExecutor (plugins applied)
const user = await getUserById(executor, 1)
// Usage with context (inside transaction)
await withTransaction(executor, async ctx => {
const user = await getUserById(ctx, 1)
return user
})
`####
createTransactionalQueryCreate a query function that requires a transaction context.
Parameters:
-
queryFn: (ctx: DbContext - Query implementationReturns:
QueryFunctionThrows: Error if called outside a transaction
Example:
`typescript
const transferFunds = createTransactionalQuery(
async (ctx, fromId: number, toId: number, amount: number) => {
await ctx.db
.updateTable('accounts')
.set(eb => ({ balance: eb('balance', '-', amount) }))
.where('id', '=', fromId)
.execute() await ctx.db
.updateTable('accounts')
.set(eb => ({ balance: eb('balance', '+', amount) }))
.where('id', '=', toId)
.execute()
return { success: true }
}
)
// This will work
await withTransaction(executor, ctx => transferFunds(ctx, 1, 2, 100))
// This will throw an error
await transferFunds(executor, 1, 2, 100) // Error: Query requires transaction
`$3
####
createContextCreate a database context from any database instance.
Parameters:
-
db: Kysely - Database instanceReturns:
DbContextImportant: To enable plugin support in the DAL, pass a
KyseraExecutor (not a raw Kysely instance) to createContext. The executor wraps the database with plugin interception.Example:
`typescript
import { createContext } from '@kysera/dal'
import { createExecutor } from '@kysera/executor'
import { softDeletePlugin } from '@kysera/soft-delete'// Create executor with plugins
const executor = await createExecutor(db, [softDeletePlugin()])
// Create context from executor (not raw db!)
const ctx = createContext(executor)
// Queries now have plugins applied
const user = await findUserById(ctx, 1) // soft-delete filter applied
`Without plugins (raw Kysely):
`typescript
// This works but plugins won't be applied
const ctx = createContext(db) // Raw Kysely instance
const user = await findUserById(ctx, 1) // No plugin filters
`####
withTransactionExecute a function within a transaction.
Parameters:
-
db: Kysely - Database instance
- fn: (ctx: DbContext - Function to execute
- options?: TransactionOptions - Transaction options (optional)Returns:
PromiseExample:
`typescript
// Basic usage
const result = await withTransaction(executor, async ctx => {
const user = await createUser(ctx, userData)
const profile = await createProfile(ctx, { userId: user.id, ...profileData })
return { user, profile }
})// With KyseraExecutor (plugins propagated)
const result = await withTransaction(executor, async ctx => {
// All queries in transaction have plugins applied
const users = await getUsers(ctx)
return users
})
`####
withContextExecute a function with a database context (no transaction).
Parameters:
-
db: Kysely - Database instance
- fn: (ctx: DbContext - Function to executeReturns:
PromiseExample:
`typescript
const users = await withContext(executor, async ctx => {
return getAllUsers(ctx)
})
`####
isInTransactionCheck if context is within a transaction.
Parameters:
-
ctx: DbContext - Database contextReturns:
booleanExample:
`typescript
const myQuery = createQuery((ctx, id: number) => {
if (isInTransaction(ctx)) {
console.log('Running inside transaction')
}
return ctx.db.selectFrom('users').selectAll().where('id', '=', id).executeTakeFirst()
})
`$3
####
composeCompose two query functions sequentially.
Parameters:
-
first: QueryFunction - First query
- second: (ctx: DbContext - Second queryReturns:
QueryFunction####
chainChain multiple transformations on a query result.
Parameters:
-
query: QueryFunction - Initial query
- ...transforms: Array<(ctx: DbContext - Transform functionsReturns:
QueryFunction (where N is the last transform result type)Overloads: Supports 1-3 transform functions with full type inference
####
parallelExecute multiple queries in parallel.
Parameters:
-
queries: Record - Object of query functionsReturns:
QueryFunction####
conditionalExecute a query conditionally.
Parameters:
-
condition: (ctx: DbContext - Condition function
- query: QueryFunction - Query to execute if true
- fallback?: TFallback - Value to return if falseReturns:
QueryFunction####
mapResultMap over array results.
Parameters:
-
query: QueryFunction - Query returning array
- mapper: (item: TItem, index: number) => TResult - Mapper functionReturns:
QueryFunctionTypeScript Types
$3
The following types are re-exported from
@kysera/executor for convenience:`typescript
import type {
ExecutorConfig,
KyseraExecutorMarker,
PluginValidationDetails
} from '@kysera/dal'
`-
ExecutorConfig - Configuration options for executor creation
- KyseraExecutorMarker - Type marker interface for KyseraExecutor
- PluginValidationDetails - Validation error details from plugin system@kysera/executor documentation for detailed type information.$3
Database context interface.
`typescript
interface DbContext> {
readonly db: Kysely | Transaction | KyseraExecutor | KyseraTransaction
readonly isTransaction: boolean
}
`$3
Query function signature.
`typescript
type QueryFunction = (
ctxOrDb: DbContext | Kysely | KyseraExecutor,
...args: TArgs
) => Promise
`$3
Transaction execution options.
`typescript
interface TransactionOptions {
isolationLevel?: 'read uncommitted' | 'read committed' | 'repeatable read' | 'serializable'
}
`Note: Isolation level configuration is dialect-specific and should typically be set at the connection pool level.
$3
`typescript
// Infer result type from query function
type InferResult = T extends QueryFunction ? R : never// Infer arguments type from query function
type InferArgs = T extends QueryFunction ? A : never
// Infer database type from query function
type InferDB = T extends QueryFunction ? DB : never
`$3
Result type for parallel query execution.
`typescript
type ParallelResult>> = {
[K in keyof T]: T[K] extends QueryFunction ? R : never
}
`Transaction Features
$3
The DAL provides savepoint support for nested transaction rollback points:
`typescript
import { withTransaction, withSavepoint } from '@kysera/dal'await withTransaction(executor, async ctx => {
// Create a user
const user = await createUser(ctx, { email: 'test@example.com', name: 'Test' })
// Create a savepoint before risky operation
try {
await withSavepoint(ctx, 'before_post', async spCtx => {
const post = await createPost(spCtx, { userId: user.id, title: 'Test Post' })
// This might fail
await someRiskyOperation(spCtx, post.id)
})
} catch (error) {
// Rollback to savepoint - user creation is preserved
console.log('Post creation failed, but user was still created')
}
return user
})
`Savepoint Validation:
Savepoint names must be positive integers (1, 2, 3, etc.) for PostgreSQL compatibility:
`typescript
// ✅ Valid savepoint names
await withSavepoint(ctx, '1', async spCtx => { / ... / })
await withSavepoint(ctx, '2', async spCtx => { / ... / })
await withSavepoint(ctx, '999', async spCtx => { / ... / })// ❌ Invalid savepoint names (will throw error)
await withSavepoint(ctx, 'my-savepoint', async spCtx => { / ... / }) // Not a positive integer
await withSavepoint(ctx, '0', async spCtx => { / ... / }) // Zero not allowed
await withSavepoint(ctx, '-1', async spCtx => { / ... / }) // Negative not allowed
`$3
When a transaction or savepoint is rolled back due to an error, the DAL logs the rollback operation but preserves the original error:
`typescript
import { withTransaction } from '@kysera/dal'try {
await withTransaction(executor, async ctx => {
await createUser(ctx, { email: 'test@example.com', name: 'Test' })
// This will cause rollback
throw new Error('Something went wrong')
})
} catch (error) {
console.error(error.message) // "Something went wrong"
// Transaction was rolled back automatically
// Rollback is logged internally for debugging
}
`Internal logging:
When a rollback occurs, the DAL logs it using
console.error:`
Transaction/savepoint rolled back due to error: [error message]
`This helps with debugging while ensuring the original error is always re-thrown to the caller.
$3
All plugins registered on the executor automatically propagate through transactions:
`typescript
import { createExecutor } from '@kysera/executor'
import { softDeletePlugin } from '@kysera/soft-delete'
import { rlsPlugin } from '@kysera/rls'
import { withTransaction } from '@kysera/dal'const executor = await createExecutor(db, [
softDeletePlugin(),
rlsPlugin({ schema: rlsSchema, getCurrentTenantId: () => tenantId })
])
await withTransaction(executor, async ctx => {
// All queries in transaction have soft-delete and RLS filters applied
const users = await ctx.db.selectFrom('users').selectAll().execute()
// Plugin filters still apply to inserts/updates
const newUser = await ctx.db
.insertInto('users')
.values({ name: 'Alice', tenant_id: tenantId })
.returningAll()
.executeTakeFirst()
return newUser
})
`See the Executor documentation for more details on how plugins work in transactions.
Examples
$3
`typescript
import { Kysely, PostgresDialect } from 'kysely'
import { Pool } from 'pg'
import { createExecutor } from '@kysera/executor'
import { softDeletePlugin } from '@kysera/soft-delete'
import { rlsPlugin } from '@kysera/rls'
import { createQuery, withTransaction, parallel } from '@kysera/dal'// Database schema
interface Database {
users: {
id: number
email: string
name: string
tenant_id: number
deleted_at: Date | null
}
posts: {
id: number
user_id: number
title: string
body: string
tenant_id: number
deleted_at: Date | null
}
}
// Initialize database
const db = new Kysely({
dialect: new PostgresDialect({
pool: new Pool({ connectionString: process.env.DATABASE_URL })
})
})
// Create executor with plugins
const executor = await createExecutor(db, [
softDeletePlugin(),
rlsPlugin({
schema: {
users: { tenantIdColumn: 'tenant_id' },
posts: { tenantIdColumn: 'tenant_id' }
},
getCurrentTenantId: () => currentTenantId
})
])
// Define query functions
const getUserById = createQuery((ctx, id: number) =>
ctx.db.selectFrom('users').selectAll().where('id', '=', id).executeTakeFirst()
)
const getPostsByUserId = createQuery((ctx, userId: number) =>
ctx.db.selectFrom('posts').selectAll().where('user_id', '=', userId).execute()
)
const createUser = createQuery((ctx, data: { email: string; name: string; tenant_id: number }) =>
ctx.db.insertInto('users').values(data).returningAll().executeTakeFirstOrThrow()
)
const createPost = createQuery(
(ctx, data: { user_id: number; title: string; body: string; tenant_id: number }) =>
ctx.db.insertInto('posts').values(data).returningAll().executeTakeFirstOrThrow()
)
// Use queries - plugins apply automatically
const user = await getUserById(executor, 1)
// Only returns if:
// - deleted_at IS NULL (soft-delete plugin)
// - tenant_id = currentTenantId (RLS plugin)
// Atomic operations with transaction
const result = await withTransaction(executor, async ctx => {
const newUser = await createUser(ctx, {
email: 'test@example.com',
name: 'Test User',
tenant_id: currentTenantId
})
const newPost = await createPost(ctx, {
user_id: newUser.id,
title: 'First Post',
body: 'Hello World',
tenant_id: currentTenantId
})
return { user: newUser, post: newPost }
})
// Parallel queries
const getUserData = parallel({
user: getUserById,
posts: getPostsByUserId
})
const userData = await getUserData(executor, userId)
// { user: {...}, posts: [...] }
`$3
`typescript
import { createQuery, withTransaction, parallel } from '@kysera/dal'// Queries
const createUser = createQuery((ctx, data: { email: string; name: string }) =>
ctx.db.insertInto('users').values(data).returningAll().executeTakeFirstOrThrow()
)
const createUserProfile = createQuery((ctx, data: { user_id: number; bio: string }) =>
ctx.db.insertInto('profiles').values(data).returningAll().executeTakeFirstOrThrow()
)
const getUserById = createQuery((ctx, id: number) =>
ctx.db.selectFrom('users').selectAll().where('id', '=', id).executeTakeFirst()
)
const getProfileByUserId = createQuery((ctx, userId: number) =>
ctx.db.selectFrom('profiles').selectAll().where('user_id', '=', userId).executeTakeFirst()
)
// Service function using transaction
async function registerUser(executor: KyseraExecutor, data: RegisterData) {
return withTransaction(executor, async ctx => {
const user = await createUser(ctx, {
email: data.email,
name: data.name
})
const profile = await createUserProfile(ctx, {
user_id: user.id,
bio: data.bio
})
return { user, profile }
})
}
// Fetch user data in parallel
const getUserData = parallel({
user: getUserById,
profile: getProfileByUserId
})
const userData = await getUserData(executor, userId)
`$3
`typescript
import { createQuery, compose } from '@kysera/dal'const getPostById = createQuery((ctx, id: number) =>
ctx.db.selectFrom('posts').selectAll().where('id', '=', id).executeTakeFirstOrThrow()
)
const getUserById = createQuery((ctx, id: number) =>
ctx.db.selectFrom('users').selectAll().where('id', '=', id).executeTakeFirstOrThrow()
)
const getPostWithAuthor = compose(getPostById, async (ctx, post) => ({
...post,
author: await getUserById(ctx, post.user_id)
}))
const post = await getPostWithAuthor(executor, postId)
// { id, title, body, user_id, author: { id, name, email } }
``- Node.js: >=20.0.0
- Bun: >=1.0.0
- Kysely: >=0.28.8 (peer dependency)
- @kysera/executor: >=0.7.0 (dependency)
MIT