Created
February 3, 2013 11:07
-
-
Save krasserm/4701315 to your computer and use it in GitHub Desktop.
Journal parallel writes prototype
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package example | |
import scala.concurrent._ | |
import scala.concurrent.duration._ | |
import akka.actor._ | |
import akka.pattern.ask | |
import akka.util.Timeout | |
object Parwrite extends App { | |
val system = ActorSystem("example") | |
implicit val timeout = Timeout(5 seconds) | |
import system.dispatcher | |
val journal = system.actorOf(Props[Journal]) | |
val target = system.actorOf(Props(new Actor { | |
def receive = { | |
case msg => { | |
println("received message " + msg) | |
sender ! "re: " + msg | |
} | |
} | |
})) | |
1 to 100 foreach { i => | |
journal ? Write(i, target) onSuccess { | |
case msg => //println("received reply " + msg) | |
} | |
} | |
} | |
case class Write(msg: Any, target: ActorRef, sequenceNr: Long = -1) | |
case class Written(msg: Any, target: ActorRef) | |
class Journal extends Actor { | |
import context.dispatcher | |
implicit val timeout = Timeout(5 seconds) | |
val resequencer = context.actorOf(Props(new Resequencer)) | |
val writers = List.fill(5)(context.actorOf(Props(new Writer))) | |
val writersCount = writers.length | |
var counter = 0L | |
def receive = { | |
case write: Write => { | |
counter += 1 | |
val ctr = counter | |
val sdr = sender | |
val idx = ctr % writersCount toInt | |
val io = writers(idx) ? write.copy(sequenceNr = ctr) | |
io onSuccess { | |
case written: Written => { | |
resequencer tell ((ctr, written), sdr) | |
} | |
} | |
io onFailure { | |
case thr => // wtf | |
} | |
} | |
} | |
} | |
class Writer extends Actor { | |
def receive = { | |
case Write(msg, target, sequenceNr) => { | |
// write to storage backend | |
// ... | |
// reply successful write | |
sender ! Written(msg, target) | |
} | |
} | |
} | |
class Resequencer extends Actor { | |
import scala.collection.mutable.Map | |
private val delayed = Map.empty[Long, (Written, ActorRef)] | |
private var delivered = 0L | |
def receive = { | |
case (seqnr: Long, written: Written) => resequence(seqnr, written, sender) | |
} | |
@scala.annotation.tailrec | |
private def resequence(seqnr: Long, written: Written, sdr: ActorRef) { | |
import written._ | |
if (seqnr == delivered + 1) { | |
delivered = seqnr | |
target tell (msg, sdr) | |
} else { | |
delayed += (seqnr -> (written, sender)) | |
} | |
val eo = delayed.remove(delivered + 1) | |
if (eo.isDefined) resequence(delivered + 1, eo.get._1, eo.get._2) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment