Last active
April 8, 2025 18:25
-
-
Save alirezanet/3806b47c089ee3b63d832d63f588018d to your computer and use it in GitHub Desktop.
CSV Batch Processor Template with Checkpointing and Concurrency Control This reusable template enables efficient processing of huge CSV files in batches. It leverages configurable concurrency limits and checkpointing to resume processing, while logging errors for any records that fail to process.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// USAGE EXAMPLE | |
var processor = new CsvBatchProcessor<Record>( | |
csvFile: @"PATH TO THE MAIN CSV FILE.csv", | |
errorFile: @"PATH TO THE LOCATION OF ERROR TRACKER.csv", | |
checkpointFile: @"CHECKPOINT FILE PATH", | |
batchSize: 100, | |
maxDegreeOfParallelism: 50, | |
processRecordAsync: async record => | |
{ | |
// THE PROCESSING LOGIC | |
await Task.Delay(1000); | |
}); | |
await processor.ProcessFileAsync(); | |
// ---------------------------------------------------- | |
// CSV DTO | |
public record Record(string Id, DateOnly UpdateDate); | |
// ---- THE CUSTOM REUSABLE CSV BATCH PROCESSOR ----- | |
public class CsvBatchProcessor<TRecord> | |
{ | |
private readonly string _csvFile; | |
private readonly string _errorFile; | |
private readonly string _checkpointFile; | |
private readonly int _batchSize; | |
private readonly int _maxDegreeOfParallelism; | |
private readonly Func<TRecord, Task> _processRecordAsync; | |
private readonly CsvConfiguration _csvConfig; | |
public CsvBatchProcessor( | |
string csvFile, | |
string errorFile, | |
string checkpointFile, | |
int batchSize, | |
int maxDegreeOfParallelism, | |
Func<TRecord, Task> processRecordAsync, | |
CsvConfiguration? csvConfig = null) | |
{ | |
_csvFile = csvFile; | |
_errorFile = errorFile; | |
_checkpointFile = checkpointFile; | |
_batchSize = batchSize; | |
_maxDegreeOfParallelism = maxDegreeOfParallelism; | |
_processRecordAsync = processRecordAsync; | |
_csvConfig = csvConfig ?? new CsvConfiguration(CultureInfo.InvariantCulture) | |
{ | |
MissingFieldFound = null | |
}; | |
} | |
public async Task ProcessFileAsync() | |
{ | |
int processedRecords = 0; | |
if (File.Exists(_checkpointFile)) | |
{ | |
int.TryParse(File.ReadAllText(_checkpointFile), out processedRecords); | |
Console.WriteLine($"Resuming from record #{processedRecords}"); | |
} | |
using var reader = new StreamReader(_csvFile); | |
using var csv = new CsvReader(reader, _csvConfig); | |
csv.Read(); | |
csv.ReadHeader(); | |
for (int i = 0; i < processedRecords; i++) | |
{ | |
if (!csv.Read()) break; | |
} | |
using var errorWriter = new StreamWriter(_errorFile, append: false); | |
using var errorCsv = new CsvWriter(errorWriter, CultureInfo.InvariantCulture); | |
errorCsv.WriteHeader<TRecord>(); | |
errorCsv.NextRecord(); | |
object errorLock = new(); | |
using var semaphore = new SemaphoreSlim(_maxDegreeOfParallelism); | |
var tasks = new List<Task>(); | |
int currentIndex = processedRecords; | |
while (csv.Read()) | |
{ | |
TRecord record; | |
try | |
{ | |
record = csv.GetRecord<TRecord>(); | |
} | |
catch (Exception ex) | |
{ | |
Console.WriteLine($"Error reading record at index {currentIndex}: {ex.Message}"); | |
currentIndex++; | |
continue; | |
} | |
await semaphore.WaitAsync(); | |
var task = Task.Run(async () => | |
{ | |
try | |
{ | |
await _processRecordAsync(record); | |
} | |
catch (Exception ex) | |
{ | |
Console.WriteLine($"Error processing record: {ex.Message}"); | |
lock (errorLock) | |
{ | |
errorCsv.WriteRecord(record); | |
errorCsv.NextRecord(); | |
errorWriter.Flush(); | |
} | |
} | |
finally | |
{ | |
semaphore.Release(); | |
} | |
}); | |
tasks.Add(task); | |
currentIndex++; | |
if ((currentIndex - processedRecords) % _batchSize == 0) | |
{ | |
await Task.WhenAll(tasks); | |
tasks.Clear(); | |
File.WriteAllText(_checkpointFile, currentIndex.ToString()); | |
Console.WriteLine($"Checkpoint updated: {currentIndex} records processed."); | |
} | |
} | |
if (tasks.Count > 0) | |
{ | |
await Task.WhenAll(tasks); | |
File.WriteAllText(_checkpointFile, currentIndex.ToString()); | |
Console.WriteLine($"Final checkpoint updated: {currentIndex} records processed."); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment