Skip to content

Instantly share code, notes, and snippets.

@recalde
Created November 9, 2024 02:44
Show Gist options
  • Save recalde/5644be3e1ea1b3313b55d036de287cd8 to your computer and use it in GitHub Desktop.
Save recalde/5644be3e1ea1b3313b55d036de287cd8 to your computer and use it in GitHub Desktop.
Merge logs by pod
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using ParquetSharp;
public class DirectoryFileMerger
{
private readonly string _directoryPath;
private readonly string _outputFilePath;
private readonly string _dateFilter;
public DirectoryFileMerger(string directoryPath, string outputFilePath, string dateFilter)
{
_directoryPath = directoryPath;
_outputFilePath = outputFilePath;
_dateFilter = dateFilter;
}
public void MergeFiles()
{
// Group files by pod name and hour
var filesByPodAndHour = Directory.GetFiles(_directoryPath)
.Where(file => Path.GetFileName(file).Contains(_dateFilter))
.Select(file => new FileInfo(file))
.GroupBy(file => (PodName: ExtractPodName(file.Name), Hour: file.LastWriteTimeUtc.Hour))
.ToDictionary(group => group.Key, group => group.OrderBy(file => file.LastWriteTimeUtc).ToList());
if (filesByPodAndHour.Count == 0)
{
Console.WriteLine("No files found for the specified date.");
return;
}
string uniqueOutputPath = FileNameHelper.GetUniqueFileName(_outputFilePath);
using var outputWriter = new ParquetFileWriter(uniqueOutputPath);
// Set schema based on the first file
using (var sampleReader = new ParquetFileReader(filesByPodAndHour.First().Value.First().FullName))
{
var sampleSchema = sampleReader.Schema;
outputWriter.SetSchema(sampleSchema);
}
foreach (var podAndHourGroup in filesByPodAndHour)
{
Console.WriteLine($"Merging logs for pod: {podAndHourGroup.Key.PodName}, hour: {podAndHourGroup.Key.Hour}");
var outputGroupWriter = outputWriter.AppendRowGroup();
foreach (var fileInfo in podAndHourGroup.Value)
{
using var inputReader = new ParquetFileReader(fileInfo.FullName);
for (int i = 0; i < inputReader.RowGroupCount; i++)
{
using var rowGroupReader = inputReader.RowGroup(i);
for (int col = 0; col < rowGroupReader.ColumnCount; col++)
{
using var columnReader = rowGroupReader.Column(col).LogicalReader();
outputGroupWriter.Column(col).LogicalWriter().WriteBatch(columnReader.ReadAll(columnReader.RowCount));
}
}
}
}
outputWriter.Close();
Console.WriteLine($"Merged file saved at: {uniqueOutputPath}");
}
private static string ExtractPodName(string fileName)
{
// Assuming the pod name is identifiable in the file name, e.g., "podname_timestamp.parquet"
return fileName.Split('_').First(); // Adjust this logic based on your file naming convention
}
}
public static class FileNameHelper
{
public static string GetUniqueFileName(string outputPath)
{
string directory = Path.GetDirectoryName(outputPath) ?? string.Empty;
string fileNameWithoutExtension = Path.GetFileNameWithoutExtension(outputPath);
string extension = Path.GetExtension(outputPath);
string newFilePath = outputPath;
int count = 1;
while (File.Exists(newFilePath))
{
newFilePath = Path.Combine(directory, $"{fileNameWithoutExtension}_{count}{extension}");
count++;
}
return newFilePath;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment