Skip to content

Instantly share code, notes, and snippets.

@a10y
Created April 1, 2025 22:46
Show Gist options
  • Save a10y/1054cc5c042658ec9433bb2904267389 to your computer and use it in GitHub Desktop.
Save a10y/1054cc5c042658ec9433bb2904267389 to your computer and use it in GitHub Desktop.
package org.apache.iceberg.parquet;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
public class ParquetStuffTest {
// 512MiB
private static final long ROW_GROUP_SIZE = 512 * 1024 * 1024;
// 1GiB
private static final long MAX_FILE_SIZE = 1024 * 1024 * 1024;
@ParameterizedTest
@ValueSource(
strings = {
"customer",
"lineitem",
"nation",
"orders",
"partsupp",
"part",
"region",
"supplier"
})
public void rewriteMyFiles(String tableName) throws Exception {
Configuration conf = new Configuration();
Path inputPath =
new Path("/Volumes/Datasets/TPCH/sf=100/unpartitioned/" + tableName + ".parquet");
Path outputDir = new Path("/Volumes/Datasets/TPCH/sf=100/partitioned/");
Path tmpDir = new Path("/tmp");
// Create AvroParquetReader to read GenericRecords.
ParquetReader<GenericRecord> reader =
AvroParquetReader.<GenericRecord>builder(inputPath).withConf(conf).build();
GenericRecord firstRecord = reader.read();
if (firstRecord == null) {
System.err.println("Input file is empty.");
reader.close();
return;
}
Schema schema = firstRecord.getSchema();
// Buffer to accumulate records for one row group.
List<GenericRecord> buffer = new ArrayList<>();
buffer.add(firstRecord);
int fileIndex = 0;
long currentFileSize = 0;
Path currentOutPath = new Path(outputDir, tableName + "_" + fileIndex + ".parquet");
ParquetWriter<GenericRecord> writer =
AvroParquetWriter.<GenericRecord>builder(currentOutPath)
.withSchema(schema)
.withConf(conf)
.withCompressionCodec(CompressionCodecName.ZSTD)
.withRowGroupSize(ROW_GROUP_SIZE)
.build();
// Define a buffer threshold (adjust as needed).
final int BUFFER_THRESHOLD = 10_000;
GenericRecord record;
while ((record = reader.read()) != null) {
buffer.add(record);
if (buffer.size() >= BUFFER_THRESHOLD) {
// Estimate size of the buffered row group.
long groupSize = estimateRowGroupSize(buffer, schema, conf, ROW_GROUP_SIZE, tmpDir);
// If adding this group would exceed the max file size, start a new file.
if (currentFileSize + groupSize > MAX_FILE_SIZE) {
writer.close();
fileIndex++;
currentOutPath = new Path(outputDir, tableName + "_" + fileIndex + ".parquet");
writer =
AvroParquetWriter.<GenericRecord>builder(currentOutPath)
.withSchema(schema)
.withConf(conf)
.withCompressionCodec(CompressionCodecName.ZSTD)
.withRowGroupSize(ROW_GROUP_SIZE)
.build();
currentFileSize = 0;
}
// Write buffered records.
for (GenericRecord rec : buffer) {
writer.write(rec);
}
currentFileSize += groupSize;
buffer.clear();
}
}
// Write any remaining records.
if (!buffer.isEmpty()) {
long groupSize = estimateRowGroupSize(buffer, schema, conf, ROW_GROUP_SIZE, tmpDir);
if (currentFileSize + groupSize > MAX_FILE_SIZE) {
writer.close();
fileIndex++;
currentOutPath = new Path(outputDir, tableName + "_" + fileIndex + ".parquet");
writer =
AvroParquetWriter.<GenericRecord>builder(currentOutPath)
.withSchema(schema)
.withConf(conf)
.withCompressionCodec(CompressionCodecName.ZSTD)
.withRowGroupSize(ROW_GROUP_SIZE)
.build();
}
for (GenericRecord rec : buffer) {
writer.write(rec);
}
}
writer.close();
reader.close();
}
private static long estimateRowGroupSize(
List<GenericRecord> records,
Schema schema,
Configuration conf,
long rowGroupSizeBytes,
Path tempDir)
throws IOException {
String tempFileName = "temp_" + System.nanoTime() + ".parquet";
Path tempPath = new Path(tempDir, tempFileName);
ParquetWriter<GenericRecord> tempWriter =
AvroParquetWriter.<GenericRecord>builder(tempPath)
.withSchema(schema)
.withConf(conf)
.withCompressionCodec(CompressionCodecName.ZSTD)
.withRowGroupSize(rowGroupSizeBytes)
.build();
for (GenericRecord rec : records) {
tempWriter.write(rec);
}
tempWriter.close();
FileSystem fs = FileSystem.get(conf);
long size = fs.getFileStatus(tempPath).getLen();
fs.delete(tempPath, false);
return size;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment