Skip to content

Instantly share code, notes, and snippets.

@ktoso
Last active January 2, 2016 10:49
Show Gist options
  • Save ktoso/8292668 to your computer and use it in GitHub Desktop.
Save ktoso/8292668 to your computer and use it in GitHub Desktop.
def writeAsync(persistentBatch: immutable.Seq[PersistentRepr]): Future[Unit]
def deleteAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean): Future[Unit]
def confirmAsync(processorId: String, sequenceNr: Long, channelId: String): Future[Unit]
// and "replay":
def replayAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long)
(replayCallback: (PersistentRepr) => Unit): Future[Long]
override def writeAsync(persistentBatch: Seq[PersistentRepr]): Future[Unit] = {
log.debug(s"Write async for ${persistentBatch.size} presistent messages")
val futures = persistentBatch map { p =>
import p._
executePut(
RowKey(processorId, sequenceNr).toBytes,
Array(ProcessorId, SequenceNr, Marker, Message),
Array(toBytes(processorId), toBytes(sequenceNr), toBytes(AcceptedMarker), persistentToBytes(p))
)
}
Future.sequence(futures)
}
protected def executePut(key: Array[Byte], qualifiers: Array[Array[Byte]], values: Array[Array[Byte]]): Future[Unit] = {
val request = new PutRequest(TableBytes, key, Family, qualifiers, values)
client.put(request) // implicitly converted from Deferred[A, B] => Future[Unit]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment