-
-
Save Andrei-Pozolotin/3744fb276de4ccd9ae810cc3a51f632c to your computer and use it in GitHub Desktop.
Clojure atoms, in Scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/*------------------------------------------------------------------------------ | |
* MIT License | |
* | |
* Copyright (c) 2017 Doug Kirk | |
* | |
* Permission is hereby granted, free of charge, to any person obtaining a copy | |
* of this software and associated documentation files (the "Software"), to deal | |
* in the Software without restriction, including without limitation the rights | |
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
* copies of the Software, and to permit persons to whom the Software is | |
* furnished to do so, subject to the following conditions: | |
* | |
* The above copyright notice and this permission notice shall be included in all | |
* copies or substantial portions of the Software. | |
* | |
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
* SOFTWARE. | |
*----------------------------------------------------------------------------*/ | |
sealed trait Atom[A] { | |
def apply(): A | |
def reset(a: A): A | |
def swap(f: A => A): A | |
def addWatch(watcher: Atom.Watcher[A]): Unit | |
def removeWatch(watcher: Atom.Watcher[A]): Unit | |
} | |
object Atom { | |
import java.util.concurrent.atomic._ | |
import scala.language.implicitConversions | |
def apply[A <: AnyRef](initial: A): Atom[A] = new AtomImpl[A](new AtomicReference[A](initial)) | |
def apply(initial: Boolean): Atom[Boolean] = new BooleanAtom(new AtomicBoolean(initial)) | |
def apply(initial: Int): Atom[Int] = new IntAtom(new AtomicInteger(initial)) | |
def apply(initial: Long): Atom[Long] = new LongAtom(new AtomicLong(initial)) | |
implicit def atom2value[A](atom: Atom[A]): A = atom.apply | |
object BooleanOps { | |
def not(b: Boolean) = !b | |
def and(left: Boolean)(right: Boolean) = left && right | |
def or(left: Boolean)(right: Boolean) = left || right | |
} | |
object IntOps { | |
def inc(i: Int) = i + 1 | |
def dec(i: Int) = i - 1 | |
def inc(l: Long) = l + 1 | |
def dec(l: Long) = l - 1 | |
} | |
object SeqOps { | |
def concat[A](tail: Seq[A])(as: Seq[A]) = as ++ tail | |
def conj[A](a: A)(as: Seq[A]) = as :+ a | |
def disj[A](a: A)(as: Seq[A]) = { | |
val (left, right) = as.span(_ != a) | |
left ++ right.drop(1) | |
} | |
def select[A](f: A => Boolean)(as: Seq[A]) = as filter f | |
def drop[A](n: Int)(as: Seq[A]) = as drop n | |
def take[A](n: Int)(as: Seq[A]) = as take n | |
} | |
object SetOps { | |
def conj[A](a: A)(as: Set[A]) = as + a | |
def disj[A](a: A)(as: Set[A]) = as - a | |
def union[A](others: Set[A])(as: Set[A]) = as union others | |
def difference[A](others: Set[A])(as: Set[A]) = as diff others | |
def intersection[A](others: Set[A])(as: Set[A]) = as intersect others | |
def select[A](f: A => Boolean)(as: Set[A]) = as filter f | |
} | |
object MapOps { | |
def assoc[K, V](k: K, v: V)(m: Map[K, V]) = m + (k -> v) | |
def dissoc[K, V](k: K)(m: Map[K, V]) = m - k | |
def select[K, V](f: ((K, V)) => Boolean)(m: Map[K, V]) = m filter f | |
def selectKeys[K, V](f: K => Boolean)(m: Map[K, V]) = m filter { case (k, _) => f(k) } | |
} | |
type Watcher[A] = (Atom[A], /* old */A, /* new */A) => Unit | |
//------------------------------------------ | |
// private implementation | |
private class AtomImpl[A](ref: AtomicReference[A]) extends Atom[A] with WatchSupport[A] { | |
def apply() = ref.get | |
def reset(a: A): A = { ref.set(a); a } | |
def swap(f: A => A): A = { | |
while (true) { | |
val oldvalue = ref.get | |
val newvalue = f(oldvalue) | |
if (ref.compareAndSet(oldvalue, newvalue)) { | |
notifyWatchers(oldvalue, newvalue) | |
return newvalue | |
} | |
} | |
// never reached...but we must satisfy the type checker | |
throw new IllegalStateException("exiting swap without matching old value") | |
} | |
override def toString() = s"Atom<$ref>" | |
} | |
private class BooleanAtom(ref: AtomicBoolean) extends Atom[Boolean] with WatchSupport[Boolean] { | |
def apply(): Boolean = ref.get | |
def reset(b: Boolean): Boolean = { ref.set(b); b } | |
def swap(f: Boolean => Boolean): Boolean = { | |
while (true) { | |
val oldvalue = ref.get | |
val newvalue = f(oldvalue) | |
if (ref.compareAndSet(oldvalue, newvalue)) { | |
notifyWatchers(oldvalue, newvalue) | |
return newvalue | |
} | |
} | |
// never reached...but we must satisfy the type checker | |
throw new IllegalStateException("exiting swap without matching old value") | |
} | |
override def toString() = s"BooleanAtom<$ref>" | |
} | |
private class IntAtom(ref: AtomicInteger) extends Atom[Int] with WatchSupport[Int] { | |
def apply(): Int = ref.get | |
def reset(i: Int): Int = { ref.set(i); i } | |
def swap(f: Int => Int): Int = { | |
while (true) { | |
val oldvalue = ref.get | |
val newvalue = f(oldvalue) | |
if (ref.compareAndSet(oldvalue, newvalue)) { | |
notifyWatchers(oldvalue, newvalue) | |
return newvalue | |
} | |
} | |
// never reached...but we must satisfy the type checker | |
throw new IllegalStateException("exiting swap without matching old value") | |
} | |
override def toString() = s"IntAtom<$ref>" | |
} | |
private class LongAtom(ref: AtomicLong) extends Atom[Long] with WatchSupport[Long] { | |
def apply(): Long = ref.get | |
def reset(l: Long): Long = { ref.set(l); l } | |
def swap(f: Long => Long): Long = { | |
while (true) { | |
val oldvalue = ref.get | |
val newvalue = f(oldvalue) | |
if (ref.compareAndSet(oldvalue, newvalue)) { | |
notifyWatchers(oldvalue, newvalue) | |
return newvalue | |
} | |
} | |
// never reached...but we must satisfy the type checker | |
throw new IllegalStateException("exiting swap without matching old value") | |
} | |
override def toString() = s"LongAtom<$ref>" | |
} | |
sealed trait WatchSupport[A] { self: Atom[A] => | |
private[this] val watchers = new AtomicReference[Set[Watcher[A]]](Set.empty[Watcher[A]]) | |
final def addWatch(watcher: Watcher[A]): Unit = swapWatchers(_ + watcher) | |
final def removeWatch(watcher: Watcher[A]): Unit = swapWatchers(_ - watcher) | |
protected def notifyWatchers(oldvalue: A, newvalue: A): Unit = | |
watchers.get.foreach { _(self, oldvalue, newvalue) } | |
private[this] def swapWatchers(f: Set[Watcher[A]] => Set[Watcher[A]]): Unit = { | |
while (true) { | |
val oldvalue = watchers.get | |
val newvalue = f(oldvalue) | |
if (watchers.compareAndSet(oldvalue, newvalue)) { | |
return () | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment