Last active
October 17, 2018 06:39
-
-
Save ahmadmo/a1bfa865a6e5ab22d7faca8e2f117989 to your computer and use it in GitHub Desktop.
Go Channels in Scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Channel.{Core, Message} | |
object Channel { | |
private case class Message[A](value: A, last: Boolean) | |
private object Message { | |
private val LAST = Message(null, last = true) | |
def last[A]: Message[A] = LAST.asInstanceOf[Message[A]] | |
} | |
private class Core[A](capacity: Int) { | |
import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, SynchronousQueue} | |
val queue: BlockingQueue[Message[A]] = capacity match { | |
case 0 => new SynchronousQueue[Message[A]]() | |
case c if c > 0 => new ArrayBlockingQueue[Message[A]](capacity) | |
case _ => throw new IllegalArgumentException("invalid channel capacity.") | |
} | |
@volatile var closed = false | |
} | |
def synchronous[A]: Channel[A] = | |
make(0) | |
def make[A](capacity: Int): Channel[A] = | |
new Channel[A](new Core[A](capacity)) | |
} | |
final class Channel[A] private(private val core: Core[A], | |
private val filter: A => Boolean = (_: A) => true) { | |
private def send(message: Message[A]): Unit = { | |
if (isOpen || message.last) { | |
core.queue.put(message) | |
} else { | |
throw new IllegalStateException("channel is closed.") | |
} | |
} | |
private def receive: Option[A] = { | |
if (isClosed && isEmpty) { | |
None | |
} else { | |
val m = core.queue.take() | |
if (!m.last && filter(m.value)) { | |
Some(m.value) | |
} else { | |
None | |
} | |
} | |
} | |
def !(value: A): Unit = send(Message(value, last = false)) | |
def unary_! : Option[A] = receive | |
def unary_~ : A = receive.get | |
def withFilter(fn: A => Boolean): Channel[A] = { | |
new Channel[A](core, filter = fn) | |
} | |
def foreach[U](fn: A => U): Unit = { | |
while (isOpen || nonEmpty) { | |
receive.foreach(fn) | |
} | |
} | |
def forall(fn: A => Boolean): Boolean = { | |
var res = true | |
while (res && (isOpen || nonEmpty)) { | |
receive.foreach { message => | |
res = fn(message) | |
} | |
} | |
res | |
} | |
def collect: Seq[A] = | |
collect[A](identity[A]) | |
def collect[U](fn: A => U): Seq[U] = { | |
var res = Seq.empty[U] | |
foreach { message => | |
res = res :+ fn(message) | |
} | |
res | |
} | |
def close(): Unit = { | |
core.closed = true | |
send(Message.last[A]) | |
} | |
def isClosed: Boolean = core.closed | |
def isOpen: Boolean = !isClosed | |
def isEmpty: Boolean = core.queue.isEmpty | |
def nonEmpty: Boolean = !isEmpty | |
} | |
object ProducerConsumerExample { | |
val messages = Channel.synchronous[Int] | |
val done = Channel.synchronous[Boolean] | |
def produce(): Unit = { | |
for (i <- 1 to 10) { | |
messages ! i | |
Thread.sleep(100) | |
} | |
messages.close() | |
done ! true | |
} | |
def consume(): Unit = { | |
for (msg <- messages if msg % 2 == 0) { | |
println(msg) | |
} | |
} | |
def main(args: Array[String]): Unit = { | |
import scala.concurrent.ExecutionContext.Implicits._ | |
import scala.concurrent.{Future => go} | |
go(produce()) | |
go(consume()) | |
!done | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment