Skip to content

Instantly share code, notes, and snippets.

@HaloFour
Created April 23, 2020 14:25
Show Gist options
  • Save HaloFour/ce3063d4e693b495e3c194cbb2f66686 to your computer and use it in GitHub Desktop.
Save HaloFour/ce3063d4e693b495e3c194cbb2f66686 to your computer and use it in GitHub Desktop.
Reactive JSON Array Extractor
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.function.Function;
import java.util.function.Supplier;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonPointer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.core.async.ByteArrayFeeder;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.util.TokenBuffer;
import org.springframework.core.codec.DecodingException;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.ReactiveHttpInputMessage;
import org.springframework.web.reactive.function.BodyExtractor;
import reactor.core.publisher.Flux;
public class StreamingJsonArrayExtractor<T> implements BodyExtractor<Flux<T>, ReactiveHttpInputMessage> {
private final Class<T> type;
private final ObjectMapper mapper;
private final JsonPointer pointer;
public StreamingJsonArrayExtractor(Class<T> type, ObjectMapper mapper, JsonPointer pointer) {
this.type = type;
this.mapper = mapper;
this.pointer = pointer;
}
@Override
public Flux<T> extract(ReactiveHttpInputMessage inputMessage, Context context) {
try {
var factory = mapper.getFactory();
var deserializationContext = mapper.getDeserializationContext();
var parser = factory.createNonBlockingByteArrayParser();
var feeder = (ByteArrayFeeder) parser.getNonBlockingInputFeeder();
Supplier<TokenBuffer> tokenBufferFactory = () -> new TokenBuffer(parser, deserializationContext);
var tokenizer = new StreamingElementTokenizer(parser, feeder, tokenBufferFactory, pointer);
return inputMessage.getBody()
.concatMapIterable(tokenizer::tokenize)
.concatWith(tokenizer.endOfInput())
.map(deserializeBuffer(mapper, type));
} catch (Exception exception) {
return Flux.error(exception);
}
}
private static class StreamingElementTokenizer {
private final JsonParser parser;
private final ByteArrayFeeder feeder;
private final Supplier<TokenBuffer> tokenBufferFactory;
private final JsonPointer pointer;
private boolean found;
private int nestLevel;
private TokenBuffer currentTokenBuffer;
public StreamingElementTokenizer(JsonParser parser, ByteArrayFeeder feeder, Supplier<TokenBuffer> tokenBufferFactory, JsonPointer pointer) {
this.parser = parser;
this.feeder = feeder;
this.tokenBufferFactory = tokenBufferFactory;
this.pointer = pointer;
}
public List<TokenBuffer> tokenize(DataBuffer buffer) {
int bufferSize = buffer.readableByteCount();
var bytes = new byte[bufferSize];
buffer.read(bytes);
DataBufferUtils.release(buffer);
try {
feeder.feedInput(bytes, 0, bufferSize);
return parseTokens();
} catch (JsonProcessingException exception) {
throw new DecodingException("JSON decoding error: " + exception.getOriginalMessage(), exception);
} catch (IOException exception) {
throw new UncheckedIOException(exception);
}
}
public Flux<TokenBuffer> endOfInput() {
return Flux.defer(() -> {
feeder.endOfInput();
try {
return Flux.fromIterable(parseTokens());
} catch (JsonProcessingException exception) {
throw new DecodingException("JSON decoding error: " + exception.getOriginalMessage(), exception);
} catch (IOException exception) {
throw new UncheckedIOException(exception);
}
});
}
private List<TokenBuffer> parseTokens() throws IOException {
List<TokenBuffer> tokenBuffers = Collections.emptyList();
for (var token = parser.nextToken(); token != null && token != JsonToken.NOT_AVAILABLE; token = parser.nextToken()) {
if (found) {
if (nestLevel == 0 && token == JsonToken.END_ARRAY) {
found = false;
} else {
if (currentTokenBuffer == null) {
currentTokenBuffer = tokenBufferFactory.get();
}
currentTokenBuffer.copyCurrentEvent(parser);
if (token.isStructStart()) {
nestLevel += 1;
} else if (token.isStructEnd()) {
nestLevel -= 1;
if (nestLevel == 0) {
if (tokenBuffers.isEmpty()) {
tokenBuffers = new LinkedList<>();
}
tokenBuffers.add(currentTokenBuffer);
currentTokenBuffer = null;
}
}
}
} else {
if (parser.isExpectedStartArrayToken()) {
var context = parser.getParsingContext();
found = context.pathAsPointer(false).toString().equals(pointer.toString());
nestLevel = 0;
}
}
}
return tokenBuffers;
}
}
private Function<TokenBuffer, T> deserializeBuffer(ObjectMapper mapper, Class<T> type) {
var reader = mapper.readerFor(type);
return tokenBuffer -> {
try {
return reader.readValue(tokenBuffer.asParser(mapper));
} catch (IOException exception) {
throw new UncheckedIOException(exception);
}
};
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment