Last active
July 5, 2024 12:03
-
-
Save djfdyuruiry/cf7abefb3a7dda0e2d2d5814bb5b6186 to your computer and use it in GitHub Desktop.
Multi-threaded CSV Reader/Writer for Java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// uses Apache commons CSV, IO and Lang | |
import org.apache.commons.csv.CSVFormat; | |
import org.apache.commons.csv.CSVParser; | |
import org.apache.commons.csv.CSVPrinter; | |
import org.apache.commons.io.FileUtils; | |
import org.apache.commons.io.IOUtils; | |
import java.io.*; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.concurrent.Callable; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.stream.Stream; | |
import static java.lang.String.format; | |
import static java.util.Collections.synchronizedList; | |
import static java.util.Objects.isNull; | |
import static java.util.Objects.nonNull; | |
import static java.util.Spliterator.CONCURRENT; | |
import static java.util.Spliterators.spliteratorUnknownSize; | |
import static java.util.concurrent.Executors.callable; | |
import static java.util.concurrent.Executors.newFixedThreadPool; | |
import static java.util.stream.Collectors.toList; | |
import static java.util.stream.IntStream.range; | |
import static java.util.stream.StreamSupport.stream; | |
import static org.apache.commons.lang3.time.StopWatch.createStarted; | |
public class App { | |
private static final int CSV_CHUNK_SIZE = 10000; | |
public static void main(String[] args) throws InterruptedException, IOException { | |
var outDir = new File("out"); | |
FileUtils.deleteQuietly(outDir); | |
FileUtils.forceMkdir(outDir); | |
runMultiThreadCsvWriteTest(); | |
runMultiThreadCsvReadTest(); | |
} | |
private static void runMultiThreadCsvWriteTest() throws InterruptedException { | |
var executor = newFixedThreadPool(10); | |
var files = synchronizedList(new ArrayList<File>()); | |
var latch = new CountDownLatch(10); | |
var allChunksStopwatch = createStarted(); | |
executor.invokeAll( | |
range(0, 10) | |
.mapToObj(i -> callable(() -> { | |
var filename = format("out/chunk_%d.csv", i); | |
try { | |
writeCsvFile(filename, i); | |
} catch (IOException ex) { | |
throw new RuntimeException(format("Error writing to file: %s", filename), ex); | |
} | |
files.add(new File(filename)); | |
latch.countDown(); | |
})) | |
.collect(toList()) | |
); | |
latch.await(); | |
executor.shutdownNow(); | |
allChunksStopwatch.stop(); | |
System.out.println(format("Writing 10 CSV chunks took %dms", allChunksStopwatch.getTime())); | |
try { | |
buildFinalCsvFile(files); | |
} catch (IOException ex) { | |
throw new RuntimeException("Failed to build file: final_file.csv", ex); | |
} | |
} | |
private static void writeCsvFile(String filename, int number) throws IOException { | |
System.out.println(format("Writing records to file: %s", filename)); | |
var csvFileStopwatch = createStarted(); | |
try ( | |
var fileWriter = new FileWriter(filename); | |
var csvFile = new CSVPrinter(fileWriter, CSVFormat.EXCEL) | |
) { | |
range(0, CSV_CHUNK_SIZE).forEach(i -> { | |
try { | |
csvFile.printRecord("name", number, "address", "number", "age"); | |
} catch (IOException ex) { | |
throw new RuntimeException(ex); | |
} | |
}); | |
} | |
csvFileStopwatch.stop(); | |
System.out.println( | |
format( | |
"Writing %d records to CSV file chunk took %sms", | |
CSV_CHUNK_SIZE, | |
csvFileStopwatch.getTime() | |
) | |
); | |
} | |
private static void buildFinalCsvFile(List<File> files) throws IOException { | |
System.out.println("Building final file: final_file.csv"); | |
var recordsFile = new File("out/final_file.csv"); | |
var buildStopwatch = createStarted(); | |
try(var fileOutputStream = FileUtils.openOutputStream(recordsFile)) { | |
for (var file : files) { | |
try (var fileInputStream = FileUtils.openInputStream(file)) { | |
IOUtils.copy(fileInputStream, fileOutputStream); | |
} | |
} | |
} | |
buildStopwatch.stop(); | |
System.out.println(format("Building final file took %sms", buildStopwatch.getTime())); | |
} | |
private static void runMultiThreadCsvReadTest() throws InterruptedException, IOException { | |
var readAllStopwatch = createStarted(); | |
var recordsFile = new File("out/final_file.csv"); | |
var csvChunkFiles = splitRecordsFileIntoChunks(recordsFile); | |
var executor = newFixedThreadPool(10); | |
var latch = new CountDownLatch(10); | |
var recordCounts = executor.invokeAll( | |
csvChunkFiles.map(f -> | |
(Callable<Integer>)() -> { | |
var count = readCsvFile(f); | |
latch.countDown(); | |
return count; | |
}).collect(toList()) | |
); | |
latch.await(); | |
executor.shutdownNow(); | |
var totalRecordsRead = recordCounts.stream() | |
.mapToInt(rc -> { | |
try { | |
return rc.get(); | |
} catch (Throwable ex) { | |
throw new RuntimeException(ex); | |
} | |
}).sum(); | |
readAllStopwatch.stop(); | |
System.out.println( | |
format( | |
"Reading %d records from CSV chunk files took %dms", | |
totalRecordsRead, | |
readAllStopwatch.getTime() | |
) | |
); | |
} | |
private static Stream<File> splitRecordsFileIntoChunks(File recordsFile) throws IOException { | |
var chunkFilesStopwatch = createStarted(); | |
var chunkFiles = new ArrayList<File>(); | |
try ( | |
var fileReader = new FileReader(recordsFile); | |
var bufferedReader = new BufferedReader(fileReader) | |
) { | |
// open initial file | |
var currentFileRecordCount = 0; | |
var currentFile = new File(format("out/in_chunk_%d.csv", chunkFiles.size())); | |
var currentFileWriter = new FileWriter(currentFile); | |
var currentBufferedWriter = new BufferedWriter(currentFileWriter); | |
var line = bufferedReader.readLine(); | |
while (nonNull(line)) { | |
// dump line into chunk file | |
currentBufferedWriter.write(line); | |
currentBufferedWriter.newLine(); | |
currentFileRecordCount++; | |
line = bufferedReader.readLine(); | |
if (nonNull(line) && currentFileRecordCount > 9999) { | |
// open next file if we are at chunk limit and still reading full file | |
currentBufferedWriter.close(); | |
currentFileWriter.close(); | |
currentBufferedWriter.close(); | |
currentFileWriter.close(); | |
chunkFiles.add(currentFile); | |
currentFileRecordCount = 0; | |
currentFile = new File(format("out/in_chunk_%d.csv", chunkFiles.size())); | |
currentFileWriter = new FileWriter(currentFile); | |
currentBufferedWriter = new BufferedWriter(currentFileWriter); | |
} else if (isNull(line)) { | |
// ensure we add the last file to chunks list | |
chunkFiles.add(currentFile); | |
} | |
} | |
} | |
chunkFilesStopwatch.stop(); | |
System.out.println( | |
format( | |
"Splitting file '%s' into '%d' chunks of (at most) 10K records each took %dms", | |
recordsFile.getName(), | |
chunkFiles.size(), | |
chunkFilesStopwatch.getTime() | |
) | |
); | |
return chunkFiles.stream(); | |
} | |
private static int readCsvFile(File csvFileHandle) { | |
var readStopwatch = createStarted(); | |
try ( | |
var fileReader = new FileReader(csvFileHandle); | |
var csvFile = new CSVParser(fileReader, CSVFormat.EXCEL) | |
) { | |
var numRecords = stream( | |
spliteratorUnknownSize(csvFile.iterator(), CONCURRENT), true | |
).mapToInt(r -> 1).sum(); | |
readStopwatch.stop(); | |
System.out.println( | |
format( | |
"Reading %d records from CSV file '%s' took %dms", | |
numRecords, | |
csvFileHandle.getName(), | |
readStopwatch.getTime() | |
) | |
); | |
return numRecords; | |
} catch (Throwable ex) { | |
throw new RuntimeException( | |
format("Error reading from chunk file: %s", csvFileHandle.getName()), | |
ex | |
); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment