Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- abstract class EsServiceSimple[F[_]: Sync, A <: GeneratedMessage, AA <: Message, B, C <: GeneratedMessage, CC <: Message](implicit
- aJavaSupport: JavaProtoSupport[A, AA],
- cJavaSupport: JavaProtoSupport[C, CC]
- ) extends EsService[F, A, B, C] {
- def simpleHandler: EventHandler[F, A, Either[B, C]]
- implicit def settings[T <: Message]: ProtobufSettings[F, T] =
- ProtobufSettings(
- SchemaRegistryClientSettings[F](esConfig.schemaRegistry.url)
- ).withAutoRegisterSchemas(esConfig.schemaRegistry.autoRegister)
- .withValueSubjectNameStrategy(esConfig.schemaRegistry.subjectNameStrategy)
- override def consumerSettings: ConsumerSettings[F, String, Either[Throwable, A]] =
- ConsumerSettings[F, String, Either[Throwable, A]]
- .withIsolationLevel(IsolationLevel.ReadCommitted)
- .withAutoOffsetReset(AutoOffsetReset.Earliest)
- .withBootstrapServers(esConfig.kafka.brokerUrl)
- .withGroupId(esConfig.kafka.groupId)
- .withProperty("security.protocol", esConfig.kafka.securityProtocol.entryName)
- .withProperties(esConfig.kafka.extraConsumerProps)
- override def producerSettings: ProducerSettings[F, String, Either[common.Error, C]] =
- ProducerSettings[F, String, Either[common.Error, C]]
- .withBootstrapServers(esConfig.kafka.brokerUrl)
- .withRetries(esConfig.kafka.producerRetries)
- .withProperty("security.protocol", esConfig.kafka.securityProtocol.entryName)
- .withProperties(esConfig.kafka.extraProducerProps)
- override def eventHandler: EventHandler[F, Either[Throwable, A], Either[B, C]] =
- _.evalMap { value =>
- value
- .traverse {
- case Left(_: SerializationException) => value.drop[A].pure[F]
- case Left(_: ClassCastException) => value.drop[A].pure[F]
- case Left(otherwise) => otherwise.raiseError[F, KafkaValue[F, A]]
- case Right(cmd) => value.as(cmd).pure[F]
- }
- .map(_.andThen(identity))
- }
- .map(_.filterByHeaders(_.aggregateType == esConfig.aggregateType))
- .through(simpleHandler)
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement