Guest User

Untitled

a guest
Dec 5th, 2018
155
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 13.96 KB | None | 0 0
  1. @Autowired
  2. lateinit var jackson2ObjectMapperBuilder: Jackson2ObjectMapperBuilder
  3. @Autowired
  4. lateinit var kafkaProperties: KafkaProperties
  5.  
  6. @Bean
  7. fun jackson2ObjectMapperBuilderCustomizer(): Jackson2ObjectMapperBuilderCustomizer {
  8. return Jackson2ObjectMapperBuilderCustomizer { jacksonObjectMapperBuilder ->
  9. jacksonObjectMapperBuilder.deserializerByType(LocalDateTime::class.java , LocalDateTimeDeserializer(
  10. DateTimeFormatter.ISO_DATE) )
  11. jacksonObjectMapperBuilder.modulesToInstall(JavaTimeModule(), KotlinModule())
  12. }
  13. }
  14.  
  15. @Bean
  16. fun kafkaTemplate(): KafkaTemplate<String, NoteEvent> {
  17. return KafkaTemplate<String, NoteEvent>(kafkaProducerFactory())
  18. }
  19.  
  20. @Bean
  21. fun kafkaConsumerFactory(): ConsumerFactory<String, NoteEvent> {
  22. val objectMapper = jackson2ObjectMapperBuilder.build() as ObjectMapper
  23. objectMapper.registerModule(JavaTimeModule())
  24. val jsonDeserializer = JsonDeserializer<NoteEvent>(objectMapper)
  25. jsonDeserializer.configure(kafkaProperties.buildConsumerProperties(), false)
  26. return DefaultKafkaConsumerFactory<String, NoteEvent>(
  27. kafkaProperties.buildConsumerProperties(),
  28. StringDeserializer(),
  29. jsonDeserializer
  30. )
  31. }
  32.  
  33. @Bean
  34. fun kafkaProducerFactory(): ProducerFactory<String, NoteEvent> {
  35. val jsonSerializer = JsonSerializer<NoteEvent>(jackson2ObjectMapperBuilder.build())
  36. jsonSerializer.configure(kafkaProperties.buildProducerProperties(), false)
  37. return DefaultKafkaProducerFactory<String, NoteEvent>(
  38. kafkaProperties.buildProducerProperties(),
  39. StringSerializer() , jsonSerializer
  40. )
  41. }
  42.  
  43. @Bean
  44. fun defaultKafkaConsumerFactory(): ConsumerFactory<Any, Any> {
  45. val objectMapper = jackson2ObjectMapperBuilder.build() as ObjectMapper
  46. objectMapper.registerModule(JavaTimeModule())
  47. val jsonDeserializer = JsonDeserializer<Any>(objectMapper)
  48. jsonDeserializer.configure(kafkaProperties.buildConsumerProperties(), false)
  49. val kafkaConsumerFactory = DefaultKafkaConsumerFactory<Any, Any>(
  50. kafkaProperties.buildConsumerProperties()
  51. )
  52. kafkaConsumerFactory.setKeyDeserializer(jsonDeserializer)
  53. return kafkaConsumerFactory
  54. }
  55.  
  56. @Bean
  57. fun defaultKafkaProducerFactory(): ProducerFactory<Any, Any> {
  58. val factory = DefaultKafkaProducerFactory<Any, Any>(
  59. kafkaProperties.buildProducerProperties()
  60. )
  61. val transactionIdPrefix = kafkaProperties.producer
  62. .transactionIdPrefix
  63. if (transactionIdPrefix != null) {
  64. factory.setTransactionIdPrefix(transactionIdPrefix)
  65. }
  66. return factory
  67. }
  68. }
  69.  
  70. spring.datasource.url: jdbc:mysql://localhost:3306/notes
  71. spring.datasource.username: root
  72. spring.datasource.password:
  73.  
  74. logging.level.org.hibernate.SQL: debug
  75.  
  76. spring.jpa.database: MYSQL
  77. spring.jpa.open-in-view: true
  78. spring.jpa.show-sql: true
  79. spring.data.jpa.repositories.bootstrap-mode: default
  80.  
  81. spring.jpa.database-platform: org.hibernate.dialect.MySQL5Dialect
  82. spring.jpa.hibernate.ddl-auto: update
  83.  
  84. logging.level.org.springframework: DEBUG
  85. spring:
  86. datasource:
  87. driver-class-name: com.mysql.cj.jdbc.Driver
  88.  
  89. spring.kafka.bootstrap-servers: 192.168.169.22:9092
  90. spring.kafka.consumer.group-id: noteGroup
  91. spring.kafka.consumer.auto-offset-reset: earliest
  92. spring.kafka.consumer.properties.spring.json.trusted.packages: com.remusrd.notesample.domain.event
  93. spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
  94. spring.kafka.consumer.properties.spring.json.add.type.headers: true
  95. spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
  96.  
  97. package com.remusrd.notesample.service
  98.  
  99. import arrow.core.Option
  100. import arrow.core.getOrElse
  101. import arrow.data.NonEmptyList
  102. import com.remusrd.notesample.data.NoteRepository
  103. import com.remusrd.notesample.domain.Note
  104. import com.remusrd.notesample.domain.event.NoteEvent
  105. import org.springframework.beans.factory.annotation.Autowired
  106. import org.springframework.kafka.core.KafkaTemplate
  107. import org.springframework.stereotype.Service
  108. import org.springframework.transaction.annotation.Transactional
  109.  
  110.  
  111. @Service
  112. @Transactional
  113. class JpaNoteService : NoteService {
  114. val TOPIC_NAME = "notes"
  115.  
  116. @Autowired
  117. private lateinit var noteRepository: NoteRepository
  118. @Autowired
  119. private lateinit var kafkaTemplate: KafkaTemplate<String, NoteEvent>
  120.  
  121. override fun getAllNotes(): Option<NonEmptyList<Note>> =
  122. NonEmptyList.fromList(noteRepository.findAll())
  123.  
  124. override fun createNote(note: Option<Note>) : Note {
  125. note.map {
  126. kafkaTemplate.send(TOPIC_NAME, NoteEvent.Created(it))
  127. }
  128. return note.getOrElse { Note(id=0) }
  129. }
  130.  
  131. @Override
  132. @Transactional(readOnly = true)
  133. override fun getNotesByAuthor(author: String): Option<NonEmptyList<Note>> {
  134. val noteList = noteRepository.findByAuthor(author)
  135. return NonEmptyList.fromList(noteList)
  136. }
  137. }
  138.  
  139. package com.remusrd.notesample.service
  140.  
  141. import org.springframework.kafka.annotation.KafkaListener
  142. import org.springframework.messaging.Message
  143. import org.springframework.stereotype.Component
  144.  
  145. @Component
  146. class createdNotesConsumer {
  147.  
  148.  
  149. @KafkaListener(topics = ["notes"], groupId = "noteGroup")
  150. fun recieve(noteEvent: Message<Any>) {
  151. println("received" + noteEvent + noteEvent.javaClass)
  152. }
  153. }
  154.  
  155. package com.remusrd.notesample.domain
  156.  
  157. import java.time.LocalDateTime
  158. import javax.persistence.*
  159.  
  160. @Entity
  161. @Table(name = "note")
  162. data class Note(
  163. @Id @GeneratedValue(strategy = GenerationType.IDENTITY)
  164. val id: Long,
  165. val content: String = "",
  166. val creationDate: LocalDateTime = LocalDateTime.now(),
  167. val lastModified: LocalDateTime = LocalDateTime.now(),
  168. val author: String = ""
  169. )
  170.  
  171. buildscript {
  172. ext {
  173. kotlinVersion = "1.3.10"
  174. springBootVersion = "2.1.1.RELEASE"
  175. springCloudVersion = "Greenwich.M3"
  176. arrow_version = "0.8.1"
  177. }
  178. repositories {
  179. mavenCentral()
  180. }
  181. dependencies {
  182. classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
  183. classpath("org.jetbrains.kotlin:kotlin-gradle-plugin:${kotlinVersion}")
  184. classpath("org.jetbrains.kotlin:kotlin-allopen:${kotlinVersion}")
  185. classpath("org.jetbrains.kotlin:kotlin-noarg:${kotlinVersion}")
  186. classpath("io.spring.gradle:dependency-management-plugin:1.0.2.RELEASE")
  187. }
  188. }
  189.  
  190. apply plugin: "kotlin"
  191. apply plugin: "kotlin-spring"
  192. apply plugin: "org.springframework.boot"
  193. apply plugin: "io.spring.dependency-management"
  194. apply plugin: "kotlin-allopen"
  195. apply plugin: "kotlin-noarg"
  196. apply plugin: "kotlin-jpa"
  197.  
  198.  
  199. group "com.remusrd"
  200. version "0.0.1-SNAPSHOT"
  201.  
  202. sourceCompatibility = 1.8
  203.  
  204. repositories {
  205. mavenCentral()
  206. maven { url 'http://repo.spring.io/milestone' }
  207. }
  208. noArg{
  209. annotation("com.remusrd.notesample.domain.annotation.NoArg")
  210. }
  211. allOpen{
  212. annotation("com.remusrd.notesample.domain.annotation.Open")
  213. }
  214.  
  215. dependencies {
  216. // Kotlin
  217. implementation "org.jetbrains.kotlin:kotlin-stdlib"
  218. implementation "org.jetbrains.kotlin:kotlin-reflect"
  219. implementation "com.fasterxml.jackson.module:jackson-module-kotlin"
  220. implementation "io.arrow-kt:arrow-core:$arrow_version"
  221. implementation "io.arrow-kt:arrow-data:$arrow_version"
  222.  
  223. // Spring Boot
  224. implementation "org.springframework.cloud:spring-cloud-starter-netflix-eureka-client"
  225. implementation "org.springframework.boot:spring-boot-starter-web:$springBootVersion"
  226. implementation "org.springframework.boot:spring-boot-starter-data-jpa:$springBootVersion"
  227. implementation "org.springframework.kafka:spring-kafka"
  228. implementation "com.fasterxml.jackson.module:jackson-module-kotlin:2.9.7"
  229.  
  230.  
  231.  
  232. // BBDD
  233. implementation "mysql:mysql-connector-java:8.0.13"
  234. implementation "com.h2database:h2:1.4.197"
  235.  
  236.  
  237. // Test
  238. testImplementation "junit:junit:4.12"
  239. testImplementation("org.springframework.boot:spring-boot-starter-test")
  240. }
  241.  
  242. dependencyManagement {
  243. imports {
  244. mavenBom "org.springframework.boot:spring-boot-dependencies:${springBootVersion}"
  245. mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
  246. }
  247. }
  248.  
  249. compileKotlin {
  250. kotlinOptions {
  251. freeCompilerArgs = ["-Xjsr305=strict"]
  252. jvmTarget = "1.8"
  253. }
  254. }
  255. compileTestKotlin {
  256. kotlinOptions {
  257. freeCompilerArgs = ["-Xjsr305=strict"]
  258. jvmTarget = "1.8"
  259. }
  260. }
  261.  
  262. 2018-12-05 16:48:56.884 ERROR 8331 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: null
  263.  
  264. org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition notes-0 at offset 0. If needed, please seek past the record to continue consumption.
  265. Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[123, 34, 110, 111, 116, 101, 34, 58, 123, 34, 105, 100, 34, 58, 48, 44, 34, 99, 111, 110, 116, 101, 110, 116, 34, 58, 34, 72, 111, 108, 97, 32, 113, 117, -61, -87, 32, 116, 97, 108, 34, 44, 34, 99, 114, 101, 97, 116, 105, 111, 110, 68, 97, 116, 101, 34, 58, 34, 50, 48, 49, 56, 45, 49, 50, 45, 48, 53, 32, 49, 54, 58, 52, 53, 58, 53, 57, 34, 44, 34, 108, 97, 115, 116, 77, 111, 100, 105, 102, 105, 101, 100, 34, 58, 34, 50, 48, 49, 56, 45, 49, 50, 45, 48, 53, 32, 49, 54, 58, 52, 53, 58, 53, 57, 34, 44, 34, 97, 117, 116, 104, 111, 114, 34, 58, 34, 82, 105, 99, 104, 97, 114, 100, 34, 125, 125]] from topic [notes]
  266. Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `java.time.LocalDateTime` (no Creators, like default construct, exist): no String-argument constructor/factory method to deserialize from String value ('2018-12-05 16:45:59')
  267. at [Source: (byte[])"{"note":{"id":0,"content":"Hola qué tal","creationDate":"2018-12-05 16:45:59","lastModified":"2018-12-05 16:45:59","author":"Richard"}}"; line: 1, column: 58] (through reference chain: com.remusrd.notesample.domain.event.NoteEvent$Modified["note"]->com.remusrd.notesample.domain.Note["creationDate"])
  268. at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:67) ~[jackson-databind-2.9.7.jar:2.9.7]
  269. at com.fasterxml.jackson.databind.DeserializationContext.reportBadDefinition(DeserializationContext.java:1452) ~[jackson-databind-2.9.7.jar:2.9.7]
  270. at com.fasterxml.jackson.databind.DeserializationContext.handleMissingInstantiator(DeserializationContext.java:1028) ~[jackson-databind-2.9.7.jar:2.9.7]
  271. at com.fasterxml.jackson.databind.deser.ValueInstantiator._createFromStringFallbacks(ValueInstantiator.java:371) ~[jackson-databind-2.9.7.jar:2.9.7]
  272. at com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.createFromString(StdValueInstantiator.java:323) ~[jackson-databind-2.9.7.jar:2.9.7]
  273. at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromString(BeanDeserializerBase.java:1373) ~[jackson-databind-2.9.7.jar:2.9.7]
  274. at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:171) ~[jackson-databind-2.9.7.jar:2.9.7]
  275. at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:161) ~[jackson-databind-2.9.7.jar:2.9.7]
  276. at com.fasterxml.jackson.databind.deser.impl.FieldProperty.deserializeAndSet(FieldProperty.java:136) ~[jackson-databind-2.9.7.jar:2.9.7]
  277. at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:369) ~[jackson-databind-2.9.7.jar:2.9.7]
  278. at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:159) ~[jackson-databind-2.9.7.jar:2.9.7]
  279. at com.fasterxml.jackson.databind.deser.impl.FieldProperty.deserializeAndSet(FieldProperty.java:136) ~[jackson-databind-2.9.7.jar:2.9.7]
  280. at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:369) ~[jackson-databind-2.9.7.jar:2.9.7]
  281. at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:159) ~[jackson-databind-2.9.7.jar:2.9.7]
  282. at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:1611) ~[jackson-databind-2.9.7.jar:2.9.7]
  283. at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1234) ~[jackson-databind-2.9.7.jar:2.9.7]
  284. at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:328) ~[spring-kafka-2.2.2.RELEASE.jar:2.2.2.RELEASE]
  285. at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1041) ~[kafka-clients-2.0.1.jar:na]
  286. at org.apache.kafka.clients.consumer.internals.Fetcher.access$3300(Fetcher.java:110) ~[kafka-clients-2.0.1.jar:na]
  287. at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1223) ~[kafka-clients-2.0.1.jar:na]
  288. at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1400(Fetcher.java:1072) ~[kafka-clients-2.0.1.jar:na]
  289. at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:562) ~[kafka-clients-2.0.1.jar:na]
  290. at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:523) ~[kafka-clients-2.0.1.jar:na]
  291. at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1230) ~[kafka-clients-2.0.1.jar:na]
  292. at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187) ~[kafka-clients-2.0.1.jar:na]
  293. at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154) ~[kafka-clients-2.0.1.jar:na]
  294. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:719) ~[spring-kafka-2.2.2.RELEASE.jar:2.2.2.RELEASE]
  295. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:676) ~[spring-kafka-2.2.2.RELEASE.jar:2.2.2.RELEASE]
  296. at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_171]
  297. at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [na:1.8.0_171]
  298. at java.util.concurrent.FutureTask.run(FutureTask.java) [na:1.8.0_171]
  299. at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171]
Add Comment
Please, Sign In to add comment