Guest User

Untitled

a guest
Jan 22nd, 2018
158
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 8.80 KB | None | 0 0
  1. []2018-01-22T06:41:15,797Z INFO
  2. o.s.k.l.KafkaMessageListenerContainer - partitions revoked:[]
  3. []2018-01-22T06:41:15,828Z INFO o.s.k.l.KafkaMessageListenerContainer
  4. - partitions assigned:[com.combine.domain.addUser-0]
  5. []2018-01-22T06:42:30,962Z ERROR o.s.k.listener.LoggingErrorHandler -
  6. Error while processing: ConsumerRecord(topic =
  7. com.combine.domain.addUser, partition = 0,
  8. offset = 1, key = null, value = AddUserEventModel(name=Test-User-
  9. Kafka-2, address=Test-User-Kafka-2, age=26))
  10. org.springframework.kafka.KafkaException: No method found for class com.example.data.combine.eventmodel.AddUserEventModel
  11. at org.springframework.kafka.listener.adapter.DelegatingInvocableHandler.getHandlerForPayload(DelegatingInvocableHandler.java:92)
  12. at org.springframework.kafka.listener.adapter.DelegatingInvocableHandler.getMethodNameFor(DelegatingInvocableHandler.java:146)
  13. at org.springframework.kafka.listener.adapter.HandlerAdapter.getMethodAsString(HandlerAdapter.java:60)
  14. at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:131)
  15. at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:101)
  16. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:618)
  17. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$1500(KafkaMessageListenerContainer.java:236)
  18. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:797)
  19. at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
  20. at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  21. at java.lang.Thread.run(Thread.java:745)
  22. []2018-01-22T08:21:20,074Z INFO o.s.k.l.KafkaMessageListenerContainer-
  23. partitions revoked:[com.combine.domain.addUser-0]
  24. []2018-01-22T08:21:20,081Z INFO o.s.k.l.KafkaMessageListenerContainer-
  25. partitions assigned:[com.combine.domain.addUser-0]
  26.  
  27. []2018-01-22T06:42:30,800Z INFO c.e.d.c.controller.MongoController
  28. - # send user by kafka model : AddUserEventModel(name=Test-User-
  29. Kafka-2, address=Test-User-Kafka-2, age=26) with parameter
  30. []2018-01-22T06:42:30,800Z INFO c.e.d.c.publisher.AddUserPublished
  31. - #sending addUserEventModel
  32. []2018-01-22T06:42:30,809Z INFO o.a.k.c.producer.ProducerConfig -
  33. ProducerConfig values:
  34. compression.type = none
  35. metric.reporters = []
  36. metadata.max.age.ms = 300000
  37. metadata.fetch.timeout.ms = 60000
  38. reconnect.backoff.ms = 50
  39. sasl.kerberos.ticket.renew.window.factor = 0.8
  40. bootstrap.servers = [localhost:9092]
  41. retry.backoff.ms = 100
  42. sasl.kerberos.kinit.cmd = /usr/bin/kinit
  43. buffer.memory = 33554432
  44. timeout.ms = 30000
  45. key.serializer = class org.springframework.kafka.support.serializer.JsonSerializer
  46. sasl.kerberos.service.name = null
  47. sasl.kerberos.ticket.renew.jitter = 0.05
  48. ssl.keystore.type = JKS
  49. ssl.trustmanager.algorithm = PKIX
  50. block.on.buffer.full = false
  51. ssl.key.password = null
  52. max.block.ms = 60000
  53. sasl.kerberos.min.time.before.relogin = 60000
  54. connections.max.idle.ms = 540000
  55. ssl.truststore.password = null
  56. max.in.flight.requests.per.connection = 5
  57. metrics.num.samples = 2
  58. client.id =
  59. ssl.endpoint.identification.algorithm = null
  60. ssl.protocol = TLS
  61. request.timeout.ms = 30000
  62. ssl.provider = null
  63. ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
  64. acks = 1
  65. batch.size = 16384
  66. ssl.keystore.location = null
  67. receive.buffer.bytes = 32768
  68. ssl.cipher.suites = null
  69. ssl.truststore.type = JKS
  70. security.protocol = PLAINTEXT
  71. retries = 0
  72. max.request.size = 1048576
  73. value.serializer = class org.springframework.kafka.support.serializer.JsonSerializer
  74. ssl.truststore.location = null
  75. ssl.keystore.password = null
  76. ssl.keymanager.algorithm = SunX509
  77. metrics.sample.window.ms = 30000
  78. partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
  79. send.buffer.bytes = 131072
  80. linger.ms = 0
  81. []2018-01-22T06:42:30,832Z INFO o.a.k.common.utils.AppInfoParser -
  82. Kafka version : 0.9.0.1
  83. []2018-01-22T06:42:30,833Z INFO o.a.k.common.utils.AppInfoParser -
  84. Kafka commitId : 23c69d62a0cabf06
  85. []2018-01-22T08:21:15,047Z INFO o.a.k.c.c.i.AbstractCoordinator -
  86. Marking the coordinator 2147483647 dead.
  87. []2018-01-22T08:21:19,485Z ERROR o.a.k.c.c.i.ConsumerCoordinator -
  88. Error UNKNOWN_MEMBER_ID occurred while committing offsets for group
  89. com.combine.domain.addUser
  90. []2018-01-22T08:21:19,486Z WARN o.a.k.c.c.i.ConsumerCoordinator -
  91. Auto offset commit failed: Commit cannot be completed due to group
  92. rebalance
  93. []2018-01-22T08:21:19,487Z ERROR o.a.k.c.c.i.ConsumerCoordinator -
  94. Error UNKNOWN_MEMBER_ID occurred while committing offsets for
  95. group com.combine.domain.addUser
  96. []2018-01-22T08:21:19,487Z WARN o.a.k.c.c.i.ConsumerCoordinator -
  97. Auto offset commit failed:
  98. []2018-01-22T08:21:20,075Z INFO o.a.k.c.c.i.AbstractCoordinator -
  99. Attempt to join group com.combine.domain.addUser failed due to
  100. unknown member id, resetting and retrying.
  101.  
  102. <dependency>
  103. <groupId>org.springframework.kafka</groupId>
  104. <artifactId>spring-kafka</artifactId>
  105. <version>${spring-kafka-version}</version>
  106. </dependency>
  107.  
  108. @Configuration
  109. public class BaseKafkaConfiguration {
  110.  
  111. @Value("${spring.kafka.bootstrap-servers}")
  112. private String servers;
  113.  
  114. @Bean
  115. public ProducerFactory<String, AddUserEventModel> producerFactory() {
  116. Map<String, Object> property = new HashMap<>();
  117. property.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
  118. property.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  119. JsonSerializer.class);
  120. property.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  121. JsonSerializer.class);
  122. return new DefaultKafkaProducerFactory<>(property);
  123. }
  124. @Bean
  125. public KafkaTemplate<String, AddUserEventModel> kafkaTemplate() {
  126. return new KafkaTemplate<>(producerFactory());
  127. }
  128. }
  129.  
  130. @EnableKafka
  131. @Configuration
  132. public class BasicConsumerConfig {
  133.  
  134. @Value("${spring.kafka.bootstrap-servers}")
  135. private String servers;
  136.  
  137. public ConsumerFactory<String, AddUserEventModel>
  138. kafkaConsumerFactory() {
  139. Map<String, Object> props = new HashMap<>();
  140. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
  141. props.put(ConsumerConfig.GROUP_ID_CONFIG,
  142. DomainEventNames.COM_COMBINE_DOMAIN_ADD_USER);
  143. return new DefaultKafkaConsumerFactory<>(props, new
  144. StringDeserializer(), new JsonDeserializer<>(AddUserEventModel.class));
  145. }
  146.  
  147. @Bean
  148. public ConcurrentKafkaListenerContainerFactory<String,
  149. AddUserEventModel> containerFactory() {
  150. ConcurrentKafkaListenerContainerFactory<String, AddUserEventModel>
  151. factory =new ConcurrentKafkaListenerContainerFactory<>();
  152. factory.setConsumerFactory(kafkaConsumerFactory());
  153. return factory;
  154. }
  155. }
  156.  
  157. @Slf4j
  158. @Component
  159. public class AddUserPublished {
  160. @Autowired
  161. private KafkaTemplate<String, AddUserEventModel> kafkaTemplate;
  162. public void publish(AddUserEventModel addUserEventModel) {
  163. log.info("#sending addUserEventModel");
  164. kafkaTemplate.send(DomainEventNames.COM_COMBINE_DOMAIN_ADD_USER,
  165. addUserEventModel);
  166. try {
  167. TimeUnit.MILLISECONDS.sleep(2000);
  168. } catch (InterruptedException e) {
  169. log.error("exception at addUserEventModel while thread sleep", e);
  170. }
  171. }
  172. }
  173.  
  174. @Component
  175. @KafkaListener(topics =
  176. DomainEventNames.COM_COMBINE_DOMAIN_ADD_USER, containerFactory =
  177. "containerFactory")
  178. @Slf4j
  179. public class AddUserConsumer {
  180. @Autowired
  181. private MongoUserRepository mongoUserRepository;
  182. @Autowired
  183. private ObjectMapper objectMapper;
  184. public void addUserConsumer(AddUserEventModel addUserEventModel)
  185. {
  186. log.info("#AdduserConsumer consuming addUserEventModel :
  187. {} ", addUserEventModel);
  188. try {
  189. MongoUser mongoUser = new MongoUser();
  190. BeanUtils.copyProperties(addUserEventModel, mongoUser);
  191. this.mongoUserRepository.save(mongoUser);
  192. log.info("#SuccessFully saved consumed object : {}",
  193. mongoUser);
  194. } catch (Exception e) {
  195. log.error("#AddUserConsumer exception during consume
  196. addUserEventModel : {}, with error : {}", addUserEventModel,
  197. e);
  198. }
  199. }
  200. }
  201.  
  202. public final class DomainEventNames {
  203. public static final String COM_COMBINE_DOMAIN_ADD_USER =
  204. "com.combine.domain.addUser";
  205. }
  206.  
  207. spring.kafka.bootstrap-servers=localhost:9092
  208.  
  209. com.combine.domain.addUser
Add Comment
Please, Sign In to add comment