Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save afsalthaj/910e16cab83e88bf75a05208ce9a991e to your computer and use it in GitHub Desktop.
Save afsalthaj/910e16cab83e88bf75a05208ce9a991e to your computer and use it in GitHub Desktop.
import java.time.Instant
import java.util.concurrent.atomic.{AtomicReference, AtomicReferenceArray}
import monix.eval.Task
import monix.execution.atomic.AtomicLong
import monix.reactive.{Consumer, Observable}
import monix.reactive.observables.ObservableLike.Transformer
import scala.annotation.tailrec
import scala.util.Random
import scala.concurrent.duration._
import monix.execution.Scheduler.Implicits.global
// Cleanup happens on unsubscribe()
class MessageSource {
val numberOfRecords = new java.util.concurrent.atomic.AtomicLong(0)
val numberOfErrorRecords =new java.util.concurrent.atomic.AtomicLong(0)
case class ProductWithExpiryDate(s: String, time: Instant)
case class ProductWithRetailCustomerId(s: ProductWithExpiryDate, id: Int)
case class Eventhub(var list: List[ProductWithRetailCustomerId])
def sendToEventHub(p: ProductWithRetailCustomerId): Task[Unit] =
Task.delay {
numberOfRecords.getAndIncrement()
}.map(_ => ()).onErrorHandle { _ =>
numberOfErrorRecords.getAndIncrement()
}.map(_ => ())
import java.time.Instant
val queue: scala.collection.mutable.Queue[String] =
scala.collection.mutable.Queue[String]()
def addTime(s: String): Task[ProductWithExpiryDate] =
Task.delay {
Instant.now()
}.map(t => ProductWithExpiryDate(s, t))
def transformerForTime: Transformer[String, ProductWithExpiryDate] =
_.mapTask(addTime)
def tranformerForAddId: Transformer[ProductWithExpiryDate, ProductWithRetailCustomerId] =
_.mapTask(t => Task.delay { Random.nextInt() }.map(tt => ProductWithRetailCustomerId(t, tt)))
def pipeline: Transformer[String, ProductWithRetailCustomerId] =
_.transform(transformerForTime).transform(tranformerForAddId)
def consumer: Consumer[ProductWithRetailCustomerId, Unit] =
Consumer.foreachParallelAsync(10)(sendToEventHub)
def run: Task[Unit] = {
def loop: Observable[ProductWithRetailCustomerId] =
if (queue.isEmpty)
loop
else
Observable.now(queue.dequeue()).transform(pipeline)
loop.consumeWith(consumer).onErrorHandle(e => println("Error occured" + e))
}
}
// val s = new MessageSource
// s.queue.enqueue("John")
// s.numberOfRecords = 0
// s.run.runAsync
// s.numberOfRecords = 1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment