This gist contains all code snippets from the article.
snippet-3.javasnippet-7.javasnippet-10.javasnippet-2.javafor.javasnippet-8.javasnippet-1.javaStatistics.javasnippet-5.javasnippet-9.java
| public static <T, R extends Comparable<R>> Gatherer<T, ?, T> | |
| takeWhileIncreasing(Function<T, R> valueExtractor) { | |
| return Gatherer.of( | |
| () -> new Object() { R lastValue = null; }, // ① Anonymous class for state | |
| (state, element, downstream) -> { | |
| R value = valueExtractor.apply(element); // ② Extract comparable value | |
| if (state.lastValue == null || // ③ First element OR | |
| value.compareTo(state.lastValue) > 0) { // value is increasing | |
| state.lastValue = value; // ④ Update state | |
| return downstream.push(element); // ⑤ Pass element along | |
| } | |
| return false; // ⑥ Stop gathering - sequence broken! | |
| } | |
| ); | |
| } | |
| List<String> increasingLength = words.stream() | |
| .gather(takeWhileIncreasing(String::length)) | |
| .toList(); | |
| System.out.println("Increasing length sequence: " + increasingLength); | |
| // Output: [hello] | |
| // Why? "hello"(5) → "world"(5) - not increasing, so we stop! |
| List<Integer> values = List.of(1, 5, 3, 8, 2, 9, 4, 7, 6); | |
| System.out.println("Input values: " + values); |
| record PeakValley(String type, double value, int index) {} | |
| public static Gatherer<Double, ?, PeakValley> peakValleyDetection() { | |
| return Gatherer.of( | |
| () -> new Object() { | |
| Double prev = null; // ① Previous value | |
| Double current = null; // ② Current value | |
| int index = 0; // ③ Track position | |
| }, | |
| (state, next, downstream) -> { | |
| if (state.prev != null && state.current != null) { // ④ Have 3 points? | |
| // Check for peak: current > both neighbors | |
| if (state.current > state.prev && state.current > next) { | |
| downstream.push(new PeakValley("PEAK", state.current, state.index)); | |
| } | |
| // Check for valley: current < both neighbors | |
| else if (state.current < state.prev && state.current < next) { | |
| downstream.push(new PeakValley("VALLEY", state.current, state.index)); | |
| } | |
| } | |
| // ⑤ Slide the window forward | |
| state.prev = state.current; | |
| state.current = next; | |
| state.index++; | |
| return true; | |
| } | |
| ); | |
| } | |
| // Visual example: | |
| // Peak | |
| // ↓ | |
| // 103 | |
| // / \ | |
| // 101 99 ← Valley | |
| // \ | |
| // 102 |
| List<Integer> runningMax = values.stream() | |
| .gather(Gatherers.scan( | |
| () -> Integer.MIN_VALUE, // ① Start with smallest possible value | |
| Integer::max // ② Keep the maximum at each step | |
| )) | |
| .toList(); | |
| System.out.println("Running maximum: " + runningMax); | |
| // Output: [-2147483648, 1, 5, 5, 8, 8, 9, 9, 9, 9] | |
| // ↑ initial ↑ 5>1 ↑ 8>5 ↑ 9>8 |
| List<Long> runningProduct = values.stream() | |
| .gather(Gatherers.scan( | |
| () -> 1L, // ① Start with identity for multiplication | |
| (acc, val) -> acc * val // ② Multiply accumulated value by current | |
| )) | |
| .toList(); | |
| System.out.println("Running product: " + runningProduct); | |
| // Output: [1, 1, 5, 15, 120, 240, 2160, 8640, 60480, 362880] | |
| // ↑ ↑ ↑ ↑ ↑ | |
| // init 1×1 1×5 5×3 15×8 (and so on...) |
| public static <T, K> Gatherer<T, ?, T> distinctByKey(Function<T, K> keyExtractor) { | |
| return Gatherer.of( | |
| HashSet::new, // ① State: Set to track seen keys | |
| (state, element, downstream) -> { | |
| K key = keyExtractor.apply(element); // ② Extract the key | |
| if (state.add(key)) { // ③ If key is new (add returns true) | |
| return downstream.push(element); // ④ Pass element downstream | |
| } | |
| return true; // ⑤ Continue processing | |
| } | |
| ); | |
| } | |
| List<String> words = List.of("hello", "world", "java", "gatherers", "stream", "api"); | |
| List<String> distinctByLength = words.stream() | |
| .gather(distinctByKey(String::length)) // Using length as the key | |
| .toList(); | |
| System.out.println("Distinct by length: " + distinctByLength); | |
| // Output: [hello, java, gatherers, api] | |
| // ↑ ↑ ↑ ↑ | |
| // len=5 len=4 len=9 len=3 | |
| // Note: "world"(5) and "stream"(6) are filtered out as duplicates |
| public static <T> Gatherer<T, ?, List<T>> | |
| batchByCondition(Predicate<List<T>> batchComplete) { | |
| return Gatherer.of( | |
| ArrayList::new, // ① Current batch being built | |
| (batch, element, downstream) -> { | |
| batch.add(element); // ② Add to current batch | |
| if (batchComplete.test(batch)) { // ③ Check if batch is "full" | |
| downstream.push(new ArrayList<>(batch)); // ④ Send copy downstream | |
| batch.clear(); // ⑤ Start fresh batch | |
| } | |
| return true; // ⑥ Keep processing | |
| }, | |
| (batch, downstream) -> { // ⑦ Finisher: handle remaining elements | |
| if (!batch.isEmpty()) { | |
| downstream.push(new ArrayList<>(batch)); | |
| } | |
| } | |
| ); | |
| } | |
| List<Integer> numbers = List.of(1, 4, 2, 8, 5, 7, 3, 9, 6); | |
| List<List<Integer>> batches = numbers.stream() | |
| .gather(batchByCondition( | |
| batch -> batch.stream().mapToInt(i -> i).sum() <= 10 // Sum threshold | |
| )) | |
| .toList(); | |
| // Output visualization: | |
| // [1, 4, 2] → Sum: 7 ✓ (7 ≤ 10, continue) | |
| // [1, 4, 2, 8] → Sum: 15 ✗ (15 > 10, emit [1,4,2], start new with [8]) | |
| // [8] → Sum: 8 ✓ (8 ≤ 10, continue) | |
| // [8, 5] → Sum: 13 ✗ (13 > 10, emit [8], start new with [5]) | |
| // ... and so on |
| public static Gatherer<Double, ?, Double> movingAverage(int windowSize) { | |
| return Gatherer.of( | |
| LinkedList::new, // ① Use LinkedList for efficient add/remove | |
| (window, price, downstream) -> { | |
| window.add(price); // ② Add new price to window | |
| if (window.size() > windowSize) { // ③ Window too big? | |
| window.removeFirst(); // ④ Remove oldest price | |
| } | |
| if (window.size() == windowSize) { // ⑤ Window full? | |
| double avg = window.stream() | |
| .mapToDouble(Double::doubleValue) | |
| .average() | |
| .orElse(0.0); | |
| return downstream.push(avg); // ⑥ Emit moving average | |
| } | |
| return true; // ⑦ Keep gathering (building up window) | |
| } | |
| ); | |
| } | |
| // Usage example: | |
| // Prices: [100, 102, 101, 105, 103, 107, 104] | |
| // Window size 3: | |
| // [100, 102, 101] → avg: 101.0 | |
| // [102, 101, 105] → avg: 102.7 | |
| // [101, 105, 103] → avg: 103.0 | |
| // ... sliding window continues |
| record TrendSignal(double price, String trend) {} | |
| public static Gatherer<Double, ?, TrendSignal> trendDetection(int lookback) { | |
| return Gatherer.of( | |
| () -> new Object() { | |
| List<Double> history = new ArrayList<>(); // ① Price history buffer | |
| }, | |
| (state, price, downstream) -> { | |
| state.history.add(price); // ② Add to history | |
| if (state.history.size() >= lookback) { // ③ Enough data? | |
| String trend = detectTrend(state.history); // ④ Analyze trend | |
| downstream.push(new TrendSignal(price, trend)); // ⑤ Emit signal | |
| if (state.history.size() > lookback) { // ⑥ Maintain window size | |
| state.history.remove(0); | |
| } | |
| } | |
| return true; | |
| } | |
| ); | |
| } | |
| private static String detectTrend(List<Double> prices) { | |
| double first = prices.get(0); | |
| double last = prices.get(prices.size() - 1); | |
| double change = (last - first) / first * 100; // Percentage change | |
| if (change > 2) return "UPTREND ↑"; // ① Significant rise | |
| else if (change < -2) return "DOWNTREND ↓"; // ② Significant fall | |
| else return "SIDEWAYS →"; // ③ Relatively flat | |
| } | |
| // Example output: | |
| // Price: $100.50 → SIDEWAYS → | |
| // Price: $102.30 → UPTREND ↑ | |
| // Price: $98.75 → DOWNTREND ↓ |
| class Statistics { | |
| int count = 0; | |
| int sum = 0; | |
| int min = Integer.MAX_VALUE; | |
| int max = Integer.MIN_VALUE; | |
| Statistics add(Integer value) { | |
| count++; // ① Track how many elements | |
| sum += value; // ② Running total | |
| min = Math.min(min, value); // ③ Track minimum | |
| max = Math.max(max, value); // ④ Track maximum | |
| return this; // ⑤ Return self for chaining | |
| } | |
| double getAverage() { | |
| return count == 0 ? 0 : (double) sum / count; | |
| } | |
| } | |
| Statistics stats = values.stream() | |
| .gather(Gatherers.fold( | |
| Statistics::new, // ① Create fresh Statistics object | |
| Statistics::add // ② Add each element to our statistics | |
| )) | |
| .findFirst() // ③ Fold returns a single-element stream | |
| .orElse(new Statistics()); | |
| System.out.printf("Count: %d, Sum: %d, Avg: %.2f, Min: %d, Max: %d%n", | |
| stats.count, stats.sum, stats.getAverage(), stats.min, stats.max); | |
| // Output: Count: 9, Sum: 45, Avg: 5.00, Min: 1, Max: 9 |