Guest User

Untitled

a guest
Jul 23rd, 2018
65
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.46 KB | None | 0 0
  1. import java.util.Properties
  2. import akka.Done
  3. import akka.actor.ActorSystem
  4. import akka.kafka.scaladsl.Producer
  5. import akka.kafka.{ProducerMessage, ProducerSettings}
  6. import akka.stream.ActorMaterializer
  7. import akka.stream.scaladsl.{Sink, Source}
  8. import cats.effect.IO
  9. import com.sksamuel.avro4s.{FromRecord, RecordFormat, ToRecord}
  10. import org.apache.avro.generic.GenericRecord
  11. import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
  12. import org.apache.logging.log4j.scala.Logging
  13.  
  14. import scala.concurrent.ExecutionContext.Implicits.global
  15.  
  16. class KafkaService[T: ToRecord: FromRecord](configuration: Configuration) extends Logging {
  17.  
  18. implicit val system = ActorSystem("FlowProducerMain")
  19. implicit val materializer = ActorMaterializer()
  20.  
  21. val bootstrapServers = s"${configuration.kafkaHost}:9092"
  22. val schemaRegistryServer = s"${configuration.kafkaHost}:8081"
  23.  
  24. val producerSettings = ProducerSettings(system, new TypedKafkaAvroSerializer[String], new TypedKafkaAvroSerializer[GenericRecord])
  25. .withProperty("schema.registry.url", schemaRegistryServer)
  26. .withBootstrapServers(bootstrapServers)
  27.  
  28. //FIXME not working if we create producer from properties (open a bug in reactive kafka?)
  29. //val kafkaProducer = producerSettings.createKafkaProducer()
  30.  
  31. val props = new Properties()
  32. props.put("bootstrap.servers", bootstrapServers)
  33. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[TypedKafkaAvroSerializer[String]])
  34. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[TypedKafkaAvroSerializer[GenericRecord]])
  35. props.put("schema.registry.url", schemaRegistryServer)
  36. val kafkaProducer = new KafkaProducer[String, GenericRecord](props)
  37.  
  38. val format = RecordFormat[T]
  39.  
  40. def sendToProducer(topic :String, input: T, key: Option[String]): IO[Done] = {
  41.  
  42. val record = format.to(input)
  43.  
  44. //if key is null the partion will be chosen randomly
  45. val productRecord = new ProducerRecord(topic, key.orNull, record)
  46.  
  47. val completion = Source
  48. .single(ProducerMessage.Message(productRecord, ()))
  49. .via(Producer.flow(producerSettings, kafkaProducer))
  50. .runWith(Sink.foreach { result =>
  51. logger.debug(s"sending data to kafka : ${result.message}/${result.metadata}")
  52. })
  53.  
  54. completion.failed.foreach(error => logger.error(error.getMessage, error))
  55.  
  56. IO.fromFuture(IO(completion))
  57. }
  58.  
  59. def closeProducer() = {
  60. logger.info("closing kafka producer")
  61. kafkaProducer.flush()
  62. kafkaProducer.close()
  63. }
  64.  
  65. }
Add Comment
Please, Sign In to add comment