Skip to content

Instantly share code, notes, and snippets.

View alexandru's full-sized avatar
😺
Having fun

Alexandru Nedelcu alexandru

😺
Having fun
View GitHub Profile
import org.joda.time._
import monix.execution._
import monix.execution.Scheduler.Implicits.global
import java.util.concurrent.TimeUnit
import scala.util.control.NonFatal
def scheduleOncePerDay(time: LocalTime, now: DateTime = DateTime.now())(cb: () => Unit)
(implicit s: Scheduler): Cancelable = {
val nextTick = {
import scala.concurrent.duration._
def scheduleTaskAtFixedRate[A](initialDelay: FiniteDuration, period: FiniteDuration, task: Task[A]): Task[Unit] =
Task.create { (s, callback) =>
def loop(delay: Long): Task[Unit] =
Task.defer {
val startedAt = s.currentTimeMillis()
val t = task.flatMap { _ =>
val duration = s.currentTimeMillis() - startedAt
val nextDelay = {

There are two problems with this piece of code:

  1. it blocks the current thread - Monix is always defaulting to process things synchronously and you might argue that this timeoutOnSlowDownstream operator violates the principle of least surprise, but if you want the timeout to work, it must not block the thread that executes onNext - consider that on top of Javascript it's not even possible

So it is better doing this and the timeout will get triggered:

def onNext(elem: Int) = Future {
  sum += elem
  Thread.sleep(20000)
import java.io.{File, FileInputStream, FileOutputStream}
import monix.eval.Task
import monix.execution.Ack
import monix.execution.Ack.{Continue, Stop}
import monix.reactive.{Consumer, Observable, Observer}
import scala.util.control.NonFatal
def copyFile(input: File, destination: File, chunkSize: Int): Task[Unit] =
Task.defer {
val in = new FileInputStream(input)
import monix.eval.TaskApp
import monix.reactive.Observable
import scala.concurrent.Await
import scala.concurrent.duration._
// Variant 1 (classic)
object Playground {
def main(args: Array[String]): Unit = {
val completedF = Observable
.intervalAtFixedRate(5.seconds)
@alexandru
alexandru / 1-problematic-macro.scala
Last active October 25, 2016 13:21
New untypecheck problem in Scala 2.12 macros
import scala.concurrent.ExecutionContext
import scala.reflect.macros.whitebox
object ExecutionUtils {
implicit class Extensions(val source: ExecutionContext)
extends AnyVal {
/** Sample extension method. */
def executeAsync(cb: => Unit): Unit =
macro ExecutionMacros.executeAsync
(defvar ocaml-packages '(tuareg merlin ocp-indent))
;; Is OPAM installed?
(if (equal "" (shell-command-to-string "which opam"))
(user-error "WARNING: OPAM is not installed, OCaml packages are not supported!")
;; Else
(progn
(setq opam-share (substring (shell-command-to-string "opam config var share 2> /dev/null") 0 -1))
(add-to-list 'load-path (concat opam-share "/emacs/site-lisp"))
;; Load and install packages if needed...
import monix.execution.Scheduler
// Executing blocking stuff on a special I/O scheduler is wise
val io = Scheduler.io()
val acquire = Task(fileChannel.lock()).executeOn(io)
def release(f: FileLock) = Task.eval(f.release())
def withLock[A](fa: Task[A]): Task[A] =
acquire.flatMap { lock =>
import monix.execution.Cancelable
import monix.execution.atomic.Atomic
import monix.execution.atomic.PaddingStrategy.LeftRight128
import scala.annotation.tailrec
import scala.collection.immutable.Queue
import scala.concurrent.{Future, Promise}
/** The `TaskSemaphore` is an asynchronous semaphore implementation that
* limits the parallelism on task execution.
*
import java.util.concurrent.TimeUnit
import monix.execution.cancelables.MultiAssignmentCancelable
import monix.execution.{Cancelable, Scheduler}
import monix.execution.schedulers.{ExecutionModel, LocalBatchingExecutor}
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration
class AkkaToMonixScheduler(
akkaScheduler: akka.actor.Scheduler,
context: ExecutionContext,