Skip to content

Instantly share code, notes, and snippets.

@pochi
Last active August 29, 2015 14:04
Show Gist options
  • Select an option

  • Save pochi/88ea47763e5dc54e0f17 to your computer and use it in GitHub Desktop.

Select an option

Save pochi/88ea47763e5dc54e0f17 to your computer and use it in GitHub Desktop.
About kafka

kafka

kafkaを利用するのに必要なノウハウを記載します。

version

v0.8.1.1

OS

Mac OS /opt/local/repos/kafka

topicを削除する

ローカル環境でがんがん使うとメモリ不足で落ちます。

how to delete topic

pochi >>> ./bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand --zookeeper localhost:2181 --topic test
deletion succeeded!

Install

Scalaを2.10系にする。

pochi >>> brew info scala 
scala: stable 2.10.4, devel 2.11.0-RC4
http://www.scala-lang.org/
/usr/local/Cellar/scala/2.10.4 (103 files, 32M) *
  Built from source
From: https://github.com/Homebrew/homebrew/blob/master/Library/Formula/scala.rb
==> Options
--with-docs
        Also install library documentation
--devel
        install development version 2.11.0-RC4
==> Caveats
To use with IntelliJ, set the Scala home to:
  /usr/local/opt/scala/idea

Bash completion has been installed to:
  /usr/local/etc/bash_completion.d
pochi >>> brew versions scala

2.11.1   git checkout 92e1fdd /usr/local/Library/Formula/scala.rb
2.11.0   git checkout 7d1c149 /usr/local/Library/Formula/scala.rb
2.10.4   git checkout d64edec /usr/local/Library/Formula/scala.rb
2.10.3   git checkout 7c65743 /usr/local/Library/Formula/scala.rb
2.10.2   git checkout 03b5e3d /usr/local/Library/Formula/scala.rb
2.10.1   git checkout 39f9385 /usr/local/Library/Formula/scala.rb
2.10.0   git checkout 8ca07aa /usr/local/Library/Formula/scala.rb
2.9.2    git checkout 8896425 /usr/local/Library/Formula/scala.rb
2.9.1-1  git checkout 2e7cbfe /usr/local/Library/Formula/scala.rb
2.9.1    git checkout b78cfbd /usr/local/Library/Formula/scala.rb
2.9.0.1  git checkout cb1ab23 /usr/local/Library/Formula/scala.rb
2.9.0    git checkout 4002978 /usr/local/Library/Formula/scala.rb
2.8.1    git checkout 0e16b9d /usr/local/Library/Formula/scala.rb
2.8.0    git checkout fdb41a3 /usr/local/Library/Formula/scala.rb
brew versions scala  12.59s user 1.19s system 97% cpu 14.106 total
pochi >>> cd /usr/local/Library/Formula && git checkout d64edec scala.rb && brew install scala
pochi >>> scala -version
Scala code runner version 2.10.4 -- Copyright 2002-2013, LAMP/EPFL

Kafkaをとってくる。 Scalaのバージョンごとに用意されている。 v2.10.4に対応したものは以下にある。

https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.1.1/kafka_2.10-0.8.1.1.tgz

プロセス起動

まずはクラスタに必要なzookeeperを動かす。

pochi >>> tar -zxvf kafka_2.10-0.8.1.1.tgz
pochi >>> cd kafka_2.10-0.8.1.1
pochi >>> ./bin/zookeeper-server-start.sh config/zookeeper.properties

次にproducerの受け口となるserverを起動する。

pochi >>> ./bin/kafka-server-start.sh config/server.properties

データを受け取るconsumerも起動する。

./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning 

実際にデータを吐き出すproducerを起動して同期できるか確認してみる。

pochi >>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Bowbow!

Bowbow!というメッセージがconsumerに出ていたらひとまず動作確認完了。

Description

Kafkaで途中まではbrokerを起動したらうまくメッセージの取得ができていたのに、 あるときからOutOfMemoryエラーがでて原因がわからない。

エラーの中身

Caused by: java.lang.OutOfMemoryError: Java heap space
[2014-10-15 21:53:01,098] ERROR OOME with size 1213486160 (kafka.network.BoundedByteBufferReceive)
java.lang.OutOfMemoryError: Java heap space
[2014-10-15 21:53:01,098] WARN Fetching topic metadata with correlation id 23 for topics [Set(pochi)] from broker [id:0,host:localhost,port:9292] failed (kafka.client.ClientUtils$)
java.lang.OutOfMemoryError: Java heap space
[2014-10-15 21:53:01,099] ERROR Failed to collate messages by topic, partition due to: fetching topic metadata for topics [Set(pochi)] from broker [ArrayBuffer(id:0,host:localhost,port:9292)] failed (kafka.produ$
er.async.DefaultEventHandler)
[2014-10-15 21:53:01,322] ERROR OOME with size 1213486160 (kafka.network.BoundedByteBufferReceive)
java.lang.OutOfMemoryError: Java heap space
[2014-10-15 21:53:01,323] WARN Fetching topic metadata with correlation id 24 for topics [Set(pochi)] from broker [id:0,host:localhost,port:9292] failed (kafka.client.ClientUtils$)
java.lang.OutOfMemoryError: Java heap space
[2014-10-15 21:53:01,324] ERROR fetching topic metadata for topics [Set(pochi)] from broker [ArrayBuffer(id:0,host:localhost,port:9292)] failed (kafka.utils.Utils$)
kafka.common.KafkaException: fetching topic metadata for topics [Set(pochi)] from broker [ArrayBuffer(id:0,host:localhost,port:9292)] failed
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
        at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
        at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
        at kafka.utils.Utils$.swallow(Utils.scala:167)
        at kafka.utils.Logging$class.swallowError(Logging.scala:106)
        at kafka.utils.Utils$.swallowError(Utils.scala:46)
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
        at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
        at scala.collection.immutable.Stream.foreach(Stream.scala:547)
        at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
        at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
Caused by: java.lang.OutOfMemoryError: Java heap space
[2014-10-15 21:53:01,325] ERROR Failed to send requests for topics pochi with correlation ids in [17,24] (kafka.producer.async.DefaultEventHandler)
[2014-10-15 21:53:01,325] ERROR Error in handling batch of 3 events (kafka.producer.async.ProducerSendThread)
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
        at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
        at scala.collection.immutable.Stream.foreach(Stream.scala:547)
        at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
        at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
^C./bin/kafka-console-producer.sh --broker-list localhost:9292 --topic pochi  5.09s user 0.35s system 1% cpu 7:09.79 total

対応策

とりあえず/tmp以下のkafka周りのディレクトリ全部削除して再立ち上げしたら出なくなったが、今後データ増えていくとそうなるということだろうか・・・

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment