Skip to content

Instantly share code, notes, and snippets.

@joan38
Last active May 19, 2018 19:06
Show Gist options
  • Save joan38/b98e83071ba93e2aa8cce62930e363dd to your computer and use it in GitHub Desktop.
Save joan38/b98e83071ba93e2aa8cce62930e363dd to your computer and use it in GitHub Desktop.
trait Record[K, V] {
def topic: String
def key(value: V): K
def timestamp(value: V): Long
}
object Producer {
def apply[V] = new ProducerBuilder[V]
class ProducerBuilder[V] {
def apply[K](config: Properties)(implicit record: Record[K, V],
keySerializer: Serializer[K],
valueSerializer: Serializer[V]): KafkaProducer[K, V] =
new KafkaProducer(config, keySerializer, valueSerializer)
}
}
implicit class KafkaProducerOps[K, V](kafkaProducer: KafkaProducer[K, V]) {
def send(value: V)(implicit record: Record[K, V]): Future[RecordMetadata] = Future {
kafkaProducer.send(new ProducerRecord(record.topic, null, record.timestamp(value), record.key(value), value)).get()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment