Last active
April 29, 2016 23:08
-
-
Save timcharper/24aeb204b9587a55cdbc61db969261ae to your computer and use it in GitHub Desktop.
Demonstrate issue with gracefully stopping actorRef Source
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
name := "proof" | |
organization := "com.timcharper" | |
scalaVersion := "2.11.8" | |
val akkaVersion = "2.4.4" | |
libraryDependencies ++= Seq( | |
"com.typesafe.akka" %% "akka-actor" % akkaVersion, | |
"com.typesafe.akka" %% "akka-stream" % akkaVersion | |
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package m | |
import akka.stream.scaladsl._ | |
import akka.stream._ | |
import akka.actor._ | |
object Main extends App { | |
implicit val system = ActorSystem("hi") | |
implicit val materializer = ActorMaterializer() | |
implicit val ec = system.dispatcher | |
println("hi there") | |
val (input, completed) = Source.actorRef[Int](100, OverflowStrategy.dropNew). | |
conflate { (prior, overflown) => | |
println(s"dropping overflown ${overflown}") | |
prior | |
}. | |
buffer(10, OverflowStrategy.backpressure). | |
async. | |
toMat(Sink.foreach { n => | |
println(n) | |
Thread.sleep(100) | |
})(Keep.both). | |
run | |
(1 to 50).foreach { n => | |
input ! n | |
println(s"sent ${n}") | |
Thread.sleep(10) | |
} | |
Thread.sleep(100) | |
input ! Status.Success | |
completed.onComplete { result => | |
println(s"all done! Result = ${result}") | |
system.terminate() | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[success] Total time: 1 s, completed Apr 29, 2016 4:59:02 PM | |
> as-conflate-proof hi there | |
as-conflate-proof sent 1 | |
as-conflate-proof 1 | |
as-conflate-proof sent 2 | |
as-conflate-proof sent 3 | |
as-conflate-proof sent 4 | |
as-conflate-proof sent 5 | |
as-conflate-proof sent 6 | |
as-conflate-proof sent 7 | |
as-conflate-proof sent 8 | |
as-conflate-proof sent 9 | |
as-conflate-proof sent 10 | |
as-conflate-proof 2 | |
as-conflate-proof sent 11 | |
as-conflate-proof sent 12 | |
as-conflate-proof sent 13 | |
as-conflate-proof sent 14 | |
as-conflate-proof sent 15 | |
as-conflate-proof sent 16 | |
as-conflate-proof sent 17 | |
as-conflate-proof sent 18 | |
as-conflate-proof sent 19 | |
as-conflate-proof 3 | |
as-conflate-proof sent 20 | |
as-conflate-proof sent 21 | |
as-conflate-proof sent 22 | |
as-conflate-proof sent 23 | |
as-conflate-proof sent 24 | |
as-conflate-proof sent 25 | |
as-conflate-proof sent 26 | |
as-conflate-proof sent 27 | |
as-conflate-proof sent 28 | |
as-conflate-proof dropping overflown 28 | |
as-conflate-proof 4 | |
as-conflate-proof sent 29 | |
as-conflate-proof dropping overflown 29 | |
as-conflate-proof sent 30 | |
as-conflate-proof dropping overflown 30 | |
as-conflate-proof sent 31 | |
as-conflate-proof dropping overflown 31 | |
as-conflate-proof sent 32 | |
as-conflate-proof dropping overflown 32 | |
as-conflate-proof sent 33 | |
as-conflate-proof dropping overflown 33 | |
as-conflate-proof sent 34 | |
as-conflate-proof dropping overflown 34 | |
as-conflate-proof sent 35 | |
as-conflate-proof dropping overflown 35 | |
as-conflate-proof sent 36 | |
as-conflate-proof dropping overflown 36 | |
as-conflate-proof sent 37 | |
as-conflate-proof dropping overflown 37 | |
as-conflate-proof 5 | |
as-conflate-proof sent 38 | |
as-conflate-proof dropping overflown 38 | |
as-conflate-proof sent 39 | |
as-conflate-proof dropping overflown 39 | |
as-conflate-proof sent 40 | |
as-conflate-proof dropping overflown 40 | |
as-conflate-proof sent 41 | |
as-conflate-proof dropping overflown 41 | |
as-conflate-proof sent 42 | |
as-conflate-proof dropping overflown 42 | |
as-conflate-proof sent 43 | |
as-conflate-proof dropping overflown 43 | |
as-conflate-proof sent 44 | |
as-conflate-proof dropping overflown 44 | |
as-conflate-proof sent 45 | |
as-conflate-proof dropping overflown 45 | |
as-conflate-proof sent 46 | |
as-conflate-proof dropping overflown 46 | |
as-conflate-proof sent 47 | |
as-conflate-proof 6 | |
as-conflate-proof dropping overflown 47 | |
as-conflate-proof sent 48 | |
as-conflate-proof dropping overflown 48 | |
as-conflate-proof sent 49 | |
as-conflate-proof dropping overflown 49 | |
as-conflate-proof sent 50 | |
as-conflate-proof dropping overflown 50 | |
as-conflate-proof 7 | |
as-conflate-proof [ERROR] [04/29/2016 16:59:03.887] [hi-akka.actor.default-dispatcher-2] [akka://hi/user/StreamSupervisor-0/flow-0-1-unknown-operation] Error in stage [Batch(1,<function1>,<function1>,<function2>)]: requirement failed: Cannot pull closed port (Batch.in) | |
as-conflate-proof java.lang.IllegalArgumentException: requirement failed: Cannot pull closed port (Batch.in) | |
as-conflate-proof at scala.Predef$.require(Predef.scala:224) | |
as-conflate-proof at akka.stream.stage.GraphStageLogic.pull(GraphStage.scala:346) | |
as-conflate-proof at akka.stream.impl.fusing.Batch$$anon$20$$anon$21.onPush(Ops.scala:598) | |
as-conflate-proof at akka.stream.impl.fusing.GraphInterpreter.processElement$1(GraphInterpreter.scala:587) | |
as-conflate-proof at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:598) | |
as-conflate-proof at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:539) | |
as-conflate-proof at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:469) | |
as-conflate-proof at akka.stream.impl.fusing.GraphInterpreterShell.receive(ActorGraphInterpreter.scala:408) | |
as-conflate-proof at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:593) | |
as-conflate-proof at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:604) | |
as-conflate-proof at akka.actor.Actor$class.aroundReceive(Actor.scala:482) | |
as-conflate-proof at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:519) | |
as-conflate-proof at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) | |
as-conflate-proof at akka.actor.ActorCell.invoke(ActorCell.scala:495) | |
as-conflate-proof at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) | |
as-conflate-proof at akka.dispatch.Mailbox.run(Mailbox.scala:224) | |
as-conflate-proof at akka.dispatch.Mailbox.exec(Mailbox.scala:234) | |
as-conflate-proof at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) | |
as-conflate-proof at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) | |
as-conflate-proof at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) | |
as-conflate-proof at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) | |
as-conflate-proof | |
as-conflate-proof 8 | |
as-conflate-proof 9 | |
as-conflate-proof 10 | |
as-conflate-proof 11 | |
as-conflate-proof 12 | |
as-conflate-proof 13 | |
as-conflate-proof 14 | |
as-conflate-proof 15 | |
as-conflate-proof 16 | |
as-conflate-proof all done! Result = Failure(java.lang.ClassCastException: akka.actor.Status$Success$ cannot be cast to java.lang.Integer) | |
as-conflate-proof ... finished with exit code 0 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment