Created
February 8, 2017 16:31
-
-
Save defndaines/c269bd7ffab85d6971ab56d7b8aea873 to your computer and use it in GitHub Desktop.
Kafka façade for use in a Spark job (adapted liberally from elsewhere)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.Future | |
import java.util.concurrent.atomic.AtomicReference | |
import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata} | |
import scala.collection.JavaConversions._ | |
/** This construct allows for a KafkaPublisher to be reused in an executor. | |
* | |
* Establishing communication to the Kafka cluster is expensive, so this should | |
* improve performance and reduce the risk of connection errors. | |
*/ | |
object KafkaSink { | |
def apply(config: Map[String, Object], topic: String): KafkaSink = { | |
val create = () => { | |
val producer = new KafkaProducer[String, Array[Byte]](config) | |
sys.addShutdownHook { | |
producer.close() | |
} | |
producer | |
} | |
new KafkaSink(create, topic) | |
} | |
} | |
class KafkaSink(createProducer: () => KafkaProducer[String, Array[Byte]], topic: String) extends Serializable { | |
// Lazy producer allows it to be created by executor and correctly serialize. | |
lazy val producer = createProducer() | |
def send(key: String, bytes: Array[Byte], callback: Callback): Future[RecordMetadata] = | |
producer.send(new ProducerRecord(topic, key, bytes), callback) | |
} | |
class KafkaSinkExceptionHandler extends Callback { | |
private val lastException = new AtomicReference[Option[Exception]](None) | |
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = | |
lastException.set(Option(exception)) | |
def throwExceptionIfAny(): Unit = | |
lastException.getAndSet(None).foreach(ex => throw ex) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
And an example usage: