Skip to content

Instantly share code, notes, and snippets.

@alirezanet
Last active April 8, 2025 18:25
Show Gist options
  • Save alirezanet/3806b47c089ee3b63d832d63f588018d to your computer and use it in GitHub Desktop.
Save alirezanet/3806b47c089ee3b63d832d63f588018d to your computer and use it in GitHub Desktop.
CSV Batch Processor Template with Checkpointing and Concurrency Control This reusable template enables efficient processing of huge CSV files in batches. It leverages configurable concurrency limits and checkpointing to resume processing, while logging errors for any records that fail to process.
// USAGE EXAMPLE
var processor = new CsvBatchProcessor<Record>(
csvFile: @"PATH TO THE MAIN CSV FILE.csv",
errorFile: @"PATH TO THE LOCATION OF ERROR TRACKER.csv",
checkpointFile: @"CHECKPOINT FILE PATH",
batchSize: 100,
maxDegreeOfParallelism: 50,
processRecordAsync: async record =>
{
// THE PROCESSING LOGIC
await Task.Delay(1000);
});
await processor.ProcessFileAsync();
// ----------------------------------------------------
// CSV DTO
public record Record(string Id, DateOnly UpdateDate);
// ---- THE CUSTOM REUSABLE CSV BATCH PROCESSOR -----
public class CsvBatchProcessor<TRecord>
{
private readonly string _csvFile;
private readonly string _errorFile;
private readonly string _checkpointFile;
private readonly int _batchSize;
private readonly int _maxDegreeOfParallelism;
private readonly Func<TRecord, Task> _processRecordAsync;
private readonly CsvConfiguration _csvConfig;
public CsvBatchProcessor(
string csvFile,
string errorFile,
string checkpointFile,
int batchSize,
int maxDegreeOfParallelism,
Func<TRecord, Task> processRecordAsync,
CsvConfiguration? csvConfig = null)
{
_csvFile = csvFile;
_errorFile = errorFile;
_checkpointFile = checkpointFile;
_batchSize = batchSize;
_maxDegreeOfParallelism = maxDegreeOfParallelism;
_processRecordAsync = processRecordAsync;
_csvConfig = csvConfig ?? new CsvConfiguration(CultureInfo.InvariantCulture)
{
MissingFieldFound = null
};
}
public async Task ProcessFileAsync()
{
int processedRecords = 0;
if (File.Exists(_checkpointFile))
{
int.TryParse(File.ReadAllText(_checkpointFile), out processedRecords);
Console.WriteLine($"Resuming from record #{processedRecords}");
}
using var reader = new StreamReader(_csvFile);
using var csv = new CsvReader(reader, _csvConfig);
csv.Read();
csv.ReadHeader();
for (int i = 0; i < processedRecords; i++)
{
if (!csv.Read()) break;
}
using var errorWriter = new StreamWriter(_errorFile, append: false);
using var errorCsv = new CsvWriter(errorWriter, CultureInfo.InvariantCulture);
errorCsv.WriteHeader<TRecord>();
errorCsv.NextRecord();
object errorLock = new();
using var semaphore = new SemaphoreSlim(_maxDegreeOfParallelism);
var tasks = new List<Task>();
int currentIndex = processedRecords;
while (csv.Read())
{
TRecord record;
try
{
record = csv.GetRecord<TRecord>();
}
catch (Exception ex)
{
Console.WriteLine($"Error reading record at index {currentIndex}: {ex.Message}");
currentIndex++;
continue;
}
await semaphore.WaitAsync();
var task = Task.Run(async () =>
{
try
{
await _processRecordAsync(record);
}
catch (Exception ex)
{
Console.WriteLine($"Error processing record: {ex.Message}");
lock (errorLock)
{
errorCsv.WriteRecord(record);
errorCsv.NextRecord();
errorWriter.Flush();
}
}
finally
{
semaphore.Release();
}
});
tasks.Add(task);
currentIndex++;
if ((currentIndex - processedRecords) % _batchSize == 0)
{
await Task.WhenAll(tasks);
tasks.Clear();
File.WriteAllText(_checkpointFile, currentIndex.ToString());
Console.WriteLine($"Checkpoint updated: {currentIndex} records processed.");
}
}
if (tasks.Count > 0)
{
await Task.WhenAll(tasks);
File.WriteAllText(_checkpointFile, currentIndex.ToString());
Console.WriteLine($"Final checkpoint updated: {currentIndex} records processed.");
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment