Last active
June 15, 2016 12:45
-
-
Save lazyval/2e4b5a53d61d3098a37a41a8d5b1ffcb to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
scalaVersion := "2.11.8" | |
libraryDependencies += "org.apache.kafka" %% "kafka" % "0.8.2.1" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import kafka.api.FetchRequestBuilder | |
import kafka.cluster.Broker | |
import kafka.javaapi.{FetchRequest, TopicMetadataRequest} | |
import kafka.javaapi.consumer.SimpleConsumer | |
import kafka.message.Message | |
import scala.collection.JavaConverters._ | |
class Reader(anyBroker: String, topic: String, partition: Int, offset: Int) { | |
val ClientId = "kafka-fetch-size-client" | |
val SoTimeoutMs = 20000 | |
private val connection = { | |
val Broker(id, host, port) = findLeader(anyBroker, topic, partition) | |
new SimpleConsumer(host, port, SoTimeoutMs, 1024 * 1024, ClientId) | |
} | |
def read(fetchSize: Int): Seq[Message] = { | |
val request = new FetchRequestBuilder() | |
.addFetch(topic, partition, offset, fetchSize) | |
.build() | |
val it = connection.fetch(request).messageSet(topic, partition).iterator().asScala | |
it.map(msg => msg.message).toSeq | |
} | |
private def findLeader(anyBroker: String, topic: String, partition: Int): Broker = { | |
val Array(host, port) = anyBroker.split(":") | |
val metadata = new SimpleConsumer(host, port.toInt, SoTimeoutMs, 1024 * 1024, ClientId) | |
val whoIsLeader = new TopicMetadataRequest(java.util.Collections.singletonList(topic)) | |
val answer = metadata.send(whoIsLeader) | |
val leader = for { | |
topicMeta <- answer.topicsMetadata.asScala if topicMeta.topic == topic | |
partitionMeta <- topicMeta.partitionsMetadata.asScala if partitionMeta.partitionId == partition | |
} yield partitionMeta.leader | |
metadata.close() | |
println(s"Found leader for partition, it's ${leader.head}") | |
leader.head | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment