Created
June 5, 2016 19:13
-
-
Save maciekciolek/c645e28cc708b26177eb1444d077d09a to your computer and use it in GitHub Desktop.
Example of KafkaBatchProducer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.nio.ByteBuffer | |
import java.util.concurrent.TimeUnit | |
import org.apache.kafka.clients._ | |
import org.apache.kafka.common.metrics.{MetricConfig, Metrics, MetricsReporter} | |
import org.apache.kafka.common.network.Selector | |
import org.apache.kafka.common.protocol.ApiKeys | |
import org.apache.kafka.common.record.{CompressionType, MemoryRecords} | |
import org.apache.kafka.common.requests.{ProduceRequest, RequestSend} | |
import org.apache.kafka.common.utils.SystemTime | |
import org.apache.kafka.common.{Cluster, TopicPartition} | |
import scala.collection.JavaConverters._ | |
import scala.concurrent.{Future, Promise} | |
/** | |
* @author mciolek | |
*/ | |
case class SampleRecord(key: String, value: String) | |
class KafkaBatchProducer(clientId: String, addresses: List[String]) { | |
private val props = Map[String, AnyRef]("security.protocol" -> "PLAINTEXT").asJava | |
private val time = new SystemTime | |
private val channelBuilder = ClientUtils.createChannelBuilder(props) | |
private val metricTags = Map("client-id" -> clientId).asJava | |
private val metricConfig = new MetricConfig() | |
.samples(10) | |
.timeWindow(1000, TimeUnit.MILLISECONDS) | |
.tags(metricTags) | |
private val metricReports = List.empty[MetricsReporter].asJava | |
private val metrics = new Metrics(metricConfig, metricReports, time) | |
private val selector = new Selector(30000, metrics, time, "producer", channelBuilder) | |
private val metadata = new Metadata(1000, 10000) | |
metadata.update(Cluster.bootstrap(ClientUtils.parseAndValidateAddresses(addresses.asJava)), time.milliseconds) | |
private val client: KafkaClient = new NetworkClient( | |
selector, | |
metadata, | |
clientId, | |
5, | |
1000, | |
102400, | |
32768, | |
30000, | |
time | |
) | |
def sendBatch(topic: String, partition: Int, batch: List[SampleRecord]): Future[ClientResponse] = { | |
//The node should be the leader of topic and partition | |
val node = metadata.fetch().nodes().get(0) | |
if (client.isReady(node, time.milliseconds())) { | |
val topicPartition = new TopicPartition(topic, partition) | |
val buffer = ByteBuffer.allocate(1024) | |
val records = MemoryRecords.emptyRecords(buffer, CompressionType.NONE) | |
val promise = Promise[ClientResponse] | |
batch | |
.zipWithIndex | |
.foreach { case (SampleRecord(key, value), offset) => | |
records.append(offset, time.milliseconds(), key.getBytes("UTF-8"), value.getBytes("UTF-8")) | |
} | |
records.close() | |
val partitionRecords = Map(topicPartition -> records.buffer()) | |
val produceRequest = new ProduceRequest(1, 1000, partitionRecords.asJava) | |
val sendRequest = new RequestSend(String.valueOf(node.id()), client.nextRequestHeader(ApiKeys.PRODUCE), produceRequest.toStruct) | |
val clientRequest = new ClientRequest(time.milliseconds(), true, sendRequest, new RequestCompletionHandler { | |
override def onComplete(response: ClientResponse): Unit = | |
//The response should be analyzed here in order to fulfill promise with success or error | |
promise.success(response) | |
}) | |
client.send(clientRequest, time.milliseconds()) | |
promise.future | |
} else { | |
Future.failed(new IllegalStateException("Leader is not ready.")) | |
} | |
} | |
def poolNewData() = client.poll(1000, time.milliseconds()) | |
//Ugly part... | |
//Ensure, that Kafka will start connections... | |
metadata.fetch().nodes().asScala.foreach(node => client.ready(node, time.milliseconds())) | |
//Wait until the client is ready to hand | |
while (!client.isReady(metadata.fetch().nodes().get(0), time.milliseconds())) { | |
client.poll(1000, time.milliseconds()) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment