Created
October 3, 2022 08:36
-
-
Save lbialy/69d582fa623288f96625cbfb7876b51a to your computer and use it in GitHub Desktop.
An extremely looming Future for Scala, also parallel iterators are a thing now.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//> using scala "3.2" | |
import scala.concurrent.*, duration.* | |
import scala.collection.* | |
import java.util.concurrent.ExecutorService | |
import scala.language.implicitConversions | |
import java.util.concurrent.locks.ReentrantLock | |
object Looming: | |
sealed trait LoomingExecutionContext | |
implicit object LoomingExecutionContext extends LoomingExecutionContext with ExecutionContext: | |
import java.util.concurrent.Executors | |
lazy val loom: ExecutorService = Executors.newVirtualThreadPerTaskExecutor() | |
override def execute(runnable: Runnable): Unit = loom.execute(runnable) | |
override def reportFailure(cause: Throwable): Unit = cause.printStackTrace() | |
extension [C[X] <: IterableOnceOps[X, C, C[X]], A](c: C[Future[A]]) | |
def get(using LoomingExecutionContext): C[A] = | |
if (!Thread.currentThread.isVirtual) then println("Oh no, you're blocking on an OS thread!") | |
c.map(futOfa => Await.result(futOfa, Duration.Inf)) | |
extension [A](future: Future[A]) | |
def get(duration: Duration)(using LoomingExecutionContext): A = | |
if (!Thread.currentThread.isVirtual) then println("Oh no, you're blocking on an OS thread!") | |
Await.result(future, duration) | |
def get(using LoomingExecutionContext): A = | |
if (!Thread.currentThread.isVirtual) then println("Oh no, you're blocking on an OS thread!") | |
Await.result(future, Duration.Inf) | |
class WTF(index: Int) extends Exception(s"Found Future at index $index, expected null!") | |
import java.util.concurrent.locks.ReentrantLock | |
class MVar[A >: Null <: AnyRef](): | |
import java.util.Objects.isNull | |
private var value: A = null | |
private val writeLock = ReentrantLock() | |
private val writeCond = writeLock.newCondition() | |
private val readLock = ReentrantLock() | |
private val readCond = readLock.newCondition() | |
def put(a: A): Unit = | |
writeLock.lock() | |
try | |
while value ne null do | |
writeCond.await() | |
readLock.lock() | |
try | |
value = a | |
readCond.signal() | |
finally | |
readLock.unlock() | |
finally | |
writeLock.unlock() | |
def take: A = | |
readLock.lock() | |
try | |
while value eq null do | |
readCond.await() | |
writeLock.lock() | |
try | |
val result = value | |
value = null | |
result | |
finally | |
writeLock.unlock() | |
finally | |
readLock.unlock() | |
end MVar | |
extension [A](it: Iterator[A]) | |
def parMap[B](f: A => B)(parallelism: Int)(using LoomingExecutionContext): Iterator[B] = | |
assert(parallelism > 0) | |
if parallelism == 1 then it.map(f) else new Iterator[B]: | |
var initialized = false | |
val pending = Array.ofDim[Future[B]](parallelism) | |
var index = 0 | |
def hasNext(): Boolean = if !initialized then it.hasNext else pending.exists(_ ne null) | |
def next(): B = | |
if !initialized then | |
var full = false | |
while it.hasNext && !full do | |
val a = it.next | |
pending(index) = Future(f(a)) | |
if (index + 1 == parallelism) then | |
index = 0 | |
full = true | |
else | |
index += 1 | |
initialized = true | |
val res: B = Await.result(pending(index), Duration.Inf) | |
pending(index) = null | |
res | |
else | |
if (pending(index) ne null) throw WTF(index) | |
if it.hasNext then | |
val a = it.next | |
pending(index) = Future(f(a)) | |
if (index + 1 == parallelism) then | |
index = 0 | |
else | |
index += 1 | |
val res = Await.result(pending(index), Duration.Inf) | |
pending(index) = null | |
res | |
else | |
if (index + 1 == parallelism) then | |
index = 0 | |
else | |
index += 1 | |
if pending(index) eq null then throw IllegalStateException("Called next after hasNext returned false!") | |
else Await.result(pending(index), Duration.Inf) | |
def parMapUnordered[B](f: A => B)(parallelism: Int)(using LoomingExecutionContext): Iterator[B] = | |
import scala.util.Try | |
assert(parallelism > 0) | |
val mvar = MVar[(Int, Try[B])]() | |
if parallelism == 1 then it.map(f) else new Iterator[B]: | |
var initialized = false | |
val pending = Array.ofDim[Future[B]](parallelism) | |
var index = 0 | |
def hasNext(): Boolean = if !initialized then it.hasNext else pending.exists(_ ne null) | |
def next(): B = | |
if !initialized then | |
var full = false | |
while it.hasNext && !full do | |
val a = it.next | |
val futOfB = Future(f(a)) | |
pending(index) = futOfB | |
futOfB.onComplete(tryB => mvar.put(index -> tryB)) | |
if index + 1 == parallelism then | |
index = 0 | |
full = true | |
else | |
index += 1 | |
initialized = true | |
val (indexToReset, tryB) = mvar.take | |
pending(indexToReset) = null | |
tryB.get | |
else | |
var x = 0 | |
var done = false | |
while x < parallelism && !done do | |
if pending(x) eq null then | |
val a = it.next | |
val futOfB = Future(f(a)) | |
pending(x) = futOfB | |
futOfB.onComplete(tryB => mvar.put(x -> tryB)) | |
if x + 1 == parallelism then | |
done = true | |
else | |
x += 1 | |
val (indexToReset, tryB) = mvar.take | |
pending(indexToReset) = null | |
tryB.get | |
// WARNING, HERE BE STRONK MAGICKS, DO NOT UNCOMMENT THIS UNLESS YOU WANT ABSOLUTE POWER AND UNDEBUGGABILITY | |
// given futureToAWithDuration[A](using LoomingExecutionContext, Duration): Conversion[Future[A], A] with | |
// def apply(future: Future[A]): A = | |
// if (!Thread.currentThread.isVirtual) then println("Oh no, you're blocking on an OS thread!") | |
// Await.result(future, summon[Duration]) | |
// given futureToA[A](using LoomingExecutionContext): Conversion[Future[A], A] with | |
// def apply(future: Future[A]): A = | |
// if (!Thread.currentThread.isVirtual) then println("Oh no, you're blocking on an OS thread!") | |
// Await.result(future, Duration.Inf) | |
// given iterableOfFutureToIterableOfA[C[X] <: IterableOnceOps[X, C, C[X]], A](using LoomingExecutionContext): Conversion[C[Future[A]], C[A]] with | |
// def apply(c: C[Future[A]]): C[A] = c.map(futOfa => Await.result(futOfa, Duration.Inf)) | |
// NO MORE MAGICKS | |
abstract class LoomyApp: | |
def main(args: Array[String]): Unit = | |
implicit val lec = LoomingExecutionContext | |
val mainFut = Future(run(args.toVector)) | |
val res = Await.result(mainFut, Duration.Inf) | |
sys.exit(res) | |
def run(args: Vector[String]): Int | |
import Looming.{given, *} | |
object Main extends LoomyApp: | |
def run(args: Vector[String]): Int = | |
println(s"hello from Scala 3.2 on ${sys.props("java.vm.name")} ${sys.props("java.version")}") | |
def futOfInt = Future { | |
Thread.sleep(500) | |
0 | |
} | |
val fut: Future[Int] = futOfInt | |
val int: Int = futOfInt.get | |
val numbers = 1 to 23 | |
val numbersSquaredInParallel: List[Double] = numbers.toList.map(num => Future(math.pow(num, 2))).get | |
println("=== ORDERED ===") | |
Iterator.iterate(0)(_ + 1) | |
.parMap(i => { | |
Thread.sleep(1000) | |
i + 1 | |
})(4) | |
.take(20) | |
.foreach(println) | |
println("===============") | |
println("== UNORDERED ==") | |
Iterator.iterate(0)(_ + 1) | |
.parMapUnordered(i => { | |
Thread.sleep(util.Random.between(1,11) * 100) | |
i + 1 | |
})(4) | |
.take(20) | |
.foreach(println) | |
println("===============") | |
int | |
One can run this example with $ scala-cli --jvm 19 -J --enable-preview https://gist.github.com/lbialy/69d582fa623288f96625cbfb7876b51a
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Disclaimer: that stuff is probably severely broken as it's just an API PoC.