-
-
Save maxandersen/474d9229802ed148bd13f7cc1398cc25 to your computer and use it in GitHub Desktop.
Stream and process Wikipedia change events (run with `jbang https://gist.github.com/maxandersen/474d9229802ed148bd13f7cc1398cc25`)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| //JAVA 25+ | |
| //DEPS com.fasterxml.jackson.core:jackson-databind:2.19.0 | |
| // TODO for setup: | |
| // * run with email as first argument. `jbang WikiStream.java "[email protected]"`` | |
| import module java.net.http; | |
| import com.fasterxml.jackson.databind.*; | |
| static final ObjectMapper MAPPER = new ObjectMapper(); | |
| static final String DATA_LINE_PREFIX = "data: "; | |
| static final URI WIKI_URL = URI.create("https://stream.wikimedia.org/v2/stream/mediawiki.recentchange"); | |
| static String EMAIL = ""; | |
| final Set<String> ignoredEventTypes = new HashSet<>(); | |
| long addedLines; | |
| void main(String[] args) throws Exception { | |
| EMAIL =args.length > 0 ? args[0] : ""; | |
| wikiStream(this::process); | |
| Thread.sleep(3_000); | |
| IO.println(""" | |
| Lines: %+d | |
| Ignored event types: %s | |
| """.formatted(addedLines, ignoredEventTypes)); | |
| } | |
| static void wikiStream(Consumer<String> processor) { | |
| var client = HttpClient.newHttpClient(); | |
| var request = HttpRequest.newBuilder() | |
| .uri(WIKI_URL) | |
| .header("User-Agent", "JavaXDemo/0.0 " + EMAIL) | |
| .header("Accept", "text/event-stream") | |
| .build(); | |
| client.sendAsync(request, HttpResponse.BodyHandlers.ofLines()) | |
| .thenAccept(response -> response.body().forEach(processor)); | |
| } | |
| void process(String line) { | |
| if (!line.startsWith(DATA_LINE_PREFIX)) { | |
| return; | |
| } | |
| var event = line.substring(DATA_LINE_PREFIX.length()); | |
| switch (parse(event)) { | |
| case Event.New(var length) -> { | |
| addedLines += length; | |
| IO.println("New : %+d".formatted(length)); | |
| } | |
| case Event.Edit(var oldLength, var newLength) -> { | |
| addedLines += (newLength - oldLength); | |
| IO.println("Edit: %+d (%s -> %s)".formatted(newLength - oldLength, oldLength, newLength)); | |
| } | |
| case Event.Unknown(var type) -> | |
| ignoredEventTypes.add(type); | |
| case Event.Error(var ex) -> | |
| IO.println(""" | |
| Error while processing event | |
| JSON: %s | |
| ERROR: %s (%s) | |
| """.formatted(event, ex.getClass().getSimpleName(), ex.getMessage())); | |
| } | |
| } | |
| static Event parse(String event) { | |
| try { | |
| var root = MAPPER.readTree(event); | |
| return switch (root.get("type").asText()) { | |
| case "new" -> new Event.New(root.get("length").get("new").asLong()); | |
| case "edit" -> new Event.Edit(root.get("length").get("old").asLong(), root.get("length").get("new").asLong()); | |
| case String s -> new Event.Unknown(s); | |
| }; | |
| } catch (Exception ex) { | |
| return new Event.Error(ex); | |
| } | |
| } | |
| sealed interface Event { | |
| record New(long length) implements Event { } | |
| record Edit(long oldLength, long newLength) implements Event { } | |
| record Unknown(String name) implements Event { } | |
| record Error(Exception exception) implements Event { } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment