Skip to content

Instantly share code, notes, and snippets.

@maxjustus
Last active December 13, 2024 21:19
Show Gist options
  • Save maxjustus/80d8c50fe352e091e5e74532fced8935 to your computer and use it in GitHub Desktop.
Save maxjustus/80d8c50fe352e091e5e74532fced8935 to your computer and use it in GitHub Desktop.
generate and stream parquet to clickhouse in nodejs using clickhouse-js and parquetjs
// npm install @clickhouse/client parquetjs
const { createClient } = require('@clickhouse/client');
const { ParquetSchema, ParquetTransformer } = require('parquetjs');
const { Readable, Transform } = require('stream');
// Transform stream to convert from object mode to binary
class BinaryModeTransform extends Transform {
constructor() {
super({
objectMode: false // Output in binary mode for ClickHouse js client
});
}
_transform(chunk, encoding, callback) {
// Ensure chunk is a Buffer
if (!Buffer.isBuffer(chunk)) {
chunk = Buffer.from(chunk);
}
this.push(chunk);
callback();
}
}
async function streamParquetToClickhouse({
tableName,
parquetSchema,
data,
clickhouseConfig = {},
parquetOptions = {}
}) {
// Create ClickHouse client
const client = createClient(clickhouseConfig);
try {
// Create source stream from data
const sourceStream = Readable.from(data);
// Create parquet transformer
const parquetTransformer = new ParquetTransformer(parquetSchema, parquetOptions);
const binaryModeTransform = new BinaryModeTransform();
// Pipe source through parquet transformer
const parquetStream = sourceStream
.pipe(parquetTransformer)
.pipe(binaryModeTransform);
// Stream directly to ClickHouse
await client.insert({
table: tableName,
values: parquetStream,
format: 'Parquet',
});
} finally {
await client.close();
}
}
// Example usage:
async function example() {
// Define schema matching your ClickHouse table
const parquetSchema = new ParquetSchema({
id: { type: 'INT64' },
name: { type: 'UTF8' },
});
// Sample data matching schema
const data = [
{ id: 0, name: 'a' },
{ id: 1, name: 'b' },
{ id: 2, name: 'c' }
];
try {
// Create table if needed
const client = createClient();
await client.command({
query: `
CREATE TABLE IF NOT EXISTS example_table
(id UInt64, name String, sku Array(UInt8))
ENGINE MergeTree()
ORDER BY (id)
`
});
await client.close();
// Stream data to ClickHouse
await streamParquetToClickhouse({
tableName: 'example_table',
parquetSchema,
data,
clickhouseConfig: {
// Add any ClickHouse client config options
host: 'http://localhost:8123' // Adjust this to your ClickHouse host
},
parquetOptions: {
// Add any ParquetWriter options
rowGroupSize: 100000,
pageSize: 8192
}
});
// Verify the data was inserted
const verificationClient = createClient({
host: 'http://localhost:8123' // Adjust this to your ClickHouse host
});
const rs = await verificationClient.query({
query: 'SELECT * FROM example_table',
format: 'JSONEachRow',
});
console.log('Inserted data:');
for await (const rows of rs.stream()) {
rows.forEach(row => {
console.log(row.json());
});
}
await verificationClient.close();
} catch (error) {
console.error('Error:', error);
throw error;
}
}
// Run the example
(async () => {
await example();
console.log("ran!");
})();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment