kafkaを利用するのに必要なノウハウを記載します。
v0.8.1.1
Mac OS /opt/local/repos/kafka
Scalaを2.10系にする。
pochi >>> brew info scala
scala: stable 2.10.4, devel 2.11.0-RC4
http://www.scala-lang.org/
/usr/local/Cellar/scala/2.10.4 (103 files, 32M) *
Built from source
From: https://github.com/Homebrew/homebrew/blob/master/Library/Formula/scala.rb
==> Options
--with-docs
Also install library documentation
--devel
install development version 2.11.0-RC4
==> Caveats
To use with IntelliJ, set the Scala home to:
/usr/local/opt/scala/idea
Bash completion has been installed to:
/usr/local/etc/bash_completion.d
pochi >>> brew versions scala
2.11.1 git checkout 92e1fdd /usr/local/Library/Formula/scala.rb
2.11.0 git checkout 7d1c149 /usr/local/Library/Formula/scala.rb
2.10.4 git checkout d64edec /usr/local/Library/Formula/scala.rb
2.10.3 git checkout 7c65743 /usr/local/Library/Formula/scala.rb
2.10.2 git checkout 03b5e3d /usr/local/Library/Formula/scala.rb
2.10.1 git checkout 39f9385 /usr/local/Library/Formula/scala.rb
2.10.0 git checkout 8ca07aa /usr/local/Library/Formula/scala.rb
2.9.2 git checkout 8896425 /usr/local/Library/Formula/scala.rb
2.9.1-1 git checkout 2e7cbfe /usr/local/Library/Formula/scala.rb
2.9.1 git checkout b78cfbd /usr/local/Library/Formula/scala.rb
2.9.0.1 git checkout cb1ab23 /usr/local/Library/Formula/scala.rb
2.9.0 git checkout 4002978 /usr/local/Library/Formula/scala.rb
2.8.1 git checkout 0e16b9d /usr/local/Library/Formula/scala.rb
2.8.0 git checkout fdb41a3 /usr/local/Library/Formula/scala.rb
brew versions scala 12.59s user 1.19s system 97% cpu 14.106 total
pochi >>> cd /usr/local/Library/Formula && git checkout d64edec scala.rb && brew install scala
pochi >>> scala -version
Scala code runner version 2.10.4 -- Copyright 2002-2013, LAMP/EPFL
Kafkaをとってくる。 Scalaのバージョンごとに用意されている。 v2.10.4に対応したものは以下にある。
https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.1.1/kafka_2.10-0.8.1.1.tgz
まずはクラスタに必要なzookeeperを動かす。
pochi >>> tar -zxvf kafka_2.10-0.8.1.1.tgz
pochi >>> cd kafka_2.10-0.8.1.1
pochi >>> ./bin/zookeeper-server-start.sh config/zookeeper.properties
次にproducerの受け口となるserverを起動する。
pochi >>> ./bin/kafka-server-start.sh config/server.properties
データを受け取るconsumerも起動する。
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
実際にデータを吐き出すproducerを起動して同期できるか確認してみる。
pochi >>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Bowbow!
Bowbow!というメッセージがconsumerに出ていたらひとまず動作確認完了。
Kafkaで途中まではbrokerを起動したらうまくメッセージの取得ができていたのに、 あるときからOutOfMemoryエラーがでて原因がわからない。
Caused by: java.lang.OutOfMemoryError: Java heap space
[2014-10-15 21:53:01,098] ERROR OOME with size 1213486160 (kafka.network.BoundedByteBufferReceive)
java.lang.OutOfMemoryError: Java heap space
[2014-10-15 21:53:01,098] WARN Fetching topic metadata with correlation id 23 for topics [Set(pochi)] from broker [id:0,host:localhost,port:9292] failed (kafka.client.ClientUtils$)
java.lang.OutOfMemoryError: Java heap space
[2014-10-15 21:53:01,099] ERROR Failed to collate messages by topic, partition due to: fetching topic metadata for topics [Set(pochi)] from broker [ArrayBuffer(id:0,host:localhost,port:9292)] failed (kafka.produ$
er.async.DefaultEventHandler)
[2014-10-15 21:53:01,322] ERROR OOME with size 1213486160 (kafka.network.BoundedByteBufferReceive)
java.lang.OutOfMemoryError: Java heap space
[2014-10-15 21:53:01,323] WARN Fetching topic metadata with correlation id 24 for topics [Set(pochi)] from broker [id:0,host:localhost,port:9292] failed (kafka.client.ClientUtils$)
java.lang.OutOfMemoryError: Java heap space
[2014-10-15 21:53:01,324] ERROR fetching topic metadata for topics [Set(pochi)] from broker [ArrayBuffer(id:0,host:localhost,port:9292)] failed (kafka.utils.Utils$)
kafka.common.KafkaException: fetching topic metadata for topics [Set(pochi)] from broker [ArrayBuffer(id:0,host:localhost,port:9292)] failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
at kafka.utils.Utils$.swallow(Utils.scala:167)
at kafka.utils.Logging$class.swallowError(Logging.scala:106)
at kafka.utils.Utils$.swallowError(Utils.scala:46)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
at scala.collection.immutable.Stream.foreach(Stream.scala:547)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
Caused by: java.lang.OutOfMemoryError: Java heap space
[2014-10-15 21:53:01,325] ERROR Failed to send requests for topics pochi with correlation ids in [17,24] (kafka.producer.async.DefaultEventHandler)
[2014-10-15 21:53:01,325] ERROR Error in handling batch of 3 events (kafka.producer.async.ProducerSendThread)
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
at scala.collection.immutable.Stream.foreach(Stream.scala:547)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
^C./bin/kafka-console-producer.sh --broker-list localhost:9292 --topic pochi 5.09s user 0.35s system 1% cpu 7:09.79 total
とりあえず/tmp以下のkafka周りのディレクトリ全部削除して再立ち上げしたら出なくなったが、今後データ増えていくとそうなるということだろうか・・・