Skip to content

Instantly share code, notes, and snippets.

@nkallen
Created February 27, 2010 09:08
Show Gist options
  • Save nkallen/316595 to your computer and use it in GitHub Desktop.
Save nkallen/316595 to your computer and use it in GitHub Desktop.
This is a prototype of a technique for testing distributed race conditions.
/*
This is a prototype of a technique for testing distributed race conditions.
Here is an example,
Suppose we have a command that performs an operation across two or more components.
And suppose more than one of these commands is run in parallel (across processes).
The race conditions will occur as each command process independently invokes methods
on these components in various orders.
The testing technique proposed here is to create barriers around method invocations
so that you can explicitly specify the order. Thus, you can enumerate all possible
orders of execution in a unit-test like fashion and assert that the system ends-up
in a coherent state.
val rawShard1 = new Shard
val rawShard2 = new Shard
val shard1 = barrier(rawShard1)
val shard2 = barrier(rawShard2)
val from = 1
val to = 2
val at = Time.now
"with simultaneous 'archives' and 'adds', the 'archive' always wins" in {
"when the archive executes in the middle of the add" >> {
one(shard1).add(1, 2, at) then
one(shard1).archive(2, at) then
one(shard2).archive(2, at) then
one(shard2).add(1, 2, at)
val process1 = future(AddEdge(from, to, List(shard1, shard2), at))
val process2 = future(Archive(from, List(shard1, shard2), at))
List(process1, process2).foreach(_.get(1.second))
rawShard1.isArchived(1, 2) mustBe true
rawShard2.isArchived(1, 2) mustBe true
}
"when the archive executes before the add" {
one(shard1).archive(2, at) then
one(shard2).archive(2, at) then
one(shard1).add(1, 2, at) then
one(shard2).add(1, 2, at)
val process1 = future(AddEdge(from, to, List(shard1, shard2), at))
val process2 = future(Archive(from, List(shard1, shard2), at))
List(process1, process2).foreach(_.get(1.second))
rawShard1.isArchived(1, 2) mustBe true
rawShard2.isArchived(1, 2) mustBe true
}
}
*/
import scala.collection.mutable.Map
import scala.collection.mutable
import java.util.NoSuchElementException
import java.lang.reflect.{InvocationHandler, InvocationTargetException, Method, Proxy => JProxy, UndeclaredThrowableException}
import scala.reflect.Manifest
object Proxy {
def apply[T <: AnyRef](obj: T)(f: (T, MethodWrapper, Array[Object]) => Object)(implicit manifest: Manifest[T]): T = {
val invocationHandler = new InvocationHandler {
def invoke(unused: Object, method: Method, args: Array[Object]) = {
f(obj, new MethodWrapper(method), args)
}
}
JProxy.newProxyInstance(obj.getClass.getClassLoader, Array(manifest.erasure), invocationHandler).asInstanceOf[T]
}
def apply[T <: AnyRef](cls: Class[T])(f: (T, MethodWrapper, Array[Object]) => Object)(implicit manifest: Manifest[T]): T = {
val invocationHandler = new InvocationHandler {
def invoke(obj: Object, method: Method, args: Array[Object]) = {
f(obj.asInstanceOf[T], new MethodWrapper(method), args)
}
}
JProxy.newProxyInstance(cls.getClassLoader, Array(manifest.erasure), invocationHandler).asInstanceOf[T]
}
class MethodWrapper(val method: Method) {
@throws(classOf[Exception])
def invoke(obj: Object, args: Array[Object]) = {
try {
method.invoke(obj, args: _*)
} catch {
case e: InvocationTargetException => throw e.getTargetException
}
}
def name = method.getName
}
}
class NotAllowedException(name: String) extends Exception(name)
case class Allowance(name: String) {
val allowedInvocations = Map[List[Object], Int]()
def permit(args: Array[Object], times: Int) = synchronized {
allowedInvocations += args.toList -> times
}
def doIfPermitted[T](args: Array[Object], f: => T) = synchronized {
val times = allowedInvocations.getOrElse(args.toList, {
throw new NotAllowedException(name)
})
if (allowedInvocations(args.toList) <= 0) {
throw new NotAllowedException(name)
}
allowedInvocations(args.toList) -= 1
f
}
}
val allowedMethods = new mutable.HashMap[(AnyRef, String), Allowance] with scala.collection.mutable.SynchronizedMap[(AnyRef, String), Allowance]
val proxy2Object = new mutable.HashMap[JProxy, AnyRef] with scala.collection.mutable.SynchronizedMap[JProxy, AnyRef]
def barrier[T <: AnyRef](t: T)(implicit manifest: Manifest[T]): T = {
val p = Proxy(t) { (obj, method, args) =>
try {
if (List("toString", "hashCode").contains(method.name))
method.invoke(obj, args)
else if (method.name == "equals")
args(0).equals(obj).asInstanceOf[AnyRef]
else
allowedMethods((obj, method.name)).doIfPermitted(args, method.invoke(obj, args))
} catch {
case e: NoSuchElementException => throw new NotAllowedException(method.name)
}
}
proxy2Object += p.asInstanceOf[JProxy] -> t
p
}
def one[T <: AnyRef](t: T)(implicit manifest: Manifest[T]): T = {
Proxy(t) { (obj, method, args) =>
allowedMethods.getOrElseUpdate((proxy2Object(obj.asInstanceOf[JProxy]), method.name), {
Allowance(method.name)
}).permit(args, 1)
}
}
trait Foo {
def bar(a: Int, b: String)
}
class Bar extends Foo {
def bar(a: Int, b: String) {
println(b + a)
}
}
val foo = barrier[Foo](new Bar)
try {
foo.bar(1, "haay")
} catch {
case _ =>
}
one(foo).bar(1, "haay")
try {
foo.bar(1, "nuh uh")
} catch {
case _ =>
}
foo.bar(1, "haay")
// yay!!
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment