Skip to content

Instantly share code, notes, and snippets.

@recalde
Created November 9, 2024 02:34
Show Gist options
  • Save recalde/291622fbd70aa374e374d9d4c3ac38fa to your computer and use it in GitHub Desktop.
Save recalde/291622fbd70aa374e374d9d4c3ac38fa to your computer and use it in GitHub Desktop.
Parquet DirectoryFileMerger
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using ParquetSharp;
public class DirectoryFileMerger
{
private readonly string _directoryPath;
private readonly string _outputFilePath;
private readonly string _dateFilter;
public DirectoryFileMerger(string directoryPath, string outputFilePath, string dateFilter)
{
_directoryPath = directoryPath;
_outputFilePath = outputFilePath;
_dateFilter = dateFilter;
}
public void MergeFiles()
{
var files = Directory.GetFiles(_directoryPath)
.Where(file => Path.GetFileName(file).Contains(_dateFilter))
.Select(file => new FileInfo(file))
.OrderBy(file => file.LastWriteTimeUtc) // Sort by file system timestamp
.ToList();
if (!files.Any())
{
Console.WriteLine("No files found for the specified date.");
return;
}
using var outputWriter = new ParquetFileWriter(_outputFilePath);
// Assume schema based on first file
using (var sampleReader = new ParquetFileReader(files.First().FullName))
{
var sampleSchema = sampleReader.Schema;
outputWriter.SetSchema(sampleSchema);
}
var outputGroupWriter = outputWriter.AppendRowGroup();
foreach (var fileInfo in files)
{
Console.WriteLine($"Processing file: {fileInfo.Name}");
using var inputReader = new ParquetFileReader(fileInfo.FullName);
for (int i = 0; i < inputReader.RowGroupCount; i++)
{
using var rowGroupReader = inputReader.RowGroup(i);
for (int col = 0; col < rowGroupReader.ColumnCount; col++)
{
using var columnReader = rowGroupReader.Column(col).LogicalReader();
outputGroupWriter.Column(col).LogicalWriter().WriteBatch(columnReader.ReadAll(columnReader.RowCount));
}
}
}
outputWriter.Close();
Console.WriteLine($"Merged file saved at: {_outputFilePath}");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment