Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package externalservices.kafka
- import java.util.Properties
- import application.utils.ConfigurationHelpers
- import com.google.inject.{ImplementedBy, Inject, Singleton}
- import com.sksamuel.avro4s.RecordFormat
- import org.apache.avro.generic.GenericRecord
- import org.apache.kafka.clients.producer.KafkaProducer
- import play.api.Configuration
- import play.api.inject.ApplicationLifecycle
- import scala.concurrent.Promise
- import scala.util.Try
- @Singleton
- class AvroTopicFactory @Inject() (config: KafkaAvroConfig, lifecycle: ApplicationLifecycle) {
- private val producer = new KafkaProducer[String, GenericRecord](config.properties)
- lifecycle.addStopHook( () => {
- val t = Try {
- producer.flush()
- producer.close()
- }
- val p = Promise.fromTry(t)
- p.future
- })
- /**
- * Create a new ProducerTopic with name "topic" for records of type T
- * with play.api.lifecycle. Requires an implicit RecordFormat[T]
- * @param topic topic name
- * @param recordFormat com.sksamuel.avro4s.RecordFormat for T
- * @tparam T message type for topic
- * @return ProducerTopic with a .publish method
- */
- def producer[T](topic: String)(implicit recordFormat: RecordFormat[T]): ProducerTopic[T] = {
- ProducerTopic[T](topic, producer)
- }
- }
- @ImplementedBy(classOf[KafkaAvroConfigFromApplication])
- trait KafkaAvroConfig {
- def kafkaUrl: String
- def acks: String
- def retries: String
- def keySerializer: String
- def valueSerializer: String
- def schemaRegistryUrl: String
- def properties: Properties = {
- val props = new Properties()
- props.put("bootstrap.servers", kafkaUrl)
- props.put("acks", acks)
- props.put("retries", retries)
- props.put("key.serializer", keySerializer)
- props.put("value.serializer", valueSerializer)
- props.put("schema.registry.url", schemaRegistryUrl)
- props
- }
- }
- @Singleton
- class KafkaAvroConfigFromApplication @Inject()(configuration: Configuration )
- extends KafkaAvroConfig
- with ConfigurationHelpers {
- override val kafkaUrl: String = configuration.requireString("listservice.kafka.kafkaUrl")
- override val acks: String = configuration.requireString("listservice.kafka.acks")
- override val retries: String = configuration.requireString("listservice.kafka.retries")
- override val schemaRegistryUrl: String = configuration.requireString("listservice.kafka.schemaRegistryUrl")
- // no config for these options
- override val keySerializer = "io.confluent.kafka.serializers.KafkaAvroSerializer"
- override val valueSerializer = "io.confluent.kafka.serializers.KafkaAvroSerializer"
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement