Last active
October 9, 2017 07:55
-
-
Save LMnet/2d1b2a19ec686aa059bd3b4f03518092 to your computer and use it in GitHub Desktop.
CronSource.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ru.dgis.casino.sharpy | |
import java.util.Date | |
import java.util.concurrent.TimeUnit | |
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} | |
import akka.actor.Cancellable | |
import akka.stream.scaladsl.Source | |
import akka.stream.stage.{AsyncCallback, GraphStageLogic, GraphStageWithMaterializedValue, OutHandler, StageLogging, TimerGraphStageLogic} | |
import akka.stream.{Attributes, Outlet, SourceShape} | |
import org.quartz.CronExpression | |
import scala.concurrent.duration.FiniteDuration | |
/** | |
* Based on `TickSource` | |
*/ | |
object CronSource { | |
def apply[T](cronExpression: CronExpression, tick: T): Source[T, Cancellable] = { | |
Source.fromGraph(new CronSource(cronExpression, tick)) | |
} | |
} | |
class CronSource[T](cronExpression: CronExpression, tick: T) | |
extends GraphStageWithMaterializedValue[SourceShape[T], Cancellable] { | |
val out: Outlet[T] = Outlet("CronSource") | |
override val shape = SourceShape(out) | |
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Cancellable) = { | |
val logic = new TimerGraphStageLogic(shape) with StageLogging with Cancellable { | |
val cancelled = new AtomicBoolean(false) | |
val cancelCallback: AtomicReference[Option[AsyncCallback[Unit]]] = new AtomicReference(None) | |
val isTickUndelivered = new AtomicBoolean(false) | |
override def preStart(): Unit = { | |
super.preStart() | |
cancelCallback.set(Some(getAsyncCallback[Unit](_ ⇒ completeStage()))) | |
if (cancelled.get) completeStage() | |
else scheduleNext() | |
} | |
setHandler(out, new OutHandler { | |
override def onPull(): Unit = { | |
if (isTickUndelivered.getAndSet(false)) { | |
push(out, tick) | |
} | |
} | |
}) | |
override protected def onTimer(timerKey: Any): Unit = { | |
if (!isClosed(out) && !isCancelled) { | |
if (isAvailable(out)) { | |
push(out, tick) | |
isTickUndelivered.set(false) | |
} else { | |
isTickUndelivered.set(true) | |
} | |
scheduleNext() | |
} | |
} | |
def scheduleNext(): Unit = { | |
val currentDate = new Date | |
val nextDate = cronExpression.getNextValidTimeAfter(currentDate) | |
val delay = FiniteDuration(nextDate.getTime - currentDate.getTime, TimeUnit.MILLISECONDS) | |
log.debug(s"Scheduling $tick at ${nextDate.toInstant}") | |
scheduleOnce(tick, delay) | |
} | |
override def cancel(): Boolean = { | |
val success = !cancelled.getAndSet(true) | |
if (success) cancelCallback.get.foreach(_.invoke(())) | |
success | |
} | |
override def isCancelled: Boolean = cancelled.get | |
} | |
(logic, logic) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ru.dgis.casino.sharpy | |
import akka.actor.{ActorSystem, Cancellable} | |
import akka.stream.ActorMaterializer | |
import akka.stream.scaladsl.Sink | |
import akka.stream.testkit.TestSubscriber | |
import akka.testkit.TestKit | |
import org.quartz.CronExpression | |
import org.scalatest.FreeSpecLike | |
import ru.dgis.util.ScalatestTestKit | |
import scala.concurrent.duration._ | |
/** | |
* Based on `akka.stream.scaladsl.TickSourceSpec` | |
*/ | |
class CronSourceTest extends ScalatestTestKit with FreeSpecLike { | |
implicit val actorSystem = ActorSystem() | |
implicit val materializer = ActorMaterializer() | |
def prepareTest(): (TestSubscriber.ManualProbe[String], Cancellable) = { | |
val probe = TestSubscriber.manualProbe[String]() | |
val cronExpression = new CronExpression("* * * * * ? *") | |
val cancellable = CronSource(cronExpression, "tick").to(Sink.fromSubscriber(probe)).run() | |
(probe, cancellable) | |
} | |
"CronSource should" - { | |
"produce ticks" in { | |
val (probe, _) = prepareTest() | |
val sub = probe.expectSubscription() | |
sub.request(2) | |
probe.expectNext(1.second, "tick") | |
probe.expectNoMsg(900.millis) | |
probe.expectNext(200.millis, "tick") | |
sub.cancel() | |
probe.expectNoMsg(1.second) | |
} | |
"buffer ticks when not requested" in { | |
val (probe, _) = prepareTest() | |
val sub = probe.expectSubscription() | |
sub.request(1) | |
probe.expectNext("tick") | |
probe.expectNoMsg(1100.millis) | |
sub.request(1) | |
probe.expectNext(100.millis, "tick") | |
sub.cancel() | |
probe.expectNoMsg(1.second) | |
} | |
"be possible to cancel" in { | |
val (probe, cancellable) = prepareTest() | |
val sub = probe.expectSubscription() | |
sub.request(2) | |
probe.expectNext("tick") | |
probe.expectNoMsg(900.millis) | |
probe.expectNext(200.millis, "tick") | |
cancellable.cancel() | |
TestKit.awaitCond(cancellable.isCancelled, 200.millis) | |
sub.request(3) | |
probe.expectComplete() | |
} | |
"acknowledge cancellation only once" in { | |
val (probe, cancellable) = prepareTest() | |
val sub = probe.expectSubscription() | |
sub.request(2) | |
probe.expectNext("tick") | |
assert(cancellable.cancel() == true) | |
assert(cancellable.cancel() == false) | |
probe.expectComplete() | |
} | |
"have isCancelled mirror the cancellation state" in { | |
val (probe, cancellable) = prepareTest() | |
val sub = probe.expectSubscription() | |
sub.request(2) | |
probe.expectNext("tick") | |
assert(cancellable.isCancelled == false) | |
assert(cancellable.cancel() == true) | |
assert(cancellable.isCancelled == true) | |
probe.expectComplete() | |
} | |
"support being cancelled immediately after its materialization" in { | |
val (probe, cancellable) = prepareTest() | |
assert(cancellable.cancel() == true) | |
val sub = probe.expectSubscription() | |
sub.request(2) | |
probe.expectComplete() | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment