Last active
August 15, 2024 15:15
-
-
Save dfa1/65ae63ca3f10a72e57885add52c48b4f to your computer and use it in GitHub Desktop.
Trying to improve https://boyter.org/posts/file-read-challange/
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.nio.charset.StandardCharsets; | |
import java.nio.file.Files; | |
import java.nio.file.Path; | |
import java.util.ArrayList; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Map.Entry; | |
import java.util.TreeMap; | |
import java.util.concurrent.ArrayBlockingQueue; | |
import java.util.concurrent.Callable; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.Future; | |
import java.util.regex.Pattern; | |
import java.util.stream.Collectors; | |
public class improved { | |
private static final String FILE_NAME = "/tmp/itcont.txt"; | |
private static final int WORKERS = 2; | |
private static final int BATCH_SIZE = 50_000; | |
private final ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<>(10); | |
private final List<String> poison = List.of("poison"); | |
public static void main(String[] args) throws Exception { | |
for (int i = 1; i <= 20; i++) { | |
long start = System.nanoTime(); | |
var m = new improved(); | |
m.run(); | |
long end = System.nanoTime(); | |
System.out.printf("run #%s completed in %sns%n", i, (end - start)); | |
} | |
} | |
public void run() throws Exception { | |
var executor = Executors.newCachedThreadPool(); | |
var futures = new ArrayList<Future<Result>>(); | |
var producer = executor.submit(new Producer()); | |
futures.add(producer); | |
for (int i = 0; i < WORKERS; i++) { | |
var worker = executor.submit(new Worker()); | |
futures.add(worker); | |
} | |
List<Result> results = new ArrayList<>(); | |
for (var future : futures) { | |
Result result = future.get(); | |
results.add(result); | |
} | |
totals(results); | |
executor.shutdown(); | |
} | |
private void totals(List<Result> results) { | |
long total = results.stream().mapToLong(r -> r.count).sum(); | |
Map<String, Integer> names = results | |
.stream() | |
.flatMap(r -> r.countByFirstName.entrySet().stream()) | |
.collect(Collectors.groupingBy(e -> e.getKey(), Collectors.summingInt(e -> e.getValue()))); | |
Map<Integer, Integer> donationsByMonthYear = results | |
.stream() | |
.flatMap(r -> r.donations.entrySet().stream()) | |
.collect(Collectors.groupingBy(e -> e.getKey(), TreeMap::new, Collectors.summingInt(e -> e.getValue()))); | |
List<Entry<String, Integer>> top3 = names.entrySet() | |
.stream() | |
.sorted(Entry.<String, Integer>comparingByValue().reversed()) | |
.limit(3) | |
.collect(Collectors.toList()); | |
System.out.printf("total %s%n", total); | |
System.out.printf("donations %s%n", donationsByMonthYear); | |
System.out.printf("top 3 first names %s%n", top3); | |
} | |
private class Result { | |
private int count = 0; | |
private Map<String, Integer> countByFirstName = new HashMap<>(); | |
private Map<Integer, Integer> donations = new HashMap<>(); | |
public void incrementLineNumber() { | |
count++; | |
} | |
public void addFirstName(String firstName) { | |
countByFirstName.merge(firstName, 1, Integer::sum); | |
} | |
public void addDonation(Integer yearmonth) { | |
donations.merge(yearmonth, 1, Integer::sum); | |
} | |
} | |
class Producer implements Callable<Result> { | |
@Override | |
public Result call() throws Exception { | |
try (var reader = Files.newBufferedReader(Path.of(FILE_NAME), StandardCharsets.US_ASCII)) { | |
var readLine = ""; | |
var lines = new ArrayList<String>(BATCH_SIZE); | |
var batchIndex = 0; | |
while ((readLine = reader.readLine()) != null) { | |
lines.add(readLine); | |
batchIndex++; | |
if (batchIndex == BATCH_SIZE) { | |
queue.put(lines); | |
lines = new ArrayList<>(BATCH_SIZE); | |
batchIndex = 0; | |
} | |
} | |
// last batch | |
queue.put(lines); | |
for (int i = 0; i < WORKERS; i++) { | |
queue.put(poison); | |
} | |
return new Result(); | |
} | |
} | |
} | |
class Worker implements Callable<Result> { | |
private final Pattern FIRST_NAME = Pattern.compile(", \\s*([^, ]+)"); | |
@Override | |
public Result call() throws Exception { | |
Result result = new Result(); | |
while (true) { | |
var lines = queue.take(); | |
if (lines == poison) { | |
break; | |
} | |
for (var line : lines) { | |
var columns = line.split("\\|", 9); | |
var dateTime = columns[4]; | |
var ym = Integer.parseInt(dateTime.substring(0, 6)); | |
result.addDonation(ym); | |
String name = columns[7]; | |
var matcher = FIRST_NAME.matcher(name); | |
if (matcher.find()) { | |
result.addFirstName(matcher.group(1)); | |
} | |
result.incrementLineNumber(); | |
} | |
} | |
return result; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment