Created
November 20, 2013 15:14
-
-
Save squito/7564825 to your computer and use it in GitHub Desktop.
Some scala utils for working w/ kafka SimpleConsumer. realized there was a way for me to use the ConsumerGroup API instead, so putting this here just in case its useful to me later on ...
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import kafka.consumer.SimpleConsumer | |
import kafka.common.TopicAndPartition | |
import kafka.api._ | |
import scala.annotation.tailrec | |
import scala.util._ | |
object KafkaSimpleConsumerUtils { | |
def getLastOffset(consumer: SimpleConsumer, topic: String, partition: Int, whichTime: Long, clientName: String): Try[Long] = { | |
val tap = new TopicAndPartition(topic, partition) | |
val request = new kafka.api.OffsetRequest(Map(tap -> PartitionOffsetRequestInfo(whichTime, 1))) | |
val response = consumer.getOffsetsBefore(request) | |
if (response.hasError) { | |
val err = response.partitionErrorAndOffsets(tap).error | |
Failure(new KafkaException("Error fetching data Offset Data the Broker. Reason: " + err)) | |
} else { | |
//offsets is sorted in descending order, we always want the first | |
Success(response.partitionErrorAndOffsets(tap).offsets(0)) | |
} | |
} | |
@tailrec | |
def findConsumerAndDo[T](seedBrokers:List[String], port: Int, consumerName: String)(f: SimpleConsumer => T): Option[(SimpleConsumer,T)] = | |
seedBrokers match { | |
case Nil => None | |
case seed :: others => | |
Try{ | |
val consumer = new SimpleConsumer(seed, port, 100000, 64 * 1024, consumerName) | |
val r = f(consumer) | |
Some((consumer,r)) | |
} match { | |
case Success(r) => r | |
case Failure(ex) => | |
findConsumerAndDo(others, port, consumerName)(f) | |
} | |
} | |
def findLeader(seedBrokers:List[String], port:Int, consumerName: String, topic: String, partition: Int): Option[PartitionMetadata] = { | |
findConsumerAndDo(seedBrokers, port, consumerName) {consumer => | |
findLeader(consumer, topic, partition) | |
}.map{_._2}.flatten | |
} | |
def findLeader(consumer: SimpleConsumer, topic: String, partition: Int): Option[PartitionMetadata] = { | |
val topics = Seq(topic) | |
val req: TopicMetadataRequest = new TopicMetadataRequest(topics, 0) | |
val resp: TopicMetadataResponse = consumer.send(req) | |
val metaData = resp.topicsMetadata | |
(for { | |
item <- metaData.iterator | |
part <- item.partitionsMetadata.iterator | |
} yield part).find(part => part.partitionId == partition) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment