Created
August 19, 2019 22:49
-
-
Save romain-grecourt/e75d9996ee76398456ba4bdd54809a06 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package io.helidon.media.common; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.nio.ByteBuffer; | |
import java.util.concurrent.CompletableFuture; | |
import java.util.concurrent.ExecutionException; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import java.util.logging.Logger; | |
import io.helidon.common.http.DataChunk; | |
import io.helidon.common.reactive.Flow; | |
/** | |
* A {@link Flow.Subscriber subscriber} that can subscribe to a {@link Flow.Publisher publisher} of {@link DataChunk data chunk} | |
* and make the data available for consumption via standard blocking {@link InputStream} API. | |
* | |
* This {@link InputStream} is not thread-safe, concurrent accesses should not be allowed and invocations of read() should be | |
* synchronized by the consumer for any state updates to be visible cross-threads. | |
* | |
* The following assumptions are made about the publisher: | |
* <ul> | |
* <li>{@code request} is invoked only after one chunk has been consumed</li> | |
* <li>The number of chunks requested is always 1</li> | |
* <li>The source {@link Flow.Publisher} fully conforms to the reactive-streams specification with respect to: | |
* <ul> | |
* <li>Total order of {@code onNext}, {@code onComplete}, {@code onError} calls</li> | |
* <li>Follows back pressure: {@code onNext} is not called until more chunks are requested</li> | |
* <li>Relaxed ordering of calls to {@code request}, allows to request even after onComplete/onError</li> | |
* </ul> | |
* </li> | |
* </ul> | |
*/ | |
public class PublisherInputStream extends InputStream implements Flow.Publisher<DataChunk> { | |
private static final Logger LOGGER = Logger.getLogger(PublisherInputStream.class.getName()); | |
private final Flow.Publisher<DataChunk> originalPublisher; | |
/* | |
* Given the assumptions that the number of chunks requested is at most 1, the requests are totally | |
* ordered with request / onNext by construction. This affords the following safety guarantees: | |
* | |
* 1. The only place where this.next is assigned is in onNext, before the next chunk is published | |
* 2. Initially this.next and this.current are identical; one request(1) is called on subscription | |
* 3. All subsequent calls to request(1) happen after the publishing of the chunk is observed by read(...) | |
* | |
* 4. It follows from 3 and 1 that one and only one assignment to this.next happens before observing the | |
* chunk by read(...). (provided the Publisher observes backpressure) | |
* | |
* 5. Such this.next is never lost, because it is copied into this.current before request(1), therefore | |
* a new assignment of this.next in onNext never loses the reference to a Future with an unobserved chunk | |
* (provided the Publisher observes backpressure) | |
* | |
* 6. The publishing of the chunk by onNext synchronizes-with the observation of the chunk by a read(...): | |
* (1) and (5) ensure this.current observed by read(...) is the same as this.next at the time onNext | |
* is invoked, so onNext completes the same Future as accessed by read(...). Moreover, the store to | |
* this.next by onNext and load of this.next by read(...) are in happens-before relationship due to this | |
* synchronizes-with edge, the program order in onNext, and program order in read(...) (and out-of-bands | |
* synchronization between multiple reads) | |
* | |
* A conforming Publisher establishes total order of onNext, therefore, a total order of assignments to | |
* this.next and Future.complete: | |
* | |
* # onSubscribe: assert: this.current == this.next | |
* - request(1) | |
* | |
* # onNext: assert: this.current == this.next | |
* - prev = next | |
* - next = new Future (A) | |
* - prev.complete(chunk) (B): assert: prev == this.current | |
* | |
* # read(...) | |
* - current.get(): (C): (C) synchronizes-with (B): any read is blocked until (B) | |
* ... | |
* # read(...) (same or subsequent read) | |
* - current.get(): synchronizes-with (B) | |
* - chunk is seen to be consumed entirely: release the chunk, and request next: | |
* - current = next: (D): (A) happens-before (D), no further onNext intervenes | |
* invariant: this.current never references a released chunk as seen by close(...), | |
* assuming read(...) and close(...) are totally ordered - either by | |
* program order, or through out-of-bands synchronization | |
* - request(1): assert: a conforming Publisher does not invoke onNext before this | |
* | |
* # onNext: assert: this.current == this.next: a conforming Publisher does not invoke onNext before request(1) | |
* - prev = next | |
* - next = new Future (E) | |
* - prev.complete(chunk) (F): assert: prev == this.current | |
* | |
* # read(...) | |
* - current.get(): (G): (G) synchronizes-with (F): any read after (D) is blocked until (F) | |
* ... | |
* | |
* # onComplete / onError: assert: this.next has not been completed: stream is either empty (no onNext will ever | |
* be called), or an onNext assigned a new uncompleted Future to this.next | |
* - next.complete(...): (H): assert: conforming Publisher ensures this.next assignments by onNext are visible here | |
* by totally ordering onNext / onComplete / onError | |
* | |
* # read(...): assert: eventually this.current == this.next: either initially, or after some read that consumed | |
* the chunk in its entirety and requested the new chunk | |
* - current.get(): (I): (I) synchronizes-with (H) | |
* - signal EOF | |
* | |
* # close(...): assert: this.current never references a released chunk; it either eventually references a chunk | |
* that has been produced by onNext and has not been consumed fully by read(...), or a null | |
* produced by onComplete / onError | |
* assert: if this.next != this.current, this.next will never produce a new chunk: this is the case | |
* if and only if onNext has occurred, but read(...) has not consumed the chunk in its entirety, | |
* hence has not requested any new chunks | |
* - current.whenComplete(release) | |
*/ | |
private CompletableFuture<DataChunk> current = new CompletableFuture<>(); | |
private CompletableFuture<DataChunk> next = current; | |
private volatile Flow.Subscription subscription; | |
private byte[] oneByte; | |
/** | |
* Wraps the supplied publisher and adds a blocking {@link InputStream} based nature. | |
* It is illegal to subscribe to the returned publisher. | |
* | |
* @param originalPublisher the original publisher to wrap | |
*/ | |
public PublisherInputStream(Flow.Publisher<DataChunk> originalPublisher) { | |
this.originalPublisher = originalPublisher; | |
} | |
// This really doesn't need to be AtomicBoolean - all accesses are not thread-safe anyway, so | |
// are meant to be single-threaded. This remains AtomicBoolean just in case there still is some | |
// use-case where the existence of the full memory fence on compareAndSet introduces the "out-of-bands | |
// synchronization" necessary for total ordering of read(...) and close(...) | |
private final AtomicBoolean subscribed = new AtomicBoolean(false); | |
@Override | |
public void close() { | |
// assert: if current != next, next cannot ever be resolved with a chunk that needs releasing | |
current.whenComplete(PublisherInputStream::releaseChunk); | |
current = null; // any future read() will fail | |
} | |
@Override | |
public int read() throws IOException { | |
if (oneByte == null) { | |
oneByte = new byte[1]; | |
} | |
// Chunks are always non-empty, so r is either 1 (at least one byte is produced) or | |
// negative (EOF) | |
int r = read(oneByte, 0, 1); | |
if (r < 0) { | |
return r; | |
} | |
return oneByte[0] & 0xFF; | |
} | |
@Override | |
public int read(byte[] buf, int off, int len) throws IOException { | |
if (subscribed.compareAndSet(false, true)) { | |
// subscribe just once | |
subscribe(); | |
} | |
if (current == null) { | |
throw new IOException("Already closed"); | |
} | |
try { | |
DataChunk chunk = current.get(); // block until a processing data are available | |
if (chunk == null) { | |
return -1; | |
} | |
ByteBuffer currentBuffer = chunk.data(); | |
if (currentBuffer.position() == 0) { | |
LOGGER.finest(() -> "Reading chunk ID: " + chunk.id()); | |
} | |
int rem = currentBuffer.remaining(); | |
// read as much as possible | |
if (len > rem) { | |
len = rem; | |
} | |
currentBuffer.get(buf, off, len); | |
// chunk is consumed entirely, release the chunk and prefetch a new chunk | |
if (len == rem) { | |
releaseChunk(chunk, null); | |
current = next; | |
subscription.request(1); | |
} | |
return len; | |
} catch (InterruptedException e) { | |
Thread.currentThread().interrupt(); | |
throw new IOException(e); | |
} catch (ExecutionException e) { | |
throw new IOException(e.getCause()); | |
} | |
} | |
@Override | |
public void subscribe(Flow.Subscriber<? super DataChunk> subscriber) { | |
subscriber.onError(new UnsupportedOperationException("Subscribing on this publisher is not allowed!")); | |
} | |
private void subscribe() { | |
originalPublisher.subscribe(new Flow.Subscriber<DataChunk>() { | |
@Override | |
public void onSubscribe(Flow.Subscription subscription) { | |
PublisherInputStream.this.subscription = subscription; | |
subscription.request(1); | |
} | |
@Override | |
public void onNext(DataChunk item) { | |
LOGGER.finest(() -> "Processing chunk: " + item.id()); | |
// set next to the next future before completing it | |
// since completing next will unblock read() which which may set current to next | |
// if all the data in current has been consumed | |
CompletableFuture<DataChunk> prev = next; | |
next = new CompletableFuture<>(); | |
// unblock read() | |
prev.complete(item); | |
} | |
@Override | |
public void onError(Throwable throwable) { | |
// unblock read() with an ExecutionException wrapping the throwable | |
// read() uses a try/catch and wraps the ExecutionException cause in an IOException | |
next.completeExceptionally(throwable); | |
} | |
@Override | |
public void onComplete() { | |
// read() returns EOF if the chunk is null | |
next.complete(null); | |
} | |
}); | |
} | |
private static void releaseChunk(DataChunk chunk, Throwable th) { | |
if (chunk != null && !chunk.isReleased()) { | |
LOGGER.finest(() -> "Releasing chunk: " + chunk.id()); | |
chunk.release(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment