Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- @Autowired
- lateinit var jackson2ObjectMapperBuilder: Jackson2ObjectMapperBuilder
- @Autowired
- lateinit var kafkaProperties: KafkaProperties
- @Bean
- fun jackson2ObjectMapperBuilderCustomizer(): Jackson2ObjectMapperBuilderCustomizer {
- return Jackson2ObjectMapperBuilderCustomizer { jacksonObjectMapperBuilder ->
- jacksonObjectMapperBuilder.deserializerByType(LocalDateTime::class.java , LocalDateTimeDeserializer(
- DateTimeFormatter.ISO_DATE) )
- jacksonObjectMapperBuilder.modulesToInstall(JavaTimeModule(), KotlinModule())
- }
- }
- @Bean
- fun kafkaTemplate(): KafkaTemplate<String, NoteEvent> {
- return KafkaTemplate<String, NoteEvent>(kafkaProducerFactory())
- }
- @Bean
- fun kafkaConsumerFactory(): ConsumerFactory<String, NoteEvent> {
- val objectMapper = jackson2ObjectMapperBuilder.build() as ObjectMapper
- objectMapper.registerModule(JavaTimeModule())
- val jsonDeserializer = JsonDeserializer<NoteEvent>(objectMapper)
- jsonDeserializer.configure(kafkaProperties.buildConsumerProperties(), false)
- return DefaultKafkaConsumerFactory<String, NoteEvent>(
- kafkaProperties.buildConsumerProperties(),
- StringDeserializer(),
- jsonDeserializer
- )
- }
- @Bean
- fun kafkaProducerFactory(): ProducerFactory<String, NoteEvent> {
- val jsonSerializer = JsonSerializer<NoteEvent>(jackson2ObjectMapperBuilder.build())
- jsonSerializer.configure(kafkaProperties.buildProducerProperties(), false)
- return DefaultKafkaProducerFactory<String, NoteEvent>(
- kafkaProperties.buildProducerProperties(),
- StringSerializer() , jsonSerializer
- )
- }
- @Bean
- fun defaultKafkaConsumerFactory(): ConsumerFactory<Any, Any> {
- val objectMapper = jackson2ObjectMapperBuilder.build() as ObjectMapper
- objectMapper.registerModule(JavaTimeModule())
- val jsonDeserializer = JsonDeserializer<Any>(objectMapper)
- jsonDeserializer.configure(kafkaProperties.buildConsumerProperties(), false)
- val kafkaConsumerFactory = DefaultKafkaConsumerFactory<Any, Any>(
- kafkaProperties.buildConsumerProperties()
- )
- kafkaConsumerFactory.setKeyDeserializer(jsonDeserializer)
- return kafkaConsumerFactory
- }
- @Bean
- fun defaultKafkaProducerFactory(): ProducerFactory<Any, Any> {
- val factory = DefaultKafkaProducerFactory<Any, Any>(
- kafkaProperties.buildProducerProperties()
- )
- val transactionIdPrefix = kafkaProperties.producer
- .transactionIdPrefix
- if (transactionIdPrefix != null) {
- factory.setTransactionIdPrefix(transactionIdPrefix)
- }
- return factory
- }
- }
- spring.datasource.url: jdbc:mysql://localhost:3306/notes
- spring.datasource.username: root
- spring.datasource.password:
- logging.level.org.hibernate.SQL: debug
- spring.jpa.database: MYSQL
- spring.jpa.open-in-view: true
- spring.jpa.show-sql: true
- spring.data.jpa.repositories.bootstrap-mode: default
- spring.jpa.database-platform: org.hibernate.dialect.MySQL5Dialect
- spring.jpa.hibernate.ddl-auto: update
- logging.level.org.springframework: DEBUG
- spring:
- datasource:
- driver-class-name: com.mysql.cj.jdbc.Driver
- spring.kafka.bootstrap-servers: 192.168.169.22:9092
- spring.kafka.consumer.group-id: noteGroup
- spring.kafka.consumer.auto-offset-reset: earliest
- spring.kafka.consumer.properties.spring.json.trusted.packages: com.remusrd.notesample.domain.event
- spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
- spring.kafka.consumer.properties.spring.json.add.type.headers: true
- spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
- package com.remusrd.notesample.service
- import arrow.core.Option
- import arrow.core.getOrElse
- import arrow.data.NonEmptyList
- import com.remusrd.notesample.data.NoteRepository
- import com.remusrd.notesample.domain.Note
- import com.remusrd.notesample.domain.event.NoteEvent
- import org.springframework.beans.factory.annotation.Autowired
- import org.springframework.kafka.core.KafkaTemplate
- import org.springframework.stereotype.Service
- import org.springframework.transaction.annotation.Transactional
- @Service
- @Transactional
- class JpaNoteService : NoteService {
- val TOPIC_NAME = "notes"
- @Autowired
- private lateinit var noteRepository: NoteRepository
- @Autowired
- private lateinit var kafkaTemplate: KafkaTemplate<String, NoteEvent>
- override fun getAllNotes(): Option<NonEmptyList<Note>> =
- NonEmptyList.fromList(noteRepository.findAll())
- override fun createNote(note: Option<Note>) : Note {
- note.map {
- kafkaTemplate.send(TOPIC_NAME, NoteEvent.Created(it))
- }
- return note.getOrElse { Note(id=0) }
- }
- @Override
- @Transactional(readOnly = true)
- override fun getNotesByAuthor(author: String): Option<NonEmptyList<Note>> {
- val noteList = noteRepository.findByAuthor(author)
- return NonEmptyList.fromList(noteList)
- }
- }
- package com.remusrd.notesample.service
- import org.springframework.kafka.annotation.KafkaListener
- import org.springframework.messaging.Message
- import org.springframework.stereotype.Component
- @Component
- class createdNotesConsumer {
- @KafkaListener(topics = ["notes"], groupId = "noteGroup")
- fun recieve(noteEvent: Message<Any>) {
- println("received" + noteEvent + noteEvent.javaClass)
- }
- }
- package com.remusrd.notesample.domain
- import java.time.LocalDateTime
- import javax.persistence.*
- @Entity
- @Table(name = "note")
- data class Note(
- @Id @GeneratedValue(strategy = GenerationType.IDENTITY)
- val id: Long,
- val content: String = "",
- val creationDate: LocalDateTime = LocalDateTime.now(),
- val lastModified: LocalDateTime = LocalDateTime.now(),
- val author: String = ""
- )
- buildscript {
- ext {
- kotlinVersion = "1.3.10"
- springBootVersion = "2.1.1.RELEASE"
- springCloudVersion = "Greenwich.M3"
- arrow_version = "0.8.1"
- }
- repositories {
- mavenCentral()
- }
- dependencies {
- classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
- classpath("org.jetbrains.kotlin:kotlin-gradle-plugin:${kotlinVersion}")
- classpath("org.jetbrains.kotlin:kotlin-allopen:${kotlinVersion}")
- classpath("org.jetbrains.kotlin:kotlin-noarg:${kotlinVersion}")
- classpath("io.spring.gradle:dependency-management-plugin:1.0.2.RELEASE")
- }
- }
- apply plugin: "kotlin"
- apply plugin: "kotlin-spring"
- apply plugin: "org.springframework.boot"
- apply plugin: "io.spring.dependency-management"
- apply plugin: "kotlin-allopen"
- apply plugin: "kotlin-noarg"
- apply plugin: "kotlin-jpa"
- group "com.remusrd"
- version "0.0.1-SNAPSHOT"
- sourceCompatibility = 1.8
- repositories {
- mavenCentral()
- maven { url 'http://repo.spring.io/milestone' }
- }
- noArg{
- annotation("com.remusrd.notesample.domain.annotation.NoArg")
- }
- allOpen{
- annotation("com.remusrd.notesample.domain.annotation.Open")
- }
- dependencies {
- // Kotlin
- implementation "org.jetbrains.kotlin:kotlin-stdlib"
- implementation "org.jetbrains.kotlin:kotlin-reflect"
- implementation "com.fasterxml.jackson.module:jackson-module-kotlin"
- implementation "io.arrow-kt:arrow-core:$arrow_version"
- implementation "io.arrow-kt:arrow-data:$arrow_version"
- // Spring Boot
- implementation "org.springframework.cloud:spring-cloud-starter-netflix-eureka-client"
- implementation "org.springframework.boot:spring-boot-starter-web:$springBootVersion"
- implementation "org.springframework.boot:spring-boot-starter-data-jpa:$springBootVersion"
- implementation "org.springframework.kafka:spring-kafka"
- implementation "com.fasterxml.jackson.module:jackson-module-kotlin:2.9.7"
- // BBDD
- implementation "mysql:mysql-connector-java:8.0.13"
- implementation "com.h2database:h2:1.4.197"
- // Test
- testImplementation "junit:junit:4.12"
- testImplementation("org.springframework.boot:spring-boot-starter-test")
- }
- dependencyManagement {
- imports {
- mavenBom "org.springframework.boot:spring-boot-dependencies:${springBootVersion}"
- mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
- }
- }
- compileKotlin {
- kotlinOptions {
- freeCompilerArgs = ["-Xjsr305=strict"]
- jvmTarget = "1.8"
- }
- }
- compileTestKotlin {
- kotlinOptions {
- freeCompilerArgs = ["-Xjsr305=strict"]
- jvmTarget = "1.8"
- }
- }
- 2018-12-05 16:48:56.884 ERROR 8331 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: null
- org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition notes-0 at offset 0. If needed, please seek past the record to continue consumption.
- Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[123, 34, 110, 111, 116, 101, 34, 58, 123, 34, 105, 100, 34, 58, 48, 44, 34, 99, 111, 110, 116, 101, 110, 116, 34, 58, 34, 72, 111, 108, 97, 32, 113, 117, -61, -87, 32, 116, 97, 108, 34, 44, 34, 99, 114, 101, 97, 116, 105, 111, 110, 68, 97, 116, 101, 34, 58, 34, 50, 48, 49, 56, 45, 49, 50, 45, 48, 53, 32, 49, 54, 58, 52, 53, 58, 53, 57, 34, 44, 34, 108, 97, 115, 116, 77, 111, 100, 105, 102, 105, 101, 100, 34, 58, 34, 50, 48, 49, 56, 45, 49, 50, 45, 48, 53, 32, 49, 54, 58, 52, 53, 58, 53, 57, 34, 44, 34, 97, 117, 116, 104, 111, 114, 34, 58, 34, 82, 105, 99, 104, 97, 114, 100, 34, 125, 125]] from topic [notes]
- Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `java.time.LocalDateTime` (no Creators, like default construct, exist): no String-argument constructor/factory method to deserialize from String value ('2018-12-05 16:45:59')
- at [Source: (byte[])"{"note":{"id":0,"content":"Hola qué tal","creationDate":"2018-12-05 16:45:59","lastModified":"2018-12-05 16:45:59","author":"Richard"}}"; line: 1, column: 58] (through reference chain: com.remusrd.notesample.domain.event.NoteEvent$Modified["note"]->com.remusrd.notesample.domain.Note["creationDate"])
- at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:67) ~[jackson-databind-2.9.7.jar:2.9.7]
- at com.fasterxml.jackson.databind.DeserializationContext.reportBadDefinition(DeserializationContext.java:1452) ~[jackson-databind-2.9.7.jar:2.9.7]
- at com.fasterxml.jackson.databind.DeserializationContext.handleMissingInstantiator(DeserializationContext.java:1028) ~[jackson-databind-2.9.7.jar:2.9.7]
- at com.fasterxml.jackson.databind.deser.ValueInstantiator._createFromStringFallbacks(ValueInstantiator.java:371) ~[jackson-databind-2.9.7.jar:2.9.7]
- at com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.createFromString(StdValueInstantiator.java:323) ~[jackson-databind-2.9.7.jar:2.9.7]
- at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromString(BeanDeserializerBase.java:1373) ~[jackson-databind-2.9.7.jar:2.9.7]
- at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:171) ~[jackson-databind-2.9.7.jar:2.9.7]
- at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:161) ~[jackson-databind-2.9.7.jar:2.9.7]
- at com.fasterxml.jackson.databind.deser.impl.FieldProperty.deserializeAndSet(FieldProperty.java:136) ~[jackson-databind-2.9.7.jar:2.9.7]
- at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:369) ~[jackson-databind-2.9.7.jar:2.9.7]
- at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:159) ~[jackson-databind-2.9.7.jar:2.9.7]
- at com.fasterxml.jackson.databind.deser.impl.FieldProperty.deserializeAndSet(FieldProperty.java:136) ~[jackson-databind-2.9.7.jar:2.9.7]
- at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:369) ~[jackson-databind-2.9.7.jar:2.9.7]
- at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:159) ~[jackson-databind-2.9.7.jar:2.9.7]
- at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:1611) ~[jackson-databind-2.9.7.jar:2.9.7]
- at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1234) ~[jackson-databind-2.9.7.jar:2.9.7]
- at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:328) ~[spring-kafka-2.2.2.RELEASE.jar:2.2.2.RELEASE]
- at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1041) ~[kafka-clients-2.0.1.jar:na]
- at org.apache.kafka.clients.consumer.internals.Fetcher.access$3300(Fetcher.java:110) ~[kafka-clients-2.0.1.jar:na]
- at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1223) ~[kafka-clients-2.0.1.jar:na]
- at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1400(Fetcher.java:1072) ~[kafka-clients-2.0.1.jar:na]
- at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:562) ~[kafka-clients-2.0.1.jar:na]
- at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:523) ~[kafka-clients-2.0.1.jar:na]
- at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1230) ~[kafka-clients-2.0.1.jar:na]
- at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187) ~[kafka-clients-2.0.1.jar:na]
- at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154) ~[kafka-clients-2.0.1.jar:na]
- at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:719) ~[spring-kafka-2.2.2.RELEASE.jar:2.2.2.RELEASE]
- at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:676) ~[spring-kafka-2.2.2.RELEASE.jar:2.2.2.RELEASE]
- at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_171]
- at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [na:1.8.0_171]
- at java.util.concurrent.FutureTask.run(FutureTask.java) [na:1.8.0_171]
- at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171]
Add Comment
Please, Sign In to add comment