Skip to content

Instantly share code, notes, and snippets.

@drdozer
Last active August 29, 2015 14:20
Show Gist options
  • Save drdozer/f17a37ad0064b4dd8066 to your computer and use it in GitHub Desktop.
Save drdozer/f17a37ad0064b4dd8066 to your computer and use it in GitHub Desktop.
Fuse multiple requests into a single publisher
object RequestsManager {
case class Start(requests: () => DatabasePublisher[Request])
case class OnError(t: Throwable)
case object OnComplete
}
class RequestsManager() extends Actor with ActorPublisher[Request] {
import RequestsManager._
override def receive: Receive = {
case RequestsManager.Start(requests) =>
swapInNewRequests(requests)
}
def swapInNewRequests(requests: () => DatabasePublisher[Request], outstanding: Long = 0L): Unit = {
requests().subscribe(new Subscriber[Request] {
override def onError(t: Throwable) =
self ! OnError(t)
override def onSubscribe(s: Subscription) =
context become forwardWithSubscription(s, requests, outstanding)
override def onComplete() =
self ! OnComplete
override def onNext(r: Request) =
self ! r
})
}
def forwardWithSubscription(subscription: Subscription,
requests: () => DatabasePublisher[Request],
outstanding: Long): Receive =
{
var _outstanding = outstanding
{
case OnError(t) =>
onError(t)
case OnComplete =>
swapInNewRequests(requests, _outstanding)
case r : Request =>
_outstanding = _outstanding - 1
onNext(r)
case ActorPublisherMessage.Request(items) =>
_outstanding += items
subscription.request(items)
case ActorPublisherMessage.Cancel =>
subscription.cancel()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment