Created
June 14, 2023 12:05
-
-
Save diesalbla/5fadcd75d075617564b4aad403fbf810 to your computer and use it in GitHub Desktop.
Scala Coroutines - Basic definitions
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
abstract class Starter[A]: | |
def invoke(completion: Continuation[A]): A | Any | Null | |
trait ContinuationLib: | |
extension [A](continuation: Continuation[A]) | |
def intercepted(ec: ExecutionContext): Continuation[A] = | |
continuation match | |
case x: ContinuationImpl => | |
x.intercepted(ec).asInstanceOf[Continuation[A]] | |
case _ => continuation | |
trait StarterLib extends ContinuationLib: | |
extension [A](starter: Starter[A]) | |
def start(completion: Continuation[A]): Unit = | |
create(completion).intercepted(completion.executionContext).resume(()) | |
def create(completion: Continuation[A]): Continuation[Unit] = | |
if (starter.isInstanceOf[ContinuationImpl]) | |
starter.create(completion) | |
else | |
starter.createFromCompletion(completion) | |
def createFromCompletion(completion: Continuation[A]): Continuation[Unit] = | |
val context = completion.context() | |
if (context == EmptyTuple) | |
new RestrictedContinuation(completion.asInstanceOf): | |
private var label = 0 | |
override protected def invokeSuspend( | |
result: Either[Throwable, Any | Null | Continuation.State.Suspended.type]): Any | | |
Null = | |
label match | |
case 0 => | |
label = 1 | |
result match | |
case Left(exception) => | |
throw exception | |
case _ => () | |
starter.invoke(this) | |
case 1 => | |
label = 2 | |
result match | |
case Left(exception) => | |
throw exception | |
case Right(result) => | |
result | |
case _ => throw new IllegalStateException("already completed") | |
else | |
new ContinuationImpl(completion.asInstanceOf, context): | |
private var label = 0 | |
override def invokeSuspend( | |
result: Either[Throwable, Any | Null | Continuation.State.Suspended.type]) | |
: Any | Null | Continuation.State.Suspended.type = | |
label match | |
case 0 => | |
label = 1 | |
result match | |
case Left(exception) => | |
throw exception | |
case _ => () | |
starter.invoke(this) | |
case 1 => | |
label = 2 | |
result match | |
case Left(exception) => | |
throw exception | |
case Right(result) => | |
result | |
case _ => throw new IllegalStateException("already completed") | |
abstract class BaseContinuationImpl( | |
val completion: Continuation[Any | Null] | Null, | |
icontext: Tuple | |
) extends Continuation[Any | Null], | |
ContinuationStackFrame, | |
Serializable: | |
override type Ctx = Tuple | |
override def context() = icontext | |
override val executionContext: ExecutionContext = | |
if completion == null then throw RuntimeException("resume called with no completion") | |
else completion.executionContext | |
final override def resume(result: Any | Null): Unit = resumeAux(Right(result)) | |
final override def raise(error: Throwable): Unit = | |
resumeAux(Left(error)) | |
private def resumeAux(result: Either[Throwable, Any | Null]): Unit = { | |
var current = this | |
var param = result | |
while true do | |
if (completion == null) throw RuntimeException("resume called with no completion") | |
val outcome: Either[Throwable, Any | Null] = | |
try | |
val outcome = current.invokeSuspend(param) | |
if (outcome == Continuation.State.Suspended) return | |
Right(outcome) | |
catch | |
case exception: Throwable => | |
Left(exception) | |
releaseIntercepted() | |
completion match | |
case base: BaseContinuationImpl => | |
current = base | |
param = outcome | |
println(s"base raise/resume") | |
outcome.fold(current.raise, current.resume) | |
return | |
case _ => | |
println(s"completion raise/resume") | |
outcome.fold(completion.raise, completion.resume) | |
return | |
} | |
protected def invokeSuspend( | |
result: Either[Throwable, Any | Null | Continuation.State.Suspended.type]): Any | Null | |
protected def releaseIntercepted(): Unit = () | |
def create(completion: Continuation[?]): Continuation[Unit] = | |
throw UnsupportedOperationException("create(Continuation) has not been overridden") | |
def create(value: Any | Null, completion: Continuation[Any | Null]): Continuation[Unit] = | |
throw UnsupportedOperationException("create(Any?;Continuation) has not been overridden") | |
override def callerFrame: ContinuationStackFrame | Null = | |
if (completion != null && completion.isInstanceOf[ContinuationStackFrame]) | |
completion.asInstanceOf | |
else null | |
override def getStackTraceElement(): StackTraceElement | Null = | |
null | |
abstract class ContinuationImpl( | |
completion: Continuation[Any | Null], | |
icontext: Tuple | |
) extends BaseContinuationImpl(completion, icontext): | |
override type Ctx = Tuple | |
override def context() = icontext | |
override val executionContext: ExecutionContext = completion.executionContext | |
private var _intercepted: Continuation[Any | Null] = null | |
def intercepted(ec: ExecutionContext): Continuation[Any | Null] = | |
if (_intercepted != null) _intercepted | |
else | |
val interceptor = contextService[ContinuationInterceptor]() | |
val intercepted = | |
if (interceptor != null) | |
interceptor.interceptContinuation( | |
this | |
) | |
else this | |
_intercepted = intercepted | |
intercepted | |
override def releaseIntercepted(): Unit = | |
val intercepted = _intercepted | |
if (intercepted != null && intercepted != this) | |
val interceptor = contextService[ContinuationInterceptor]() | |
if (interceptor != null) | |
interceptor.releaseInterceptedContinuation(intercepted) | |
_intercepted = CompletedContinuation | |
object CompletedContinuation extends Continuation[Any | Null]: | |
override type Ctx = Nothing | |
override val executionContext: ExecutionContext = ??? | |
override def context: CompletedContinuation.Ctx = | |
throw IllegalStateException("Already completed") | |
override def resume(result: Any | Null): Unit = | |
throw IllegalStateException("Already completed") | |
override def raise(error: Throwable): Unit = | |
throw IllegalStateException("Already completed") | |
abstract class RestrictedContinuation( | |
completion: Continuation[Any | Null] | Null, | |
) extends BaseContinuationImpl(completion, EmptyTuple): | |
if (completion != null) | |
require(completion.context() == EmptyTuple) | |
trait ContinuationStackFrame: | |
def callerFrame: ContinuationStackFrame | Null | |
def getStackTraceElement(): StackTraceElement | Null | |
type CancellationException = java.util.concurrent.CancellationException | |
class SafeContinuation[T] private ( | |
val delegate: Continuation[T], | |
initialResult: T | Continuation.State) | |
extends SafeContinuationBase, | |
Continuation[T], | |
ContinuationStackFrame: | |
override type Ctx = delegate.Ctx | |
override val executionContext: ExecutionContext = delegate.executionContext | |
override def context(): Ctx = delegate.context() | |
result = initialResult | |
var errored: Boolean = false | |
override def resume(value: T): Unit = | |
while true do | |
this.result match { | |
case Continuation.State.Undecided => | |
if (CAS_RESULT(Continuation.State.Undecided, value)) | |
return () | |
case Continuation.State.Suspended => | |
if (CAS_RESULT(Continuation.State.Suspended, Continuation.State.Resumed)) { | |
delegate.resume(value) | |
return () | |
} | |
case _ => | |
throw IllegalStateException("Already resumed") | |
} | |
override def raise(error: Throwable): Unit = | |
while true do | |
val cur = this.result | |
if (cur == Continuation.State.Undecided) { | |
if (CAS_RESULT(Continuation.State.Undecided, error)) | |
errored = true | |
return delegate.raise(error) | |
} else if (cur == Continuation.State.Suspended) { | |
if (CAS_RESULT(Continuation.State.Suspended, Continuation.State.Resumed)) { | |
errored = true | |
delegate.raise(error) | |
return () | |
} | |
} else throw IllegalStateException("Already resumed") | |
def getOrThrow(): T | Null | Continuation.State.Suspended.type = | |
var result = this.result | |
if (result == Continuation.State.Undecided) { | |
if (CAS_RESULT(Continuation.State.Undecided, Continuation.State.Suspended)) { | |
return Continuation.State.Suspended | |
} | |
result = this.result | |
} | |
if (result == Continuation.State.Resumed) { | |
Continuation.State.Suspended | |
} else if ((result ne null) && errored) { | |
throw result.asInstanceOf[Throwable] | |
} else | |
result.asInstanceOf[T] | |
override def callerFrame: ContinuationStackFrame | Null = | |
if (delegate != null && delegate.isInstanceOf[ContinuationStackFrame]) delegate.asInstanceOf | |
else null | |
override def getStackTraceElement(): StackTraceElement | Null = | |
null | |
object SafeContinuation: | |
def init[A](cont: Continuation[A]): SafeContinuation[A] = | |
val intrinsic = cont.intercepted(cont.executionContext) | |
new SafeContinuation[A](intrinsic, Continuation.State.Undecided) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment