Skip to content

Instantly share code, notes, and snippets.

@iravid
Created June 28, 2019 06:06
Show Gist options
  • Save iravid/9fff40b375cb1d15e6e39086b8093a17 to your computer and use it in GitHub Desktop.
Save iravid/9fff40b375cb1d15e6e39086b8093a17 to your computer and use it in GitHub Desktop.
Map-Reduce with ZIO
import $ivy.`dev.zio::zio:1.0.0-RC8-12`
import $ivy.`dev.zio::zio-streams:1.0.0-RC8-12`
import zio._, zio.stream._
object Step1 {
import java.nio.file.{Files, Paths, Path}
import scala.collection.JavaConverters._
import zio.blocking._
def listFiles(dir: String): ZIO[Blocking, Throwable, List[Path]] =
effectBlocking {
Files.list(Paths.get(dir)).iterator.asScala.toList
}
def readContents(path: Path): ZIO[Blocking, Throwable, String] =
effectBlocking {
new String(
Files.readAllBytes(path),
"UTF-8"
)
}
def countAllWords(dir: String): ZIO[Blocking, Throwable, Long] =
for {
paths <- listFiles(dir)
counts <- ZIO.foreach(paths) { path =>
readContents(path).map { contents =>
contents.split(' ').size
}
}
} yield counts.sum
}
object Step2 {
import zio.blocking._
import Step1.{listFiles, readContents}
def mapReduce[A, B](dir: String)
(map: String => A)
(z: B)(reduce: (B, A) => B): ZIO[Blocking, Throwable, B] =
for {
paths <- listFiles(dir)
mapped <- ZIO.foreach(paths) { path =>
readContents(path).map { contents =>
map(contents)
}
}
} yield mapped.foldLeft(z)(reduce)
def countAllWords(dir: String) = mapReduce(dir)(_.split(' ').size)(0L)(_ + _)
}
object Step3 {
import zio.blocking._
import Step1.{listFiles, readContents}
def mapReduce[A, B](dir: String, workers: Int)
(map: String => A)
(z: B)(reduce: (B, A) => B): ZIO[Blocking, Throwable, B] =
for {
paths <- listFiles(dir)
mapped <- ZIO.foreachParN(workers)(paths) { path =>
readContents(path).map { contents =>
map(contents)
}
}
} yield mapped.foldLeft(z)(reduce)
def countAllWords(dir: String, workers: Int) =
mapReduce(dir, workers)(_.split(' ').size)(0L)(_ + _)
}
object Step4 {
import java.nio.file.Path
import zio.blocking._
import Step1.{listFiles, readContents}
def createMapWorker[A](inputQueue: Queue[Path], reduceQueue: Queue[A])(map: String => A) =
(for {
path <- inputQueue.take
contents <- readContents(path)
a = map(contents)
_ <- reduceQueue.offer(a)
} yield ()).forever.fork
def createReduceWorker[A, B](inputQueue: Queue[A], outputQueue: Queue[B], latest: Ref[B])
(reduce: (B, A) => B) =
(for {
b <- latest.get
a <- inputQueue.take
newB = reduce(b, a)
_ <- latest.set(newB)
_ <- outputQueue.offer(newB)
} yield ()).forever.fork
def mapReduce[A, B](inputQueue: Queue[Path], reduceQueue: Queue[A], outputQueue: Queue[B], workers: Int)
(map: String => A)
(z: B)(reduce: (B, A) => B): ZIO[Blocking, Throwable, Unit] = {
val mapWorkers = List.fill(workers)(createMapWorker(inputQueue, reduceQueue)(map))
val reduceWorker =
for {
bRef <- Ref.make(z)
fiber <- createReduceWorker(reduceQueue, outputQueue, bRef)(reduce)
} yield fiber
for {
mapFibers <- ZIO.collectAll(mapWorkers)
reduceFiber <- reduceWorker
_ <- Fiber.joinAll(reduceFiber :: mapFibers)
} yield ()
}
def printer(queue: Queue[Long]) =
(for {
count <- queue.take
_ <- UIO(println("Current count: $count"))
} yield ()).forever.fork
def countAllWords(dir: String, workers: Int) =
for {
inputQueue <- Queue.bounded[Path](16)
reduceQueue <- Queue.bounded[Long](16)
outputQueue <- Queue.bounded[Long](16)
paths <- listFiles(dir)
_ <- inputQueue.offerAll(paths)
_ <- mapReduce(inputQueue, reduceQueue, outputQueue, workers)(
_.split(' ').size)(0L)(_ + _).fork
_ <- printer(outputQueue)
} yield ()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment