Skip to content

Instantly share code, notes, and snippets.

@Krever
Created December 2, 2017 12:35
Show Gist options
  • Save Krever/005f617d9da54c514152a07f865a8dc7 to your computer and use it in GitHub Desktop.
Save Krever/005f617d9da54c514152a07f865a8dc7 to your computer and use it in GitHub Desktop.
import java.io._
import io.circe._
import scala.collection.mutable.ListBuffer
import scala.io.Source
import scala.language.higherKinds
import scala.util.{Failure, Success, Try}
trait Codec[T] {
def persist(x: T, label: String): Try[Unit]
def read(label: String): Try[T]
}
trait Logger[-T] {
def log(str: String, cc: Option[T])
}
object CachedStep {
def start[T](label: String, x: =>T, cached: Boolean)(implicit codec: Codec[T], logger: Logger[T]): CachedStep[Unit, T] = CachedStep(label, _ => x, cached, () => (), 0)
}
case class CachedStep[In, Out](
label:String,
f: In => Out,
cached:Boolean,
prev: () => In,
idx: Int
)(
implicit codec: Codec[Out],
logger: Logger[Out]
) {
def log(str: String, x: Option[Out]): Unit = logger.log(s"[Step $idx: $label] $str", x)
def execute(): Out = {
if (cached) {
codec.read(label) match {
case Success(x) =>
log(s"Successfully read from cache", Some(x))
x
case Failure(ex) =>
log("Reading from cache failed", None)
val dep = prev()
log("Executing", None)
val result = f(dep)
val persistResult = codec.persist(result, label)
persistResult match {
case Success(_) => log(s"Persisting successful", None)
case Failure(_) => log(s"Persisting failed", None)
}
result
}
} else {
log("Skipping cache", None)
val dep = prev()
log("Executing", None)
f(dep)
}
}
def map[T](label: String, f: Out => T, cached: Boolean)(implicit codec: Codec[T], logger: Logger[T]): CachedStep[Out, T] = {
new CachedStep(label, f, cached, () => this.execute(), this.idx + 1)
}
}
object FileJsonCodec {
def get[T](cacheDir: File)(implicit encoder: Encoder[T], decoder: Decoder[T]): Codec[Vector[T]] = new Codec[Vector[T]] {
def cacheFile(label: String) = new File(cacheDir, label)
override def persist(cc: Vector[T], label: String): Try[Unit] = {
Try {
val oos = new PrintWriter(new FileOutputStream(cacheFile(label), false))
import io.circe.syntax._
cc.foreach(x => oos.write(x.asJson.noSpaces + "\n"))
oos.close()
}
}
override def read(label: String): Try[Vector[T]] = {
Try{
val listBuffer = ListBuffer[T]()
import io.circe.parser._
Source.fromFile(cacheFile(label)).getLines()
.foreach(x => listBuffer += decode[T](x).right.get)
listBuffer.toVector
}
}
}
}
case class A(name: String)
case class B(name: String)
case class C(name: String)
class Service {
def getAs(): Vector[A] = {
// println("getAs")
Vector(A("1"), A("2"))
}
def getBs(a: A): Vector[B] = {
// println("getBs")
Vector(B("1"), B("2"))
}
def getCs(b: B): Vector[C] = {
// println("getCs")
Vector(C("1"), C("2"))
}
}
object Main {
val file = new File("./workdir")
file.mkdirs()
// implicit def codec[T: Encoder : Decoder]: Codec[T, Iterable] = FileJsonCodec.get[T](file)
import io.circe.generic.auto._
implicit def codecV[T: Encoder : Decoder]: Codec[Vector[T]] = FileJsonCodec.get[T](file)
implicit def logger[T]: Logger[Iterable[T]] = (str: String, cc: Option[Iterable[T]]) => println(s"$str${cc.map(x => s". Size: ${x.size}").getOrElse("")}")
def pipeline(service: Service): CachedStep[_, Vector[C]] = {
CachedStep.start("As", service.getAs(), true)
.map("Bs", _.flatMap(x => service.getBs(x)), false)
.map("Cs", _.flatMap(x => service.getCs(x)), true)
}
def main(args: Array[String]): Unit = {
pipeline(new Service)
.execute()
.foreach(println)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment