Created
November 10, 2011 08:09
-
-
Save cooldaemon/1354402 to your computer and use it in GitHub Desktop.
Tried CommitBarrier in the Scala STM.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
name := "Scala STM CommitBarrier Test" | |
version := "0.1" | |
organization := "com.github.cooldaemon" | |
scalaVersion := "2.9.1" | |
resolvers += "Scala Tools Snapshots" at "http://scala-tools.org/repo-snapshots/" | |
libraryDependencies += "org.scala-tools" %% "scala-stm" % "0.4-SNAPSHOT" | |
libraryDependencies += "org.scalatest" %% "scalatest" % "1.6.1" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.scalatest.fixture.FixtureFunSuite | |
import scala.concurrent.stm._ | |
import scala.concurrent.ops._ | |
class CancelSuite extends FixtureFunSuite { | |
case class F(cb: CommitBarrier, ref: Ref[String]) | |
type FixtureParam = F | |
def withFixture(test: OneArgTest): Unit = test(F( | |
CommitBarrier(60000), | |
Ref("start") | |
)) | |
test("atomic のネスト(m.atomic が内側)") { f => | |
val m = f.cb.addMember() | |
atomic { implicit txn => | |
f.ref() = "outer" | |
m.atomic { implicit txn => | |
f.ref() = f.ref() + "-inner" | |
m.cancel(CommitBarrier.UserCancel("cancel")) | |
} | |
} | |
assert(f.ref.single() === "outer") | |
} | |
test("atomic のネスト(m.atomic が外側)") { f => | |
val m = f.cb.addMember() | |
m.atomic { implicit txn => | |
f.ref() = "outer" | |
atomic { implicit txn => | |
f.ref() = f.ref() + "-inner" | |
m.cancel(CommitBarrier.UserCancel("cancel")) | |
} | |
} | |
assert(f.ref.single() === "start") | |
} | |
test("m.atomic 再利用") { f => | |
val m = f.cb.addMember() | |
val result1 = m.atomic { implicit txn => | |
f.ref() = "atomic1" | |
m.cancel(CommitBarrier.UserCancel("cancel")) | |
'finish | |
} | |
assert(result1 === Left(CommitBarrier.UserCancel("cancel"))) | |
assert(f.ref.single() === "start") | |
val m2 = f.cb.addMember() | |
val result2 = m2.atomic { implicit txn => | |
f.ref() = "atomic2" | |
'finish | |
} | |
assert(result2 === Right('finish)) | |
assert(f.ref.single() === "atomic2") | |
val thrown = intercept[IllegalStateException] { | |
val m3 = f.cb.addMember() | |
} | |
assert(thrown.getMessage === "commit barrier has already committed") | |
} | |
test("multiple thread で m.atomic") { f => | |
val startingGate = new java.util.concurrent.CountDownLatch(1) | |
val ms = Seq.tabulate(2) {n => f.cb.addMember()} | |
val fut1 = future { | |
startingGate.await() | |
ms(0).atomic { implicit txn => | |
f.ref() = f.ref() + "-f1" | |
Thread.sleep(100) | |
ms(0).cancel(CommitBarrier.UserCancel("cancel")) | |
} | |
'finish | |
} | |
val fut2 = future { | |
startingGate.await() | |
ms(1).atomic { implicit txn => | |
f.ref() = f.ref() + "-f2" | |
} // ms(0) の待ち合わせ | |
'finish | |
} | |
startingGate.countDown() | |
Seq(fut1, fut2).foreach { fut => | |
assert(fut() === 'finish) | |
} | |
assert(f.ref.single() === "start-f2") // 稀に "start" に…何故? | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.scalatest.fixture.FixtureFunSuite | |
import scala.concurrent.stm._ | |
import scala.concurrent.ops._ | |
import java.util.concurrent.CountDownLatch | |
class CommitBarrierSuite extends FixtureFunSuite { | |
case class F(cb: CommitBarrier, ms: Seq[CommitBarrier.Member], rs: Seq[Ref[String]], gate: CountDownLatch) | |
type FixtureParam = F | |
def withFixture(test: OneArgTest): Unit = { | |
val cb = CommitBarrier(300) | |
test(F( | |
cb, | |
Seq.tabulate(2) {n => cb.addMember()}, | |
Seq.tabulate(3) {n => Ref("start")}, | |
new CountDownLatch(1) | |
)) | |
} | |
test("競合なし") { f => | |
val fut1 = future { | |
f.gate.await() | |
val result1 = f.ms(0).atomic { implicit txn => | |
f.rs(0)() = f.rs(0)() + "-f1" | |
Thread.sleep(100) | |
'finish | |
} | |
assert(result1 === Right('finish)) | |
'finish | |
} | |
val fut2 = future { | |
f.gate.await() | |
val startTime = System.currentTimeMillis | |
val result2 = f.ms(1).atomic { implicit txn => | |
f.rs(1)() = f.rs(1)() + "-f2" | |
'finish | |
} // fut2 の f.ms(0) の atomic が終わるまで待つ | |
val elapseTime = System.currentTimeMillis - startTime; | |
assert(result2 === Right('finish)) | |
assert(100 < elapseTime) | |
'finish | |
} | |
f.gate.countDown() | |
Seq(fut1, fut2).foreach { fut => | |
assert(fut() === 'finish) | |
} | |
assert(f.rs(0).single() === "start-f1") | |
assert(f.rs(1).single() === "start-f2") | |
val thrown = intercept[IllegalStateException] { | |
f.cb.addMember() | |
} | |
assert(thrown.getMessage === "commit barrier has already committed") | |
} | |
test("タイムアウト") { f => | |
val timeoutMember = f.cb.addMember() | |
val fut1 = future { | |
f.gate.await() | |
val result1 = f.ms(0).atomic { implicit txn => | |
f.rs(0)() = f.rs(0)() + "-f1" | |
} // timeoutMember を待つ | |
assert(result1 === Left(CommitBarrier.Timeout)) | |
'finish | |
} | |
val fut2 = future { | |
f.gate.await() | |
val result2 = f.ms(1).atomic { implicit txn => | |
f.rs(1)() = f.rs(1)() + "-f2" | |
} // timeoutMember を待つ | |
assert(result2 === Left(CommitBarrier.Timeout)) | |
'finish | |
} | |
f.gate.countDown() | |
Seq(fut1, fut2).foreach { fut => | |
assert(fut() === 'finish) | |
} | |
assert(f.rs(0).single() === "start") | |
assert(f.rs(1).single() === "start") | |
} | |
test("競合あり") { f => | |
val fut1 = future { | |
f.gate.await() | |
val result1 = f.ms(0).atomic { implicit txn => | |
f.rs(0)() = f.rs(0)() + "-f1" | |
} | |
assert(result1.left.get.isInstanceOf[CommitBarrier.MemberCycle]) | |
'finish | |
} | |
val fut2 = future { | |
f.gate.await() | |
val result2 = f.ms(1).atomic { implicit txn => | |
f.rs(0)() = f.rs(0)() + "-f2" | |
} | |
assert(result2.left.get.isInstanceOf[CommitBarrier.MemberCycle]) | |
'finish | |
} | |
f.gate.countDown() | |
Seq(fut1, fut2).foreach { fut => | |
assert(fut() === 'finish) | |
} | |
assert(f.rs(0).single() === "start") | |
val thrown = intercept[IllegalStateException] { | |
f.cb.addMember() | |
} | |
assert(thrown.getMessage === "commit barrier has already rolled back") | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.scalatest.fixture.FixtureFunSuite | |
import scala.concurrent.stm._ | |
import scala.concurrent.ops._ | |
class ExceptionSuite extends FixtureFunSuite { | |
case class F(cb: CommitBarrier, ref: Ref[String]) | |
type FixtureParam = F | |
def withFixture(test: OneArgTest): Unit = test(F( | |
CommitBarrier(60000), | |
Ref("start") | |
)) | |
test("atomic のネスト") { f => | |
atomic { implicit txn => | |
f.ref() = "outer" | |
// rollback + rethrow | |
intercept[RuntimeException] { | |
atomic { implicit txn => | |
f.ref() = f.ref() + "-inner" | |
throw new RuntimeException | |
} | |
} | |
} | |
assert(f.ref.single() === "outer") | |
} | |
test("atomic のネスト(m.atomic が内側)") { f => | |
val m = f.cb.addMember() // cb 毎に synchronized が利用される | |
atomic { implicit txn => | |
f.ref() = "outer" | |
intercept[RuntimeException] { | |
m.atomic { implicit txn => | |
f.ref() = f.ref() + "-inner" | |
throw new RuntimeException | |
} | |
} | |
} | |
assert(f.ref.single() === "outer") | |
} | |
test("atomic のネスト(m.atomic が外側)") { f => | |
val m = f.cb.addMember() | |
m.atomic { implicit txn => | |
f.ref() = "outer" | |
intercept[RuntimeException] { | |
atomic { implicit txn => | |
f.ref() = f.ref() + "-inner" | |
throw new RuntimeException | |
} | |
} | |
} | |
assert(f.ref.single() === "outer") | |
} | |
test("m.atomic 再利用") { f => | |
val m = f.cb.addMember() | |
intercept[RuntimeException] { | |
m.atomic { implicit txn => | |
f.ref() = f.ref() + "-test" | |
throw new RuntimeException | |
} | |
} | |
assert(f.ref.single() === "start") | |
m.atomic { implicit txn => | |
f.ref() = f.ref() + "-test" | |
} | |
assert(f.ref.single() === "start") | |
val thrown = intercept[IllegalStateException] { | |
val m2 = f.cb.addMember() | |
} | |
assert(thrown.getMessage === "commit barrier has already rolled back") | |
} | |
test("multiple thread で m.atomic") { f => | |
val startingGate = new java.util.concurrent.CountDownLatch(1) | |
val ms = Seq.tabulate(2) {n => f.cb.addMember()} | |
val fut1 = future { | |
startingGate.await() | |
intercept[RuntimeException] { | |
ms(0).atomic { implicit txn => | |
f.ref() = f.ref() + "-f1" | |
throw new RuntimeException | |
} | |
} | |
'finish | |
} | |
val fut2 = future { | |
startingGate.await() | |
ms(1).atomic { implicit txn => | |
f.ref() = f.ref() + "-f2" | |
Thread.sleep(100) | |
} | |
'finish | |
} | |
startingGate.countDown() | |
Seq(fut1, fut2).foreach { fut => | |
assert(fut() === 'finish) | |
} | |
assert(f.ref.single() === "start") | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment