Skip to content

Instantly share code, notes, and snippets.

@atamborrino
atamborrino / speculativeRetries.scala
Last active June 6, 2018 14:28
Speculative retries for Scala's Futures, inspired from Cassandra Java driver http://docs.datastax.com/en/developer/java-driver/3.1/manual/speculative_execution/
import akka.actor.Scheduler
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.concurrent.duration.FiniteDuration
object FutureHelper {
private val log = ScalaLogger.get(this.getClass)
/**
* Retry concurrently the async execution ''f'' every ''delay'' while no execution completes.
* Number of retries is bounded by ''maxSpeculativeExecutions''.
@atamborrino
atamborrino / redisStream.scala
Last active April 25, 2017 14:14
Parallel SCAN per node on a Redis Cluster as an Akka Stream Source
import akka.NotUsed
import akka.stream.scaladsl.Source
import redis.clients.jedis.{Jedis, JedisCluster, ScanParams}
import scala.collection.JavaConverters._
import scala.concurrent.Future
class RedisStream(maxNodeParallelism: Int)
(implicit jedisCluster: JedisCluster,
blockingEC: RedisBlockingEC) {
@atamborrino
atamborrino / or.scala
Last active February 23, 2017 15:52
Validation "or" operator with error accumulation for Scalactic
import org.scalactic._
import org.scalactic.Accumulation._
trait Error
type Errors = Every[Error]
case object AnError extends Error
object ValidationImplicits {
implicit class ValidationOps[E](val validLeft: Validation[Every[E]]) extends AnyVal {
@atamborrino
atamborrino / TestCats.scala
Last active August 27, 2019 22:14
Using Cats vs Scalactic for error validation with error accumulation and async flow
object TestCats {
import cats._
import cats.data._
import cats.instances.future._
import cats.syntax.either._
import cats.syntax.cartesian._
import cats.instances.list._
import cats.syntax.traverse._
@atamborrino
atamborrino / UnfailableFuture.scala
Last active January 31, 2017 09:20
UnfailableFuture is a subset type of Future that can not result in a failure.
import scala.concurrent.{ExecutionContext, Future}
import scala.util.control.NonFatal
// private constructor guarantees that we can not build a semantically incorrect UnfailableFuture
class UnfailableFuture[A] private (val value: Future[A]) extends AnyVal
object UnfailableFuture {
def apply[A](fut: Future[A], fallback: Throwable => A)(implicit ec: ExecutionContext): UnfailableFuture[A] = {
val futWithFallback = fut.recover {
case NonFatal(failure) => fallback(failure)
@atamborrino
atamborrino / FutureHelper.scala
Last active November 17, 2016 14:55
Helpers for Scala's Futures
import scala.concurrent._
import scala.concurrent.duration._
import scala.util.{Failure, Success}
import scala.util.control.NonFatal
import akka.actor.Scheduler
import java.util.concurrent.atomic.AtomicInteger
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
object FutureHelper {
@atamborrino
atamborrino / a.scala
Last active November 16, 2016 10:38
Simple parallel composition of Future with max parallelism in a sliding window
import akka.stream.Materializer
import akka.stream.scaladsl._
import scala.concurrent.{Future, ExecutionContext}
// This will execute futures in parallel with a max parallelism
def traverse[A, B](as: Seq[A], parallelism: Int)(f: A => Future[B])
(implicit ec: ExecutionContext, mat: Materializer): Future[Seq[B]] = {
Source(as.toList)
.mapAsync(parallelism)(f)
.runWith(Sink.seq)
@atamborrino
atamborrino / a.scala
Last active November 3, 2016 14:21
mapPartition
def mapPartition[A, B, C](as: Seq[A])(f: A => Either[B, C]): (Seq[B], Seq[C]) = {
as.foldLeft((Vector.empty[B], Vector.empty[C])) { (acc, a) =>
f(a) match {
case Left(b) => (acc._1 :+ b, acc._2)
case Right(c) => (acc._1, acc._2 :+ c)
}
}
}
val seq: Seq[Either[String, Int]] = Seq(Left(1), Right("a"))
@atamborrino
atamborrino / a.scala
Last active November 2, 2016 17:52
Play Json vs Circe simple validation with error accumulation and no auto-derivation
case class Data(s: String, i: Int)
object Circe {
import io.circe._
import cats.implicits._
implicit val decoder: AccumulatingDecoder[Data] = (
Decoder[String].prepare(_.downField("s")) |@|
Decoder[Int].prepare(_.downField("i"))
).map(Data.apply)
.accumulating
@atamborrino
atamborrino / a.scala
Last active October 13, 2016 14:14
No warning =(
scala> sealed trait Test
defined trait Test
scala> case class Test1(a: Boolean) extends Test
defined class Test1
scala> case class Test2(a: Boolean) extends Test
defined class Test2
scala> :paste