Skip to content

Instantly share code, notes, and snippets.

@bpholt
Last active November 7, 2018 23:10
Show Gist options
  • Save bpholt/7be1e1c49b0b93c4dd587025bbdbb4b0 to your computer and use it in GitHub Desktop.
Save bpholt/7be1e1c49b0b93c4dd587025bbdbb4b0 to your computer and use it in GitHub Desktop.
FS2 Pipe that calculates the SHA-256 hash of a Stream[F, Byte]
import java.security.MessageDigest
import cats.effect._
import fs2._
import fs2.async.Promise
import tsec.common._
object Sha256Pipe {
def apply[F[_] : Sync](promisedHexString: Promise[F, String]): Pipe[F, Byte, Byte] = {
def pull(digest: MessageDigest): Stream[F, Byte] => Pull[F, Byte, String] =
_.pull.unconsChunk.flatMap {
case None => Pull.eval(Sync[F].delay(digest.digest().toHexString))
case Some((c: Chunk[Byte], rest: Stream[F, Byte])) =>
val bytes = c.toBytes
for {
_ <- Pull.eval(Sync[F].delay(digest.update(bytes.values, bytes.offset, bytes.length)))
_ <- Pull.outputChunk(c)
hexString <- pull(digest)(rest)
} yield hexString
}
def calculateHashOf(input: Stream[F, Byte]): Pull[F, Byte, Unit] =
for {
initialDigest <- Pull.eval(Sync[F].delay(MessageDigest.getInstance("SHA-256")))
hexString <- pull(initialDigest)(input)
_ <- Pull.eval(promisedHexString.complete(hexString))
} yield ()
calculateHashOf(_).stream
}
}
object Example extends StreamApp[IO] {
import scala.concurrent.ExecutionContext.Implicits.global
val example = "Hello World!"
val strings: Stream[IO, Byte] = Stream.emit(example).through(text.utf8Encode)
override def stream(args: List[String], requestShutdown: IO[Unit]): Stream[IO, StreamApp.ExitCode] =
for {
expectedHash <- Stream.eval(IO(MessageDigest.getInstance("SHA-256").digest(example.getBytes("UTF-8")).toHexString))
promisedHash <- Stream.eval(Promise.empty[IO, String])
_ <- strings.through(Sha256Pipe(promisedHash)).through(text.utf8Decode).fold1(_ + _).to(Sink(s => IO(println(s"Text: $s"))))
actualHash <- Stream.eval(promisedHash.get)
_ <- Stream.eval {
for {
_ <- IO(println(s"Expected hash: $expectedHash"))
_ <- IO(println(s"Calculated hash: $actualHash"))
_ <- IO(println(s"Are hashes equal? ${expectedHash == actualHash}"))
} yield ()
}
} yield StreamApp.ExitCode.Success
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment