Last active
December 13, 2024 21:19
-
-
Save maxjustus/80d8c50fe352e091e5e74532fced8935 to your computer and use it in GitHub Desktop.
generate and stream parquet to clickhouse in nodejs using clickhouse-js and parquetjs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// npm install @clickhouse/client parquetjs | |
const { createClient } = require('@clickhouse/client'); | |
const { ParquetSchema, ParquetTransformer } = require('parquetjs'); | |
const { Readable, Transform } = require('stream'); | |
// Transform stream to convert from object mode to binary | |
class BinaryModeTransform extends Transform { | |
constructor() { | |
super({ | |
objectMode: false // Output in binary mode for ClickHouse js client | |
}); | |
} | |
_transform(chunk, encoding, callback) { | |
// Ensure chunk is a Buffer | |
if (!Buffer.isBuffer(chunk)) { | |
chunk = Buffer.from(chunk); | |
} | |
this.push(chunk); | |
callback(); | |
} | |
} | |
async function streamParquetToClickhouse({ | |
tableName, | |
parquetSchema, | |
data, | |
clickhouseConfig = {}, | |
parquetOptions = {} | |
}) { | |
// Create ClickHouse client | |
const client = createClient(clickhouseConfig); | |
try { | |
// Create source stream from data | |
const sourceStream = Readable.from(data); | |
// Create parquet transformer | |
const parquetTransformer = new ParquetTransformer(parquetSchema, parquetOptions); | |
const binaryModeTransform = new BinaryModeTransform(); | |
// Pipe source through parquet transformer | |
const parquetStream = sourceStream | |
.pipe(parquetTransformer) | |
.pipe(binaryModeTransform); | |
// Stream directly to ClickHouse | |
await client.insert({ | |
table: tableName, | |
values: parquetStream, | |
format: 'Parquet', | |
}); | |
} finally { | |
await client.close(); | |
} | |
} | |
// Example usage: | |
async function example() { | |
// Define schema matching your ClickHouse table | |
const parquetSchema = new ParquetSchema({ | |
id: { type: 'INT64' }, | |
name: { type: 'UTF8' }, | |
}); | |
// Sample data matching schema | |
const data = [ | |
{ id: 0, name: 'a' }, | |
{ id: 1, name: 'b' }, | |
{ id: 2, name: 'c' } | |
]; | |
try { | |
// Create table if needed | |
const client = createClient(); | |
await client.command({ | |
query: ` | |
CREATE TABLE IF NOT EXISTS example_table | |
(id UInt64, name String, sku Array(UInt8)) | |
ENGINE MergeTree() | |
ORDER BY (id) | |
` | |
}); | |
await client.close(); | |
// Stream data to ClickHouse | |
await streamParquetToClickhouse({ | |
tableName: 'example_table', | |
parquetSchema, | |
data, | |
clickhouseConfig: { | |
// Add any ClickHouse client config options | |
host: 'http://localhost:8123' // Adjust this to your ClickHouse host | |
}, | |
parquetOptions: { | |
// Add any ParquetWriter options | |
rowGroupSize: 100000, | |
pageSize: 8192 | |
} | |
}); | |
// Verify the data was inserted | |
const verificationClient = createClient({ | |
host: 'http://localhost:8123' // Adjust this to your ClickHouse host | |
}); | |
const rs = await verificationClient.query({ | |
query: 'SELECT * FROM example_table', | |
format: 'JSONEachRow', | |
}); | |
console.log('Inserted data:'); | |
for await (const rows of rs.stream()) { | |
rows.forEach(row => { | |
console.log(row.json()); | |
}); | |
} | |
await verificationClient.close(); | |
} catch (error) { | |
console.error('Error:', error); | |
throw error; | |
} | |
} | |
// Run the example | |
(async () => { | |
await example(); | |
console.log("ran!"); | |
})(); | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment