Skip to content

Instantly share code, notes, and snippets.

@nraychaudhuri
Created May 11, 2014 23:56
Show Gist options
  • Select an option

  • Save nraychaudhuri/b03d693b3d1ba6676f50 to your computer and use it in GitHub Desktop.

Select an option

Save nraychaudhuri/b03d693b3d1ba6676f50 to your computer and use it in GitHub Desktop.
One Producer multiple subscribers
package samples
import akka.stream.scaladsl.Flow
import akka.stream.{MaterializerSettings, FlowMaterializer}
import org.reactivestreams.api.{Consumer, Producer}
import org.reactivestreams.spi.{Subscription, Subscriber}
import akka.actor.ActorSystem
object OneProducerManySubscribers {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("test")
val producer: Producer[Int] = Flow.apply(Stream.from(0, 1)).toProducer(FlowMaterializer(MaterializerSettings()))
producer.produceTo(new ConsoleConsumer(0, 5))
producer.produceTo(new ConsoleConsumer(1, 50))
producer.produceTo(new ConsoleConsumer(2, 20))
producer.produceTo(new ConsoleConsumer(3, 20000))
Console.readLine()
system.shutdown()
}
class ConsoleConsumer(id: Int, bufferSize: Int)(implicit sys: ActorSystem) extends Consumer[Int] {
import scala.concurrent.duration._
import sys.dispatcher
private[this] var sub: Option[Subscription] = None
private def requestMore() = sub.map(_.requestMore(bufferSize))
sys.scheduler.schedule(2 seconds, 2 seconds)(requestMore)
override def getSubscriber: Subscriber[Int] = new Subscriber[Int] {
override def onError(cause: Throwable): Unit = scala.sys.error(cause.getMessage)
override def onSubscribe(subscription: Subscription): Unit = {
println(s"Received subscription object for $id")
sub = Some(subscription)
requestMore()
}
override def onComplete(): Unit = {
println("I am done with the stream")
}
override def onNext(element: Int): Unit = {
println(s"Printing the next element for consumer $id - ${element}")
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment