Forever Canada operational utilities and scripts
npm install @botler/bimProduction‑oriented pgvector service with:
- Ingestion of precomputed embeddings
- Stats‑driven driver selection (single vs union branches)
- Adaptive ivfflat.probes
- Partial index lifecycle automation (create/drop thresholds)
- Re‑embedding for new models (additive)
- Structured decision logging
All core features are implemented and test‑verified. Remaining roadmap items (auto‑tune scripts, metrics export, advanced pruning) are optional.
---
If migrations did not already create tables, run once (idempotent):
``sql
CREATE EXTENSION IF NOT EXISTS pgcrypto;
CREATE EXTENSION IF NOT EXISTS vector;
CREATE TABLE IF NOT EXISTS vector_documents (
id TEXT PRIMARY KEY DEFAULT (gen_random_uuid()::text),
source_type TEXT, source_ref TEXT, title TEXT, metadata JSONB,
created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS vector_chunks (
id TEXT PRIMARY KEY,
document_id TEXT REFERENCES vector_documents(id) ON DELETE CASCADE,
text_plain TEXT NOT NULL,
markdown_text TEXT,
page_ids TEXT[],
chunk_index INT,
token_count INT,
overlap_with_previous BOOLEAN,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS vector_embeddings (
chunk_id TEXT REFERENCES vector_chunks(id) ON DELETE CASCADE,
model TEXT NOT NULL,
embedding_dim INT NOT NULL,
embedding VECTOR(3072) NOT NULL,
jurisdiction_code TEXT,
organization_id TEXT,
website_host TEXT,
category_a TEXT,
category_b TEXT,
metadata JSONB,
created_at TIMESTAMPTZ DEFAULT NOW(),
PRIMARY KEY (chunk_id, model)
);
-- Stats table powering driver & index decisions
CREATE TABLE IF NOT EXISTS vector_filter_value_stats (
id BIGSERIAL PRIMARY KEY,
dimension TEXT NOT NULL,
value TEXT NOT NULL,
row_count BIGINT DEFAULT 0,
last_query_at TIMESTAMPTZ,
has_index BOOLEAN DEFAULT FALSE,
UNIQUE (dimension, value)
);
`
Adjust VECTOR(3072) if using a different embedding dimension (must match VECTOR_DIM).
---
Minimum:
- POSTGRES_URI — connection string (required)VECTOR_DIM=3072
- — required embedding length
Optional tuning (defaults shown):
- VECTOR_MAX_K=50 — upper bound on requested kVECTOR_OVERSAMPLE_FACTOR=4
- — union oversampling multiplierVECTOR_MAX_UNION_VALUES=8
- — cap on union branchesVECTOR_LOG_DECISIONS=0|1
- — emit JSON decision logsVECTOR_IVFFLAT_PROBES=0
- — fixed probes (0 = let adaptive decide)VECTOR_PROBES_THRESHOLD=2000
- – candidate budget threshold for adaptive probesVECTOR_MIN_PROBES=3
- , VECTOR_MAX_PROBES=10VECTOR_PARTIAL_INDEX_CREATE_THRESHOLD=15000
- VECTOR_PARTIAL_INDEX_DROP_THRESHOLD=2000
-
Set these in your .env; only the first two are mandatory to start.
---
`ts
import { parseDatabaseUri } from "./src/database/config/environment";
import { DatabaseConnection } from "./src/database/config/connection";
import { VectorStoreService } from "./src/database/services/vector-store.service";
async function initVectorStore() {
const uri = process.env.POSTGRES_URI!; // required
const conn = DatabaseConnection.getInstance(parseDatabaseUri(uri));
await conn.initialize();
const service = new VectorStoreService(conn); // default deterministic mock embedding provider used only for re-embedding tests
return { conn, service };
}
`
If you want automatic embedding generation during re‑embedding with a real model, use the built‑in OpenAI provider (or implement the EmbeddingProvider interface if you prefer another backend).
---
ingestPreprocessed (atomic) expects you to supply:
1. A document metadata objectchunks
2. An ordered array of (each with an id and chunk_index)chunk_id
3. Parallel array of embedding rows (same order, no yet)model
4. The name
`ts
const dim = parseInt(process.env.VECTOR_DIM || "3072", 10);
const { service } = await initVectorStore();
const makeVec = (seed: number) =>
Array.from({ length: dim }, (_, i) => (i === 0 ? seed : 0));
await service.ingestPreprocessed({
document: {
source_type: "policy",
source_ref: "doc-42",
title: "Policy 42",
},
chunks: [
{
id: "doc42-c0",
document_id: "temp",
text_plain: "Opening paragraph",
chunk_index: 0,
},
{
id: "doc42-c1",
document_id: "temp",
text_plain: "Second paragraph",
chunk_index: 1,
},
] as any,
embeddings: [
{
embedding_dim: dim,
embedding: makeVec(1),
jurisdiction_code: "CA-AB",
organization_id: "ORG_X",
website_host: "example.org",
},
{
embedding_dim: dim,
embedding: makeVec(2),
jurisdiction_code: "CA-AB",
organization_id: "ORG_Y",
website_host: "example.org",
},
] as any,
model: "text-embedding-3-large",
});
`
This operation runs in a single database transaction. If any step fails, all inserts are rolled back—preventing partial state and orphan rows. Inserts are idempotent (ON CONFLICT DO NOTHING for chunks and embeddings), so re-running the same ingest converges without duplicates.
Stats are incrementally updated for any non-null jurisdiction_code, organization_id, and website_host values within the same transaction.
---
Driver selection rules:
1. If exactly one value among any of (jurisdiction, organization, host) is provided → single driver path (fastest subset)
2. Else choose a union dimension (one with >1 values) minimizing sum of row_count stats; run parallel branches with oversampling → global dedupe
3. Fallback to global scan if no filter values
`ts`
const queryVec = Array.from({ length: dim }, (_, i) => (i === 0 ? 1 : 0));
const results = await service.query({
model: "text-embedding-3-large",
jurisdiction_codes: ["CA-AB"],
organization_ids: ["ORG_X", "ORG_Y"],
website_hosts: ["example.org"],
k: 5,
embedding: queryVec,
});
console.log(results[0]);
Each result includes: chunk_id, document_id, model, distance, text, and filter dimensions.
Note: Queries may open a short transaction internally to set ivfflat.probes when adaptive tuning is active; queries never modify data.
Importable from the package root for a single call flow. It embeds your input text(s) with OpenAI by default, then queries the vector store and returns per-input results.
Env required:
- POSTGRES_URIVECTOR_DIM
- (e.g., 3072)OPENAI_API_KEY
-
Optional env:
- VECTOR_MODEL_DEFAULT (defaults to text-embedding-3-large)
Usage:
`ts
import { queryByText } from "@botler/bim";
const out = await queryByText(
[
"What are the rules for petition signatures?",
"Where can I submit forms in Edmonton?",
],
{
jurisdiction_codes: ["CA-AB"],
website_hosts: ["www.alberta.ca"],
// category filters are supported
category_a_values: ["policy"],
},
{
k: 5,
// model: "text-embedding-3-large", // optional override
// probes: 5, // optional ivfflat.probes
concurrency: 4, // parallel embedding+query per input
}
);
// Shape: [{ text, results }]
for (const item of out) {
console.log(item.text);
console.log(item.results.slice(0, 2));
}
`
Contract:
- Input text can be a string or string[]
- Filters mirror VectorStoreService.query (jurisdiction, org, host, category_a/b){ k=5, model, probes, concurrency=4 }
- Options: Array<{ text: string, results: VectorRetrievalResult[] }>
- Output: in the same order as inputs
Notes:
- Connection is initialized lazily on first call and reused (singleton).
- If VECTOR_MODEL_DEFAULT is set, it’s used when options.model isn’t provided.VECTOR_LOG_DECISIONS=1
- will include planner logs for underlying queries.
---
Add embeddings for a new model while preserving existing ones:
`ts`
// docId obtained from earlier ingestion response or lookup by source_ref
await service.reembedDocument(docId, { model: "alt-embedding-mock" });
This reads existing chunks, generates vectors through the configured EmbeddingProvider, and inserts new (chunk_id, newModel) rows. No deletion or pruning yet.
---
Trigger an exact recount (use sparingly in large datasets):
`ts`
await service.refreshFilterStats(); // all dimensions
await service.refreshFilterStats("website_host"); // single dimension
Stats table fields:
- row_count (approx via incremental increments; exact after refresh)last_query_at
- (touched during queries)has_index
- (maintained by index lifecycle logic)
---
Use the built‑in OpenAI provider for production embeddings, or supply a custom EmbeddingProvider.
OpenAI provider usage:
`ts
import { VectorStoreService } from "./src/database/services/vector-store.service";
import { createOpenAIEmbeddingProviderFromEnv } from "./src/database/embedding/openai/openai-embedding-provider";
// Env required:
// - OPENAI_API_KEY
// - VECTOR_DIM (e.g., 3072 for text-embedding-3-large)
// Optional tuning: OPENAI_EMBED_BATCH_SIZE, OPENAI_EMBED_MAX_TOKENS_PER_BATCH, etc.
const provider = createOpenAIEmbeddingProviderFromEnv();
const service = new VectorStoreService(conn, provider);
`
Custom provider template:
`ts
import { EmbeddingProvider } from "./src/database/embedding/embedding-provider";
class MyEmbeddingProvider implements EmbeddingProvider {
constructor(private dim = parseInt(process.env.VECTOR_DIM || "3072", 10)) {}
async embed(batch: string[], model: string): Promise
// Return an array of vectors aligned with batch inputs
return batch.map(() => new Array(this.dim).fill(0));
}
}
`
---
ensureIndexesForFilters (internal, invoked where appropriate in future automation) decides per (dimension,value) whether to create or drop a partial IVFFLAT index using thresholds:
- Create when row_count >= VECTOR_PARTIAL_INDEX_CREATE_THRESHOLD and index absentrow_count < VECTOR_PARTIAL_INDEX_DROP_THRESHOLD
- Drop when and index present
Names are sanitized: vec_ (truncated/safe chars). Creation attempts a fallback variant if WITH (lists=...) fails.
Decision log events:
- index_create, index_drop, index_create_error
---
Enable with VECTOR_LOG_DECISIONS=1. Sample line:
`json`
{
"ts": "2025-09-10T00:21:36.664Z",
"component": "vector-store",
"event": "decision",
"mode": "single",
"driver_dimension": "website_host",
"driver_value": "rf.example",
"driver_row_count": 12,
"k": 2,
"probes": null,
"filters": {
"jurisdiction_codes": 1,
"organization_ids": 2,
"website_hosts": 1
}
}
Modes: single, union, union_branch_exec, global, index_create, index_drop, index_create_error.
---
If no explicit probes is passed and VECTOR_IVFFLAT_PROBES=0, adaptive logic estimates a candidate budget (sum of per-branch limits) and scales ivfflat.probes between VECTOR_MIN_PROBES and VECTOR_MAX_PROBES once budget ≥ threshold.
Override per query: pass probes in query() input.
---
Common errors:
- Dimension mismatch → thrown if query embedding length != VECTOR_DIMindex_create_error
- Index creation failure → logged () but does not abort query pathAppError
- Database query failure → rethrown as generic upstream (see connection wrapper)
Ingestion atomicity and idempotency:
- In case of process interruption or a mid-ingest error, the entire ingest is rolled back.
- Because inserts are idempotent, re-running the ingest after a failure will safely complete without creating duplicates.
Recommended: wrap external API embedding generation with retries + circuit breaker.
---
Run full test suite:
`bash`
npm test --silent
Integration tests cover ingestion, single/union retrieval, guardrails, re‑embedding, and index lifecycle decisions.
If you see a Jest forced-exit warning, ensure the DB pool closes (already handled in tests via afterAll).
---
| Objective | Knobs |
| ------------------------------------ | --------------------------------------------------------------------- |
| Lower latency small filtered queries | Ensure single driver filters (supply exactly one value) |
| Balance recall for many values | Increase VECTOR_OVERSAMPLE_FACTOR (watch latency) |VECTOR_MAX_UNION_VALUES
| Reduce wasted work large unions | Lower or adapt thresholds |VECTOR_IVFFLAT_PROBES
| Probe depth tradeoff | Fix or rely on adaptive range |
| Heavy hot value | Allow partial index creation by lowering create threshold temporarily |
Baseline before tuning: capture p95 latency & recall (if you have ground‑truth sets) before changing.
---
1. Connection Pool: If high concurrency is expected, size pool based on CPU \* 4 (rule of thumb) then measure.
2. Vacuum & Analyze: Rely on autovacuum; manual ANALYZE after large bulk ingests can improve planner stats.
3. Backups: Embeddings are reproducible but stats table + document metadata should be backed up normally.
4. Re‑Embedding Strategy: Keep old model for shadow comparisons; once confident, prune (future helper TBD).
5. Observability: Stream decision logs to log pipeline; add application metrics (latency, candidate counts) later.
---
`ts
async function example() {
const { service, conn } = await initVectorStore();
const dim = parseInt(process.env.VECTOR_DIM || "3072", 10);
const v = (x: number) =>
Array.from({ length: dim }, (_, i) => (i === 0 ? x : 0));
// Ingest
const ingest = await service.ingestPreprocessed({
document: { source_type: "manual", source_ref: "ex-1", title: "Example" },
chunks: [
{
id: "ex-c0",
document_id: "temp",
text_plain: "Example text",
chunk_index: 0,
},
] as any,
embeddings: [
{
embedding_dim: dim,
embedding: v(5),
jurisdiction_code: "CA-AB",
organization_id: "ORG1",
website_host: "ex.host",
},
] as any,
model: "text-embedding-3-large",
});
// Query
const res = await service.query({
model: "text-embedding-3-large",
jurisdiction_codes: ["CA-AB"],
organization_ids: ["ORG1"],
website_hosts: ["ex.host"],
k: 3,
embedding: v(5),
});
console.log(res);
// Re-embed in new model
await service.reembedDocument(ingest.document_id, {
model: "alt-embedding-mock",
});
await conn.close();
}
`
---
- Auto calibration script (collect latency/recall ⇒ suggest env)
- Metrics emitter (Prometheus / OTEL hooks)
- Old model pruning policy helper
- Hybrid lexical + vector phase
- Optional reranking with LLM or cross encoder
---
Q: Do I need partial indexes now?
A: The lifecycle logic is present; create thresholds are high to avoid churn. Manually lower thresholds in staging to validate.
Q: Can I store embeddings for multiple dimensions?
A: All embeddings share a single vector column (dimension fixed via VECTOR_DIM). Different models distinguished by model column.
Q: How do I delete a document?
A: Delete from vector_documents; cascades remove chunks + embeddings.
Q: How big can k be? VECTOR_MAX_K
A: Capped by (default 50) to guard latency & memory.
---
1. Set POSTGRES_URI & VECTOR_DIM.VECTOR_LOG_DECISIONS=1
2. Run schema bootstrap (or migrations).
3. Ingest at least one document with embeddings.
4. Run a query & verify results.
5. (Optional) Enable briefly to observe planning.
6. Set production thresholds for index lifecycle if large skew in value cardinalities.
---
If you seeded synthetic or test data (e.g. via the recall harness) and want to remove it:
Dry run (synthetic only):
`bash`
npx tsx scripts/database/cleanup-vector-store.ts
Apply deletion of synthetic rows (identified by source_type='recall'):
`bash`
npx tsx scripts/database/cleanup-vector-store.ts --apply
Full wipe of all vector store tables (irreversible) plus dropping partial indexes:
`bash`
npx tsx scripts/database/cleanup-vector-store.ts --mode=all --apply --drop-partial-indexes
Pass --verbose for before/after counts. The script requires POSTGRES_URI.
---
You can resume long-running ingestion without redoing preprocess:
- From preprocess (.pre.json):
`bash`
npx tsx scripts/ingest-documents-into-vector-store/ingest-documents-into-vector-store.ts \
--input=scripts/ingest-documents-into-vector-store/catalog.alberta.json \
--resume=RAG \
--resume-input=scripts/ingest-documents-into-vector-store/processed/2023NBCAE-V1_National_Building_Code2023_Alberta_Edition.pdf.pre.json
- From embeddings (.embed.json) to store only:
`bash`
npx tsx scripts/ingest-documents-into-vector-store/ingest-documents-into-vector-store.ts \
--input=scripts/ingest-documents-into-vector-store/catalog.alberta.json \
--resume=store \
--resume-input=scripts/ingest-documents-into-vector-store/processed/ab-001.embed.json
See scripts/ingest-documents-into-vector-store/README.md` for full details.
---
End.