Created
February 27, 2010 09:08
-
-
Save nkallen/316595 to your computer and use it in GitHub Desktop.
This is a prototype of a technique for testing distributed race conditions.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
This is a prototype of a technique for testing distributed race conditions. | |
Here is an example, | |
Suppose we have a command that performs an operation across two or more components. | |
And suppose more than one of these commands is run in parallel (across processes). | |
The race conditions will occur as each command process independently invokes methods | |
on these components in various orders. | |
The testing technique proposed here is to create barriers around method invocations | |
so that you can explicitly specify the order. Thus, you can enumerate all possible | |
orders of execution in a unit-test like fashion and assert that the system ends-up | |
in a coherent state. | |
val rawShard1 = new Shard | |
val rawShard2 = new Shard | |
val shard1 = barrier(rawShard1) | |
val shard2 = barrier(rawShard2) | |
val from = 1 | |
val to = 2 | |
val at = Time.now | |
"with simultaneous 'archives' and 'adds', the 'archive' always wins" in { | |
"when the archive executes in the middle of the add" >> { | |
one(shard1).add(1, 2, at) then | |
one(shard1).archive(2, at) then | |
one(shard2).archive(2, at) then | |
one(shard2).add(1, 2, at) | |
val process1 = future(AddEdge(from, to, List(shard1, shard2), at)) | |
val process2 = future(Archive(from, List(shard1, shard2), at)) | |
List(process1, process2).foreach(_.get(1.second)) | |
rawShard1.isArchived(1, 2) mustBe true | |
rawShard2.isArchived(1, 2) mustBe true | |
} | |
"when the archive executes before the add" { | |
one(shard1).archive(2, at) then | |
one(shard2).archive(2, at) then | |
one(shard1).add(1, 2, at) then | |
one(shard2).add(1, 2, at) | |
val process1 = future(AddEdge(from, to, List(shard1, shard2), at)) | |
val process2 = future(Archive(from, List(shard1, shard2), at)) | |
List(process1, process2).foreach(_.get(1.second)) | |
rawShard1.isArchived(1, 2) mustBe true | |
rawShard2.isArchived(1, 2) mustBe true | |
} | |
} | |
*/ | |
import scala.collection.mutable.Map | |
import scala.collection.mutable | |
import java.util.NoSuchElementException | |
import java.lang.reflect.{InvocationHandler, InvocationTargetException, Method, Proxy => JProxy, UndeclaredThrowableException} | |
import scala.reflect.Manifest | |
object Proxy { | |
def apply[T <: AnyRef](obj: T)(f: (T, MethodWrapper, Array[Object]) => Object)(implicit manifest: Manifest[T]): T = { | |
val invocationHandler = new InvocationHandler { | |
def invoke(unused: Object, method: Method, args: Array[Object]) = { | |
f(obj, new MethodWrapper(method), args) | |
} | |
} | |
JProxy.newProxyInstance(obj.getClass.getClassLoader, Array(manifest.erasure), invocationHandler).asInstanceOf[T] | |
} | |
def apply[T <: AnyRef](cls: Class[T])(f: (T, MethodWrapper, Array[Object]) => Object)(implicit manifest: Manifest[T]): T = { | |
val invocationHandler = new InvocationHandler { | |
def invoke(obj: Object, method: Method, args: Array[Object]) = { | |
f(obj.asInstanceOf[T], new MethodWrapper(method), args) | |
} | |
} | |
JProxy.newProxyInstance(cls.getClassLoader, Array(manifest.erasure), invocationHandler).asInstanceOf[T] | |
} | |
class MethodWrapper(val method: Method) { | |
@throws(classOf[Exception]) | |
def invoke(obj: Object, args: Array[Object]) = { | |
try { | |
method.invoke(obj, args: _*) | |
} catch { | |
case e: InvocationTargetException => throw e.getTargetException | |
} | |
} | |
def name = method.getName | |
} | |
} | |
class NotAllowedException(name: String) extends Exception(name) | |
case class Allowance(name: String) { | |
val allowedInvocations = Map[List[Object], Int]() | |
def permit(args: Array[Object], times: Int) = synchronized { | |
allowedInvocations += args.toList -> times | |
} | |
def doIfPermitted[T](args: Array[Object], f: => T) = synchronized { | |
val times = allowedInvocations.getOrElse(args.toList, { | |
throw new NotAllowedException(name) | |
}) | |
if (allowedInvocations(args.toList) <= 0) { | |
throw new NotAllowedException(name) | |
} | |
allowedInvocations(args.toList) -= 1 | |
f | |
} | |
} | |
val allowedMethods = new mutable.HashMap[(AnyRef, String), Allowance] with scala.collection.mutable.SynchronizedMap[(AnyRef, String), Allowance] | |
val proxy2Object = new mutable.HashMap[JProxy, AnyRef] with scala.collection.mutable.SynchronizedMap[JProxy, AnyRef] | |
def barrier[T <: AnyRef](t: T)(implicit manifest: Manifest[T]): T = { | |
val p = Proxy(t) { (obj, method, args) => | |
try { | |
if (List("toString", "hashCode").contains(method.name)) | |
method.invoke(obj, args) | |
else if (method.name == "equals") | |
args(0).equals(obj).asInstanceOf[AnyRef] | |
else | |
allowedMethods((obj, method.name)).doIfPermitted(args, method.invoke(obj, args)) | |
} catch { | |
case e: NoSuchElementException => throw new NotAllowedException(method.name) | |
} | |
} | |
proxy2Object += p.asInstanceOf[JProxy] -> t | |
p | |
} | |
def one[T <: AnyRef](t: T)(implicit manifest: Manifest[T]): T = { | |
Proxy(t) { (obj, method, args) => | |
allowedMethods.getOrElseUpdate((proxy2Object(obj.asInstanceOf[JProxy]), method.name), { | |
Allowance(method.name) | |
}).permit(args, 1) | |
} | |
} | |
trait Foo { | |
def bar(a: Int, b: String) | |
} | |
class Bar extends Foo { | |
def bar(a: Int, b: String) { | |
println(b + a) | |
} | |
} | |
val foo = barrier[Foo](new Bar) | |
try { | |
foo.bar(1, "haay") | |
} catch { | |
case _ => | |
} | |
one(foo).bar(1, "haay") | |
try { | |
foo.bar(1, "nuh uh") | |
} catch { | |
case _ => | |
} | |
foo.bar(1, "haay") | |
// yay!! |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment