Advertisement
Guest User

Untitled

a guest
Sep 14th, 2021
60
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 2.08 KB | None | 0 0
  1. abstract class EsServiceSimple[F[_]: Sync, A <: GeneratedMessage, AA <: Message, B, C <: GeneratedMessage, CC <: Message](implicit
  2.     aJavaSupport: JavaProtoSupport[A, AA],
  3.     cJavaSupport: JavaProtoSupport[C, CC]
  4. ) extends EsService[F, A, B, C] {
  5.   def simpleHandler: EventHandler[F, A, Either[B, C]]
  6.  
  7.   implicit def settings[T <: Message]: ProtobufSettings[F, T] =
  8.     ProtobufSettings(
  9.       SchemaRegistryClientSettings[F](esConfig.schemaRegistry.url)
  10.     ).withAutoRegisterSchemas(esConfig.schemaRegistry.autoRegister)
  11.       .withValueSubjectNameStrategy(esConfig.schemaRegistry.subjectNameStrategy)
  12.  
  13.   override def consumerSettings: ConsumerSettings[F, String, Either[Throwable, A]] =
  14.     ConsumerSettings[F, String, Either[Throwable, A]]
  15.       .withIsolationLevel(IsolationLevel.ReadCommitted)
  16.       .withAutoOffsetReset(AutoOffsetReset.Earliest)
  17.       .withBootstrapServers(esConfig.kafka.brokerUrl)
  18.       .withGroupId(esConfig.kafka.groupId)
  19.       .withProperty("security.protocol", esConfig.kafka.securityProtocol.entryName)
  20.       .withProperties(esConfig.kafka.extraConsumerProps)
  21.  
  22.   override def producerSettings: ProducerSettings[F, String, Either[common.Error, C]] =
  23.     ProducerSettings[F, String, Either[common.Error, C]]
  24.       .withBootstrapServers(esConfig.kafka.brokerUrl)
  25.       .withRetries(esConfig.kafka.producerRetries)
  26.       .withProperty("security.protocol", esConfig.kafka.securityProtocol.entryName)
  27.       .withProperties(esConfig.kafka.extraProducerProps)
  28.  
  29.   override def eventHandler: EventHandler[F, Either[Throwable, A], Either[B, C]] =
  30.     _.evalMap { value =>
  31.       value
  32.         .traverse {
  33.           case Left(_: SerializationException) => value.drop[A].pure[F]
  34.           case Left(_: ClassCastException)     => value.drop[A].pure[F]
  35.           case Left(otherwise)                 => otherwise.raiseError[F, KafkaValue[F, A]]
  36.           case Right(cmd)                      => value.as(cmd).pure[F]
  37.         }
  38.         .map(_.andThen(identity))
  39.     }
  40.       .map(_.filterByHeaders(_.aggregateType == esConfig.aggregateType))
  41.       .through(simpleHandler)
  42. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement