Skip to content

Instantly share code, notes, and snippets.

@cooldaemon
Created November 10, 2011 08:09
Show Gist options
  • Save cooldaemon/1354402 to your computer and use it in GitHub Desktop.
Save cooldaemon/1354402 to your computer and use it in GitHub Desktop.
Tried CommitBarrier in the Scala STM.
name := "Scala STM CommitBarrier Test"
version := "0.1"
organization := "com.github.cooldaemon"
scalaVersion := "2.9.1"
resolvers += "Scala Tools Snapshots" at "http://scala-tools.org/repo-snapshots/"
libraryDependencies += "org.scala-tools" %% "scala-stm" % "0.4-SNAPSHOT"
libraryDependencies += "org.scalatest" %% "scalatest" % "1.6.1"
import org.scalatest.fixture.FixtureFunSuite
import scala.concurrent.stm._
import scala.concurrent.ops._
class CancelSuite extends FixtureFunSuite {
case class F(cb: CommitBarrier, ref: Ref[String])
type FixtureParam = F
def withFixture(test: OneArgTest): Unit = test(F(
CommitBarrier(60000),
Ref("start")
))
test("atomic のネスト(m.atomic が内側)") { f =>
val m = f.cb.addMember()
atomic { implicit txn =>
f.ref() = "outer"
m.atomic { implicit txn =>
f.ref() = f.ref() + "-inner"
m.cancel(CommitBarrier.UserCancel("cancel"))
}
}
assert(f.ref.single() === "outer")
}
test("atomic のネスト(m.atomic が外側)") { f =>
val m = f.cb.addMember()
m.atomic { implicit txn =>
f.ref() = "outer"
atomic { implicit txn =>
f.ref() = f.ref() + "-inner"
m.cancel(CommitBarrier.UserCancel("cancel"))
}
}
assert(f.ref.single() === "start")
}
test("m.atomic 再利用") { f =>
val m = f.cb.addMember()
val result1 = m.atomic { implicit txn =>
f.ref() = "atomic1"
m.cancel(CommitBarrier.UserCancel("cancel"))
'finish
}
assert(result1 === Left(CommitBarrier.UserCancel("cancel")))
assert(f.ref.single() === "start")
val m2 = f.cb.addMember()
val result2 = m2.atomic { implicit txn =>
f.ref() = "atomic2"
'finish
}
assert(result2 === Right('finish))
assert(f.ref.single() === "atomic2")
val thrown = intercept[IllegalStateException] {
val m3 = f.cb.addMember()
}
assert(thrown.getMessage === "commit barrier has already committed")
}
test("multiple thread で m.atomic") { f =>
val startingGate = new java.util.concurrent.CountDownLatch(1)
val ms = Seq.tabulate(2) {n => f.cb.addMember()}
val fut1 = future {
startingGate.await()
ms(0).atomic { implicit txn =>
f.ref() = f.ref() + "-f1"
Thread.sleep(100)
ms(0).cancel(CommitBarrier.UserCancel("cancel"))
}
'finish
}
val fut2 = future {
startingGate.await()
ms(1).atomic { implicit txn =>
f.ref() = f.ref() + "-f2"
} // ms(0) の待ち合わせ
'finish
}
startingGate.countDown()
Seq(fut1, fut2).foreach { fut =>
assert(fut() === 'finish)
}
assert(f.ref.single() === "start-f2") // 稀に "start" に…何故?
}
}
import org.scalatest.fixture.FixtureFunSuite
import scala.concurrent.stm._
import scala.concurrent.ops._
import java.util.concurrent.CountDownLatch
class CommitBarrierSuite extends FixtureFunSuite {
case class F(cb: CommitBarrier, ms: Seq[CommitBarrier.Member], rs: Seq[Ref[String]], gate: CountDownLatch)
type FixtureParam = F
def withFixture(test: OneArgTest): Unit = {
val cb = CommitBarrier(300)
test(F(
cb,
Seq.tabulate(2) {n => cb.addMember()},
Seq.tabulate(3) {n => Ref("start")},
new CountDownLatch(1)
))
}
test("競合なし") { f =>
val fut1 = future {
f.gate.await()
val result1 = f.ms(0).atomic { implicit txn =>
f.rs(0)() = f.rs(0)() + "-f1"
Thread.sleep(100)
'finish
}
assert(result1 === Right('finish))
'finish
}
val fut2 = future {
f.gate.await()
val startTime = System.currentTimeMillis
val result2 = f.ms(1).atomic { implicit txn =>
f.rs(1)() = f.rs(1)() + "-f2"
'finish
} // fut2 の f.ms(0) の atomic が終わるまで待つ
val elapseTime = System.currentTimeMillis - startTime;
assert(result2 === Right('finish))
assert(100 < elapseTime)
'finish
}
f.gate.countDown()
Seq(fut1, fut2).foreach { fut =>
assert(fut() === 'finish)
}
assert(f.rs(0).single() === "start-f1")
assert(f.rs(1).single() === "start-f2")
val thrown = intercept[IllegalStateException] {
f.cb.addMember()
}
assert(thrown.getMessage === "commit barrier has already committed")
}
test("タイムアウト") { f =>
val timeoutMember = f.cb.addMember()
val fut1 = future {
f.gate.await()
val result1 = f.ms(0).atomic { implicit txn =>
f.rs(0)() = f.rs(0)() + "-f1"
} // timeoutMember を待つ
assert(result1 === Left(CommitBarrier.Timeout))
'finish
}
val fut2 = future {
f.gate.await()
val result2 = f.ms(1).atomic { implicit txn =>
f.rs(1)() = f.rs(1)() + "-f2"
} // timeoutMember を待つ
assert(result2 === Left(CommitBarrier.Timeout))
'finish
}
f.gate.countDown()
Seq(fut1, fut2).foreach { fut =>
assert(fut() === 'finish)
}
assert(f.rs(0).single() === "start")
assert(f.rs(1).single() === "start")
}
test("競合あり") { f =>
val fut1 = future {
f.gate.await()
val result1 = f.ms(0).atomic { implicit txn =>
f.rs(0)() = f.rs(0)() + "-f1"
}
assert(result1.left.get.isInstanceOf[CommitBarrier.MemberCycle])
'finish
}
val fut2 = future {
f.gate.await()
val result2 = f.ms(1).atomic { implicit txn =>
f.rs(0)() = f.rs(0)() + "-f2"
}
assert(result2.left.get.isInstanceOf[CommitBarrier.MemberCycle])
'finish
}
f.gate.countDown()
Seq(fut1, fut2).foreach { fut =>
assert(fut() === 'finish)
}
assert(f.rs(0).single() === "start")
val thrown = intercept[IllegalStateException] {
f.cb.addMember()
}
assert(thrown.getMessage === "commit barrier has already rolled back")
}
}
import org.scalatest.fixture.FixtureFunSuite
import scala.concurrent.stm._
import scala.concurrent.ops._
class ExceptionSuite extends FixtureFunSuite {
case class F(cb: CommitBarrier, ref: Ref[String])
type FixtureParam = F
def withFixture(test: OneArgTest): Unit = test(F(
CommitBarrier(60000),
Ref("start")
))
test("atomic のネスト") { f =>
atomic { implicit txn =>
f.ref() = "outer"
// rollback + rethrow
intercept[RuntimeException] {
atomic { implicit txn =>
f.ref() = f.ref() + "-inner"
throw new RuntimeException
}
}
}
assert(f.ref.single() === "outer")
}
test("atomic のネスト(m.atomic が内側)") { f =>
val m = f.cb.addMember() // cb 毎に synchronized が利用される
atomic { implicit txn =>
f.ref() = "outer"
intercept[RuntimeException] {
m.atomic { implicit txn =>
f.ref() = f.ref() + "-inner"
throw new RuntimeException
}
}
}
assert(f.ref.single() === "outer")
}
test("atomic のネスト(m.atomic が外側)") { f =>
val m = f.cb.addMember()
m.atomic { implicit txn =>
f.ref() = "outer"
intercept[RuntimeException] {
atomic { implicit txn =>
f.ref() = f.ref() + "-inner"
throw new RuntimeException
}
}
}
assert(f.ref.single() === "outer")
}
test("m.atomic 再利用") { f =>
val m = f.cb.addMember()
intercept[RuntimeException] {
m.atomic { implicit txn =>
f.ref() = f.ref() + "-test"
throw new RuntimeException
}
}
assert(f.ref.single() === "start")
m.atomic { implicit txn =>
f.ref() = f.ref() + "-test"
}
assert(f.ref.single() === "start")
val thrown = intercept[IllegalStateException] {
val m2 = f.cb.addMember()
}
assert(thrown.getMessage === "commit barrier has already rolled back")
}
test("multiple thread で m.atomic") { f =>
val startingGate = new java.util.concurrent.CountDownLatch(1)
val ms = Seq.tabulate(2) {n => f.cb.addMember()}
val fut1 = future {
startingGate.await()
intercept[RuntimeException] {
ms(0).atomic { implicit txn =>
f.ref() = f.ref() + "-f1"
throw new RuntimeException
}
}
'finish
}
val fut2 = future {
startingGate.await()
ms(1).atomic { implicit txn =>
f.ref() = f.ref() + "-f2"
Thread.sleep(100)
}
'finish
}
startingGate.countDown()
Seq(fut1, fut2).foreach { fut =>
assert(fut() === 'finish)
}
assert(f.ref.single() === "start")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment