Created
April 1, 2025 22:46
-
-
Save a10y/1054cc5c042658ec9433bb2904267389 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.iceberg.parquet; | |
import java.io.IOException; | |
import java.util.ArrayList; | |
import java.util.List; | |
import org.apache.avro.Schema; | |
import org.apache.avro.generic.GenericRecord; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.parquet.avro.AvroParquetReader; | |
import org.apache.parquet.avro.AvroParquetWriter; | |
import org.apache.parquet.hadoop.ParquetReader; | |
import org.apache.parquet.hadoop.ParquetWriter; | |
import org.apache.parquet.hadoop.metadata.CompressionCodecName; | |
import org.junit.jupiter.params.ParameterizedTest; | |
import org.junit.jupiter.params.provider.ValueSource; | |
public class ParquetStuffTest { | |
// 512MiB | |
private static final long ROW_GROUP_SIZE = 512 * 1024 * 1024; | |
// 1GiB | |
private static final long MAX_FILE_SIZE = 1024 * 1024 * 1024; | |
@ParameterizedTest | |
@ValueSource( | |
strings = { | |
"customer", | |
"lineitem", | |
"nation", | |
"orders", | |
"partsupp", | |
"part", | |
"region", | |
"supplier" | |
}) | |
public void rewriteMyFiles(String tableName) throws Exception { | |
Configuration conf = new Configuration(); | |
Path inputPath = | |
new Path("/Volumes/Datasets/TPCH/sf=100/unpartitioned/" + tableName + ".parquet"); | |
Path outputDir = new Path("/Volumes/Datasets/TPCH/sf=100/partitioned/"); | |
Path tmpDir = new Path("/tmp"); | |
// Create AvroParquetReader to read GenericRecords. | |
ParquetReader<GenericRecord> reader = | |
AvroParquetReader.<GenericRecord>builder(inputPath).withConf(conf).build(); | |
GenericRecord firstRecord = reader.read(); | |
if (firstRecord == null) { | |
System.err.println("Input file is empty."); | |
reader.close(); | |
return; | |
} | |
Schema schema = firstRecord.getSchema(); | |
// Buffer to accumulate records for one row group. | |
List<GenericRecord> buffer = new ArrayList<>(); | |
buffer.add(firstRecord); | |
int fileIndex = 0; | |
long currentFileSize = 0; | |
Path currentOutPath = new Path(outputDir, tableName + "_" + fileIndex + ".parquet"); | |
ParquetWriter<GenericRecord> writer = | |
AvroParquetWriter.<GenericRecord>builder(currentOutPath) | |
.withSchema(schema) | |
.withConf(conf) | |
.withCompressionCodec(CompressionCodecName.ZSTD) | |
.withRowGroupSize(ROW_GROUP_SIZE) | |
.build(); | |
// Define a buffer threshold (adjust as needed). | |
final int BUFFER_THRESHOLD = 10_000; | |
GenericRecord record; | |
while ((record = reader.read()) != null) { | |
buffer.add(record); | |
if (buffer.size() >= BUFFER_THRESHOLD) { | |
// Estimate size of the buffered row group. | |
long groupSize = estimateRowGroupSize(buffer, schema, conf, ROW_GROUP_SIZE, tmpDir); | |
// If adding this group would exceed the max file size, start a new file. | |
if (currentFileSize + groupSize > MAX_FILE_SIZE) { | |
writer.close(); | |
fileIndex++; | |
currentOutPath = new Path(outputDir, tableName + "_" + fileIndex + ".parquet"); | |
writer = | |
AvroParquetWriter.<GenericRecord>builder(currentOutPath) | |
.withSchema(schema) | |
.withConf(conf) | |
.withCompressionCodec(CompressionCodecName.ZSTD) | |
.withRowGroupSize(ROW_GROUP_SIZE) | |
.build(); | |
currentFileSize = 0; | |
} | |
// Write buffered records. | |
for (GenericRecord rec : buffer) { | |
writer.write(rec); | |
} | |
currentFileSize += groupSize; | |
buffer.clear(); | |
} | |
} | |
// Write any remaining records. | |
if (!buffer.isEmpty()) { | |
long groupSize = estimateRowGroupSize(buffer, schema, conf, ROW_GROUP_SIZE, tmpDir); | |
if (currentFileSize + groupSize > MAX_FILE_SIZE) { | |
writer.close(); | |
fileIndex++; | |
currentOutPath = new Path(outputDir, tableName + "_" + fileIndex + ".parquet"); | |
writer = | |
AvroParquetWriter.<GenericRecord>builder(currentOutPath) | |
.withSchema(schema) | |
.withConf(conf) | |
.withCompressionCodec(CompressionCodecName.ZSTD) | |
.withRowGroupSize(ROW_GROUP_SIZE) | |
.build(); | |
} | |
for (GenericRecord rec : buffer) { | |
writer.write(rec); | |
} | |
} | |
writer.close(); | |
reader.close(); | |
} | |
private static long estimateRowGroupSize( | |
List<GenericRecord> records, | |
Schema schema, | |
Configuration conf, | |
long rowGroupSizeBytes, | |
Path tempDir) | |
throws IOException { | |
String tempFileName = "temp_" + System.nanoTime() + ".parquet"; | |
Path tempPath = new Path(tempDir, tempFileName); | |
ParquetWriter<GenericRecord> tempWriter = | |
AvroParquetWriter.<GenericRecord>builder(tempPath) | |
.withSchema(schema) | |
.withConf(conf) | |
.withCompressionCodec(CompressionCodecName.ZSTD) | |
.withRowGroupSize(rowGroupSizeBytes) | |
.build(); | |
for (GenericRecord rec : records) { | |
tempWriter.write(rec); | |
} | |
tempWriter.close(); | |
FileSystem fs = FileSystem.get(conf); | |
long size = fs.getFileStatus(tempPath).getLen(); | |
fs.delete(tempPath, false); | |
return size; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment