Skip to content

Instantly share code, notes, and snippets.

View afsalthaj's full-sized avatar

Afsal Thaj afsalthaj

View GitHub Profile
import com.cronutils.model._
import com.cronutils.model.definition._
import java.time._
import com.cronutils.parser.CronParser
import com.cronutils.model.time.ExecutionTime
import CronOps._
import StepDirection._
import java.util.Optional
import scala.util.Try

My note on scalaz.Task approaches. This is mainly for old code!!!!

I have found wrong usages of scalaz.Task in old programs.

Why this is important?

When you bring in FP ornaments into your code, it comes with certain cost. The first and formost is additional learning curve for new developers to come in and understand the concepts of total, side effect free, lazy descriptions of computations. This has its advantage of being able to ship robust code with equational reasoning. But what if the usage of FP constructs itself is wrong?

Most of the old scala-FP code has the usages of scalaz.Task which is one of the most popular FP datastructure then. It's good to use it even today although there are typelevel libraries and ZIO providing better choices - cats.IO, ZIO, Monix.Task

object Repeat {
def apply(f: Task[Unit], duration: Duration): Task[List[Any]] =
Nondeterminism[Task].gatherUnordered(List(Task.fork(f), Task.fork { apply(f, duration).after(duration) }))
}
// This is a demo how functional and recursive thinking could help us write unbreakable, stack safe code.
// **This is not any IP, but a general pattern of code solving a usual simple usecase**
// and that also show how safe it is to play with lazy structures (although I did that with scala.Stream as my project didnt have dependencies to fs2)
// Do note that, the entire logic can be just tested with `scala.Double, or scala.Int` and needn't get into the details of `java.time.Instant`
/**
* Given the data freshness (eg: data always arrives by 2 days late: hence offset = 2.days,
* and the schedule be once in 10 days (schedule = 10.days),
* find out all the time periods (start time and end time) for which the spark job should run
* such that the last result was updated at `lastUpdatedTime`. It should also handle cron schedules.
import scala.util.Random
sealed trait Streamss[A, B]
object Streamss extends App {
case class NeedInput[A, B](f: A => Streamss[A, B]) extends Streamss[A, B]
case class HasOutput[A, B](b: B, s: Streamss[A, B]) extends Streamss[A, B]
// http://degoes.net/articles/zio-challenge
// This is a quick try with Ref and I will keep trying!
import scalaz.zio.{UIO, ZIO, _}
final case class Percentage(value: Double) extends AnyVal
/**
* A `Tap` adjusts the flow of tasks through
* an external service in response to observed
//
// main.c
// Explanation of pointer to pointer by deleting a node from linked list.
//
// Created by Afsal Thaj on 10/3/19.
// Copyright © 2019 Afsal Thaj. All rights reserved.
//
//
// The concept discuss about pointers to pointers: https://www.eskimo.com/~scs/cclass/int/sx8.html
import fs2.Stream // 0.10.6, a few things changed in 1.0.0
import cats.effect.IO
import scalaz.syntax.monad._
import scalaz.{\/, -\/, \/-}
import scalaz.Monad
// Resource acquisition with fs2.Stream.bracket, `rethrow` in fs2, along with `catchable` in scalaz (or `MonadError` in cats) just makes life easy.
// Forming a stream where one of the data fetch resulted in error.
@ def formStream[F[_]]: Stream[F, Throwable \/ String] = Stream(\/-("1"), -\/(new RuntimeException("an exception during the streaming process"))).covary[F]
import cats.effect.{ Effect, Sync }
import org.http4s.MediaType.`application/json`
import org.http4s.{ EntityDecoder, EntityEncoder, Headers, Method, Request, Status, Uri }
import org.http4s.client.Client
import org.http4s.headers.{ Accept, `Content-Type` }
import scalaz.{ EitherT, \/ }
import cats.syntax.applicative._
import cats.syntax.functor._
import org.http4s.client.blaze.Http1Client
import scalaz.syntax.either._
/**
* Retrying using Fs2, where if the result is `right` of disjunction stream stops, else retries for ever
* with optional `FinitDuration` of delay between each retry, with logging as `Left \/ Right => IO[Unit]`.
*/
@SuppressWarnings(Array("org.wartremover.warts.DefaultArguments", "org.wartremover.warts.Recursion"))
trait Fs2StreamRetrySupport {
def delayed[F[_], A](b: => F[A], delay: Option[FiniteDuration])(implicit F: Effect[F]): Stream[F, A] = {
delay.fold(
Stream.eval[F, A](b)
)(d => Stream.eval[F, A] {