Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import java.util.Properties
- import akka.Done
- import akka.actor.ActorSystem
- import akka.kafka.scaladsl.Producer
- import akka.kafka.{ProducerMessage, ProducerSettings}
- import akka.stream.ActorMaterializer
- import akka.stream.scaladsl.{Sink, Source}
- import cats.effect.IO
- import com.sksamuel.avro4s.{FromRecord, RecordFormat, ToRecord}
- import org.apache.avro.generic.GenericRecord
- import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
- import org.apache.logging.log4j.scala.Logging
- import scala.concurrent.ExecutionContext.Implicits.global
- class KafkaService[T: ToRecord: FromRecord](configuration: Configuration) extends Logging {
- implicit val system = ActorSystem("FlowProducerMain")
- implicit val materializer = ActorMaterializer()
- val bootstrapServers = s"${configuration.kafkaHost}:9092"
- val schemaRegistryServer = s"${configuration.kafkaHost}:8081"
- val producerSettings = ProducerSettings(system, new TypedKafkaAvroSerializer[String], new TypedKafkaAvroSerializer[GenericRecord])
- .withProperty("schema.registry.url", schemaRegistryServer)
- .withBootstrapServers(bootstrapServers)
- //FIXME not working if we create producer from properties (open a bug in reactive kafka?)
- //val kafkaProducer = producerSettings.createKafkaProducer()
- val props = new Properties()
- props.put("bootstrap.servers", bootstrapServers)
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[TypedKafkaAvroSerializer[String]])
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[TypedKafkaAvroSerializer[GenericRecord]])
- props.put("schema.registry.url", schemaRegistryServer)
- val kafkaProducer = new KafkaProducer[String, GenericRecord](props)
- val format = RecordFormat[T]
- def sendToProducer(topic :String, input: T, key: Option[String]): IO[Done] = {
- val record = format.to(input)
- //if key is null the partion will be chosen randomly
- val productRecord = new ProducerRecord(topic, key.orNull, record)
- val completion = Source
- .single(ProducerMessage.Message(productRecord, ()))
- .via(Producer.flow(producerSettings, kafkaProducer))
- .runWith(Sink.foreach { result =>
- logger.debug(s"sending data to kafka : ${result.message}/${result.metadata}")
- })
- completion.failed.foreach(error => logger.error(error.getMessage, error))
- IO.fromFuture(IO(completion))
- }
- def closeProducer() = {
- logger.info("closing kafka producer")
- kafkaProducer.flush()
- kafkaProducer.close()
- }
- }
Add Comment
Please, Sign In to add comment