-
-
Save ivportilla/f0ab7c418b3f0f0df7749e45c945201e to your computer and use it in GitHub Desktop.
click stream example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.awt.Dimension | |
import java.awt.event.MouseAdapter | |
import java.awt.event.MouseEvent | |
import javax.swing.JFrame | |
import scala.collection.immutable | |
import scala.concurrent.duration._ | |
import akka.actor._ | |
import akka.stream._ | |
import akka.stream.scaladsl._ | |
import akka.stream.stage._ | |
object ClickStreamExample extends App { | |
implicit class SourceEnriched[A, Mat](stream: Source[A, Mat]) { | |
/** | |
* Accumulates elements as long as they arrive within the time of `duration` | |
* after the previous element. | |
*/ | |
def throttle(duration: FiniteDuration): Source[immutable.Seq[A], Mat] = { | |
require(duration > Duration.Zero) | |
stream.via(new Throttle[A](duration)).withAttributes(Attributes.name("throttle")) | |
} | |
} | |
def mkClickFrame(ref: ActorRef): Unit = { | |
new JFrame("Click Stream Example") { | |
setPreferredSize(new Dimension(300, 300)) | |
setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE) | |
addMouseListener(new MouseAdapter { | |
override def mouseClicked(e: MouseEvent) = | |
ref ! e | |
}) | |
pack() | |
setVisible(true) | |
} | |
} | |
implicit val system = ActorSystem("TestSystem") | |
implicit val materializer = ActorMaterializer() | |
val clickStream = Source | |
.actorRef[MouseEvent](bufferSize = 0, OverflowStrategy.fail) | |
.mapMaterializedValue(mkClickFrame) | |
val multiClickStream = clickStream | |
.throttle(250.millis) | |
.map(clickEvents => clickEvents.length) | |
.filter(numberOfClicks => numberOfClicks >= 2) | |
multiClickStream runForeach println | |
} | |
/** | |
* Implementation copied from [[akka.stream.impl.fusing.GroupedWithin]] and | |
* adapted to our needs. | |
*/ | |
final class Throttle[A](duration: FiniteDuration) extends GraphStage[FlowShape[A, immutable.Seq[A]]] { | |
val in = Inlet[A]("in") | |
val out = Outlet[immutable.Seq[A]]("out") | |
val shape = FlowShape(in, out) | |
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { | |
val buf: immutable.VectorBuilder[A] = new immutable.VectorBuilder | |
var groupClosed = false | |
var finished = false | |
val GroupedWithinTimer = "GroupedWithinTimer" | |
override def preStart() = { | |
pull(in) | |
} | |
def nextElement(elem: A): Unit = { | |
buf += elem | |
scheduleOnce(GroupedWithinTimer, duration) | |
pull(in) | |
} | |
def closeGroup(): Unit = { | |
groupClosed = true | |
if (isAvailable(out)) emitGroup() | |
} | |
def emitGroup(): Unit = { | |
push(out, buf.result()) | |
buf.clear() | |
if (!finished) startNewGroup() | |
else completeStage() | |
} | |
def startNewGroup(): Unit = { | |
groupClosed = false | |
if (isAvailable(in)) nextElement(grab(in)) | |
else if (!hasBeenPulled(in)) pull(in) | |
} | |
setHandler(in, new InHandler { | |
override def onPush(): Unit = | |
if (!groupClosed) | |
nextElement(grab(in)) // otherwise keep the element for next round | |
override def onUpstreamFinish(): Unit = { | |
finished = true | |
if (!groupClosed) closeGroup() | |
else completeStage() | |
} | |
override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex) | |
}) | |
setHandler(out, new OutHandler { | |
override def onPull(): Unit = if (groupClosed) emitGroup() | |
override def onDownstreamFinish(): Unit = completeStage() | |
}) | |
override protected def onTimer(timerKey: Any) = | |
closeGroup() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment