Last active
February 22, 2017 04:40
-
-
Save ariens/e6a39bc3dbeb11467e53 to your computer and use it in GitHub Desktop.
Kafka Offset Range Provider/Storage Engine
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.blackberry.bdp.korpse | |
import scala.collection.parallel.ParIterable | |
import scala.collection.mutable | |
import scala.collection.mutable.{ Map => MutableMap } | |
import org.apache.spark.streaming.kafka.OffsetRange | |
import kafka.api._ | |
import kafka.api.OffsetResponse | |
import kafka.api.PartitionOffsetsResponse | |
import kafka.cluster.Broker | |
import kafka.network.BlockingChannel | |
import kafka.common.ErrorMapping | |
import kafka.common.{ TopicAndPartition, OffsetAndMetadata } | |
import org.slf4j.{ Logger, LoggerFactory } | |
import java.io.IOException | |
case class HostAndPort(host: String, port: Int) | |
case class AllBrokersFailed(str: String) extends Exception(str) | |
case class CorrelationMismatchException(str: String) extends Exception(str) | |
object Korpse { | |
def apply(seedBrokersStr: String, | |
topicsStr: String, | |
consumerGroupId: String, | |
kafkaClientId: String, | |
defaultOffset: Long = -1) = { | |
new Korpse(parseBrokerString(seedBrokersStr), | |
parseTopicsString(topicsStr), | |
consumerGroupId, | |
kafkaClientId, | |
defaultOffset) | |
} | |
def parseTopicsString(topicsStr: String): Seq[String] = { | |
topicsStr.split("""\s?,\s?""").map(_.trim).toSeq | |
} | |
def parseBrokerString(seedBrokerStr: String): Seq[HostAndPort] = { | |
val seedBrokerBuffer = new mutable.ListBuffer[HostAndPort]() | |
for (seedBroker <- seedBrokerStr | |
.split("""\s?,\s?""") | |
.map(_.trim)) { | |
val host :: port :: xs = seedBroker | |
.split("""\s?:\s?""").map(_.trim).toList | |
seedBrokerBuffer += HostAndPort(host, port.toInt) | |
} | |
seedBrokerBuffer.toSeq | |
} | |
} | |
class Korpse( | |
seedBrokers: Seq[HostAndPort], | |
topics: Seq[String], | |
consumerGroupId: String, | |
kafkaClientId: String, | |
defaultOffset: Long) { | |
val LOG: Logger = LoggerFactory.getLogger(this.getClass); | |
var correlationId = 0 | |
val backOffMs = 500 | |
val retries = 3 | |
val backoffExponent = 2 | |
val partitions = getPartitions() | |
var partitionDefaultStarts: Option[Map[TopicAndPartition, Long]] = None | |
var topicDefaultStarts: Option[Map[String, Long]] = None | |
var globalDefaultStarts: Long = -1 | |
// Gets a channel to a specific broker | |
private def getChannel(broker: Broker): Option[BlockingChannel] = { | |
return getChannel(HostAndPort(broker.host, broker.port), true) | |
} | |
// Gets a channel from a list of seed brokers | |
private def getChannel(seedBrokers: Seq[HostAndPort]): Option[BlockingChannel] = { | |
for (seedBroker <- seedBrokers) { | |
try { | |
return getChannel(new HostAndPort(seedBroker.host, seedBroker.port)) | |
} catch { | |
case ioe: IOException => println(s"Blocking Channel " | |
+ "IOException on $host:$port") | |
} | |
} | |
return None | |
} | |
// Gets a channel from a host and port | |
private def getChannel(host: HostAndPort, | |
ignoreCoordinatorRedirect: Boolean = false): | |
Option[BlockingChannel] = { | |
try { | |
println(s"Establishing channel to $host") | |
var channel = new BlockingChannel(host.host, | |
host.port, | |
BlockingChannel.UseDefaultBufferSize, | |
BlockingChannel.UseDefaultBufferSize, | |
5000); | |
correlationId = correlationId + 1 | |
channel.connect | |
channel.send(ConsumerMetadataRequest(consumerGroupId, | |
ConsumerMetadataRequest.CurrentVersion, | |
correlationId, | |
kafkaClientId)); | |
val metadataResponse = ConsumerMetadataResponse.readFrom( | |
channel.receive.buffer); | |
if (correlationId != metadataResponse.correlationId) | |
throw new CorrelationMismatchException( | |
"ConsumerMetadataRequest: " + correlationId + ", " + | |
"ConsumerMetadataResponse: " + metadataResponse.correlationId) | |
if (metadataResponse.errorCode == ErrorMapping.NoError) { | |
if (ignoreCoordinatorRedirect) { | |
println("Connected to " + channel.host + ":" + channel.port | |
+ " and not redirecting to my coordinator for offset mgmt") | |
return Option(channel) | |
} | |
val offsetManager = metadataResponse.coordinatorOpt | |
if (offsetManager.isDefined) { | |
channel.disconnect | |
channel = new BlockingChannel(offsetManager.get.host, | |
offsetManager.get.port, | |
BlockingChannel.UseDefaultBufferSize, | |
BlockingChannel.UseDefaultBufferSize, | |
5000); | |
channel.connect | |
return Option(channel) | |
} | |
} else { | |
println("Error code: " + metadataResponse.errorCode) | |
return None | |
} | |
} catch { | |
case ioe: IOException => | |
println(s"Blocking Channel " | |
+ "IOException on $host:$port") | |
return None | |
} | |
return None | |
} | |
/** | |
* This mapping of TopicAndPartition to Broker is used as | |
* subsequent OffsetRequest's to fetch latest/earliest | |
* offset must be sent to the partiton leader only | |
*/ | |
private def getPartitions(): Map[TopicAndPartition, Broker] = { | |
val tempMap = MutableMap[TopicAndPartition, Broker]() | |
correlationId = correlationId + 1 | |
val channel = getChannel(seedBrokers) | |
if (!channel.isDefined) | |
throw new Exception("Cannot get channel from seed brokers") | |
channel.get.send(new TopicMetadataRequest(topics, correlationId)) | |
val response = TopicMetadataResponse.readFrom(channel.get.receive.buffer) | |
channel.get.disconnect() | |
if (correlationId != response.correlationId) | |
throw new CorrelationMismatchException( | |
"TopicMetadataRequest: " + correlationId + ", " + | |
"TopicMetadataResponse: " + response.correlationId) | |
for (topicMetadata <- response.topicsMetadata) { | |
for (partitionMetadata <- topicMetadata.partitionsMetadata) { | |
if (!partitionMetadata.leader.isDefined) { | |
println("ERROR: leader not defined for " + partitionMetadata) | |
} else { | |
println("discovered partition: " + partitionMetadata) | |
val partition = TopicAndPartition(topicMetadata.topic, | |
partitionMetadata.partitionId) | |
tempMap += (partition -> partitionMetadata.leader.get) | |
} | |
} | |
} | |
return tempMap.toMap | |
} | |
/** | |
* See code references 1 and 2, there's something weird about | |
* the OffsetRequest, I would have suspected that a None would | |
* have been present for the optional instead of always getting | |
* back a -1. There's the default Kafka parameter named | |
* auto.offset.reset that defaults to largest (-1) which doesn't | |
* seem to be relevant for using a blocking channel to make API | |
* calls directly. Needs more investigation as this workaround | |
* seems hackish -- dariens | |
*/ | |
def getFromOffsets(): Option[Map[TopicAndPartition, Long]] = { | |
var fromOffsets = MutableMap[TopicAndPartition, Long]() | |
var unknowns = MutableMap[TopicAndPartition, Broker]() | |
val channel = getChannel(seedBrokers) | |
if (!channel.isDefined) | |
throw new Exception("Cannot get channel from seed brokers") | |
correlationId = correlationId + 1 | |
val fetchRequest = new OffsetFetchRequest(consumerGroupId, | |
partitions.keySet.toSeq, 1, | |
correlationId, | |
kafkaClientId); | |
channel.get.send(fetchRequest) | |
val fetchResponse = OffsetFetchResponse.readFrom(channel.get.receive.buffer) | |
channel.get.disconnect() | |
if (correlationId != fetchResponse.correlationId) | |
throw new CorrelationMismatchException( | |
"OffsetFetchRequest: " + correlationId + ", " + | |
"OffsetFetchResponse: " + fetchResponse.correlationId) | |
for ((partition, broker) <- partitions) { | |
val metadataAndError = fetchResponse.requestInfo.get(partition) | |
if (metadataAndError.isDefined) { | |
val errorCode = metadataAndError.get.error | |
if (errorCode == ErrorMapping.NotCoordinatorForConsumerCode) { | |
println("channel created on " + channel.get.host + ":" + channel.get.port | |
+ " is not coordinator for group " + consumerGroupId | |
+ " and client ID " + kafkaClientId + ", retrying") | |
Thread sleep backOffMs | |
return getFromOffsets | |
} else if (errorCode == ErrorMapping.OffsetsLoadInProgressCode) { | |
LOG.warn("offset load in processs for " + partition.topic | |
+ "-" + partition.partition + ", retrying") | |
Thread sleep backOffMs | |
return getFromOffsets | |
} else { | |
if (metadataAndError.get.offset == -1) { | |
println(s"$partition received -1 OffsetFetchResponse " | |
+ s"fetching default for time $defaultOffset") | |
// Code reference [1] -- why do we need to do this?! | |
val offset = getDefaultFromOffset(partition, broker) | |
if (offset.isDefined) { | |
println(s"$partition default offset for $defaultOffset was " + offset.get) | |
fromOffsets += (partition -> offset.get) | |
} else { | |
throw new Exception(s"couldn't find offset for partition: $partition") | |
} | |
} else { | |
fromOffsets += (partition -> metadataAndError.get.offset) | |
println("mapping from offset for partion " + partition | |
+ " to " + metadataAndError.get.offset) | |
} | |
} | |
} else { | |
// Code reference [2] -- Does this ever execute--since it appears we get a -1 when | |
// There are no offsets committed | |
println(partition + ": Committed offset not found, fetching default") | |
val offset = getDefaultFromOffset(partition, broker) | |
if (offset.isDefined) { | |
println(s"$partition default offset for $defaultOffset was " + offset.get) | |
fromOffsets += (partition -> offset.get) | |
} else { | |
throw new Exception("couldn't find any from offset for partition: " + partition) | |
} | |
} | |
} | |
return Option(fromOffsets.toMap) | |
} | |
def getDefaultFromOffset(partition: TopicAndPartition, | |
broker: Broker): Option[Long] = { | |
val channel = getChannel(broker) | |
if (!channel.isDefined) | |
throw new Exception(s"Cannot get channel for broker $broker") | |
correlationId = correlationId + 1 | |
val reqInfo = Map(partition -> PartitionOffsetRequestInfo(defaultOffset, 1)) | |
val request = OffsetRequest(reqInfo, 1, correlationId, kafkaClientId) | |
channel.get.send(request) | |
val response = OffsetResponse.readFrom(channel.get.receive.buffer) | |
channel.get.disconnect() | |
if (correlationId != response.correlationId) | |
throw new CorrelationMismatchException( | |
"OffsetRequest: " + correlationId + ", " + | |
"OffsetResponse: " + response.correlationId) | |
if (response.hasError) { | |
println(s"Trying to fetch offset request for time $defaultOffset failed") | |
return None | |
} else { | |
val offsetResponse = response.partitionErrorAndOffsets.get(partition).get | |
println(s"$partition fetched offset for time $defaultOffset is " + offsetResponse.offsets(0)) | |
return Option(offsetResponse.offsets(0)) | |
} | |
return None | |
} | |
def storeOffsetRanges(offsetRanges: Array[OffsetRange]): Unit = { | |
val channel = getChannel(seedBrokers) | |
if (!channel.isDefined) | |
throw new Exception("Cannot get channel from seed brokers") | |
val now = System.currentTimeMillis() | |
val offsets = mutable.Map[TopicAndPartition, OffsetAndMetadata]() | |
for (osr <- offsetRanges) { | |
val partition = TopicAndPartition(osr.topic, osr.partition) | |
val oam = OffsetAndMetadata(osr.untilOffset, "metadata?", now) | |
offsets += (partition -> oam) | |
println("[" + osr.topic + "-" + osr.partition + "] offset: " | |
+ osr.untilOffset + " added to commit request") | |
} | |
correlationId = correlationId + 1 | |
val commitRequest = new OffsetCommitRequest(consumerGroupId, offsets.toMap, 1, | |
correlationId, kafkaClientId); | |
try { | |
channel.get.send(commitRequest) | |
val commitResponse = OffsetCommitResponse.readFrom(channel.get.receive.buffer) | |
channel.get.disconnect() | |
if (correlationId != commitResponse.correlationId) | |
throw new CorrelationMismatchException( | |
"OffsetCommitRequest: " + correlationId + ", " + | |
"OffsetCommitResponse: " + commitResponse.correlationId) | |
if (commitResponse.hasError) { | |
for ((partition, broker) <- partitions) { | |
val status = commitResponse.commitStatus.get(partition) | |
if (!status.isDefined) { | |
println("WARN: commit status was not defined for " + partition) | |
} else { | |
val errorCode = status.get | |
if (errorCode == ErrorMapping.NotCoordinatorForConsumerCode | |
|| errorCode == ErrorMapping.ConsumerCoordinatorNotAvailableCode) { | |
println("[" + partition.topic + "-" + partition.partition | |
+ "] Consumer coordinator has moved, need to retry") | |
Thread sleep 500 | |
storeOffsetRanges(offsetRanges) | |
} else if (errorCode == ErrorMapping.OffsetMetadataTooLargeCode) { | |
throw new Exception("WARN: partition " + partition | |
+ ": You must reduce the size of the metadata if you wish to retry") | |
} | |
} | |
} | |
} else { | |
println("offsets and metadata stored without errors") | |
} | |
} catch { | |
case ioe: IOException => println("an IOException occred: " + ioe) | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.blackberry.bdp.kafkafilter | |
import kafka.common.TopicAndPartition | |
import kafka.serializer.StringDecoder | |
import kafka.message.MessageAndMetadata | |
import org.apache.spark.{ SparkConf, TaskContext } | |
import org.apache.spark.SparkContext | |
import org.apache.spark.streaming.{ Seconds, StreamingContext } | |
import org.apache.spark.streaming.kafka.KafkaUtils | |
import org.apache.spark.streaming.kafka.{ OffsetRange, HasOffsetRanges } | |
import org.slf4j.{ Logger, LoggerFactory } | |
import scala.collection.mutable | |
import com.blackberry.bdp.korpse._ | |
object Main { | |
val LOG: Logger = LoggerFactory.getLogger(this.getClass); | |
def main(args: Array[String]) { | |
if (args.length != 7) { | |
println("Required arguments:") | |
println("\t1 => metadata brokers") | |
println("\t2 => topic names (comma seperated)") | |
println("\t3 => batch length (seconds)") | |
println("\t4 => consumer group ID") | |
println("\t5 => Kafka client ID") | |
println("\t6 => default offset (-2 earliest, -1 latest)") | |
System.exit(1); | |
} | |
val seedBrokerList = args(0) | |
val topics = args(1) | |
val batchSeconds = args(2).toInt | |
val consumerGroup = args(3) | |
val kafkaClientId = args(4) | |
val defaultOffset = args(5) | |
val ssc = new StreamingContext(new SparkConf, Seconds(batchSeconds)) | |
val kafkaParams = Map( | |
"metadata.broker.list" -> seedBrokerList) | |
val korpse = Korpse(seedBrokerList, | |
topics, | |
consumerGroup, | |
kafkaClientId, | |
defaultOffset.toLong) | |
val fromOffsets = korpse.getFromOffsets() | |
if (!fromOffsets.isDefined) { | |
LOG.error("Unable to determine starting offsets") | |
System.exit(1) | |
} | |
println("\n***\nfrom offsets: " + fromOffsets + "\n***\n") | |
fromOffsets.get.foreach((t2) => println(t2._1 + " starting offset: " + t2._2)) | |
var offsetRanges = Array[OffsetRange]() | |
// Create a stream from our offset ranges that includes the Kafka message, partition and offset | |
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, Int, Long)](ssc, kafkaParams, fromOffsets.get, | |
(mmd: MessageAndMetadata[String, String]) => (mmd.message(), mmd.partition, mmd.offset)) | |
.transform { rdd => | |
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges | |
rdd | |
} | |
// Typical application specific Spark stream processing now follows | |
val results = stream | |
.filter() // you can filter the stream | |
.map() // ...and apply a mapping function | |
.reduceByKey() // then reduce by key | |
.foreachRDD(rdd => { | |
rdd.foreach(println) | |
}) | |
// Now we want to persist the offset ranges that we're consuming so we | |
// start from where we left off if/when the job is stoped/crashes and is | |
// restarted | |
stream.foreachRDD { rdd => | |
korpse.storeOffsetRanges(offsetRanges) | |
} | |
ssc.start(); | |
ssc.awaitTermination(); | |
} | |
} |
The direct stream has code for interacting with Kafka's offset api, it's just not public.
There's a JIRA for exposing it as a public api
https://issues.apache.org/jira/browse/SPARK-10963
you can see the PR if you want to apply that patch.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This is an example Spark job that uses the Korpse library to provide and store offsets in Kafka. The direct stream processing in Spark 1.3.1-1.5.1 doesn't use the standard high level consumer and pushes the responsibility of management to the application. Korpse simplifies this metadata persistence by using the Kafka blocking channel API to store/retrieve the offsets for all partitions in one or more topics.
The Korpse library and example Spark job is included in this gist.