Last active
May 4, 2016 13:15
-
-
Save jeantil/424e4ebd268998280696c4f2b16b28e5 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// tu as 2 flux différents a gérer : | |
// ton protocole interne | |
def receiveChat:Receive = { | |
case chatMessage@ChatMessage(message, topic, client) if isActive => | |
// dans tous les cas j'ajoute le message a la fin de la file d'attente | |
// si tu ne fais pas ça tu envoies potentiellement les chat messages dans le désordre | |
// aka race condition :) | |
buffer.enqueue(chatMessage) | |
// dans tous les cas j'essaye d'envoyer totalDemand messages depuis la file d'attente | |
deliverPending() | |
} | |
// le protocole akka-stream | |
def handleStreamProtocol :Receive{ | |
case ActorPublisherMessage.Request(_) => | |
deliverPending() | |
case ActorPublisherMessage.Cancel => | |
context.stop(self) | |
} | |
// j'ai viré le cas où totalDemande est > Int.MaxValue mais tu peux le récuperer si besoin | |
final def deliverPending(): Unit = | |
if (totalDemand > 0) { | |
val (use, keep) = buffer.splitAt(totalDemand.toInt) // prendre les totalDemand premiers elements et essayer de les envoyer | |
buffer = keep // le reste on garde | |
use foreach onNext | |
} | |
// et ton receive devient SRP ;) | |
override def receive: Receive = | |
receiveChat orElse handleStreamProtocol |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment