Last active
February 7, 2018 07:58
-
-
Save RoryKelly/e34d07b1eb34b289f0f6d00f628a7c47 to your computer and use it in GitHub Desktop.
Mirrored Source for OKIO.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Takes a bytes source and streams it to parallel sources. I.E Streaming bytes to two server in parallel. | |
*/ | |
public final class MirroredSource { | |
private final Buffer buffer = new Buffer(); | |
private final Source source; | |
private final AtomicBoolean sourceExhausted = new AtomicBoolean(); | |
private boolean closed = false; | |
/** | |
* Takes a bytes source and streams it to parallel sources, I.E Streaming bytes to two server in parallel. | |
* | |
* @param source The Bytes source you want to stream in parallel. | |
*/ | |
public MirroredSource(final Source source) { | |
this.source = source; | |
} | |
/** | |
* As soon as you read from the returned source, its output is copied and buffered. This buffer can then be read from | |
* mirror(). | |
* | |
* @return a byte source. | |
*/ | |
public final Source original() { | |
return new okio.Source() { | |
@Override public long read(final Buffer sink, final long byteCount) throws IOException { | |
final long bytesRead = source.read(sink, byteCount); | |
if (bytesRead > 0) { | |
synchronized (buffer) { | |
sink.copyTo(buffer, sink.size() - bytesRead, bytesRead); | |
// Notfiy the mirror to continue | |
buffer.notifyAll(); | |
} | |
} else { | |
synchronized (buffer) { | |
buffer.notifyAll(); | |
} | |
sourceExhausted.set(true); | |
} | |
return bytesRead; | |
} | |
@Override public Timeout timeout() { | |
return source.timeout(); | |
} | |
@Override public void close() throws IOException { | |
source.close(); | |
sourceExhausted.set(true); | |
synchronized (buffer) { | |
buffer.notifyAll(); | |
} | |
} | |
}; | |
} | |
/** | |
* A byte source. Will emit all bytes emitted from original(). Will end when original() is exhausted. | |
* | |
* @return A bytes source. | |
*/ | |
public final Source mirror() { | |
return new okio.Source() { | |
@Override public long read(final Buffer sink, final long byteCount) throws IOException { | |
if (closed) new IllegalStateException("reading closed source"); | |
while (!sourceExhausted.get()) { | |
// only need to synchronise on reads when the source is not exhausted. | |
synchronized (buffer) { | |
if (buffer.request(byteCount)) { | |
return buffer.read(sink, byteCount); | |
} else { | |
try { | |
buffer.wait(200); | |
} catch (final InterruptedException e) { | |
return -1; | |
} | |
} | |
} | |
} | |
return buffer.read(sink, byteCount); | |
} | |
@Override public Timeout timeout() { | |
return new Timeout(); | |
} | |
@Override public void close() throws IOException { | |
buffer.clear(); | |
closed = true; | |
} | |
}; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public final class OkioTools { | |
public static Source tee(final Source source, final BufferedSink cacheBody) { | |
return new Source() { | |
@Override public long read(final Buffer sink, final long byteCount) throws IOException { | |
final long bytesRead; | |
bytesRead = source.read(sink, byteCount); | |
if (bytesRead == -1) { | |
return -1; | |
} | |
sink.copyTo(cacheBody.buffer(), sink.size() - bytesRead, bytesRead); | |
cacheBody.emitCompleteSegments(); | |
cacheBody.flush(); | |
return bytesRead; | |
} | |
@Override public Timeout timeout() { | |
return source.timeout(); | |
} | |
@Override public void close() throws IOException { | |
cacheBody.flush(); | |
cacheBody.close(); | |
source.close(); | |
} | |
}; | |
} | |
public static MirroredSource mirror(final Source source) { | |
return new MirroredSource(source); | |
} | |
/** | |
* Takes a source and emits the cumulative bytes at I.E byteInterval, byteInterval* 2, byteInterval * N. | |
* | |
* @param source Recording Source. | |
* @param byteInterval The byte interval to emit each byte array at. | |
* @return Emits a byte array at byteInterval length, i.e byteInterval, byteInterval* 2, byteInterval * N. | |
*/ | |
public static Observable<byte[]> cumulativeEmit(final Source source, final int byteInterval) { | |
Preconditions.checkArgument(byteInterval > 0, "byte interval must be greater that zero."); | |
final Buffer allAudioData = new Buffer(); | |
return Observable.create(subscriber -> { | |
try (BufferedSource bufferedSource = Okio.buffer(source)) { | |
int nextEmission = byteInterval; | |
while (!subscriber.isUnsubscribed() && !bufferedSource.exhausted()) { | |
// read into allAudioData | |
bufferedSource.read(allAudioData, byteInterval); | |
if (nextEmission < allAudioData.size()){ | |
// copy the allAudioData to a temp allAudioData so i can get a byte array. | |
final Buffer tempBuffer = new Buffer(); | |
allAudioData.copyTo(tempBuffer, 0, nextEmission); | |
subscriber.onNext(tempBuffer.readByteArray()); | |
tempBuffer.clear(); | |
nextEmission += byteInterval; | |
} | |
} | |
allAudioData.clear(); | |
} catch (final IOException e) { | |
subscriber.onError(e); | |
} | |
subscriber.onCompleted(); | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Updated with Documentation and a few useful static helpers.