Skip to content

Instantly share code, notes, and snippets.

@tindzk
Created August 4, 2016 14:24
Show Gist options
  • Save tindzk/f4cb72e9bc6f58eb66654a57c0dbbb03 to your computer and use it in GitHub Desktop.
Save tindzk/f4cb72e9bc6f58eb66654a57c0dbbb03 to your computer and use it in GitHub Desktop.
Service
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
/**
* A Service is an abstraction for a small processing unit that takes a request
* and responds asynchronously. The communication is therefore entirely
* message-driven. The messages are epxected to be immutable. A service can
* have an internal state that will not be exposed. Services can be composed
* and requests may be forwarded to other services. The concept is inspired by
* Akka's actors.
*/
trait Service[Req, Resp] extends reactive.propagate.Produce[Req] {
val process: PartialFunction[Req, Future[Resp]]
def produce(req: Req): Unit = process(req)
def request(req: Req): Future[Resp] = process(req)
def ?(req: Req): Future[Resp] = request(req)
def compose(service: Service[Req, Resp]): Service[Req, Resp] =
Service(process.orElse(service.process))
def map(f: PartialFunction[Req, Req]): Service[Req, Resp] =
Service {
Function.unlift { req: Req =>
process.lift(f.lift(req).getOrElse(req))
}
}
def mapResponse(f: Resp => Resp): Service[Req, Resp] =
Service {
Function.unlift { req: Req =>
process.lift(req).map(_.map(f))
}
}
def filter(f: Req => Boolean): Service[Req, Resp] =
Service {
Function.unlift { req: Req =>
if (f(req)) process.lift(req)
else None
}
}
}
object Service {
def apply[Req, Resp](f: PartialFunction[Req, Future[Resp]]): Service[Req, Resp] =
new Service[Req, Resp] {
val process = f
}
}
import org.scalatest.AsyncFunSuite
import scala.concurrent.Future
class ServiceSpec extends AsyncFunSuite {
sealed trait Request
case class RequestA(i: Int = 0) extends Request
case class RequestB() extends Request
sealed trait Response
case class ResponseA(i: Int = 0) extends Response
case class ResponseB() extends Response
test("Querying service") {
var triggered = false
val service = Service[Request, Response] {
case _: RequestA =>
triggered = true
Future.successful(ResponseA())
case RequestB() => Future.successful(ResponseB())
}
service ! RequestA()
assert(triggered)
}
test("Sending request to service") {
val service = Service[Request, Response] {
case _: RequestA => Future.successful(ResponseA())
case RequestB() => Future.successful(ResponseB())
}
val response = service ? RequestA()
response.map(x => assert(x == ResponseA()))
}
test("Composing services") {
val serviceA = Service[Request, Response] {
case _: RequestA => Future.successful(ResponseA())
}
val serviceB = Service[Request, Response] {
case RequestB() => Future.successful(ResponseB())
}
val service = serviceA.compose(serviceB)
val response = service ? RequestB()
response.map(x => assert(x == ResponseB()))
}
test("Mapping requests") {
val service = Service[Request, Response] {
case r: RequestA => Future.successful(ResponseA(r.i))
case RequestB() => Future.successful(ResponseB())
}.map {
case r: RequestA => RequestA(r.i + 1)
}
for {
r <- service ? RequestA()
r2 <- service ? RequestB()
} yield {
assert(r == ResponseA(1) && r2 == ResponseB())
}
}
test("Forwarding requests") {
val service = Service[Request, Response] {
case r: RequestA => Future.successful(ResponseA())
}
val service2 = Service[Request, Response] {
case RequestB() => Future.successful(ResponseB())
case r => service ? r
}
for {
r <- service2 ? RequestA()
r2 <- service2 ? RequestB()
} yield {
assert(r == ResponseA() && r2 == ResponseB())
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment