Created
June 1, 2015 22:12
-
-
Save benjchristensen/48f17210a674bf38329d to your computer and use it in GitHub Desktop.
Playground for RxJava/ReactiveStreams + Aeron with Backpressure
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package pubsub; | |
import java.nio.ByteBuffer; | |
import java.util.concurrent.TimeUnit; | |
import rx.Observable; | |
import rx.Observable.Operator; | |
import rx.Scheduler.Worker; | |
import rx.Subscriber; | |
import rx.functions.Func1; | |
import rx.functions.Func3; | |
import rx.schedulers.Schedulers; | |
import rx.subscriptions.Subscriptions; | |
import uk.co.real_logic.aeron.Aeron; | |
import uk.co.real_logic.aeron.FragmentAssemblyAdapter; | |
import uk.co.real_logic.aeron.Publication; | |
import uk.co.real_logic.aeron.Subscription; | |
import uk.co.real_logic.aeron.driver.MediaDriver; | |
import uk.co.real_logic.agrona.DirectBuffer; | |
import uk.co.real_logic.agrona.concurrent.UnsafeBuffer; | |
public class RxAeronExample { | |
private static final String TOMBSTONE = "TOMBSTONE"; | |
public static final String RESPONSE_CHANNEL = "aeron:udp?remote=localhost:43450"; | |
public static final int SUBMISSION_STREAM_ID = 1; | |
public static final String SERVER_CHANNEL = "aeron:udp?remote=localhost:43456"; | |
public static final int SERVER_STREAM_ID = 1; | |
private static final long IDLE_MAX_SPINS = 0; | |
private static final long IDLE_MAX_YIELDS = 0; | |
private static final long IDLE_MIN_PARK_NS = TimeUnit.NANOSECONDS.toNanos(1); | |
private static final long IDLE_MAX_PARK_NS = TimeUnit.MILLISECONDS.toNanos(1); | |
private static final int MAX_BUFFER_LENGTH = 1024; | |
public static void main(String... args) { | |
Aeron aeron = start(); | |
// /* async data stream with a tombstone to allow unsubscribing on the other end */ | |
// Observable<String> data = Observable.interval(100, TimeUnit.MILLISECONDS).map(i -> { | |
// return "data_" + (i + 1); | |
// }).take(5).concatWith(Observable.just(TOMBSTONE)); | |
/* cold, fast, synchronous data stream that supports backpressure */ | |
Observable<String> data = Observable.range(1, Integer.MAX_VALUE) | |
.doOnNext(i -> { | |
if(i % 100 == 0) { | |
System.out.println("Source Emitted => " + i); // this should not emit beyond consumption | |
} | |
}) | |
.map(i -> "data_"+i) | |
// .doOnRequest(r -> System.out.println("requested: " + r)) | |
; | |
publish(aeron, RESPONSE_CHANNEL, 1, data, RxAeronExample::stringToBuffer) | |
.doOnError(t -> t.printStackTrace()) | |
.subscribe(); | |
consume(aeron, RESPONSE_CHANNEL, 1, | |
(buffer, offset, length) -> buffer.getStringWithoutLengthUtf8(offset, length).trim()) | |
.takeWhile(s -> !s.equals(TOMBSTONE)) | |
.toBlocking().forEach(System.out::println); | |
System.out.println("done"); | |
} | |
public static <T> Observable<Void> publish(Aeron aeron, final String channel, final int streamId, Observable<T> data, Func1<T, DirectBuffer> map) { | |
/* backpressure aware operator for offering to Publication */ | |
return data.lift(new Operator<Void, T>() { | |
@Override | |
public Subscriber<? super T> call(Subscriber<? super Void> child) { | |
return new Subscriber<T>(child) { | |
private DirectBuffer last; | |
Publication serverPublication; | |
Worker w; | |
@Override | |
public void onStart() { | |
serverPublication = aeron.addPublication(channel, streamId); | |
add(Subscriptions.create(() -> serverPublication.close())); | |
w = Schedulers.computation().createWorker(); | |
add(w); | |
// TODO make this do more than 1 | |
request(1); | |
} | |
@Override | |
public void onCompleted() { | |
child.onCompleted(); | |
} | |
@Override | |
public void onError(Throwable e) { | |
child.onError(e); | |
} | |
@Override | |
public void onNext(T t) { | |
DirectBuffer v = map.call(t); | |
tryOffer(v); | |
} | |
private void tryOffer(DirectBuffer v) { | |
long sent = serverPublication.offer(v); | |
if (sent == Publication.NOT_CONNECTED) { | |
onError(new RuntimeException("Not connected")); | |
} else if (sent == Publication.BACK_PRESSURE) { | |
last = v; | |
w.schedule(() -> tryOffer(v)); | |
} else { | |
// TODO make this do more than 1 | |
request(1); | |
} | |
} | |
}; | |
} | |
}); | |
} | |
public static <T> Observable<T> consume(Aeron aeron, final String channel, final int streamId, Func3<DirectBuffer, Integer, Integer, T> map) { | |
return Observable.create(s -> { | |
Subscription subscription = aeron.addSubscription(channel, streamId, new FragmentAssemblyAdapter((buffer, offset, length, header) -> { | |
T value = map.call(buffer, offset, length); | |
try { | |
// make it behave slowly | |
Thread.sleep(100); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
s.onNext(value); | |
})); | |
s.add(Subscriptions.create(() -> subscription.close())); | |
// use existing event loops | |
Worker w = Schedulers.computation().createWorker(); | |
// limit fragments so it doesn't starve the eventloop | |
w.schedulePeriodically(() -> subscription.poll(100), 0, 0, TimeUnit.NANOSECONDS); | |
s.add(w); | |
}); | |
} | |
public static Aeron start() { | |
final MediaDriver.Context ctx = new MediaDriver.Context(); | |
final MediaDriver mediaDriver = MediaDriver.launch(ctx.dirsDeleteOnExit(true)); | |
final Aeron.Context context = new Aeron.Context() | |
.newConnectionHandler((String channel, int streamId, int sessionId, long joiningPosition, String sourceInformation) -> { | |
System.out.println("New Connection => channel: " + channel + " stream: " + streamId + " session: " + sessionId + " position: " + joiningPosition + " source: " + sourceInformation); | |
}) | |
.errorHandler(t -> t.printStackTrace()); | |
final Aeron aeron = Aeron.connect(context); | |
return aeron; | |
} | |
// TODO not thread-safe at all for this experiment since it is statically defined | |
final static UnsafeBuffer unsafeBuffer = new UnsafeBuffer(ByteBuffer.allocateDirect(MAX_BUFFER_LENGTH)); | |
public static DirectBuffer stringToBuffer(String s) { | |
byte[] bytes = s.getBytes(); | |
unsafeBuffer.putBytes(0, bytes); | |
return unsafeBuffer; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Please note this is just a playground and is NOT production ready. It can be far more efficient with batching, hooking into Aeron metrics for high/low watermarks, etc.