Guest User

Untitled

a guest
May 22nd, 2018
110
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.27 KB | None | 0 0
  1. import cats.effect.Async
  2. import org.apache.kafka.clients.producer.{
  3. ProducerRecord,
  4. RecordMetadata,
  5. KafkaProducer => ApacheKafkaProducer
  6. }
  7. import org.apache.kafka.clients.producer.ProducerConfig
  8. import org.apache.kafka.common.serialization.Serializer
  9. import fs2._
  10.  
  11. import scala.util.Try
  12.  
  13. trait ProducerOps[F[_], K, V] {
  14. def close: F[Unit]
  15. def sendAsync(record: ProducerRecord[K, V]): F[Unit]
  16. def sendSync(message: ProducerRecord[K, V]): F[RecordMetadata]
  17. }
  18.  
  19. trait Producer[F[_], K, V] {
  20. def sendSync: Pipe[F, ProducerRecord[K, V], RecordMetadata]
  21. def sendAsync: Sink[F, ProducerRecord[K, V]]
  22. }
  23.  
  24. object ProducerOps {
  25. def apply[F[_], K: Serializer, V: Serializer](
  26. underlyingProducer: ApacheKafkaProducer[K, V]
  27. )(implicit F: Async[F]): ProducerOps[F, K, V] = new ProducerOps[F, K, V] {
  28. def close: F[Unit] = F.delay {
  29. underlyingProducer.close()
  30. }
  31.  
  32. def sendAsync(record: ProducerRecord[K, V]): F[Unit] =
  33. F.delay {
  34. underlyingProducer.send(record)
  35. ()
  36. }
  37.  
  38. def sendSync(message: ProducerRecord[K, V]): F[RecordMetadata] =
  39. F.async[RecordMetadata] { cb =>
  40. F.delay {
  41. underlyingProducer.send(message, (metadata: RecordMetadata, exception: Exception) => {
  42. if (exception eq null) {
  43. cb(Right(metadata))
  44. } else {
  45. cb(Left(exception))
  46. }
  47. })
  48. ()
  49. }
  50. ()
  51. }
  52. }
  53. }
  54.  
  55. object Producer {
  56. def apply[F[_], K: Serializer, V: Serializer](
  57. producerSettings: ProducerSettings[K, V]
  58. )(implicit F: Async[F]): Producer[F, K, V] =
  59. new Producer[F, K, V] {
  60. private def createProducerOps: F[ProducerOps[F, K, V]] =
  61. F.suspend {
  62. val javaProps =
  63. producerSettings.props.foldLeft(new java.util.Properties) {
  64. case (p, (k, v)) => p.put(k, v); p
  65. }
  66.  
  67. F.fromTry(
  68. Try(
  69. ProducerOps[F, K, V](
  70. new ApacheKafkaProducer(
  71. javaProps,
  72. producerSettings.keySerializer,
  73. producerSettings.valueSerializer
  74. )
  75. )
  76. )
  77. )
  78. }
  79.  
  80. private def withOps[I, O](f: ProducerOps[F, K, V] => Pipe[F, I, O]): Pipe[F, I, O] =
  81. (input: Stream[F, I]) => Stream.bracket(createProducerOps)(ops => f(ops)(input), _.close)
  82.  
  83. def sendSync: Pipe[F, ProducerRecord[K, V], RecordMetadata] = withOps { ops => input =>
  84. input.evalMap(ops.sendSync)
  85. }
  86.  
  87. def sendAsync: Sink[F, ProducerRecord[K, V]] = withOps { ops => input =>
  88. input.evalMap(ops.sendAsync)
  89. }
  90. }
  91. }
  92.  
  93. final case class ProducerSettings[K: Serializer, V: Serializer](
  94. props: Map[String, String] = Map.empty
  95. ) {
  96. val keySerializer: Serializer[K] = util.serializer[K]
  97. val valueSerializer: Serializer[V] = util.serializer[V]
  98.  
  99. def withProperty(key: String, value: String): ProducerSettings[K, V] =
  100. ProducerSettings[K, V](props.updated(key, value))
  101.  
  102. def withBootstrapServers(bootstrapServers: String): ProducerSettings[K, V] =
  103. withProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
  104.  
  105. def withAcks(acks: String): ProducerSettings[K, V] =
  106. withProperty(ProducerConfig.ACKS_CONFIG, acks)
  107. }
  108.  
  109. object util {
  110. def serializer[T](implicit s: Serializer[T]): Serializer[T] = s
  111. }
Add Comment
Please, Sign In to add comment