Last active
August 29, 2015 14:06
-
-
Save EECOLOR/b1a8856608553c83acb4 to your computer and use it in GitHub Desktop.
An alternative version of typed actors. Actors can only receive messages of a certain type, the type parameter determines the return type.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package experiment | |
import scala.concurrent.ExecutionContext | |
import scala.concurrent.Future | |
import scala.language.higherKinds | |
import scala.language.implicitConversions | |
import scala.reflect.ClassTag | |
import akka.actor.Actor | |
import akka.actor.ActorRef | |
import akka.actor.ActorRefFactory | |
import akka.actor.Props | |
import akka.util.Timeout | |
trait TypedActor[MessageType[_]] { outer: Actor => | |
type TypedReceive[ResultType] = TypedActor.TypedReceive[MessageType, ResultType] | |
private def receiveFor[ResultType](typedReceive: TypedReceive[ResultType]): Receive = { | |
// I don't know how to get rid of this warning | |
case message: MessageType[ResultType] if typedReceive isDefinedAt message => | |
import context.dispatcher | |
import akka.pattern.pipe | |
typedReceive(message).asFuture pipeTo sender | |
} | |
// This should be in the context, for this example that was too complex | |
def typedBecome[ResultType](behavior: TypedReceive[ResultType]) = | |
context.become(receiveFor(behavior)) | |
def typedBecome[ResultType](behavior: TypedReceive[ResultType], discardOld: Boolean) = | |
context.become(receiveFor(behavior), discardOld) | |
final def receive: Receive = receiveFor(typedReceive) | |
def typedReceive[ResultType]: TypedReceive[ResultType] | |
} | |
object TypedActor { | |
def apply(system: ActorRefFactory) = new TypedActorFactory(system) | |
type TypedReceive[MessageType[_], ResultType] = PartialFunction[MessageType[ResultType], Result[ResultType]] | |
sealed trait Result[ResultType] { | |
def asFuture: Future[ResultType] | |
} | |
object Result { | |
implicit def any[ResultType](value: ResultType) = | |
new Result[ResultType] { | |
val asFuture = Future successful value | |
} | |
implicit def forFuture[ResultType](value: Future[ResultType]) = | |
new Result[ResultType] { | |
val asFuture = value | |
} | |
} | |
} | |
class TypedActorRef[MessageType[_]](actorRef: ActorRef) { | |
def ?[ReturnType](message: MessageType[ReturnType])(implicit timeout: Timeout, ec: ExecutionContext): Future[ReturnType] = { | |
import akka.pattern.ask | |
(actorRef ? message).map(_.asInstanceOf[ReturnType]) | |
} | |
} | |
trait TypedProps[MessageType[_]] { | |
def actorProps: Props | |
} | |
object TypedProps { | |
def apply[T <: Actor: ClassTag](implicit helper: TypeInferenceHelper[T]) = | |
new TypedProps[helper.MessageType] { | |
def actorProps = Props[T] | |
} | |
} | |
class TypedActorFactory(context: ActorRefFactory) { | |
def typedActorOf[MessageType[_]](props: TypedProps[MessageType]) = | |
new TypedActorRef[MessageType](context actorOf props.actorProps) | |
} | |
trait TypeInferenceHelper[T] { | |
type MessageType[_] | |
} | |
object TypeInferenceHelper { | |
type Aux[T, M[_]] = TypeInferenceHelper[T] { | |
type MessageType[x] = M[x] | |
} | |
implicit def any[T, M[_]](implicit ev: T => TypedActor[M]): Aux[T, M] = null | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package experiment | |
import scala.concurrent.Await | |
import scala.concurrent.Future | |
import scala.concurrent.duration.DurationInt | |
import scala.language.higherKinds | |
import scala.language.implicitConversions | |
import akka.actor.Actor | |
import akka.actor.ActorSystem | |
import akka.util.Timeout | |
sealed trait Message1[x] | |
case class Echo(input: String) extends Message1[String] | |
case class ComplexComputation(input: Int) extends Message1[Int] | |
class Test1 extends Actor with TypedActor[Message1] { | |
val test2 = TypedActor(context).typedActorOf(TypedProps[Test2]) | |
import context.dispatcher | |
def typedReceive[ResultType] = { | |
case Echo(input) => | |
typedBecome(strangeEcho) | |
input | |
case ComplexComputation(input) => | |
implicit val timeout = Timeout(5.seconds) | |
(test2 ? SimpleComputation(input)).map(_ + 1) | |
} | |
def strangeEcho[ResultType]: TypedReceive[ResultType] = { | |
case Echo(input) => input + "..." + input | |
} | |
} | |
sealed trait Message2[x] | |
case class SimpleComputation(input: Int) extends Message2[Int] | |
class Test2 extends Actor with TypedActor[Message2] { | |
def typedReceive[ResultType] = { | |
case SimpleComputation(input) => input + 1 | |
} | |
} | |
object Explore { | |
val system = ActorSystem("test") | |
val typedSystem = TypedActor(system) | |
val typedRef = typedSystem.typedActorOf(TypedProps[Test1]) | |
implicit val timeout = Timeout(5.seconds) | |
import system.dispatcher | |
val result1: Future[Int] = typedRef ? ComplexComputation(1) | |
val result2: Future[String] = typedRef ? Echo("test") | |
val result3: Future[String] = typedRef ? Echo("test") | |
import system.dispatcher | |
val result = Await.result(Future.sequence(Seq(result1, result2, result3)), 1.second) | |
system.shutdown() | |
system.awaitTermination() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment