Skip to content

Instantly share code, notes, and snippets.

@aludwiko
Last active November 17, 2017 11:24
Show Gist options
  • Save aludwiko/4f69fdc6b2f05d919f409eb9727cbab3 to your computer and use it in GitHub Desktop.
Save aludwiko/4f69fdc6b2f05d919f409eb9727cbab3 to your computer and use it in GitHub Desktop.
Source(List(1, 2, 3, 4))
.map(_ * 2)
.withAttributes(supervisionStrategy(Supervision.resumingDecider))
.map(i => if (i == 2) throw new IllegalStateException("error"))
.runWith(Sink.ignore)
val infiniteTransactionsStream =
Stream.continually(s"$randomUser() sent $randomValue() BTC to $randomUser()")
Source(infiniteTransactionsStream)
.grouped(5) //group transactions into 5 element blocks
.scan(0.hashCode.toString) { (previousBlockHash, transactions) => //calculate hashes for each block.
(transactions.hashCode(), previousBlockHash).hashCode.toString //hash for block is defined as hash of transactions hash and previous block hash
}
.runForeach(savedHash => println(s"saved $savedHash"))
Source(infiniteTransactionsStream)
.log("got transaction")
Source(infiniteTransactionsStream)
.log("got transaction")
.grouped(5)
.log("grouped transactions")
.scan(0.hashCode.toString) { (previousBlockHash, transactions) =>
(transactions.hashCode(), previousBlockHash).hashCode.toString
}
.log("hashed block")
.mapAsync(1)(saveHashAndReturnIt)
.log("saved block")
.withAttributes(logLevels(onElement = BedugLevel))
def saveHashAndReturnIt(hash: String): Future[String] = Future {
if (Random.nextInt(5) == 1) {
Thread.sleep(100000) //simulate database unavailability
}
hash
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment