Skip to content

Instantly share code, notes, and snippets.

@tOverney
Last active May 31, 2016 00:09
Show Gist options
  • Save tOverney/0a9e481ad8fbaa9eb7cda93f908bf95f to your computer and use it in GitHub Desktop.
Save tOverney/0a9e481ad8fbaa9eb7cda93f908bf95f to your computer and use it in GitHub Desktop.
import java.util._
import scala.collection.mutable.MutableList
import scala.collection.mutable.Set
import scala.collection.mutable.Map
// IMPORTANT -- THIS IS INDIVIDUAL WORK. ABSOLUTELY NO COLLABORATION!!!
// - implement a (main-memory) data store with OMVCC.
// - objects are <int, int> key-value pairs.
// - if an operation is to be refused by the OMVCC protocol,
// undo its xact (what work does this take?) and throw an exception.
// - garbage collection of versions is optional.
// - throw exceptions when necessary, such as when we try to:
// + execute an operation in a transaction that is not running
// + read a nonexisting key
// + delete a nonexisting key
// + write into a key where it already has an uncommitted version
// - you may but do not need to create different exceptions for operations that
// are refused and for operations that are refused and cause the Xact to be
// aborted. Keep it simple!
// - keep the interface, we want to test automatically!
object OMVCC {
/* TODO -- your versioned key-value store data structure */
private val kvStore: Map[Int, MutableList[(Long, Int)]] = Map()
private val xactMap: Map[Long, Transaction] = Map()
sealed trait Predicates
case class Read(key: Int) extends Predicates
case class ModQuery(mod: Int, KVs: MutableList[Int]) extends Predicates
case class Write(key: Int) {
var value: Int = -1;
}
def fetchXact(id: Long, checkFinished: Boolean = true): Transaction = {
xactMap.get(id) match {
case Some(xact) if !checkFinished || (checkFinished && !xact.finished) =>
xact
case Some(xact) =>
throw new AlreadyFinishedTransaction(s"$xact is over!")
case None =>
throw new AlreadyFinishedTransaction(s"$id is not a known transaction")
}
}
class Transaction(val xactID: Long, val version: Long,
val committedVersion: Set[Long]) {
private var _finished = false
var predicates: Set[Predicates] = Set()
var writes: MutableList[Write] = MutableList()
def finished = _finished
def over() = _finished = true
def deleteWrite(w: Write): Unit = {
val Write(key) = w
val versionnedValue = kvStore.getOrElse(key,
keyError(s"trying to delete $key"))
val newValues = versionnedValue.filterNot(elem => elem._1 == xactID)
kvStore.update(key, newValues)
}
def updateWriteVersions(newVersion: Long): Unit = {
for (wr @ Write(key) <- writes.toSet) {
val versions = kvStore(key)
val idx = versions.indexWhere(_._1 == xactID)
val value: Int = versions(idx)._2
versions.update(idx, newVersion -> value)
kvStore.update(key, versions.sortBy(- _._1))
}
}
def revertXact(): Unit = {
(writes toSet) foreach deleteWrite
}
def isValid: Boolean = {
def checkPredicates: Boolean = {
val setToTest = previouslyCommittedVersion -- committedVersion
val writesToVerify = setToTest.flatMap{ tId =>
val trans = fetchXact(tId, checkFinished = false)
trans.writes
}
predicates.forall{
case Read(key) =>
!writesToVerify.contains(Write(key))
case ModQuery(mod, keys) =>
keys.forall(key => !writesToVerify.contains(Write(key)))
}
}
writes.isEmpty || checkPredicates
}
def keyError(message: String) = {
revertXact()
throw new NonExistingKey(message)
}
def getVersionedValue(key: Int): Int = {
val versions = kvStore.getOrElse(key, keyError(s"trying to read $key"))
val valid = versions.dropWhile( entry => !committedVersion(entry._1))
if(valid.isEmpty) keyError(s"trying to read $key")
val (version, value) = valid.head
predicates += Read(key)
value
}
def getModuloableValues(mod: Int): MutableList[Int] = {
val res = kvStore.toList.foldLeft(MutableList[(Int, Int)]()){
case (acc, (key, versions)) =>
val valid = versions.dropWhile( entry => !committedVersion(entry._1))
lazy val value = valid.head._2
if (valid.nonEmpty && value % mod == 0)
acc :+ (key, value)
else acc
}
val (keys, values) = res.unzip
predicates += ModQuery(mod, keys)
values
}
override def toString: String =
s"Transaction id:$xactID - predicates: $predicates - writes: $writes"
}
private var previouslyCommittedVersion: Set[Long] = Set()
private var startAndCommitTimestampGen: Long = 0L
private var transactionIdGen: Long = 1L << 62
class NonExistingKey(message: String) extends Exception {
override def toString: String = s"Non-existing key exception: $message"
}
class AlreadyFinishedTransaction(message: String) extends Exception {
override def toString: String =
s"Already Finished Transaction exception: $message"
}
// returns transaction id == logical start timestamp
def begin: Long = {
startAndCommitTimestampGen += 1
transactionIdGen += 1
val trans = new Transaction(transactionIdGen, startAndCommitTimestampGen,
previouslyCommittedVersion + transactionIdGen)
xactMap.put(transactionIdGen, trans)
transactionIdGen
}
// return value of object key in transaction xact
@throws(classOf[Exception])
def read(xact: Long, key: Int): Int = {
val transaction = fetchXact(xact)
transaction.getVersionedValue(key)
}
// return the list of values of objects whose values mod k are zero.
// this is our only kind of query / bulk read.
@throws(classOf[Exception])
def modquery(xact: Long, k: Int): java.util.List[Integer] = {
val transaction = fetchXact(xact)
val l = new java.util.ArrayList[Integer]
val values = transaction.getModuloableValues(k)
values.foreach(elem => l.add(elem))
l
}
// update the value of an existing object identified by key
// or insert <key,value> for a non-existing key in transaction xact
@throws(classOf[Exception])
def write(xact: Long, key: Int, value: Int) {
val transaction = fetchXact(xact)
val inbetweenCommit = previouslyCommittedVersion --
transaction.committedVersion
kvStore.get(key) match {
case Some(versions) if versions.contains(xact) =>
val idx = versions.indexWhere(_._1 == xact)
versions.update(idx, xact -> value)
kvStore.update(key, versions)
case Some(versions) if versions.exists(entry =>
inbetweenCommit(entry._1)) =>
transaction.keyError(
s"there's a more recent committed version for key: $key")
case Some(versions) if versions.exists(entry =>
!transaction.committedVersion(entry._1)) =>
transaction.keyError(
s"there's already an uncommitted version for key: $key")
case Some(versions) =>
kvStore.update(key,
((xact, value) +=: versions).sortBy(- _._1))
case None =>
kvStore.update(key, MutableList(xact -> value))
}
val wr = Write(key)
wr.value = value
transaction.writes += wr
}
// delete the object identified by key in transaction xact
@throws(classOf[Exception])
def delete(xact: Long, key: Int) {
val transaction = fetchXact(xact)
val wr = Write(key)
transaction.deleteWrite(wr)
}
@throws(classOf[Exception])
def commit(xact: Long) {
val transaction = fetchXact(xact)
if (transaction.isValid) {
startAndCommitTimestampGen += 1 //SHOULD BE USED
previouslyCommittedVersion += startAndCommitTimestampGen
xactMap.put(startAndCommitTimestampGen, transaction)
transaction.updateWriteVersions(startAndCommitTimestampGen)
}
else {
transaction.revertXact()
}
transaction.over()
}
@throws(classOf[Exception])
def rollback(xact: Long) {
val transaction = fetchXact(xact)
transaction.revertXact()
transaction.over()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment