Skip to content

Instantly share code, notes, and snippets.

@alirezapla
Last active May 23, 2025 10:55
Show Gist options
  • Save alirezapla/8cf9735df6a87271e4a4400b27a7cc61 to your computer and use it in GitHub Desktop.
Save alirezapla/8cf9735df6a87271e4a4400b27a7cc61 to your computer and use it in GitHub Desktop.
processing 100 csv file concurrently
import java.io.*;
import java.nio.file.*;
import java.util.concurrent.*;
public class CsvConcurrentProcessor {
static final int NUM_CONSUMERS = 4;
static final int QUEUE_CAPACITY = 1000;
static final BlockingQueue<String[]> queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
static final String POISON_PILL = "__EOF__";
public static void main(String[] args) throws InterruptedException {
ExecutorService producers = Executors.newCachedThreadPool();
ExecutorService consumers = Executors.newFixedThreadPool(NUM_CONSUMERS);
Path csvDir = Paths.get("path/to/csv/files");
try (DirectoryStream<Path> stream = Files.newDirectoryStream(csvDir, "*.csv")) {
for (Path filePath : stream) {
producers.submit(() -> processFile(filePath));
}
} catch (IOException e) {
e.printStackTrace();
}
for (int i = 0; i < NUM_CONSUMERS; i++) {
consumers.submit(CsvConcurrentProcessor::consumeAndStore);
}
producers.shutdown();
producers.awaitTermination(1, TimeUnit.HOURS);
for (int i = 0; i < NUM_CONSUMERS; i++) {
queue.offer(new String[]{POISON_PILL});
}
consumers.shutdown();
consumers.awaitTermination(1, TimeUnit.HOURS);
}
private static void processFile(Path filePath) {
try (BufferedReader reader = Files.newBufferedReader(filePath)) {
String line;
while ((line = reader.readLine()) != null) {
String[] data = line.split(",");
queue.put(data);
}
} catch (IOException | InterruptedException e) {
System.err.println("Error processing file: " + filePath);
}
}
private static void consumeAndStore() {
try {
while (true) {
String[] data = queue.take();
if (data.length == 1 && POISON_PILL.equals(data[0])) break;
System.out.println("Saving: " + String.join(" | ", data));
Thread.sleep(10);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment