Last active
October 15, 2021 21:02
-
-
Save caprica/73b0200103bea437b45c841014b2ee57 to your computer and use it in GitHub Desktop.
Some experiments using Reactor to parse XML/JSON from a Flux of DataBuffer instances
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.fasterxml.jackson.databind.JsonNode; | |
import com.fasterxml.jackson.databind.ObjectMapper; | |
import org.springframework.core.io.buffer.DataBuffer; | |
import org.springframework.core.io.buffer.DataBufferUtils; | |
import org.springframework.core.io.buffer.DefaultDataBufferFactory; | |
import reactor.core.publisher.Flux; | |
import reactor.core.publisher.Mono; | |
import java.io.BufferedWriter; | |
import java.io.FileWriter; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.nio.file.Path; | |
import java.nio.file.StandardOpenOption; | |
/** | |
* Some experiments using Reactor in WebFlux to parse an XML/JSON file given a {@link Flux} of {@link DataBuffer}. | |
* <p> | |
* Use for whatever purpose, feel free to feedback better solutions. | |
* <p> | |
* Note that the operations are intentionally written out long-hand rather than in a more compact form so it is clearer | |
* what is going on, and what the various return values types are. | |
* <p> | |
* Required dependencies are: | |
* <ul> | |
* <li>org.springframework:spring-core:5.2.9.RELEASE for {@link DataBufferUtils} etc</li> | |
* <li>io.projectreactor:reactor-core:3.3.10.RELEASE for {{@link Flux}}, {@link Mono} etc</li> | |
* <li>com.fasterxml.jackson.core:jackson-databind:2.11.2 for {@link ObjectMapper} etc</li> | |
* </ul> | |
* Yes, I know Jackson has a reactive JSON parser, that is not the point! | |
*/ | |
public class FluxDataBufferParseExperiments { | |
private static final String SAMPLE_INPUT_FILE = "input.json"; | |
private static final String SAMPLE_OUTPUT_FILE = "output.json"; | |
/** | |
* With {@link #naiveTest()} the buffer size is significant with regards to the approach working or not, see the | |
* comments on that method. | |
*/ | |
private static final int BUFFER_SIZE = 1024; | |
/** | |
* This implementation simply uses {@link DataBufferUtils} to write the data buffers to standard output. | |
* <p> | |
* Not particularly useful, but you have to start somewhere. | |
*/ | |
private static void basicTest() { | |
System.out.println("Running basic test..."); | |
Flux<DataBuffer> buffers = DataBufferUtils.write(getBuffers(), System.out); | |
Mono.when(buffers).block(); | |
System.out.println(); | |
System.out.println("Basic test complete."); | |
System.out.println(); | |
} | |
/** | |
* This implementation will process multiple data buffers in turn, each data buffer will have a maximum size of | |
* {@link #BUFFER_SIZE}. | |
* <p> | |
* This means that when invoking {@link DataBuffer#asInputStream()} you will get an {@link InputStream} for that | |
* buffer only. So trying to use this input stream as an input to e.g. a JSON or XML parser will fail, as it will | |
* only see the data from that buffer before failing. | |
* <p> | |
* If the entire input fits inside a single buffer, it will work. | |
*/ | |
private static void naiveTest() { | |
System.out.println("Running naive test..."); | |
Flux<JsonNode> result = getBuffers().flatMap(dataBuffer -> { | |
try { | |
InputStream in = dataBuffer.asInputStream(); | |
ObjectMapper om = new ObjectMapper(); | |
JsonNode jsonNode = om.readTree(in); | |
return Flux.just(jsonNode); | |
} catch (Exception e) { | |
return Flux.error(e); | |
} | |
}); | |
try { | |
Mono.when(result).block(); | |
} catch (Exception e) { | |
System.out.printf("Expected error parsing buffer: %s%n", e.getMessage()); | |
} | |
System.out.println("Naive test complete."); | |
System.out.println(); | |
} | |
/** | |
* This implementation uses the Spring WebFlux {@link DataBufferUtils} to join the individual data buffers into a | |
* single buffer. | |
* <p> | |
* Feeding this single data buffer to a JSON/XML parser will then work as expected. | |
* <p> | |
* Clearly with the various blocking operations this is not a fully reactive asynchronous approach. It does have the | |
* advantage of actually working, and being quite simple to understand. | |
* <p> | |
* For more confidence that this version is working, delete the sample output file before running each test. | |
*/ | |
private static void blockingTest() { | |
System.out.println("Running blocking test..."); | |
Mono<DataBuffer> monoDb = DataBufferUtils.join(getBuffers()); | |
System.out.println("Prepare JSON..."); | |
Mono<JsonNode> result = monoDb.flatMap(dataBuffer -> { | |
try { | |
InputStream in = dataBuffer.asInputStream(); | |
ObjectMapper om = new ObjectMapper(); | |
return Mono.just(om.readTree(in)); | |
} catch (Exception e) { | |
return Mono.error(e); | |
} | |
}); | |
System.out.println("Adding consumer for next()"); | |
result = result.doOnNext(jsonNode -> { | |
System.out.println("Writing JSON..."); | |
try (BufferedWriter w = new BufferedWriter(new FileWriter(SAMPLE_OUTPUT_FILE))) { | |
w.write(jsonNode.toPrettyString()); | |
w.flush(); | |
} catch (IOException e) { | |
throw new RuntimeException(e); | |
} | |
System.out.println("Finished writing JSON."); | |
}); | |
result.block(); | |
System.out.println("After block."); | |
System.out.printf("Blocking test complete, check output file: %s%n", SAMPLE_OUTPUT_FILE); | |
System.out.println(); | |
} | |
/** | |
* Get a publisher of a stream of {@link DataBuffer} instances for a local file. | |
* | |
* @return stream of data buffers | |
*/ | |
private static Flux<DataBuffer> getBuffers() { | |
return DataBufferUtils.read( | |
Path.of(SAMPLE_INPUT_FILE), | |
new DefaultDataBufferFactory(), | |
BUFFER_SIZE, | |
StandardOpenOption.READ); | |
} | |
public static void main(String[] args) throws Exception { | |
basicTest(); | |
naiveTest(); | |
blockingTest(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment