Skip to content

Instantly share code, notes, and snippets.

@tgpfeiffer
Created June 5, 2014 09:36
Show Gist options
  • Save tgpfeiffer/b1e765064e983449c6b6 to your computer and use it in GitHub Desktop.
Save tgpfeiffer/b1e765064e983449c6b6 to your computer and use it in GitHub Desktop.
Kafka does not shutdown properly
import org.apache.log4j.{LogManager, Level}
import org.apache.spark.SparkContext
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.concurrent._
import ExecutionContext.Implicits.global
object KafkaDoesntShutdown {
def main(args: Array[String]) {
// get all threads
val threadsAtStartTime = currentThreads
LogManager.getRootLogger().setLevel(Level.WARN)
// create spark context and streaming context
val sc = new SparkContext("local[2]", "Kafka Issue")
val ssc = new StreamingContext(sc, Seconds(5))
// connect to server and read text lines from Kafka server
val kvPairs = KafkaUtils.createStream(ssc, "localhost:2181", "1",
Map("mytopic" -> 2))
kvPairs.print()
// shutdown in the future
future {
// stop the receiver
Thread.sleep(7000)
ssc.stop(true, true)
println("stopped StreamingContext")
currentThreads.diff(threadsAtStartTime).toList.foreach(t => {
println(t + ": " + t.getState + " -> " + t.isInterrupted)
t.getStackTrace.map(elem => println(" " + elem))
})
}
// start the streaming context
ssc.start()
ssc.awaitTermination()
println("end of main()")
}
def currentThreads = {
(Thread.getAllStackTraces().keySet().toArray.collect{
case t: Thread if !t.isDaemon => t
}).toSet
}
}
$ sbt "run-main KafkaDoesntShutdown"
14/06/05 18:25:07 WARN zookeeper.ClientCnxnSocket: Connected to an old server; r-o mode will be unavailable
14/06/05 18:25:07 WARN consumer.ZookeeperConsumerConnector: [1_hostname-1401960306956-ea5b0e67], No broker partitions consumed by consumer thread 1_hostname-1401960306956-ea5b0e67-1 for topic fluentd
14/06/05 18:25:07 WARN storage.BlockManager: Block input-0-1401960307200 already exists on this machine; not re-adding it
14/06/05 18:25:07 WARN storage.BlockManager: Block input-0-1401960307400 already exists on this machine; not re-adding it
14/06/05 18:25:08 WARN storage.BlockManager: Block input-0-1401960308000 already exists on this machine; not re-adding it
14/06/05 18:25:09 WARN storage.BlockManager: Block input-0-1401960309000 already exists on this machine; not re-adding it
-------------------------------------------
Time: 1401960310000 ms
-------------------------------------------
[... Kafka output ...]
14/06/05 18:25:10 WARN storage.BlockManager: Block input-0-1401960310000 already exists on this machine; not re-adding it
14/06/05 18:25:11 WARN storage.BlockManager: Block input-0-1401960311000 already exists on this machine; not re-adding it
14/06/05 18:25:12 WARN storage.BlockManager: Block input-0-1401960312000 already exists on this machine; not re-adding it
14/06/05 18:25:13 WARN storage.BlockManager: Block input-0-1401960313000 already exists on this machine; not re-adding it
14/06/05 18:25:13 ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver
14/06/05 18:25:13 WARN receiver.ReceiverSupervisorImpl: Stopped executor without error
14/06/05 18:25:13 WARN scheduler.ReceiverTracker: All of the receivers have not deregistered, Map(0 -> ReceiverInfo(0,KafkaReceiver-0,null,false,localhost,Stopped by driver,))
-------------------------------------------
Time: 1401960315000 ms
-------------------------------------------
[... Kafka output ...]
-------------------------------------------
Time: 1401960320000 ms
-------------------------------------------
end of main()
14/06/05 18:25:21 ERROR network.ConnectionManager: Corresponding SendingConnectionManagerId not found
14/06/05 18:25:21 WARN network.ConnectionManager: All connections not cleaned up
stopped StreamingContext
Thread[pool-12-thread-1,5,run-main-group-0]: WAITING -> false
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:63)
kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
scala.collection.Iterator$class.foreach(Iterator.scala:727)
kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
kafka.consumer.KafkaStream.foreach(KafkaStream.scala:25)
org.apache.spark.streaming.kafka.KafkaReceiver$MessageHandler.run(KafkaInputDStream.scala:127)
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
java.util.concurrent.FutureTask.run(FutureTask.java:262)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:744)
Thread[pool-12-thread-2,5,run-main-group-0]: WAITING -> false
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:63)
kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
scala.collection.Iterator$class.foreach(Iterator.scala:727)
kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
kafka.consumer.KafkaStream.foreach(KafkaStream.scala:25)
org.apache.spark.streaming.kafka.KafkaReceiver$MessageHandler.run(KafkaInputDStream.scala:127)
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
java.util.concurrent.FutureTask.run(FutureTask.java:262)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:744)
Thread[1_hostname-1401960306956-ea5b0e67-leader-finder-thread,5,run-main-group-0]: WAITING -> false
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:61)
kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
Thread[ConsumerFetcherThread-1_hostname-1401960306956-ea5b0e67-0-0,5,run-main-group-0]: RUNNABLE -> false
sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:221)
sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
kafka.utils.Utils$.read(Utils.scala:375)
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
kafka.network.Receive$class.readCompletely(Transmission.scala:56)
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73)
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
$ sbt "run-main KafkaDoesntShutdown"
[info] Loading project definition from /home/tgp/workspace/spark-playground/project
[info] Set current project to spark-playground (in build file:/home/tgp/workspace/spark-playground/)
[info] Compiling 1 Scala source to /home/tgp/workspace/spark-playground/target/scala-2.10/classes...
[info] Running KafkaDoesntShutdown
14/06/05 18:32:04 WARN zookeeper.ClientCnxnSocket: Connected to an old server; r-o mode will be unavailable
-------------------------------------------
Time: 1401960725000 ms
-------------------------------------------
14/06/05 18:32:05 WARN consumer.ZookeeperConsumerConnector: [1_hostname-1401960724884-87a111e6], No broker partitions consumed by consumer thread 1_hostname-1401960724884-87a111e6-1 for topic mytopic
14/06/05 18:32:05 WARN storage.BlockManager: Block input-0-1401960725200 already exists on this machine; not re-adding it
14/06/05 18:32:06 WARN storage.BlockManager: Block input-0-1401960726000 already exists on this machine; not re-adding it
14/06/05 18:32:07 WARN storage.BlockManager: Block input-0-1401960727000 already exists on this machine; not re-adding it
14/06/05 18:32:08 WARN storage.BlockManager: Block input-0-1401960728000 already exists on this machine; not re-adding it
14/06/05 18:32:09 WARN storage.BlockManager: Block input-0-1401960729000 already exists on this machine; not re-adding it
-------------------------------------------
Time: 1401960730000 ms
-------------------------------------------
[... Kafka output ...]
14/06/05 18:32:10 WARN storage.BlockManager: Block input-0-1401960730000 already exists on this machine; not re-adding it
14/06/05 18:32:11 WARN storage.BlockManager: Block input-0-1401960731000 already exists on this machine; not re-adding it
14/06/05 18:32:11 WARN consumer.SimpleConsumer: Reconnect due to socket error: null
14/06/05 18:32:11 ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver
14/06/05 18:32:11 WARN receiver.ReceiverSupervisorImpl: Stopped executor without error
14/06/05 18:32:11 WARN scheduler.ReceiverTracker: All of the receivers have not deregistered, Map(0 -> ReceiverInfo(0,MyOwnKafkaReceiver-0,null,false,localhost,Stopped by driver,))
-------------------------------------------
Time: 1401960735000 ms
-------------------------------------------
[... Kafka output ...]
-------------------------------------------
Time: 1401960740000 ms
-------------------------------------------
end of main()
14/06/05 18:32:21 ERROR network.ConnectionManager: Corresponding SendingConnectionManagerId not found
14/06/05 18:32:21 WARN network.ConnectionManager: All connections not cleaned up
stopped StreamingContext
Thread[pool-12-thread-1,5,run-main-group-0]: WAITING -> false
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:744)
Thread[pool-12-thread-2,5,run-main-group-0]: WAITING -> false
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:744)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment