Last active
May 31, 2016 00:09
-
-
Save tOverney/0a9e481ad8fbaa9eb7cda93f908bf95f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util._ | |
import scala.collection.mutable.MutableList | |
import scala.collection.mutable.Set | |
import scala.collection.mutable.Map | |
// IMPORTANT -- THIS IS INDIVIDUAL WORK. ABSOLUTELY NO COLLABORATION!!! | |
// - implement a (main-memory) data store with OMVCC. | |
// - objects are <int, int> key-value pairs. | |
// - if an operation is to be refused by the OMVCC protocol, | |
// undo its xact (what work does this take?) and throw an exception. | |
// - garbage collection of versions is optional. | |
// - throw exceptions when necessary, such as when we try to: | |
// + execute an operation in a transaction that is not running | |
// + read a nonexisting key | |
// + delete a nonexisting key | |
// + write into a key where it already has an uncommitted version | |
// - you may but do not need to create different exceptions for operations that | |
// are refused and for operations that are refused and cause the Xact to be | |
// aborted. Keep it simple! | |
// - keep the interface, we want to test automatically! | |
object OMVCC { | |
/* TODO -- your versioned key-value store data structure */ | |
private val kvStore: Map[Int, MutableList[(Long, Int)]] = Map() | |
private val xactMap: Map[Long, Transaction] = Map() | |
sealed trait Predicates | |
case class Read(key: Int) extends Predicates | |
case class ModQuery(mod: Int, KVs: MutableList[Int]) extends Predicates | |
case class Write(key: Int) { | |
var value: Int = -1; | |
} | |
def fetchXact(id: Long, checkFinished: Boolean = true): Transaction = { | |
xactMap.get(id) match { | |
case Some(xact) if !checkFinished || (checkFinished && !xact.finished) => | |
xact | |
case Some(xact) => | |
throw new AlreadyFinishedTransaction(s"$xact is over!") | |
case None => | |
throw new AlreadyFinishedTransaction(s"$id is not a known transaction") | |
} | |
} | |
class Transaction(val xactID: Long, val version: Long, | |
val committedVersion: Set[Long]) { | |
private var _finished = false | |
var predicates: Set[Predicates] = Set() | |
var writes: MutableList[Write] = MutableList() | |
def finished = _finished | |
def over() = _finished = true | |
def deleteWrite(w: Write): Unit = { | |
val Write(key) = w | |
val versionnedValue = kvStore.getOrElse(key, | |
keyError(s"trying to delete $key")) | |
val newValues = versionnedValue.filterNot(elem => elem._1 == xactID) | |
kvStore.update(key, newValues) | |
} | |
def updateWriteVersions(newVersion: Long): Unit = { | |
for (wr @ Write(key) <- writes.toSet) { | |
val versions = kvStore(key) | |
val idx = versions.indexWhere(_._1 == xactID) | |
val value: Int = versions(idx)._2 | |
versions.update(idx, newVersion -> value) | |
kvStore.update(key, versions.sortBy(- _._1)) | |
} | |
} | |
def revertXact(): Unit = { | |
(writes toSet) foreach deleteWrite | |
} | |
def isValid: Boolean = { | |
def checkPredicates: Boolean = { | |
val setToTest = previouslyCommittedVersion -- committedVersion | |
val writesToVerify = setToTest.flatMap{ tId => | |
val trans = fetchXact(tId, checkFinished = false) | |
trans.writes | |
} | |
predicates.forall{ | |
case Read(key) => | |
!writesToVerify.contains(Write(key)) | |
case ModQuery(mod, keys) => | |
keys.forall(key => !writesToVerify.contains(Write(key))) | |
} | |
} | |
writes.isEmpty || checkPredicates | |
} | |
def keyError(message: String) = { | |
revertXact() | |
throw new NonExistingKey(message) | |
} | |
def getVersionedValue(key: Int): Int = { | |
val versions = kvStore.getOrElse(key, keyError(s"trying to read $key")) | |
val valid = versions.dropWhile( entry => !committedVersion(entry._1)) | |
if(valid.isEmpty) keyError(s"trying to read $key") | |
val (version, value) = valid.head | |
predicates += Read(key) | |
value | |
} | |
def getModuloableValues(mod: Int): MutableList[Int] = { | |
val res = kvStore.toList.foldLeft(MutableList[(Int, Int)]()){ | |
case (acc, (key, versions)) => | |
val valid = versions.dropWhile( entry => !committedVersion(entry._1)) | |
lazy val value = valid.head._2 | |
if (valid.nonEmpty && value % mod == 0) | |
acc :+ (key, value) | |
else acc | |
} | |
val (keys, values) = res.unzip | |
predicates += ModQuery(mod, keys) | |
values | |
} | |
override def toString: String = | |
s"Transaction id:$xactID - predicates: $predicates - writes: $writes" | |
} | |
private var previouslyCommittedVersion: Set[Long] = Set() | |
private var startAndCommitTimestampGen: Long = 0L | |
private var transactionIdGen: Long = 1L << 62 | |
class NonExistingKey(message: String) extends Exception { | |
override def toString: String = s"Non-existing key exception: $message" | |
} | |
class AlreadyFinishedTransaction(message: String) extends Exception { | |
override def toString: String = | |
s"Already Finished Transaction exception: $message" | |
} | |
// returns transaction id == logical start timestamp | |
def begin: Long = { | |
startAndCommitTimestampGen += 1 | |
transactionIdGen += 1 | |
val trans = new Transaction(transactionIdGen, startAndCommitTimestampGen, | |
previouslyCommittedVersion + transactionIdGen) | |
xactMap.put(transactionIdGen, trans) | |
transactionIdGen | |
} | |
// return value of object key in transaction xact | |
@throws(classOf[Exception]) | |
def read(xact: Long, key: Int): Int = { | |
val transaction = fetchXact(xact) | |
transaction.getVersionedValue(key) | |
} | |
// return the list of values of objects whose values mod k are zero. | |
// this is our only kind of query / bulk read. | |
@throws(classOf[Exception]) | |
def modquery(xact: Long, k: Int): java.util.List[Integer] = { | |
val transaction = fetchXact(xact) | |
val l = new java.util.ArrayList[Integer] | |
val values = transaction.getModuloableValues(k) | |
values.foreach(elem => l.add(elem)) | |
l | |
} | |
// update the value of an existing object identified by key | |
// or insert <key,value> for a non-existing key in transaction xact | |
@throws(classOf[Exception]) | |
def write(xact: Long, key: Int, value: Int) { | |
val transaction = fetchXact(xact) | |
val inbetweenCommit = previouslyCommittedVersion -- | |
transaction.committedVersion | |
kvStore.get(key) match { | |
case Some(versions) if versions.contains(xact) => | |
val idx = versions.indexWhere(_._1 == xact) | |
versions.update(idx, xact -> value) | |
kvStore.update(key, versions) | |
case Some(versions) if versions.exists(entry => | |
inbetweenCommit(entry._1)) => | |
transaction.keyError( | |
s"there's a more recent committed version for key: $key") | |
case Some(versions) if versions.exists(entry => | |
!transaction.committedVersion(entry._1)) => | |
transaction.keyError( | |
s"there's already an uncommitted version for key: $key") | |
case Some(versions) => | |
kvStore.update(key, | |
((xact, value) +=: versions).sortBy(- _._1)) | |
case None => | |
kvStore.update(key, MutableList(xact -> value)) | |
} | |
val wr = Write(key) | |
wr.value = value | |
transaction.writes += wr | |
} | |
// delete the object identified by key in transaction xact | |
@throws(classOf[Exception]) | |
def delete(xact: Long, key: Int) { | |
val transaction = fetchXact(xact) | |
val wr = Write(key) | |
transaction.deleteWrite(wr) | |
} | |
@throws(classOf[Exception]) | |
def commit(xact: Long) { | |
val transaction = fetchXact(xact) | |
if (transaction.isValid) { | |
startAndCommitTimestampGen += 1 //SHOULD BE USED | |
previouslyCommittedVersion += startAndCommitTimestampGen | |
xactMap.put(startAndCommitTimestampGen, transaction) | |
transaction.updateWriteVersions(startAndCommitTimestampGen) | |
} | |
else { | |
transaction.revertXact() | |
} | |
transaction.over() | |
} | |
@throws(classOf[Exception]) | |
def rollback(xact: Long) { | |
val transaction = fetchXact(xact) | |
transaction.revertXact() | |
transaction.over() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment