Skip to content

Instantly share code, notes, and snippets.

@tifletcher
Created February 25, 2017 00:03
Show Gist options
  • Save tifletcher/5ebbcc14f330f2848939c56a1ea53980 to your computer and use it in GitHub Desktop.
Save tifletcher/5ebbcc14f330f2848939c56a1ea53980 to your computer and use it in GitHub Desktop.
avro topic / producer factory

uses avro4s / KafkaProducer / KafkaAvroSerializer to publish messages

setup:

  val kafkaTopic = deliveryConfig.getString("kafka.topic").get
  import NotificationRequest.Implicits.notificationRequestFormatter
  val notificationRequestTopic = avroTopicFactory.producer[NotificationRequest](kafkaTopic)

usage:

              notificationRequestTopic.publish(notificationRequest.message_id, notificationRequest)
package externalservices.kafka
import java.util.Properties
import application.utils.ConfigurationHelpers
import com.google.inject.{ImplementedBy, Inject, Singleton}
import com.sksamuel.avro4s.RecordFormat
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.clients.producer.KafkaProducer
import play.api.Configuration
import play.api.inject.ApplicationLifecycle
import scala.concurrent.Promise
import scala.util.Try
@Singleton
class AvroTopicFactory @Inject() (config: KafkaAvroConfig, lifecycle: ApplicationLifecycle) {
private val producer = new KafkaProducer[String, GenericRecord](config.properties)
lifecycle.addStopHook( () => {
val t = Try {
producer.flush()
producer.close()
}
val p = Promise.fromTry(t)
p.future
})
/**
* Create a new ProducerTopic with name "topic" for records of type T
* with play.api.lifecycle. Requires an implicit RecordFormat[T]
* @param topic topic name
* @param recordFormat com.sksamuel.avro4s.RecordFormat for T
* @tparam T message type for topic
* @return ProducerTopic with a .publish method
*/
def producer[T](topic: String)(implicit recordFormat: RecordFormat[T]): ProducerTopic[T] = {
ProducerTopic[T](topic, producer)
}
}
@ImplementedBy(classOf[KafkaAvroConfigFromApplication])
trait KafkaAvroConfig {
def kafkaUrl: String
def acks: String
def retries: String
def keySerializer: String
def valueSerializer: String
def schemaRegistryUrl: String
def properties: Properties = {
val props = new Properties()
props.put("bootstrap.servers", kafkaUrl)
props.put("acks", acks)
props.put("retries", retries)
props.put("key.serializer", keySerializer)
props.put("value.serializer", valueSerializer)
props.put("schema.registry.url", schemaRegistryUrl)
props
}
}
@Singleton
class KafkaAvroConfigFromApplication @Inject()(configuration: Configuration )
extends KafkaAvroConfig
with ConfigurationHelpers {
override val kafkaUrl: String = configuration.requireString("listservice.kafka.kafkaUrl")
override val acks: String = configuration.requireString("listservice.kafka.acks")
override val retries: String = configuration.requireString("listservice.kafka.retries")
override val schemaRegistryUrl: String = configuration.requireString("listservice.kafka.schemaRegistryUrl")
// no config for these options
override val keySerializer = "io.confluent.kafka.serializers.KafkaAvroSerializer"
override val valueSerializer = "io.confluent.kafka.serializers.KafkaAvroSerializer"
}
package com.yournamespace
import java.util.UUID
import com.sksamuel.avro4s.RecordFormat
import core.services.NotificationService.NotificationContext
import shapeless.{:+:, CNil, Coproduct}
case class NotificationRequest (
message_id: String,
user_id: String,
content: NotificationRequest.NotificationRequestContentType
)
object NotificationRequest {
type NotificationRequestContentType = ListActivityEmailContent :+: CNil
object Implicits {
implicit val notificationRequestFormatter: RecordFormat[NotificationRequest] = RecordFormat[NotificationRequest]
}
def apply(context: NotificationContext): NotificationRequest = {
val messageId = UUID.randomUUID().toString
val userId = context.user.userId.toString
val content = Coproduct[NotificationRequestContentType](ListActivityEmailContent(context))
NotificationRequest(messageId, userId, content)
}
}
package externalservices.kafka
import com.sksamuel.avro4s.RecordFormat
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata}
import scala.concurrent.{Future, Promise}
/**
* Represents a kafka topic with a value schema created for T and a key schema of String.
* @param topic topic name
* @param producer producer -- `kafka.clients.producer`
* @tparam T type of message which will be published to topic
*/
case class ProducerTopic[T](topic: String, producer: KafkaProducer[String, GenericRecord])(implicit recordFormatter: RecordFormat[T]) {
/**
* Publish a message to this topic. Returns a `scala.util.Future` which will be completed when the message has been ack'd by kafka.
* @param key message key
* @param value message value
* @return `kafka.clients.producer.RecordMetadata` in a nice scala Future
*/
def publish(key: String, value: T): Future[Option[RecordMetadata]] = {
val data = new ProducerRecord[String, GenericRecord](topic, key, recordFormatter.to(value))
val p = Promise[Option[RecordMetadata]]()
val cb = new Callback {
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit =
// * According to the javadoc (https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/producer/Callback.html)
// the message publish action failed if metadata is null.
// * In practice, however, I'm seeing null metadata on successfully published messages.
// * As such, fail the promise if and only if there is a non-null exception and the metadata is null
if (metadata == null && exception != null) {
p.failure(exception)
}
else {
p.success(Option(metadata))
}
}
producer.send(data, cb)
p.future
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment