Skip to content

Instantly share code, notes, and snippets.

@cloudmark
Last active July 12, 2019 00:46
Show Gist options
  • Save cloudmark/a90ec023fc471531f6869959746f8ab0 to your computer and use it in GitHub Desktop.
Save cloudmark/a90ec023fc471531f6869959746f8ab0 to your computer and use it in GitHub Desktop.
ZIO POC
package com.acme.poc
import zio.{Ref, Runtime, UIO, ZIO}
import cats.implicits._
import zio.internal.PlatformLive
import com.acme.poc.App.InteractionLog._
import scala.util.Try
object App {
type Error = String
type Mapper[A, B] = A => Either[Error, B]
/**
* InteractionLog contains a reference for InteractionData.
*
* InteractionData represents a piece of data which we can ship somewhere so that we can analyse later.
* We will aggregate piece of data bit by bit as the code flows through the code logic.
*/
trait InteractionLog {
def interactionData: Ref[InteractionData]
}
/**
* Companion object containing some helper methods.
*/
object InteractionLog {
def log(line: String): ZIO[InteractionLog, Nothing, Unit] =
ZIO.accessM[InteractionLog](_.interactionData.update(_.log(line))).unit
def request(request: Request): ZIO[InteractionLog, Nothing, Unit] =
ZIO.accessM[InteractionLog](_.interactionData.update(_.request(request))).unit
def response(response: Response): ZIO[InteractionLog, Nothing, Unit] =
ZIO.accessM[InteractionLog](_.interactionData.update(_.response(response))).unit
}
/**
* Interaction data which represents all the data that we want to keep when someone interacts with our system.
* @param correlationId unique correlation id to identify this interaction
* @param raw the raw request coming from the upstream
* @param request the mapped request (optional)
* @param response the mapped response (optional)
* @param loggedValues a transaction script to describe what happened
*/
case class InteractionData(correlationId: String, raw: String, request: Option[Request], response: Option[Response], loggedValues: Vector[String]) {
def log(line: String): InteractionData = copy(loggedValues = loggedValues :+ line)
def request(r: Request): InteractionData = copy(request= r.some)
def response(r: Response): InteractionData = copy(response = r.some)
}
/**
* A representation of a Request, Response pair.
*/
case class Request(num: Int)
case class Response(num: Int, mappedNum: Int)
/**
* The list of environments that are needed by our system. Namely 2
* ParseEnvironment - the environment needed while we parse the request
* ProcessEnvironment - the environment needed to process a mapped request.
*/
trait ParseEnvironment {
def mapper: Mapper[String, Request]
}
trait ProcessEnvironment {
def process: Mapper[Request, Response]
}
/**
* =============================================
* Business logic
* =============================================
*/
/**
* Maps a string to a request (if it cannot be mapped return an error describing why)
* @param s the string to map
* @return the ZIO of the mapped Request or Error
*/
def getRequest(s: String): ZIO[ParseEnvironment with InteractionLog, Error, Request] =
for {
_ <- log("Attempting to retrieve mapping environment")
mapping <- ZIO.access[ParseEnvironment](_.mapper)
result <- ZIO.fromEither(mapping(s))
_ <- request(result)
_ <- log("Mapped result")
} yield result
/**
* Porces the request to generate a response (if possible, otherwise give the underlying error)
* @param request the request
* @return the ZIO of the response or error
*/
def processRequest(request: Request): ZIO[ProcessEnvironment with InteractionLog, Error, Response] = for {
processFunction <- ZIO.access[ProcessEnvironment](_.process)
_ <- log("Processing request")
result <- ZIO.fromEither(processFunction(request))
_ <- response(result)
_ <- log("Processed request")
} yield result
/**
* Post process the data [Success Case] (in our case we will ship the data to be logged on a remote server)
*/
val postProcessRequestResponse: ZIO[InteractionLog, Nothing, Unit] =
for {
_ <- log("Retrieving interaction data")
interactionData <- ZIO.accessM[InteractionLog](_.interactionData.get)
_ <- UIO(println(interactionData.loggedValues))
} yield ()
/**
* Post process the data [Error Case] (in our case we will ship the data to be logged on a remote server)
*/
def processError(underlyingError: Error): ZIO[InteractionLog, Nothing, Unit] =
for {
_ <- log(s"An error has occurred $underlyingError")
interactionData <- ZIO.accessM[InteractionLog](_.interactionData.get)
_ <- UIO(println(interactionData.loggedValues))
} yield ()
def main(args: Array[String]): Unit = {
/**
* Common Environment is shared amongst a number of controllers.
*/
trait CommonEnvironment extends ParseEnvironment with ProcessEnvironment {
override def mapper: Mapper[String, Request] = value => Try(Request(value.toInt)).toEither.leftMap(_ => "Error")
override def process: Mapper[Request, Response] = request => Response(request.num, request.num * 2).asRight[Error]
def withLogger(interactionDataRef: Ref[InteractionData]): LoggedEnvironment = new LoggedEnvironment {
override def interactionData: Ref[InteractionData] = interactionDataRef
}
}
/**
* Logged Environment augments the common environment with the interaction data.
*/
trait LoggedEnvironment extends CommonEnvironment with InteractionLog {
def interactionData: Ref[InteractionData]
}
/**
* The common runtime, used by all controllers
*/
implicit val commonRuntime: Runtime[CommonEnvironment] = Runtime(new CommonEnvironment {
override def mapper: Mapper[String, Request] = value => Try(Request(value.toInt)).toEither.leftMap(_ => "Error")
override def process: Mapper[Request, Response] = request => Response(request.num, request.num * 2).asRight[Error]
}, PlatformLive.Default)
/**
* Given a correlationId and rawRequest run the pipeline
*/
def resolve(correlationId: String, rawRequest: String)(implicit commonRuntime: Runtime[CommonEnvironment]): UIO[Unit] = {
def process: ZIO[ParseEnvironment with ProcessEnvironment with InteractionLog, Nothing, Unit] =
(for {
interactionData <- ZIO.accessM[InteractionLog](_.interactionData.get)
request <- getRequest(interactionData.raw)
_ <- processRequest(request)
_ <- postProcessRequestResponse
} yield ()).catchAll(processError)
for {
ref <- Ref.make(InteractionData(correlationId, rawRequest, Option.empty, Option.empty, Vector.empty))
runtime = commonRuntime.map(_.withLogger(ref))
} yield runtime.unsafeRun(process)
}
/**
* An example controller
*/
def someExampleController(correlationId: String, rawRequest: String): Unit =
commonRuntime.unsafeRun(resolve(correlationId, rawRequest))
// Success Case
someExampleController("correlationId", "1")
// Error Case
someExampleController("correlationId", "asdasdad")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment