Skip to content

Instantly share code, notes, and snippets.

@Andrei-Pozolotin
Last active January 3, 2017 19:43
Show Gist options
  • Save Andrei-Pozolotin/4dd4f86ae90efd648ca9708bc12be393 to your computer and use it in GitHub Desktop.
Save Andrei-Pozolotin/4dd4f86ae90efd648ca9708bc12be393 to your computer and use it in GitHub Desktop.
RioArt.java - expose aeron sink/source https://github.com/akka/akka/issues/22096
package akka.remote.artery.rio;
import akka.Done;
import akka.actor.ActorSystem;
import akka.actor.ExtendedActorSystem;
import akka.remote.RemoteActorRefProvider;
import akka.remote.artery.AeronSink;
import akka.remote.artery.AeronSource;
import akka.remote.artery.AeronSource.ResourceLifecycle;
import akka.remote.artery.ArteryTransport;
import akka.remote.artery.EnvelopeBuffer;
import akka.remote.artery.EnvelopeBufferPool;
import akka.remote.artery.EventSink;
import akka.remote.artery.TaskRunner;
import akka.stream.scaladsl.Sink;
import akka.stream.scaladsl.Source;
import io.aeron.Aeron;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import akka.actor.Address;
public class RioArt {
final ExtendedActorSystem system;
final TaskRunner taskRunner;
final EnvelopeBufferPool bufferPool;
public RioArt(ActorSystem system) {
this.system = (ExtendedActorSystem) system;
this.taskRunner = new TaskRunner(this.system, 1); // TODO config
this.bufferPool = new EnvelopeBufferPool(64 * 1024, 256); // TODO config
}
public String channel(Address address) {
return channel(address.host().get(), (Integer) address.port().get());
}
public String channel(String host, int port) {
return "aeron:udp?endpoint=" + host + ":" + port;
}
public ArteryTransport transport() {
RemoteActorRefProvider provider = (RemoteActorRefProvider) system.provider();
return (ArteryTransport) provider.transport();
}
public Aeron aeron() {
return transport().akka$remote$artery$ArteryTransport$$aeron;
}
public EventSink debugSink() {
return transport().createFlightRecorderEventSink(false);
}
public AeronSink aeronSink(Address address, int streamId, Duration timeLimit) {
return new AeronSink(channel(address), streamId, aeron(), taskRunner, bufferPool, timeLimit, debugSink());
}
public AeronSource aeronSource(Address address, int streamId) {
return new AeronSource(channel(address), streamId, aeron(), taskRunner, bufferPool, debugSink());
}
public Sink<EnvelopeBuffer, Future<Done>> streamSink(Address address, int streamId, Duration timeLimit) {
return Sink.fromGraph(aeronSink(address, streamId, timeLimit));
}
public Source<EnvelopeBuffer, ResourceLifecycle> streamSource(Address address, int streamId) {
return Source.fromGraph(aeronSource(address, streamId));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment