Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import cats.effect.Async
- import org.apache.kafka.clients.producer.{
- ProducerRecord,
- RecordMetadata,
- KafkaProducer => ApacheKafkaProducer
- }
- import org.apache.kafka.clients.producer.ProducerConfig
- import org.apache.kafka.common.serialization.Serializer
- import fs2._
- import scala.util.Try
- trait ProducerOps[F[_], K, V] {
- def close: F[Unit]
- def sendAsync(record: ProducerRecord[K, V]): F[Unit]
- def sendSync(message: ProducerRecord[K, V]): F[RecordMetadata]
- }
- trait Producer[F[_], K, V] {
- def sendSync: Pipe[F, ProducerRecord[K, V], RecordMetadata]
- def sendAsync: Sink[F, ProducerRecord[K, V]]
- }
- object ProducerOps {
- def apply[F[_], K: Serializer, V: Serializer](
- underlyingProducer: ApacheKafkaProducer[K, V]
- )(implicit F: Async[F]): ProducerOps[F, K, V] = new ProducerOps[F, K, V] {
- def close: F[Unit] = F.delay {
- underlyingProducer.close()
- }
- def sendAsync(record: ProducerRecord[K, V]): F[Unit] =
- F.delay {
- underlyingProducer.send(record)
- ()
- }
- def sendSync(message: ProducerRecord[K, V]): F[RecordMetadata] =
- F.async[RecordMetadata] { cb =>
- F.delay {
- underlyingProducer.send(message, (metadata: RecordMetadata, exception: Exception) => {
- if (exception eq null) {
- cb(Right(metadata))
- } else {
- cb(Left(exception))
- }
- })
- ()
- }
- ()
- }
- }
- }
- object Producer {
- def apply[F[_], K: Serializer, V: Serializer](
- producerSettings: ProducerSettings[K, V]
- )(implicit F: Async[F]): Producer[F, K, V] =
- new Producer[F, K, V] {
- private def createProducerOps: F[ProducerOps[F, K, V]] =
- F.suspend {
- val javaProps =
- producerSettings.props.foldLeft(new java.util.Properties) {
- case (p, (k, v)) => p.put(k, v); p
- }
- F.fromTry(
- Try(
- ProducerOps[F, K, V](
- new ApacheKafkaProducer(
- javaProps,
- producerSettings.keySerializer,
- producerSettings.valueSerializer
- )
- )
- )
- )
- }
- private def withOps[I, O](f: ProducerOps[F, K, V] => Pipe[F, I, O]): Pipe[F, I, O] =
- (input: Stream[F, I]) => Stream.bracket(createProducerOps)(ops => f(ops)(input), _.close)
- def sendSync: Pipe[F, ProducerRecord[K, V], RecordMetadata] = withOps { ops => input =>
- input.evalMap(ops.sendSync)
- }
- def sendAsync: Sink[F, ProducerRecord[K, V]] = withOps { ops => input =>
- input.evalMap(ops.sendAsync)
- }
- }
- }
- final case class ProducerSettings[K: Serializer, V: Serializer](
- props: Map[String, String] = Map.empty
- ) {
- val keySerializer: Serializer[K] = util.serializer[K]
- val valueSerializer: Serializer[V] = util.serializer[V]
- def withProperty(key: String, value: String): ProducerSettings[K, V] =
- ProducerSettings[K, V](props.updated(key, value))
- def withBootstrapServers(bootstrapServers: String): ProducerSettings[K, V] =
- withProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
- def withAcks(acks: String): ProducerSettings[K, V] =
- withProperty(ProducerConfig.ACKS_CONFIG, acks)
- }
- object util {
- def serializer[T](implicit s: Serializer[T]): Serializer[T] = s
- }
Add Comment
Please, Sign In to add comment