Created
May 6, 2010 17:03
-
-
Save banyan/392392 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import java.util.concurrent.atomic.{AtomicReference => AtomR, AtomicLong} | |
| import java.util.Random | |
| import scala.collection.immutable.TreeHashMap | |
| object Multics { | |
| // MT 型の定義。type を定義することで、タイピングの量が減らせる | |
| type MT = Map[String, Int] | |
| // 変数 info は イミュータブルな Map を保持する AtomicReference | |
| // AtomicReference は、同期化せずにどのスレッドからも更新ができる | |
| val info: AtomR[MT] = new AtomR(TreeHashMap.empty) | |
| // clashCnt は、AtomicLong で、スレッドが info への書き込みを行おうとして失敗した回数を保持する。 | |
| // clashCnt は、同期化なしで、インクリメントできる | |
| val clashCnt = new AtomicLong | |
| // 新しいスレッドを作成し、repeatEvery のブロックを実行する。 | |
| // repeatEvery(1000) は 1000 ミリ秒待ち、その後ブロックを実行する。 | |
| // ブロックでは、clashCnt と info が持つ値の合計を出力する。 | |
| // 同期処理なしで、info が持つ Map にアクセスできる。 | |
| // Map はイミュータブルデータ型であり、変更されないことが分かっているからです。 | |
| def main(arfv: Array[String]) { | |
| runThread { | |
| repeatEvery(1000) { | |
| println("Clash Count: " + clashCnt + " Total: " + info.get.foldLeft(0)(_ + _._2)) | |
| } | |
| } | |
| // 次に、2000 スレッドを作成する | |
| for (i <- 1 to 2000) runThread { | |
| var cnt = 0 | |
| val ran = new Random | |
| val name = "K" + i | |
| doSet(info) {old => old + (name -> 0)} | |
| // ループ処理を行う。それぞれのループ処理の中で、0から100ミリ秒間のランダムな待ち時間を設定する | |
| repeatEvery(ran.nextInt(100)) { | |
| doSet(info) {old => old + (name -> (old(name) + 1))} | |
| // スレッドに紐つけられているカウントを、インクリメントする | |
| cnt = cnt + 1 | |
| // スレッドローカルのカウントと info の値が一致しなければ、並行処理に問題が発生したことになるため、例外の送出 | |
| if (cnt != info.get()(name)) | |
| throw new Exception("Thread: " + name + " failed") | |
| } | |
| } | |
| } | |
| // 新しいスレッドを作り、run メソッドに関数 f を設定後、スレッドを開始する。 | |
| // スレッドの run メソッドが呼ばれると、関数 f が実行される。これはコードブロックを引数として渡す例。 | |
| def runThread(f: => Unit) = (new Thread(new Runnable {def run(): Unit = f})).start | |
| // 同期処理なしで atom にアトミックな更新を行う。 | |
| // 古い値を読み取り、update メソッドへ引数として渡し、 | |
| // アトミックな更新を行います。 | |
| // アトミックな更新が成功すれば(古い値は処理中に更新されません)、 | |
| // 更新は最新バージョンのデータに対して行われ、更新は成功したことになる。 | |
| // compareAndSet メソッドが失敗したら、clashCnt をインクリメントし、再度更新を試みる | |
| def doSet[T](atom: AtomR[T]) (update: T => T) { | |
| val old = atom.get | |
| if (atom.compareAndSet(old, update(old))) () | |
| else { | |
| clashCnt.incrementAndGet | |
| doSet(atom)(update) | |
| } | |
| } | |
| // repeatEvery メソッドが制御フローの役割を果たす | |
| // repeatEvery は len と body の2つの引数を取り、 | |
| // どちらも名前渡しです。repeatEvery のブロック内で参照されるたびに | |
| // 名前渡しで受け取ったコードが実行される | |
| def repeatEvery(len: => Int) (body: => Unit): Unit = { | |
| try { | |
| while(true) { | |
| Thread.sleep(len) | |
| body | |
| } | |
| } catch { | |
| case e => e.printStackTrace; System.exit(1) | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment