Skip to content

Instantly share code, notes, and snippets.

@DamianReeves
Created November 13, 2024 17:01
Show Gist options
  • Save DamianReeves/726ada5e887ec78a8854446dae7987bb to your computer and use it in GitHub Desktop.
Save DamianReeves/726ada5e887ec78a8854446dae7987bb to your computer and use it in GitHub Desktop.
Actors with Kyo
package kyo
import kyo.Actor.Service
import kyo.kernel.Boundary
import kyo.kernel.Reducible
/** An actor prototype not considering aspects like identity and supervision. I think we could initially go with a minimal approach similar
* to this one providing local isolated actors.
*
* Instead of leaving the decision to handle the result of an actor message to the caller, this prototype separates actors into two kinds:
* Services, where each message has a response, and fire-and-forget Actions.
*
* Something interesting about this design is that actors would have mailboxes with limited size and, in case they're full, new messages
* would asynchrously block the caller, providing backpressure between actors. I'm not sure if there are solutions that provide this
* behavior, we'd need to think through the implications of that. Another option is allowing users to define sliding and dropping
* mailboxes.
*/
object Actor:
// Actor services receive an input and return an output to be handled by the caller.
// E => possible error types
// A => input
// B => output
abstract class Service[E, A, B]:
def apply(input: A)(using Frame): B < (Async & Abort[E | Closed])
def close(using Frame): Maybe[Seq[A]] < Async
object Service:
// Note how the state is handled via Var. Ctx/boundary is Kyo's mechanism
// to allow context effects (Env, Local, Resource, etc) when forking fibers.
def init[State: Tag, E, A, B: Flat, Ctx](
mailboxSize: Int,
initialState: State
)(
f: A => B < (Var[State] & Abort[E] & Async & Ctx)
)(
using
boundary: Boundary[Ctx, IO],
initFrame: Frame
): Service[E, A, B] < (IO & Ctx) =
// Since a service needs to reply, the message saves the sender promise
// to fulfill it later
case class Message(value: A, sender: Promise[E | Closed, B])
for
mailbox <- Channel.init[Message](mailboxSize, Access.MultiProducerSingleConsumer)
consumer <-
// Initializes the consumer that keeps listening for new messages
Async.run {
Loop(initialState) { state =>
mailbox.take.map { message =>
// handle the message with the current state
Var.runTuple(state)(f(message.value)).map { (newState, b) =>
// complete the sender with the result and resume loop with the new state
message.sender.complete(Result.success(b)).andThen(Loop.continue(newState))
}
}
}
}
yield
new Service[E, A, B]:
override def apply(input: A)(using Frame) =
Promise.init[E | Closed, B].map { sender =>
// Just enqueue the message and return the promise
mailbox.put(Message(input, sender)).andThen(sender.get)
}
override def close(using frame: Frame) =
mailbox.close.map {
case Absent => Absent
case Present(backlog) =>
val closed = Closed("Actor service closed", initFrame, frame)
// Interrupt the consumer fiber
consumer.interrupt(Result.Panic(closed)).andThen {
// Complete all pending messages with a Closed failure and
// return the message backlog.
val fail = Result.fail(closed)
Kyo.foreach(backlog) { message =>
message.sender.complete(fail).andThen(message.value)
}.map(Maybe(_))
}
}
end new
end for
end init
end Service
// Actions are essentially fire-and-forget services. Note how it can't define a failure type
// since the caller won't wait for the processing.
abstract class Action[A] extends Service[Nothing, A, Unit]
object Action:
def init[State: Tag, A, E, Ctx](
mailboxSize: Int,
initialState: State
)(
f: A => Unit < (Var[State] & Async & Ctx)
)(
using
boundary: Boundary[Ctx, Async],
initFrame: Frame
): Action[A] < (IO & Ctx) =
for
mailbox <- Channel.init[A](mailboxSize, Access.MultiProducerSingleConsumer)
consumer <-
// Similar to the Service impl but simpler since it doesn't need to notify the sender
Async.run {
Loop(initialState) { state =>
mailbox.take.map { value =>
Var.run(state)(f(value).andThen(Var.get[State])).map(Loop.continue)
}
}
}
yield
new Action[A]:
override def apply(input: A)(using Frame) =
// Fire-and-forget
mailbox.put(input)
override def close(using frame: Frame) =
mailbox.close.map {
case Absent => Absent
case Present(backlog) =>
val failure = Closed("Actor action closed", initFrame, frame)
consumer.interrupt(Result.Panic(failure)).andThen(Maybe(backlog))
}
end new
end init
end Action
end Actor
@DamianReeves
Copy link
Author

Getting the following error:

[error] 58 | Async.run(x)
[error] | ^
[error] |The computation you're trying to fork with Async has pending effects that aren't supported:
[error] |
[error] | Ctx
[error] |
[error] |You need to handle these effects before using Async operations. For example:
[error] |
[error] |Instead of:
[error] | Async.run(computation) // where computation has pending MyEffect
[error] |
[error] |Handle the effect first:
[error] | Async.run(MyEffect.run(computation))
[error] |
[error] |Note: There's currently a limitation with nested Async operations (like Async.run(Async.run(v))).
[error] |As a workaround, you can break them into separate statements:
[error] |
[error] |Instead of:
[error] | Async.run(Async.run(v))
[error] |
[error] |Use:
[error] | val x = Async.run(v)
[error] | Async.run(x)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment