Last active
January 3, 2017 19:43
-
-
Save Andrei-Pozolotin/4dd4f86ae90efd648ca9708bc12be393 to your computer and use it in GitHub Desktop.
RioArt.java - expose aeron sink/source https://github.com/akka/akka/issues/22096
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package akka.remote.artery.rio; | |
import akka.Done; | |
import akka.actor.ActorSystem; | |
import akka.actor.ExtendedActorSystem; | |
import akka.remote.RemoteActorRefProvider; | |
import akka.remote.artery.AeronSink; | |
import akka.remote.artery.AeronSource; | |
import akka.remote.artery.AeronSource.ResourceLifecycle; | |
import akka.remote.artery.ArteryTransport; | |
import akka.remote.artery.EnvelopeBuffer; | |
import akka.remote.artery.EnvelopeBufferPool; | |
import akka.remote.artery.EventSink; | |
import akka.remote.artery.TaskRunner; | |
import akka.stream.scaladsl.Sink; | |
import akka.stream.scaladsl.Source; | |
import io.aeron.Aeron; | |
import scala.concurrent.Future; | |
import scala.concurrent.duration.Duration; | |
import akka.actor.Address; | |
public class RioArt { | |
final ExtendedActorSystem system; | |
final TaskRunner taskRunner; | |
final EnvelopeBufferPool bufferPool; | |
public RioArt(ActorSystem system) { | |
this.system = (ExtendedActorSystem) system; | |
this.taskRunner = new TaskRunner(this.system, 1); // TODO config | |
this.bufferPool = new EnvelopeBufferPool(64 * 1024, 256); // TODO config | |
} | |
public String channel(Address address) { | |
return channel(address.host().get(), (Integer) address.port().get()); | |
} | |
public String channel(String host, int port) { | |
return "aeron:udp?endpoint=" + host + ":" + port; | |
} | |
public ArteryTransport transport() { | |
RemoteActorRefProvider provider = (RemoteActorRefProvider) system.provider(); | |
return (ArteryTransport) provider.transport(); | |
} | |
public Aeron aeron() { | |
return transport().akka$remote$artery$ArteryTransport$$aeron; | |
} | |
public EventSink debugSink() { | |
return transport().createFlightRecorderEventSink(false); | |
} | |
public AeronSink aeronSink(Address address, int streamId, Duration timeLimit) { | |
return new AeronSink(channel(address), streamId, aeron(), taskRunner, bufferPool, timeLimit, debugSink()); | |
} | |
public AeronSource aeronSource(Address address, int streamId) { | |
return new AeronSource(channel(address), streamId, aeron(), taskRunner, bufferPool, debugSink()); | |
} | |
public Sink<EnvelopeBuffer, Future<Done>> streamSink(Address address, int streamId, Duration timeLimit) { | |
return Sink.fromGraph(aeronSink(address, streamId, timeLimit)); | |
} | |
public Source<EnvelopeBuffer, ResourceLifecycle> streamSource(Address address, int streamId) { | |
return Source.fromGraph(aeronSource(address, streamId)); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment