Advertisement
Guest User

Untitled

a guest
Feb 24th, 2017
93
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.51 KB | None | 0 0
  1. package externalservices.kafka
  2.  
  3. import java.util.Properties
  4.  
  5. import application.utils.ConfigurationHelpers
  6. import com.google.inject.{ImplementedBy, Inject, Singleton}
  7. import com.sksamuel.avro4s.RecordFormat
  8. import org.apache.avro.generic.GenericRecord
  9. import org.apache.kafka.clients.producer.KafkaProducer
  10. import play.api.Configuration
  11. import play.api.inject.ApplicationLifecycle
  12.  
  13. import scala.concurrent.Promise
  14. import scala.util.Try
  15.  
  16. @Singleton
  17. class AvroTopicFactory @Inject() (config: KafkaAvroConfig, lifecycle: ApplicationLifecycle) {
  18.  
  19. private val producer = new KafkaProducer[String, GenericRecord](config.properties)
  20.  
  21. lifecycle.addStopHook( () => {
  22. val t = Try {
  23. producer.flush()
  24. producer.close()
  25. }
  26. val p = Promise.fromTry(t)
  27.  
  28. p.future
  29. })
  30.  
  31. /**
  32. * Create a new ProducerTopic with name "topic" for records of type T
  33. * with play.api.lifecycle. Requires an implicit RecordFormat[T]
  34. * @param topic topic name
  35. * @param recordFormat com.sksamuel.avro4s.RecordFormat for T
  36. * @tparam T message type for topic
  37. * @return ProducerTopic with a .publish method
  38. */
  39. def producer[T](topic: String)(implicit recordFormat: RecordFormat[T]): ProducerTopic[T] = {
  40. ProducerTopic[T](topic, producer)
  41. }
  42. }
  43.  
  44. @ImplementedBy(classOf[KafkaAvroConfigFromApplication])
  45. trait KafkaAvroConfig {
  46. def kafkaUrl: String
  47. def acks: String
  48. def retries: String
  49. def keySerializer: String
  50. def valueSerializer: String
  51. def schemaRegistryUrl: String
  52.  
  53. def properties: Properties = {
  54. val props = new Properties()
  55.  
  56. props.put("bootstrap.servers", kafkaUrl)
  57. props.put("acks", acks)
  58. props.put("retries", retries)
  59. props.put("key.serializer", keySerializer)
  60. props.put("value.serializer", valueSerializer)
  61. props.put("schema.registry.url", schemaRegistryUrl)
  62.  
  63. props
  64. }
  65. }
  66.  
  67. @Singleton
  68. class KafkaAvroConfigFromApplication @Inject()(configuration: Configuration )
  69. extends KafkaAvroConfig
  70. with ConfigurationHelpers {
  71.  
  72. override val kafkaUrl: String = configuration.requireString("listservice.kafka.kafkaUrl")
  73.  
  74. override val acks: String = configuration.requireString("listservice.kafka.acks")
  75.  
  76. override val retries: String = configuration.requireString("listservice.kafka.retries")
  77.  
  78. override val schemaRegistryUrl: String = configuration.requireString("listservice.kafka.schemaRegistryUrl")
  79.  
  80. // no config for these options
  81. override val keySerializer = "io.confluent.kafka.serializers.KafkaAvroSerializer"
  82. override val valueSerializer = "io.confluent.kafka.serializers.KafkaAvroSerializer"
  83. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement