Created
November 9, 2024 02:34
-
-
Save recalde/291622fbd70aa374e374d9d4c3ac38fa to your computer and use it in GitHub Desktop.
Parquet DirectoryFileMerger
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.IO; | |
using System.Linq; | |
using ParquetSharp; | |
public class DirectoryFileMerger | |
{ | |
private readonly string _directoryPath; | |
private readonly string _outputFilePath; | |
private readonly string _dateFilter; | |
public DirectoryFileMerger(string directoryPath, string outputFilePath, string dateFilter) | |
{ | |
_directoryPath = directoryPath; | |
_outputFilePath = outputFilePath; | |
_dateFilter = dateFilter; | |
} | |
public void MergeFiles() | |
{ | |
var files = Directory.GetFiles(_directoryPath) | |
.Where(file => Path.GetFileName(file).Contains(_dateFilter)) | |
.Select(file => new FileInfo(file)) | |
.OrderBy(file => file.LastWriteTimeUtc) // Sort by file system timestamp | |
.ToList(); | |
if (!files.Any()) | |
{ | |
Console.WriteLine("No files found for the specified date."); | |
return; | |
} | |
using var outputWriter = new ParquetFileWriter(_outputFilePath); | |
// Assume schema based on first file | |
using (var sampleReader = new ParquetFileReader(files.First().FullName)) | |
{ | |
var sampleSchema = sampleReader.Schema; | |
outputWriter.SetSchema(sampleSchema); | |
} | |
var outputGroupWriter = outputWriter.AppendRowGroup(); | |
foreach (var fileInfo in files) | |
{ | |
Console.WriteLine($"Processing file: {fileInfo.Name}"); | |
using var inputReader = new ParquetFileReader(fileInfo.FullName); | |
for (int i = 0; i < inputReader.RowGroupCount; i++) | |
{ | |
using var rowGroupReader = inputReader.RowGroup(i); | |
for (int col = 0; col < rowGroupReader.ColumnCount; col++) | |
{ | |
using var columnReader = rowGroupReader.Column(col).LogicalReader(); | |
outputGroupWriter.Column(col).LogicalWriter().WriteBatch(columnReader.ReadAll(columnReader.RowCount)); | |
} | |
} | |
} | |
outputWriter.Close(); | |
Console.WriteLine($"Merged file saved at: {_outputFilePath}"); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment