Skip to content

Instantly share code, notes, and snippets.

@lbialy
Created October 3, 2022 08:36
Show Gist options
  • Save lbialy/69d582fa623288f96625cbfb7876b51a to your computer and use it in GitHub Desktop.
Save lbialy/69d582fa623288f96625cbfb7876b51a to your computer and use it in GitHub Desktop.
An extremely looming Future for Scala, also parallel iterators are a thing now.
//> using scala "3.2"
import scala.concurrent.*, duration.*
import scala.collection.*
import java.util.concurrent.ExecutorService
import scala.language.implicitConversions
import java.util.concurrent.locks.ReentrantLock
object Looming:
sealed trait LoomingExecutionContext
implicit object LoomingExecutionContext extends LoomingExecutionContext with ExecutionContext:
import java.util.concurrent.Executors
lazy val loom: ExecutorService = Executors.newVirtualThreadPerTaskExecutor()
override def execute(runnable: Runnable): Unit = loom.execute(runnable)
override def reportFailure(cause: Throwable): Unit = cause.printStackTrace()
extension [C[X] <: IterableOnceOps[X, C, C[X]], A](c: C[Future[A]])
def get(using LoomingExecutionContext): C[A] =
if (!Thread.currentThread.isVirtual) then println("Oh no, you're blocking on an OS thread!")
c.map(futOfa => Await.result(futOfa, Duration.Inf))
extension [A](future: Future[A])
def get(duration: Duration)(using LoomingExecutionContext): A =
if (!Thread.currentThread.isVirtual) then println("Oh no, you're blocking on an OS thread!")
Await.result(future, duration)
def get(using LoomingExecutionContext): A =
if (!Thread.currentThread.isVirtual) then println("Oh no, you're blocking on an OS thread!")
Await.result(future, Duration.Inf)
class WTF(index: Int) extends Exception(s"Found Future at index $index, expected null!")
import java.util.concurrent.locks.ReentrantLock
class MVar[A >: Null <: AnyRef]():
import java.util.Objects.isNull
private var value: A = null
private val writeLock = ReentrantLock()
private val writeCond = writeLock.newCondition()
private val readLock = ReentrantLock()
private val readCond = readLock.newCondition()
def put(a: A): Unit =
writeLock.lock()
try
while value ne null do
writeCond.await()
readLock.lock()
try
value = a
readCond.signal()
finally
readLock.unlock()
finally
writeLock.unlock()
def take: A =
readLock.lock()
try
while value eq null do
readCond.await()
writeLock.lock()
try
val result = value
value = null
result
finally
writeLock.unlock()
finally
readLock.unlock()
end MVar
extension [A](it: Iterator[A])
def parMap[B](f: A => B)(parallelism: Int)(using LoomingExecutionContext): Iterator[B] =
assert(parallelism > 0)
if parallelism == 1 then it.map(f) else new Iterator[B]:
var initialized = false
val pending = Array.ofDim[Future[B]](parallelism)
var index = 0
def hasNext(): Boolean = if !initialized then it.hasNext else pending.exists(_ ne null)
def next(): B =
if !initialized then
var full = false
while it.hasNext && !full do
val a = it.next
pending(index) = Future(f(a))
if (index + 1 == parallelism) then
index = 0
full = true
else
index += 1
initialized = true
val res: B = Await.result(pending(index), Duration.Inf)
pending(index) = null
res
else
if (pending(index) ne null) throw WTF(index)
if it.hasNext then
val a = it.next
pending(index) = Future(f(a))
if (index + 1 == parallelism) then
index = 0
else
index += 1
val res = Await.result(pending(index), Duration.Inf)
pending(index) = null
res
else
if (index + 1 == parallelism) then
index = 0
else
index += 1
if pending(index) eq null then throw IllegalStateException("Called next after hasNext returned false!")
else Await.result(pending(index), Duration.Inf)
def parMapUnordered[B](f: A => B)(parallelism: Int)(using LoomingExecutionContext): Iterator[B] =
import scala.util.Try
assert(parallelism > 0)
val mvar = MVar[(Int, Try[B])]()
if parallelism == 1 then it.map(f) else new Iterator[B]:
var initialized = false
val pending = Array.ofDim[Future[B]](parallelism)
var index = 0
def hasNext(): Boolean = if !initialized then it.hasNext else pending.exists(_ ne null)
def next(): B =
if !initialized then
var full = false
while it.hasNext && !full do
val a = it.next
val futOfB = Future(f(a))
pending(index) = futOfB
futOfB.onComplete(tryB => mvar.put(index -> tryB))
if index + 1 == parallelism then
index = 0
full = true
else
index += 1
initialized = true
val (indexToReset, tryB) = mvar.take
pending(indexToReset) = null
tryB.get
else
var x = 0
var done = false
while x < parallelism && !done do
if pending(x) eq null then
val a = it.next
val futOfB = Future(f(a))
pending(x) = futOfB
futOfB.onComplete(tryB => mvar.put(x -> tryB))
if x + 1 == parallelism then
done = true
else
x += 1
val (indexToReset, tryB) = mvar.take
pending(indexToReset) = null
tryB.get
// WARNING, HERE BE STRONK MAGICKS, DO NOT UNCOMMENT THIS UNLESS YOU WANT ABSOLUTE POWER AND UNDEBUGGABILITY
// given futureToAWithDuration[A](using LoomingExecutionContext, Duration): Conversion[Future[A], A] with
// def apply(future: Future[A]): A =
// if (!Thread.currentThread.isVirtual) then println("Oh no, you're blocking on an OS thread!")
// Await.result(future, summon[Duration])
// given futureToA[A](using LoomingExecutionContext): Conversion[Future[A], A] with
// def apply(future: Future[A]): A =
// if (!Thread.currentThread.isVirtual) then println("Oh no, you're blocking on an OS thread!")
// Await.result(future, Duration.Inf)
// given iterableOfFutureToIterableOfA[C[X] <: IterableOnceOps[X, C, C[X]], A](using LoomingExecutionContext): Conversion[C[Future[A]], C[A]] with
// def apply(c: C[Future[A]]): C[A] = c.map(futOfa => Await.result(futOfa, Duration.Inf))
// NO MORE MAGICKS
abstract class LoomyApp:
def main(args: Array[String]): Unit =
implicit val lec = LoomingExecutionContext
val mainFut = Future(run(args.toVector))
val res = Await.result(mainFut, Duration.Inf)
sys.exit(res)
def run(args: Vector[String]): Int
import Looming.{given, *}
object Main extends LoomyApp:
def run(args: Vector[String]): Int =
println(s"hello from Scala 3.2 on ${sys.props("java.vm.name")} ${sys.props("java.version")}")
def futOfInt = Future {
Thread.sleep(500)
0
}
val fut: Future[Int] = futOfInt
val int: Int = futOfInt.get
val numbers = 1 to 23
val numbersSquaredInParallel: List[Double] = numbers.toList.map(num => Future(math.pow(num, 2))).get
println("=== ORDERED ===")
Iterator.iterate(0)(_ + 1)
.parMap(i => {
Thread.sleep(1000)
i + 1
})(4)
.take(20)
.foreach(println)
println("===============")
println("== UNORDERED ==")
Iterator.iterate(0)(_ + 1)
.parMapUnordered(i => {
Thread.sleep(util.Random.between(1,11) * 100)
i + 1
})(4)
.take(20)
.foreach(println)
println("===============")
int
@lbialy
Copy link
Author

lbialy commented Oct 3, 2022

Disclaimer: that stuff is probably severely broken as it's just an API PoC.

@liosedhel
Copy link

One can run this example with $ scala-cli --jvm 19 -J --enable-preview https://gist.github.com/lbialy/69d582fa623288f96625cbfb7876b51a

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment