Skip to content

Instantly share code, notes, and snippets.

@banyan
Created May 6, 2010 17:03
Show Gist options
  • Select an option

  • Save banyan/392392 to your computer and use it in GitHub Desktop.

Select an option

Save banyan/392392 to your computer and use it in GitHub Desktop.
import java.util.concurrent.atomic.{AtomicReference => AtomR, AtomicLong}
import java.util.Random
import scala.collection.immutable.TreeHashMap
object Multics {
// MT 型の定義。type を定義することで、タイピングの量が減らせる
type MT = Map[String, Int]
// 変数 info は イミュータブルな Map を保持する AtomicReference
// AtomicReference は、同期化せずにどのスレッドからも更新ができる
val info: AtomR[MT] = new AtomR(TreeHashMap.empty)
// clashCnt は、AtomicLong で、スレッドが info への書き込みを行おうとして失敗した回数を保持する。
// clashCnt は、同期化なしで、インクリメントできる
val clashCnt = new AtomicLong
// 新しいスレッドを作成し、repeatEvery のブロックを実行する。
// repeatEvery(1000) は 1000 ミリ秒待ち、その後ブロックを実行する。
// ブロックでは、clashCnt と info が持つ値の合計を出力する。
// 同期処理なしで、info が持つ Map にアクセスできる。
// Map はイミュータブルデータ型であり、変更されないことが分かっているからです。
def main(arfv: Array[String]) {
runThread {
repeatEvery(1000) {
println("Clash Count: " + clashCnt + " Total: " + info.get.foldLeft(0)(_ + _._2))
}
}
// 次に、2000 スレッドを作成する
for (i <- 1 to 2000) runThread {
var cnt = 0
val ran = new Random
val name = "K" + i
doSet(info) {old => old + (name -> 0)}
// ループ処理を行う。それぞれのループ処理の中で、0から100ミリ秒間のランダムな待ち時間を設定する
repeatEvery(ran.nextInt(100)) {
doSet(info) {old => old + (name -> (old(name) + 1))}
// スレッドに紐つけられているカウントを、インクリメントする
cnt = cnt + 1
// スレッドローカルのカウントと info の値が一致しなければ、並行処理に問題が発生したことになるため、例外の送出
if (cnt != info.get()(name))
throw new Exception("Thread: " + name + " failed")
}
}
}
// 新しいスレッドを作り、run メソッドに関数 f を設定後、スレッドを開始する。
// スレッドの run メソッドが呼ばれると、関数 f が実行される。これはコードブロックを引数として渡す例。
def runThread(f: => Unit) = (new Thread(new Runnable {def run(): Unit = f})).start
// 同期処理なしで atom にアトミックな更新を行う。
// 古い値を読み取り、update メソッドへ引数として渡し、
// アトミックな更新を行います。
// アトミックな更新が成功すれば(古い値は処理中に更新されません)、
// 更新は最新バージョンのデータに対して行われ、更新は成功したことになる。
// compareAndSet メソッドが失敗したら、clashCnt をインクリメントし、再度更新を試みる
def doSet[T](atom: AtomR[T]) (update: T => T) {
val old = atom.get
if (atom.compareAndSet(old, update(old))) ()
else {
clashCnt.incrementAndGet
doSet(atom)(update)
}
}
// repeatEvery メソッドが制御フローの役割を果たす
// repeatEvery は len と body の2つの引数を取り、
// どちらも名前渡しです。repeatEvery のブロック内で参照されるたびに
// 名前渡しで受け取ったコードが実行される
def repeatEvery(len: => Int) (body: => Unit): Unit = {
try {
while(true) {
Thread.sleep(len)
body
}
} catch {
case e => e.printStackTrace; System.exit(1)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment