ClickHouse HTTP/TCP client with native compression (LZ4/ZSTD) and Native format support
npm install @maxjustus/chttpClickHouse HTTP/TCP client with native compression (LZ4/ZSTD) and Native format support.
``bash`
npm install @maxjustus/chttp
| Feature | HTTP | TCP |
|---------|------|-----|
| Real-time progress | No | Yes (rows read/written, bytes, memory, CPU) |
| Server logs | No | Yes (via send_logs_level setting) |
| Profile events | No | Yes (detailed execution metrics) |
| Long-running queries | Timeout-prone | Robust (persistent connection) |
| Browser support | Yes | No (Node/Bun/Deno only) |
Use HTTP for browser apps or simple queries. Use TCP for observability, long-running queries, and lower latency.
`ts
import { insert, query, streamEncodeJsonEachRow, collectText } from "@maxjustus/chttp";
const config = {
baseUrl: "http://localhost:8123/",
auth: { username: "default", password: "" },
};
// Insert
const { summary } = await insert(
"INSERT INTO table FORMAT JSONEachRow",
streamEncodeJsonEachRow([{ id: 1, name: "test" }]),
"session123",
config,
);
console.log(Wrote ${summary.written_rows} rows);
// Query
const json = await collectText(query("SELECT * FROM table FORMAT JSON", "session123", config));
// DDL
for await (const _ of query("CREATE TABLE ...", "session123", config)) {}
`
`ts
import { TcpClient } from "@maxjustus/chttp/tcp";
const client = new TcpClient({ host: "localhost", port: 9000 });
await client.connect();
// Query
for await (const packet of client.query("SELECT * FROM table")) {
if (packet.type === "Data") {
for (const row of packet.batch) console.log(row.id, row.name);
}
}
// Insert
await client.insert("INSERT INTO table", [{ id: 1, name: "alice" }]);
client.close();
`
Both HTTP and TCP clients use ClickHouse's native query parameters with identical {name: Type} syntax:
`ts
// HTTP
const result = await collectText(
query("SELECT {id: UInt64} as id, {name: String} as name FORMAT JSON", sessionId, {
...config,
params: { id: 42, name: "Alice" },
}),
);
// TCP (same syntax)
for await (const packet of client.query(
"SELECT * FROM users WHERE age > {min_age: UInt32}",
{ params: { min_age: 18 } }
)) { / ... / }
`
Parameters are type-safe and prevent SQL injection. The type annotation (e.g., {name: String}) tells ClickHouse how to parse the value.
The insert function accepts Uint8Array, Uint8Array[], or AsyncIterable. Use streamEncodeJsonEachRow for JSON data:
`tsdata_${i}
// Streaming JSON objects
async function* generateRows() {
for (let i = 0; i < 1000000; i++) {
yield { id: i, value: };
}
}
await insert(
"INSERT INTO large_table FORMAT JSONEachRow",
streamEncodeJsonEachRow(generateRows()),
"session123",
{
compression: "zstd",
onProgress: (p) => console.log(${p.bytesUncompressed} bytes),
},
);
// Streaming raw bytes (any format)
async function* generateCsvChunks() {
const encoder = new TextEncoder();
for (let batch = 0; batch < 1000; batch++) {
let chunk = "";
for (let i = 0; i < 1000; i++) {
chunk += ${batch * 1000 + i},value_${i}\n;
}
yield encoder.encode(chunk);
}
}
await insert(
"INSERT INTO large_table FORMAT CSV",
generateCsvChunks(),
"session123",
{ compression: "lz4" },
);
`
The query() function yields raw Uint8Array chunks aligned to compression blocks, not rows. Use helpers to parse:
`ts
import {
query,
streamText,
streamLines,
streamDecodeJsonEachRow,
collectJsonEachRow,
collectText,
collectBytes,
} from "@maxjustus/chttp";
// JSONEachRow - streaming parsed objects
for await (const row of streamDecodeJsonEachRow(
query("SELECT * FROM t FORMAT JSONEachRow", session, config),
)) {
console.log(row.id, row.name);
}
const res = await collectJsonEachRow(
query("SELECT * FROM t FORMAT JSONEachRow", session, config),
);
// CSV/TSV - streaming raw lines
for await (const line of streamLines(
query("SELECT * FROM t FORMAT CSV", session, config),
)) {
const [id, name] = line.split(",");
}
// JSON format - buffer entire response
const json = await collectText(
query("SELECT * FROM t FORMAT JSON", session, config),
);
const data = JSON.parse(json);
`
ClickHouse's internal wire format. Returns columnar data (RecordBatch) rather than materializing all rows upfront.
`ts
import {
insert,
query,
encodeNative,
streamDecodeNative,
rows,
collectRows,
batchFromRows,
batchFromCols,
getCodec,
} from "@maxjustus/chttp";
const schema = [
{ name: "id", type: "UInt32" },
{ name: "name", type: "String" },
];
// From row arrays
const batch = batchFromRows(schema, [
[1, "alice"],
[2, "bob"],
[3, "charlie"],
]);
// From pre-built columns (zero-copy for TypedArrays)
const batch2 = batchFromCols({
id: getCodec("UInt32").fromValues(new Uint32Array([1, 2, 3])),
name: getCodec("String").fromValues(["alice", "bob", "charlie"]),
});
// From generators (streaming row construction)
function* generateRows() {
yield [1, "alice"];
yield [2, "bob"];
yield [3, "charlie"];
}
const batch3 = batchFromRows(schema, generateRows());
// Encode and insert
await insert(
"INSERT INTO t FORMAT Native",
encodeNative(batch),
"session",
config,
);
// Query returns columnar data as RecordBatch - stream rows directly
for await (const row of rows(
streamDecodeNative(query("SELECT * FROM t FORMAT Native", "session", config)),
)) {
console.log(row.id, row.name);
}
// Or collect all rows at once (materialized to plain objects)
const allRows = await collectRows(
streamDecodeNative(query("SELECT * FROM t FORMAT Native", "session", config)),
);
// Work with batches directly for columnar access
for await (const batch of streamDecodeNative(
query("SELECT * FROM t FORMAT Native", "session", config),
)) {
const ids = batch.getColumn("id")!;
for (let i = 0; i < ids.length; i++) {
console.log(ids.get(i));
}
}
`
Build columns independently with getCodec().fromValues():
`ts
const idCol = getCodec("UInt32").fromValues([1, 2, 3]);
const nameCol = getCodec("String").fromValues(["alice", "bob", "charlie"]);
// Columns carry their type - schema is derived automatically
const batch = batchFromCols({ id: idCol, name: nameCol });
// batch.schema = [{ name: "id", type: "UInt32" }, { name: "name", type: "String" }]
`
For numeric columns, pass TypedArrays (e.g., Uint32Array, Float64Array) for zero-copy construction.
`ts
// Array(Int32)
batchFromCols({
tags: getCodec("Array(Int32)").fromValues([[1, 2], [3, 4, 5], [6]]),
});
// Tuple(Float64, Float64) - positional
batchFromCols({
point: getCodec("Tuple(Float64, Float64)").fromValues([[1.0, 2.0], [3.0, 4.0]]),
});
// Tuple(x Float64, y Float64) - named tuples use objects
batchFromCols({
point: getCodec("Tuple(x Float64, y Float64)").fromValues([
{ x: 1.0, y: 2.0 },
{ x: 3.0, y: 4.0 },
]),
});
// Map(String, Int32)
batchFromCols({
meta: getCodec("Map(String, Int32)").fromValues([{ a: 1, b: 2 }, new Map([["c", 3]])]),
});
// Nullable(String)
batchFromCols({
note: getCodec("Nullable(String)").fromValues(["hello", null, "world"]),
});
// Variant(String, Int64, Bool) - type inferred from values
batchFromCols({
val: getCodec("Variant(String, Int64, Bool)").fromValues(["hello", 42n, true, null]),
});
// Variant with explicit discriminators (for ambiguous cases)
batchFromCols({
val: getCodec("Variant(String, Int64, Bool)").fromValues([
[0, "hello"], [1, 42n], [2, true], null
]),
});
// Dynamic - types inferred automatically
batchFromCols({
dyn: getCodec("Dynamic").fromValues(["hello", 42, true, [1, 2, 3], null]),
});
// JSON - plain objects
batchFromCols({
data: getCodec("JSON").fromValues([{ a: 1, b: "x" }, { a: 2, c: true }]),
});
`
`ts
import { insert, streamEncodeNative, batchFromCols, getCodec } from "@maxjustus/chttp";
async function* generateBatches() {
const batchSize = 10000;
for (let i = 0; i < 100; i++) {
const ids = new Uint32Array(batchSize);
const values = new Float64Array(batchSize);
for (let j = 0; j < batchSize; j++) {
ids[j] = i * batchSize + j;
values[j] = Math.random();
}
yield batchFromCols({
id: getCodec("UInt32").fromValues(ids),
value: getCodec("Float64").fromValues(values),
});
}
}
await insert(
"INSERT INTO t FORMAT Native",
streamEncodeNative(generateBatches()),
"session",
config,
);
`
Supports all ClickHouse types.
Limitation: Dynamic and JSON types require V3 flattened format. On ClickHouse 25.6+, set output_format_native_use_flattened_dynamic_and_json_serialization=1.
ClickHouse 64-bit+ integers (Int64, UInt64, Int128, etc.) are returned as JavaScript BigInt. Pass { bigIntAsString: true } to convert to strings for JSON serialization:
`ts`
const row = batch.get(0, { bigIntAsString: true });
const obj = row.toObject({ bigIntAsString: true });
const allRows = batch.toArray({ bigIntAsString: true });
> Global alternative: Add BigInt.prototype.toJSON = function() { return this.toString(); }; at startup. See MDN.
Direct TCP protocol. Single connection per client - use separate clients for concurrent operations.
`ts
import { TcpClient } from "@maxjustus/chttp/tcp";
const client = new TcpClient({
host: "localhost",
port: 9000,
database: "default",
user: "default",
password: "",
});
await client.connect();
for await (const packet of client.query("SELECT * FROM table")) {
if (packet.type === "Data") {
for (const row of packet.batch) {
console.log(row.id, row.name);
}
}
}
// DDL statements
await client.query("CREATE TABLE ...");
// Insert - returns Packet[] (TCP streams progress during insert)
await client.insert("INSERT INTO table", [{ id: 1, name: "alice" }]);
client.close();
`
`ts`
const client = new TcpClient({
host: "localhost",
port: 9000,
database: "default",
user: "default",
password: "",
compression: "lz4", // 'lz4' | 'zstd' | false
connectTimeout: 10000, // ms
queryTimeout: 30000, // ms
tls: true, // or tls.ConnectionOptions
});
Query yields packets - handle by type:
`ts${packet.batch.rowCount} rows
for await (const packet of client.query(sql, { settings: { send_logs_level: "trace" } })) {
switch (packet.type) {
case "Data":
console.log();${packet.progress.readRows} rows read
break;
case "Progress":
console.log();[${entry.source}] ${entry.text}
break;
case "Log":
for (const entry of packet.entries) {
console.log();${packet.info.rows} total rows
}
break;
case "ProfileInfo":
console.log();`
break;
case "EndOfStream":
break;
}
}
Progress packets contain delta values (increments since the last packet). The client accumulates these into running totals available via packet.accumulated:
`ts${accumulated.percent}% complete
for await (const packet of client.query(sql)) {
if (packet.type === "Progress") {
const { accumulated } = packet;
console.log();Read: ${accumulated.readRows} rows, ${accumulated.readBytes} bytes
console.log();Elapsed: ${Number(accumulated.elapsedNs) / 1e9}s
console.log();`
}
}
ProfileEvents provide execution metrics. Memory and CPU stats are merged into accumulated progress:
`tsMemory: ${accumulated.memoryUsage} bytes
for await (const packet of client.query(sql)) {
if (packet.type === "Progress") {
const { accumulated } = packet;
console.log();Peak memory: ${accumulated.peakMemoryUsage} bytes
console.log();CPU time: ${accumulated.cpuTimeMicroseconds}µs
console.log();CPU cores utilized: ${accumulated.cpuUsage.toFixed(1)}
console.log();
}
if (packet.type === "ProfileEvents") {
// Raw accumulated event counters
console.log(Selected rows: ${packet.accumulated.get("SelectedRows")});Read bytes: ${packet.accumulated.get("ReadCompressedBytes")}
console.log();`
}
}
memoryUsage is the latest value; peakMemoryUsage is the max seen. cpuUsage shows equivalent CPUs utilized.
The insert() method accepts RecordBatches or row objects:
`ts
// Single batch
await client.insert("INSERT INTO t", batch);
// Multiple batches
await client.insert("INSERT INTO t", [batch1, batch2]);
// Row objects with auto-coercion (types inferred from server schema; unknown keys ignored; omitted keys use defaults)
await client.insert("INSERT INTO t", [
{ id: 1, name: "alice" },
{ id: 2, name: "bob" },
]);
// Streaming rows with generator
async function* generateRows() {
for (let i = 0; i < 1000000; i++) {
yield { id: i, name: user${i} };
}
}
// batchSize dictates number of rows per RecordBatch (native insert block) sent
await client.insert("INSERT INTO t", generateRows(), { batchSize: 10000 });
// Schema validation (fail fast if types don't match the schema the server sends for the insert table)
await client.insert("INSERT INTO t", rows, {
schema: [
{ name: "id", type: "UInt32" },
{ name: "name", type: "String" },
],
});
`
#### Insert Progress Tracking
Both query() and insert() return a CollectableAsyncGenerator:await
- collects all packets into an arrayfor await
- streams packets one at a time
`tsWrote ${progress.accumulated.writtenRows} rows
// Collect all packets
const packets = await client.insert("INSERT INTO t", rows);
const progress = packets.findLast(p => p.type === "Progress");
if (progress?.type === "Progress") {
console.log();
}
// Stream packets (useful for real-time progress on large inserts)
for await (const packet of client.insert("INSERT INTO t", generateRows())) {
if (packet.type === "Progress") {
console.log(Written: ${packet.accumulated.writtenRows} rows);`
}
}
Use separate connections for concurrent read/write:
`ts
import { TcpClient, recordBatches } from "@maxjustus/chttp/tcp";
const readClient = new TcpClient(options);
const writeClient = new TcpClient(options);
await readClient.connect();
await writeClient.connect();
// Stream RecordBatches from one table to another
await writeClient.insert(
"INSERT INTO dst",
recordBatches(readClient.query("SELECT * FROM src")),
);
`
`ts
const controller = new AbortController();
setTimeout(() => controller.abort(), 5000);
await client.connect({ signal: controller.signal });
for await (const p of client.query(sql, {}, { signal: controller.signal })) {
// ...
}
`
`ts`
await using client = await TcpClient.connect(options);
// automatically closed when scope exits
Send temporary in-memory tables with your query. Schema is auto-extracted from RecordBatch.
Pass RecordBatches directly to either client:
`ts
import { batchFromCols, getCodec, query, collectText } from "@maxjustus/chttp";
const users = batchFromCols({
id: getCodec("UInt32").fromValues(new Uint32Array([1, 2, 3])),
name: getCodec("String").fromValues(["Alice", "Bob", "Charlie"]),
});
// TCP
for await (const packet of client.query(
"SELECT * FROM users WHERE id > 1",
{ externalTables: { users } }
)) {
if (packet.type === "Data") {
for (const row of packet.batch) console.log(row.name);
}
}
// HTTP - same API
const result = await collectText(query(
"SELECT * FROM users WHERE id > 1 FORMAT JSON",
sessionId,
{ baseUrl, auth, externalTables: { users } }
));
`
Supports streaming via iterables/async iterables of RecordBatch:
`ts
async function* generateBatches() {
for (let i = 0; i < 10; i++) {
yield batchFromCols({ id: getCodec("UInt32").fromValues([i]) });
}
}
// Works with both TCP and HTTP
await client.query("SELECT sum(id) FROM data", {
externalTables: { data: generateBatches() }
});
`
For raw TSV/CSV/JSON data, use the explicit structure form:
`ts`
const result = await collectText(query(
"SELECT * FROM mydata ORDER BY id FORMAT JSON",
sessionId,
{
baseUrl, auth,
externalTables: {
mydata: {
structure: "id UInt32, name String",
format: "TabSeparated", // or JSONEachRow, CSV, etc.
data: "1\tAlice\n2\tBob\n"
}
}
}
));
Configure with timeout (ms) or provide an AbortSignal for manual cancellation:
`ts
// Custom timeout
await insert(query, data, sessionId, { timeout: 60_000 });
// Manual cancellation
const controller = new AbortController();
setTimeout(() => controller.abort(), 5000);
await insert(query, data, sessionId, { signal: controller.signal });
// Both (whichever triggers first)
await insert(query, data, sessionId, {
signal: controller.signal,
timeout: 60_000,
});
`
Requires Node.js 20+, Bun, Deno, or modern browsers (Chrome 116+, Firefox 124+, Safari 17.4+) for AbortSignal.any().
The HTTP client throws standard Error objects with the HTTP status and response body:
`ts`
try {
for await (const _ of query("SELECT * FROM nonexistent", session, config)) {}
} catch (err) {
// err.message: "Query failed: 404 - Code: 60. DB::Exception: Table ... doesn't exist..."
}
Insert errors follow the same pattern:
`ts`
try {
await insert("INSERT INTO t FORMAT JSONEachRow", data, session, config);
} catch (err) {
// err.message: "Insert failed: 400 - Code: 27. DB::Exception: Cannot parse..."
}
The TCP client throws ClickHouseException for server errors, which includes structured details:
`ts
import { TcpClient, ClickHouseException } from "@maxjustus/chttp/tcp";
try {
for await (const _ of client.query("SELECT * FROM nonexistent")) {}
} catch (err) {
if (err instanceof ClickHouseException) {
console.log(err.code); // 60 (UNKNOWN_TABLE)
console.log(err.exceptionName); // "DB::Exception"
console.log(err.message); // "Table ... doesn't exist"
console.log(err.serverStackTrace); // Full server-side stack trace
console.log(err.nested); // Nested exception if present
}
}
`
Connection and protocol errors throw standard Error:
`ts`
try {
await client.connect();
} catch (err) {
// err.message: "Connection timeout after 10000ms"
// err.message: "Not connected"
// err.message: "Connection busy - cannot run concurrent operations..."
}
Set compression in options:
- "lz4" - fast, uses native bindings when available with WASM fallback (default)"zstd"
- - ~2x better compression, uses native bindings when available with WASM fallbackfalse
- - no compression
ZSTD and LZ4 use native bindings in Node.js/Bun when available, falling back to WASM in browsers and Deno.
Benchmarks on Apple M4 Max, 10k rows. Native format is ClickHouse's columnar wire format.
| Scenario | JSON+LZ4 | Native+LZ4 | JSON+ZSTD | Native+ZSTD | JSON+gzip | Native+gzip |
|----------|----------|------------|-----------|-------------|-----------|-------------|
| Simple (6 cols) | 12.2ms | 2.2ms | 12.6ms | 2.4ms | 19.7ms | 8.0ms |
| Escape-heavy strings | 3.5ms | 2.7ms | 3.4ms | 2.7ms | 6.2ms | 6.5ms |
| Arrays (50 floats/row) | 31ms | 8.3ms | 69ms | 12ms | 301ms | 113ms |
| Variant | 1.1ms | 0.8ms | 1.3ms | 0.9ms | 6.5ms | 5.4ms |
| Dynamic | 1.0ms | 0.8ms | 1.2ms | 0.9ms | 4.2ms | 4.5ms |
| JSON column | 2.7ms | 3.0ms | 3.1ms | 3.2ms | 11.2ms | 9.6ms |
| Scenario | LZ4 | ZSTD | gzip |
|----------|-----|------|------|
| Simple (6 cols) | 65% | 68% | 71% |
| Escape-heavy strings | 92% | 140%* | 84% |
| Arrays (50 floats/row) | 48% | 82% | 85% |
| Variant | 70% | 96% | 73% |
| Dynamic | 72% | 98% | 65% |
| JSON column | 56% | 67% | 67% |
*Escape-heavy strings: JSON's escaping creates repetitive patterns that ZSTD compresses exceptionally well.
Summary: LZ4 is fastest, ZSTD compresses best. Native format wins on both speed and size for most data shapes. Exception: highly repetitive escaped strings where JSON's redundancy helps ZSTD.
Run node --experimental-strip-types bench/formats.ts to reproduce.
`bash`
npm test # runs integration tests against ClickHouse via testcontainers
make test-tcp # TCP client tests (requires local ClickHouse on port 9000)
make fuzz-tcp # TCP fuzz tests (FUZZ_ITERATIONS=10 FUZZ_ROWS=20000)
Requires Node.js 20+, Bun, or Deno.
Run queries directly from the command line via the bundled TCP client:
`bashSingle query (outputs NDJSON packets)
npx @maxjustus/chttp 'SELECT version()'
bunx @maxjustus/chttp 'SELECT 1 + 1'
Configure via environment variables:
`bash
CH_HOST=clickhouse.example.com CH_PORT=9000 npx @maxjustus/chttp 'SELECT 1'
`| Variable | Default | Description |
|----------|---------|-------------|
| CH_HOST | localhost | ClickHouse host |
| CH_PORT | 9000 | TCP native port |
| CH_USER | default | Username |
| CH_PASSWORD | "" | Password |
The REPL supports
\load file.jsonl INTO table` for bulk inserts from NDJSON files.