Skip to content

Instantly share code, notes, and snippets.

@atamborrino
atamborrino / sequentialTraverse.scala
Created August 23, 2016 14:09
Sequential traverse
def sequentialTraverse[A, B](as: Seq[A])(f: A => Future[B]): Future[Seq[B]] = {
as.foldLeft(Future.sucessful(Vector.empty[B])) { (futAcc, a) =>
for {
acc <- futAcc
b <- f(a)
} yield acc :+ b
}
}
@atamborrino
atamborrino / test.scala
Last active July 20, 2016 12:49
Akka Stream GroupBy: we want to achieve ordered processing per substream + paralell processing between substreams
/// Working version
val fixedThreadPool = ExecutionContext.fromExecutorService(
Executors.newFixedThreadPool(10)
)
case class KafkaRecord(partition: Int, value: Int)
val records = (1 until 1000).map(i => KafkaRecord(i % 10, i))
val flow = Flow[KafkaRecord].mapAsync(1) { i =>
@atamborrino
atamborrino / sparkEc.scala
Last active December 10, 2020 16:01
Serializable Scala ExecutionContext for Spark. Allows to automatically re-create a thread-pool per Spark worker.
class ECProvider()(implicit conf: Config) extends Serializable {
@transient implicit lazy val ec: ExecutionContext = {
ExecutionContext.fromExecutorService(
Executors.newWorkStealingPool(conf.getForkJoinPoolMaxParallelism())
)
}
}
@atamborrino
atamborrino / main.scala
Last active October 11, 2023 23:21
Spark Streaming Kafka at-least-once with manual offset commit in Zookeeper (i.e not using Spark Streaming checkpoints that may be not recoverable after code changes)
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import com.samsung.sami.common.Curator
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.{Logging, SparkConf}
import org.scalactic.{One, Bad, Good}
import scala.util.control.NonFatal
@atamborrino
atamborrino / cassStream.scala
Last active March 2, 2020 12:02
Cassandra Akka Stream on top of Java client with no other deps
import akka.stream.scaladsl.Source
import com.datastax.driver.core.{Row, Session, Statement}
import com.google.common.util.concurrent.{Futures, FutureCallback, ListenableFuture}
import scala.concurrent.{Promise, Future, ExecutionContext}
object StreamCassandraHelper {
import scala.collection.JavaConversions._
def executeAsStream(stmt: Statement)(implicit ec: ExecutionContext, session: Session): Source[Row, Unit] = {
@atamborrino
atamborrino / ErrorFailure.scala
Last active April 5, 2019 08:40
Monad transformer for Future[A Or Every[Error]]
package models.error
import org.scalactic._
import scala.concurrent.{ExecutionContext, Future}
import org.scalactic.Accumulation._
import scala.util.Success
trait Error
@atamborrino
atamborrino / ReactiveResultSet.scala
Last active January 25, 2019 11:42
Get the result of a JDBC query as a reactive stream with proper resource cleaning
import java.sql.{PreparedStatement, Statement, Connection, ResultSet}
import akka.stream.scaladsl._
import org.slf4j.Logger
import scala.concurrent.{Future, ExecutionContext}
import scala.util.{Success, Try}
import scala.util.control.NonFatal
// ExecutionContext for blocking ops

Ingénieur Senior DevOps

L’internet des objets est une véritable révolution numérique. Les prédictions annoncent 20, 50 ou 100 milliards d’objets connectés en 2020, un changement radical pour notre société et un défi pour notre industrie. Les développements technologiques autour des objets connectés commencent aujourd’hui. Samsung, à travers de nombreuses initiatives et à différents niveaux (matériel, logiciels, services), est un acteur majeur de cette révolution.

SSIC (Samsung Strategy and Innovation Center), créé en 2012 pour promouvoir l’innovation au sein de Samsung, monte une équipe dédiée au développement de logiciels et services autour des objets connectés. Cette équipe, réduite, agile et extrêmement motivée, construit aujourd’hui une plateforme ouverte, dans le “cloud”, pour permettre le stockage des données venant de n’importe quel objet connecté et l’hébergement de services transverses. Le but de cette plateforme est d’améliorer la vie et la santé des gens et d’apporter un peu de magie dans notr

@atamborrino
atamborrino / firstSuccessfulOf.scala
Last active January 27, 2016 13:53
Future.firstSuccessfulOf
import scala.concurrent._
import scala.util.{Success, Failure}
import scala.collection.immutable.Seq
// Returns first successful future result, or last failure if all futures failed
def firstSuccessfulOf[A](futures: Seq[Future[A]])(implicit executor: ExecutionContext): Future[A] = {
val total = futures.length
val counter = new java.util.concurrent.atomic.AtomicInteger(total)
val p = Promise[A]()
package com.samsung.sami.kafka.utils
import akka.actor.Scheduler
import org.slf4j.Logger
import scala.concurrent._
import scala.concurrent.duration._
import scala.util.control.NonFatal
case class TimeoutException(msg: String) extends RuntimeException(msg)