Created
April 23, 2020 14:25
-
-
Save HaloFour/ce3063d4e693b495e3c194cbb2f66686 to your computer and use it in GitHub Desktop.
Reactive JSON Array Extractor
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.IOException; | |
import java.io.UncheckedIOException; | |
import java.util.Collections; | |
import java.util.LinkedList; | |
import java.util.List; | |
import java.util.function.Function; | |
import java.util.function.Supplier; | |
import com.fasterxml.jackson.core.JsonParser; | |
import com.fasterxml.jackson.core.JsonPointer; | |
import com.fasterxml.jackson.core.JsonProcessingException; | |
import com.fasterxml.jackson.core.JsonToken; | |
import com.fasterxml.jackson.core.async.ByteArrayFeeder; | |
import com.fasterxml.jackson.databind.ObjectMapper; | |
import com.fasterxml.jackson.databind.util.TokenBuffer; | |
import org.springframework.core.codec.DecodingException; | |
import org.springframework.core.io.buffer.DataBuffer; | |
import org.springframework.core.io.buffer.DataBufferUtils; | |
import org.springframework.http.ReactiveHttpInputMessage; | |
import org.springframework.web.reactive.function.BodyExtractor; | |
import reactor.core.publisher.Flux; | |
public class StreamingJsonArrayExtractor<T> implements BodyExtractor<Flux<T>, ReactiveHttpInputMessage> { | |
private final Class<T> type; | |
private final ObjectMapper mapper; | |
private final JsonPointer pointer; | |
public StreamingJsonArrayExtractor(Class<T> type, ObjectMapper mapper, JsonPointer pointer) { | |
this.type = type; | |
this.mapper = mapper; | |
this.pointer = pointer; | |
} | |
@Override | |
public Flux<T> extract(ReactiveHttpInputMessage inputMessage, Context context) { | |
try { | |
var factory = mapper.getFactory(); | |
var deserializationContext = mapper.getDeserializationContext(); | |
var parser = factory.createNonBlockingByteArrayParser(); | |
var feeder = (ByteArrayFeeder) parser.getNonBlockingInputFeeder(); | |
Supplier<TokenBuffer> tokenBufferFactory = () -> new TokenBuffer(parser, deserializationContext); | |
var tokenizer = new StreamingElementTokenizer(parser, feeder, tokenBufferFactory, pointer); | |
return inputMessage.getBody() | |
.concatMapIterable(tokenizer::tokenize) | |
.concatWith(tokenizer.endOfInput()) | |
.map(deserializeBuffer(mapper, type)); | |
} catch (Exception exception) { | |
return Flux.error(exception); | |
} | |
} | |
private static class StreamingElementTokenizer { | |
private final JsonParser parser; | |
private final ByteArrayFeeder feeder; | |
private final Supplier<TokenBuffer> tokenBufferFactory; | |
private final JsonPointer pointer; | |
private boolean found; | |
private int nestLevel; | |
private TokenBuffer currentTokenBuffer; | |
public StreamingElementTokenizer(JsonParser parser, ByteArrayFeeder feeder, Supplier<TokenBuffer> tokenBufferFactory, JsonPointer pointer) { | |
this.parser = parser; | |
this.feeder = feeder; | |
this.tokenBufferFactory = tokenBufferFactory; | |
this.pointer = pointer; | |
} | |
public List<TokenBuffer> tokenize(DataBuffer buffer) { | |
int bufferSize = buffer.readableByteCount(); | |
var bytes = new byte[bufferSize]; | |
buffer.read(bytes); | |
DataBufferUtils.release(buffer); | |
try { | |
feeder.feedInput(bytes, 0, bufferSize); | |
return parseTokens(); | |
} catch (JsonProcessingException exception) { | |
throw new DecodingException("JSON decoding error: " + exception.getOriginalMessage(), exception); | |
} catch (IOException exception) { | |
throw new UncheckedIOException(exception); | |
} | |
} | |
public Flux<TokenBuffer> endOfInput() { | |
return Flux.defer(() -> { | |
feeder.endOfInput(); | |
try { | |
return Flux.fromIterable(parseTokens()); | |
} catch (JsonProcessingException exception) { | |
throw new DecodingException("JSON decoding error: " + exception.getOriginalMessage(), exception); | |
} catch (IOException exception) { | |
throw new UncheckedIOException(exception); | |
} | |
}); | |
} | |
private List<TokenBuffer> parseTokens() throws IOException { | |
List<TokenBuffer> tokenBuffers = Collections.emptyList(); | |
for (var token = parser.nextToken(); token != null && token != JsonToken.NOT_AVAILABLE; token = parser.nextToken()) { | |
if (found) { | |
if (nestLevel == 0 && token == JsonToken.END_ARRAY) { | |
found = false; | |
} else { | |
if (currentTokenBuffer == null) { | |
currentTokenBuffer = tokenBufferFactory.get(); | |
} | |
currentTokenBuffer.copyCurrentEvent(parser); | |
if (token.isStructStart()) { | |
nestLevel += 1; | |
} else if (token.isStructEnd()) { | |
nestLevel -= 1; | |
if (nestLevel == 0) { | |
if (tokenBuffers.isEmpty()) { | |
tokenBuffers = new LinkedList<>(); | |
} | |
tokenBuffers.add(currentTokenBuffer); | |
currentTokenBuffer = null; | |
} | |
} | |
} | |
} else { | |
if (parser.isExpectedStartArrayToken()) { | |
var context = parser.getParsingContext(); | |
found = context.pathAsPointer(false).toString().equals(pointer.toString()); | |
nestLevel = 0; | |
} | |
} | |
} | |
return tokenBuffers; | |
} | |
} | |
private Function<TokenBuffer, T> deserializeBuffer(ObjectMapper mapper, Class<T> type) { | |
var reader = mapper.readerFor(type); | |
return tokenBuffer -> { | |
try { | |
return reader.readValue(tokenBuffer.asParser(mapper)); | |
} catch (IOException exception) { | |
throw new UncheckedIOException(exception); | |
} | |
}; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment