|
package externalservices.kafka |
|
|
|
import java.util.Properties |
|
|
|
import application.utils.ConfigurationHelpers |
|
import com.google.inject.{ImplementedBy, Inject, Singleton} |
|
import com.sksamuel.avro4s.RecordFormat |
|
import org.apache.avro.generic.GenericRecord |
|
import org.apache.kafka.clients.producer.KafkaProducer |
|
import play.api.Configuration |
|
import play.api.inject.ApplicationLifecycle |
|
|
|
import scala.concurrent.Promise |
|
import scala.util.Try |
|
|
|
@Singleton |
|
class AvroTopicFactory @Inject() (config: KafkaAvroConfig, lifecycle: ApplicationLifecycle) { |
|
|
|
private val producer = new KafkaProducer[String, GenericRecord](config.properties) |
|
|
|
lifecycle.addStopHook( () => { |
|
val t = Try { |
|
producer.flush() |
|
producer.close() |
|
} |
|
val p = Promise.fromTry(t) |
|
|
|
p.future |
|
}) |
|
|
|
/** |
|
* Create a new ProducerTopic with name "topic" for records of type T |
|
* with play.api.lifecycle. Requires an implicit RecordFormat[T] |
|
* @param topic topic name |
|
* @param recordFormat com.sksamuel.avro4s.RecordFormat for T |
|
* @tparam T message type for topic |
|
* @return ProducerTopic with a .publish method |
|
*/ |
|
def producer[T](topic: String)(implicit recordFormat: RecordFormat[T]): ProducerTopic[T] = { |
|
ProducerTopic[T](topic, producer) |
|
} |
|
} |
|
|
|
@ImplementedBy(classOf[KafkaAvroConfigFromApplication]) |
|
trait KafkaAvroConfig { |
|
def kafkaUrl: String |
|
def acks: String |
|
def retries: String |
|
def keySerializer: String |
|
def valueSerializer: String |
|
def schemaRegistryUrl: String |
|
|
|
def properties: Properties = { |
|
val props = new Properties() |
|
|
|
props.put("bootstrap.servers", kafkaUrl) |
|
props.put("acks", acks) |
|
props.put("retries", retries) |
|
props.put("key.serializer", keySerializer) |
|
props.put("value.serializer", valueSerializer) |
|
props.put("schema.registry.url", schemaRegistryUrl) |
|
|
|
props |
|
} |
|
} |
|
|
|
@Singleton |
|
class KafkaAvroConfigFromApplication @Inject()(configuration: Configuration ) |
|
extends KafkaAvroConfig |
|
with ConfigurationHelpers { |
|
|
|
override val kafkaUrl: String = configuration.requireString("listservice.kafka.kafkaUrl") |
|
|
|
override val acks: String = configuration.requireString("listservice.kafka.acks") |
|
|
|
override val retries: String = configuration.requireString("listservice.kafka.retries") |
|
|
|
override val schemaRegistryUrl: String = configuration.requireString("listservice.kafka.schemaRegistryUrl") |
|
|
|
// no config for these options |
|
override val keySerializer = "io.confluent.kafka.serializers.KafkaAvroSerializer" |
|
override val valueSerializer = "io.confluent.kafka.serializers.KafkaAvroSerializer" |
|
} |