Created
November 9, 2024 02:44
-
-
Save recalde/5644be3e1ea1b3313b55d036de287cd8 to your computer and use it in GitHub Desktop.
Merge logs by pod
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.IO; | |
using System.Linq; | |
using ParquetSharp; | |
public class DirectoryFileMerger | |
{ | |
private readonly string _directoryPath; | |
private readonly string _outputFilePath; | |
private readonly string _dateFilter; | |
public DirectoryFileMerger(string directoryPath, string outputFilePath, string dateFilter) | |
{ | |
_directoryPath = directoryPath; | |
_outputFilePath = outputFilePath; | |
_dateFilter = dateFilter; | |
} | |
public void MergeFiles() | |
{ | |
// Group files by pod name and hour | |
var filesByPodAndHour = Directory.GetFiles(_directoryPath) | |
.Where(file => Path.GetFileName(file).Contains(_dateFilter)) | |
.Select(file => new FileInfo(file)) | |
.GroupBy(file => (PodName: ExtractPodName(file.Name), Hour: file.LastWriteTimeUtc.Hour)) | |
.ToDictionary(group => group.Key, group => group.OrderBy(file => file.LastWriteTimeUtc).ToList()); | |
if (filesByPodAndHour.Count == 0) | |
{ | |
Console.WriteLine("No files found for the specified date."); | |
return; | |
} | |
string uniqueOutputPath = FileNameHelper.GetUniqueFileName(_outputFilePath); | |
using var outputWriter = new ParquetFileWriter(uniqueOutputPath); | |
// Set schema based on the first file | |
using (var sampleReader = new ParquetFileReader(filesByPodAndHour.First().Value.First().FullName)) | |
{ | |
var sampleSchema = sampleReader.Schema; | |
outputWriter.SetSchema(sampleSchema); | |
} | |
foreach (var podAndHourGroup in filesByPodAndHour) | |
{ | |
Console.WriteLine($"Merging logs for pod: {podAndHourGroup.Key.PodName}, hour: {podAndHourGroup.Key.Hour}"); | |
var outputGroupWriter = outputWriter.AppendRowGroup(); | |
foreach (var fileInfo in podAndHourGroup.Value) | |
{ | |
using var inputReader = new ParquetFileReader(fileInfo.FullName); | |
for (int i = 0; i < inputReader.RowGroupCount; i++) | |
{ | |
using var rowGroupReader = inputReader.RowGroup(i); | |
for (int col = 0; col < rowGroupReader.ColumnCount; col++) | |
{ | |
using var columnReader = rowGroupReader.Column(col).LogicalReader(); | |
outputGroupWriter.Column(col).LogicalWriter().WriteBatch(columnReader.ReadAll(columnReader.RowCount)); | |
} | |
} | |
} | |
} | |
outputWriter.Close(); | |
Console.WriteLine($"Merged file saved at: {uniqueOutputPath}"); | |
} | |
private static string ExtractPodName(string fileName) | |
{ | |
// Assuming the pod name is identifiable in the file name, e.g., "podname_timestamp.parquet" | |
return fileName.Split('_').First(); // Adjust this logic based on your file naming convention | |
} | |
} | |
public static class FileNameHelper | |
{ | |
public static string GetUniqueFileName(string outputPath) | |
{ | |
string directory = Path.GetDirectoryName(outputPath) ?? string.Empty; | |
string fileNameWithoutExtension = Path.GetFileNameWithoutExtension(outputPath); | |
string extension = Path.GetExtension(outputPath); | |
string newFilePath = outputPath; | |
int count = 1; | |
while (File.Exists(newFilePath)) | |
{ | |
newFilePath = Path.Combine(directory, $"{fileNameWithoutExtension}_{count}{extension}"); | |
count++; | |
} | |
return newFilePath; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment