Skip to content

Instantly share code, notes, and snippets.

@timcharper
Last active April 29, 2016 23:08
Show Gist options
  • Save timcharper/24aeb204b9587a55cdbc61db969261ae to your computer and use it in GitHub Desktop.
Save timcharper/24aeb204b9587a55cdbc61db969261ae to your computer and use it in GitHub Desktop.
Demonstrate issue with gracefully stopping actorRef Source
name := "proof"
organization := "com.timcharper"
scalaVersion := "2.11.8"
val akkaVersion = "2.4.4"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-stream" % akkaVersion
)
package m
import akka.stream.scaladsl._
import akka.stream._
import akka.actor._
object Main extends App {
implicit val system = ActorSystem("hi")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
println("hi there")
val (input, completed) = Source.actorRef[Int](100, OverflowStrategy.dropNew).
conflate { (prior, overflown) =>
println(s"dropping overflown ${overflown}")
prior
}.
buffer(10, OverflowStrategy.backpressure).
async.
toMat(Sink.foreach { n =>
println(n)
Thread.sleep(100)
})(Keep.both).
run
(1 to 50).foreach { n =>
input ! n
println(s"sent ${n}")
Thread.sleep(10)
}
Thread.sleep(100)
input ! Status.Success
completed.onComplete { result =>
println(s"all done! Result = ${result}")
system.terminate()
}
}
[success] Total time: 1 s, completed Apr 29, 2016 4:59:02 PM
> as-conflate-proof hi there
as-conflate-proof sent 1
as-conflate-proof 1
as-conflate-proof sent 2
as-conflate-proof sent 3
as-conflate-proof sent 4
as-conflate-proof sent 5
as-conflate-proof sent 6
as-conflate-proof sent 7
as-conflate-proof sent 8
as-conflate-proof sent 9
as-conflate-proof sent 10
as-conflate-proof 2
as-conflate-proof sent 11
as-conflate-proof sent 12
as-conflate-proof sent 13
as-conflate-proof sent 14
as-conflate-proof sent 15
as-conflate-proof sent 16
as-conflate-proof sent 17
as-conflate-proof sent 18
as-conflate-proof sent 19
as-conflate-proof 3
as-conflate-proof sent 20
as-conflate-proof sent 21
as-conflate-proof sent 22
as-conflate-proof sent 23
as-conflate-proof sent 24
as-conflate-proof sent 25
as-conflate-proof sent 26
as-conflate-proof sent 27
as-conflate-proof sent 28
as-conflate-proof dropping overflown 28
as-conflate-proof 4
as-conflate-proof sent 29
as-conflate-proof dropping overflown 29
as-conflate-proof sent 30
as-conflate-proof dropping overflown 30
as-conflate-proof sent 31
as-conflate-proof dropping overflown 31
as-conflate-proof sent 32
as-conflate-proof dropping overflown 32
as-conflate-proof sent 33
as-conflate-proof dropping overflown 33
as-conflate-proof sent 34
as-conflate-proof dropping overflown 34
as-conflate-proof sent 35
as-conflate-proof dropping overflown 35
as-conflate-proof sent 36
as-conflate-proof dropping overflown 36
as-conflate-proof sent 37
as-conflate-proof dropping overflown 37
as-conflate-proof 5
as-conflate-proof sent 38
as-conflate-proof dropping overflown 38
as-conflate-proof sent 39
as-conflate-proof dropping overflown 39
as-conflate-proof sent 40
as-conflate-proof dropping overflown 40
as-conflate-proof sent 41
as-conflate-proof dropping overflown 41
as-conflate-proof sent 42
as-conflate-proof dropping overflown 42
as-conflate-proof sent 43
as-conflate-proof dropping overflown 43
as-conflate-proof sent 44
as-conflate-proof dropping overflown 44
as-conflate-proof sent 45
as-conflate-proof dropping overflown 45
as-conflate-proof sent 46
as-conflate-proof dropping overflown 46
as-conflate-proof sent 47
as-conflate-proof 6
as-conflate-proof dropping overflown 47
as-conflate-proof sent 48
as-conflate-proof dropping overflown 48
as-conflate-proof sent 49
as-conflate-proof dropping overflown 49
as-conflate-proof sent 50
as-conflate-proof dropping overflown 50
as-conflate-proof 7
as-conflate-proof [ERROR] [04/29/2016 16:59:03.887] [hi-akka.actor.default-dispatcher-2] [akka://hi/user/StreamSupervisor-0/flow-0-1-unknown-operation] Error in stage [Batch(1,<function1>,<function1>,<function2>)]: requirement failed: Cannot pull closed port (Batch.in)
as-conflate-proof java.lang.IllegalArgumentException: requirement failed: Cannot pull closed port (Batch.in)
as-conflate-proof at scala.Predef$.require(Predef.scala:224)
as-conflate-proof at akka.stream.stage.GraphStageLogic.pull(GraphStage.scala:346)
as-conflate-proof at akka.stream.impl.fusing.Batch$$anon$20$$anon$21.onPush(Ops.scala:598)
as-conflate-proof at akka.stream.impl.fusing.GraphInterpreter.processElement$1(GraphInterpreter.scala:587)
as-conflate-proof at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:598)
as-conflate-proof at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:539)
as-conflate-proof at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:469)
as-conflate-proof at akka.stream.impl.fusing.GraphInterpreterShell.receive(ActorGraphInterpreter.scala:408)
as-conflate-proof at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:593)
as-conflate-proof at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:604)
as-conflate-proof at akka.actor.Actor$class.aroundReceive(Actor.scala:482)
as-conflate-proof at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:519)
as-conflate-proof at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
as-conflate-proof at akka.actor.ActorCell.invoke(ActorCell.scala:495)
as-conflate-proof at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
as-conflate-proof at akka.dispatch.Mailbox.run(Mailbox.scala:224)
as-conflate-proof at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
as-conflate-proof at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
as-conflate-proof at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
as-conflate-proof at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
as-conflate-proof at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
as-conflate-proof
as-conflate-proof 8
as-conflate-proof 9
as-conflate-proof 10
as-conflate-proof 11
as-conflate-proof 12
as-conflate-proof 13
as-conflate-proof 14
as-conflate-proof 15
as-conflate-proof 16
as-conflate-proof all done! Result = Failure(java.lang.ClassCastException: akka.actor.Status$Success$ cannot be cast to java.lang.Integer)
as-conflate-proof ... finished with exit code 0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment