Created
June 5, 2014 09:36
-
-
Save tgpfeiffer/b1e765064e983449c6b6 to your computer and use it in GitHub Desktop.
Kafka does not shutdown properly
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.log4j.{LogManager, Level} | |
import org.apache.spark.SparkContext | |
import org.apache.spark.streaming.kafka.KafkaUtils | |
import org.apache.spark.streaming.{Seconds, StreamingContext} | |
import scala.concurrent._ | |
import ExecutionContext.Implicits.global | |
object KafkaDoesntShutdown { | |
def main(args: Array[String]) { | |
// get all threads | |
val threadsAtStartTime = currentThreads | |
LogManager.getRootLogger().setLevel(Level.WARN) | |
// create spark context and streaming context | |
val sc = new SparkContext("local[2]", "Kafka Issue") | |
val ssc = new StreamingContext(sc, Seconds(5)) | |
// connect to server and read text lines from Kafka server | |
val kvPairs = KafkaUtils.createStream(ssc, "localhost:2181", "1", | |
Map("mytopic" -> 2)) | |
kvPairs.print() | |
// shutdown in the future | |
future { | |
// stop the receiver | |
Thread.sleep(7000) | |
ssc.stop(true, true) | |
println("stopped StreamingContext") | |
currentThreads.diff(threadsAtStartTime).toList.foreach(t => { | |
println(t + ": " + t.getState + " -> " + t.isInterrupted) | |
t.getStackTrace.map(elem => println(" " + elem)) | |
}) | |
} | |
// start the streaming context | |
ssc.start() | |
ssc.awaitTermination() | |
println("end of main()") | |
} | |
def currentThreads = { | |
(Thread.getAllStackTraces().keySet().toArray.collect{ | |
case t: Thread if !t.isDaemon => t | |
}).toSet | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ sbt "run-main KafkaDoesntShutdown" | |
14/06/05 18:25:07 WARN zookeeper.ClientCnxnSocket: Connected to an old server; r-o mode will be unavailable | |
14/06/05 18:25:07 WARN consumer.ZookeeperConsumerConnector: [1_hostname-1401960306956-ea5b0e67], No broker partitions consumed by consumer thread 1_hostname-1401960306956-ea5b0e67-1 for topic fluentd | |
14/06/05 18:25:07 WARN storage.BlockManager: Block input-0-1401960307200 already exists on this machine; not re-adding it | |
14/06/05 18:25:07 WARN storage.BlockManager: Block input-0-1401960307400 already exists on this machine; not re-adding it | |
14/06/05 18:25:08 WARN storage.BlockManager: Block input-0-1401960308000 already exists on this machine; not re-adding it | |
14/06/05 18:25:09 WARN storage.BlockManager: Block input-0-1401960309000 already exists on this machine; not re-adding it | |
------------------------------------------- | |
Time: 1401960310000 ms | |
------------------------------------------- | |
[... Kafka output ...] | |
14/06/05 18:25:10 WARN storage.BlockManager: Block input-0-1401960310000 already exists on this machine; not re-adding it | |
14/06/05 18:25:11 WARN storage.BlockManager: Block input-0-1401960311000 already exists on this machine; not re-adding it | |
14/06/05 18:25:12 WARN storage.BlockManager: Block input-0-1401960312000 already exists on this machine; not re-adding it | |
14/06/05 18:25:13 WARN storage.BlockManager: Block input-0-1401960313000 already exists on this machine; not re-adding it | |
14/06/05 18:25:13 ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver | |
14/06/05 18:25:13 WARN receiver.ReceiverSupervisorImpl: Stopped executor without error | |
14/06/05 18:25:13 WARN scheduler.ReceiverTracker: All of the receivers have not deregistered, Map(0 -> ReceiverInfo(0,KafkaReceiver-0,null,false,localhost,Stopped by driver,)) | |
------------------------------------------- | |
Time: 1401960315000 ms | |
------------------------------------------- | |
[... Kafka output ...] | |
------------------------------------------- | |
Time: 1401960320000 ms | |
------------------------------------------- | |
end of main() | |
14/06/05 18:25:21 ERROR network.ConnectionManager: Corresponding SendingConnectionManagerId not found | |
14/06/05 18:25:21 WARN network.ConnectionManager: All connections not cleaned up | |
stopped StreamingContext | |
Thread[pool-12-thread-1,5,run-main-group-0]: WAITING -> false | |
sun.misc.Unsafe.park(Native Method) | |
java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) | |
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) | |
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) | |
kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:63) | |
kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33) | |
kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) | |
kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) | |
scala.collection.Iterator$class.foreach(Iterator.scala:727) | |
kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32) | |
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) | |
kafka.consumer.KafkaStream.foreach(KafkaStream.scala:25) | |
org.apache.spark.streaming.kafka.KafkaReceiver$MessageHandler.run(KafkaInputDStream.scala:127) | |
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) | |
java.util.concurrent.FutureTask.run(FutureTask.java:262) | |
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) | |
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) | |
java.lang.Thread.run(Thread.java:744) | |
Thread[pool-12-thread-2,5,run-main-group-0]: WAITING -> false | |
sun.misc.Unsafe.park(Native Method) | |
java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) | |
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) | |
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) | |
kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:63) | |
kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33) | |
kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) | |
kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) | |
scala.collection.Iterator$class.foreach(Iterator.scala:727) | |
kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32) | |
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) | |
kafka.consumer.KafkaStream.foreach(KafkaStream.scala:25) | |
org.apache.spark.streaming.kafka.KafkaReceiver$MessageHandler.run(KafkaInputDStream.scala:127) | |
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) | |
java.util.concurrent.FutureTask.run(FutureTask.java:262) | |
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) | |
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) | |
java.lang.Thread.run(Thread.java:744) | |
Thread[1_hostname-1401960306956-ea5b0e67-leader-finder-thread,5,run-main-group-0]: WAITING -> false | |
sun.misc.Unsafe.park(Native Method) | |
java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) | |
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) | |
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:61) | |
kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) | |
Thread[ConsumerFetcherThread-1_hostname-1401960306956-ea5b0e67-0-0,5,run-main-group-0]: RUNNABLE -> false | |
sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) | |
sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) | |
sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79) | |
sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87) | |
sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98) | |
sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:221) | |
sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) | |
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) | |
kafka.utils.Utils$.read(Utils.scala:375) | |
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) | |
kafka.network.Receive$class.readCompletely(Transmission.scala:56) | |
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) | |
kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) | |
kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73) | |
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) | |
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109) | |
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) | |
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) | |
kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) | |
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108) | |
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) | |
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) | |
kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) | |
kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107) | |
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96) | |
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) | |
kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ sbt "run-main KafkaDoesntShutdown" | |
[info] Loading project definition from /home/tgp/workspace/spark-playground/project | |
[info] Set current project to spark-playground (in build file:/home/tgp/workspace/spark-playground/) | |
[info] Compiling 1 Scala source to /home/tgp/workspace/spark-playground/target/scala-2.10/classes... | |
[info] Running KafkaDoesntShutdown | |
14/06/05 18:32:04 WARN zookeeper.ClientCnxnSocket: Connected to an old server; r-o mode will be unavailable | |
------------------------------------------- | |
Time: 1401960725000 ms | |
------------------------------------------- | |
14/06/05 18:32:05 WARN consumer.ZookeeperConsumerConnector: [1_hostname-1401960724884-87a111e6], No broker partitions consumed by consumer thread 1_hostname-1401960724884-87a111e6-1 for topic mytopic | |
14/06/05 18:32:05 WARN storage.BlockManager: Block input-0-1401960725200 already exists on this machine; not re-adding it | |
14/06/05 18:32:06 WARN storage.BlockManager: Block input-0-1401960726000 already exists on this machine; not re-adding it | |
14/06/05 18:32:07 WARN storage.BlockManager: Block input-0-1401960727000 already exists on this machine; not re-adding it | |
14/06/05 18:32:08 WARN storage.BlockManager: Block input-0-1401960728000 already exists on this machine; not re-adding it | |
14/06/05 18:32:09 WARN storage.BlockManager: Block input-0-1401960729000 already exists on this machine; not re-adding it | |
------------------------------------------- | |
Time: 1401960730000 ms | |
------------------------------------------- | |
[... Kafka output ...] | |
14/06/05 18:32:10 WARN storage.BlockManager: Block input-0-1401960730000 already exists on this machine; not re-adding it | |
14/06/05 18:32:11 WARN storage.BlockManager: Block input-0-1401960731000 already exists on this machine; not re-adding it | |
14/06/05 18:32:11 WARN consumer.SimpleConsumer: Reconnect due to socket error: null | |
14/06/05 18:32:11 ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver | |
14/06/05 18:32:11 WARN receiver.ReceiverSupervisorImpl: Stopped executor without error | |
14/06/05 18:32:11 WARN scheduler.ReceiverTracker: All of the receivers have not deregistered, Map(0 -> ReceiverInfo(0,MyOwnKafkaReceiver-0,null,false,localhost,Stopped by driver,)) | |
------------------------------------------- | |
Time: 1401960735000 ms | |
------------------------------------------- | |
[... Kafka output ...] | |
------------------------------------------- | |
Time: 1401960740000 ms | |
------------------------------------------- | |
end of main() | |
14/06/05 18:32:21 ERROR network.ConnectionManager: Corresponding SendingConnectionManagerId not found | |
14/06/05 18:32:21 WARN network.ConnectionManager: All connections not cleaned up | |
stopped StreamingContext | |
Thread[pool-12-thread-1,5,run-main-group-0]: WAITING -> false | |
sun.misc.Unsafe.park(Native Method) | |
java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) | |
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) | |
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) | |
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068) | |
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) | |
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) | |
java.lang.Thread.run(Thread.java:744) | |
Thread[pool-12-thread-2,5,run-main-group-0]: WAITING -> false | |
sun.misc.Unsafe.park(Native Method) | |
java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) | |
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) | |
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) | |
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068) | |
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) | |
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) | |
java.lang.Thread.run(Thread.java:744) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment