Advertisement
Guest User

Untitled

a guest
Jun 26th, 2019
65
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 6.17 KB | None | 0 0
  1. import akka.actor.ActorSystem
  2. import akka.kafka.scaladsl.Consumer
  3. import akka.kafka.{ConsumerSettings, Subscriptions}
  4. import akka.stream.ActorMaterializer
  5. import akka.stream.scaladsl.Sink
  6. import org.apache.kafka.clients.consumer.ConsumerConfig
  7. import org.apache.kafka.common.serialization.StringDeserializer
  8.  
  9. import scala.util.{Failure, Success}
  10.  
  11. object App {
  12. def main(args: Array[String]): Unit = {
  13.  
  14.  
  15. implicit val system = ActorSystem("SAP-SENDER")
  16. implicit val executor = system.dispatcher
  17. implicit val materilizer = ActorMaterializer()
  18.  
  19. val config = system.settings.config.getConfig("akka.kafka.consumer")
  20.  
  21. val consumerSettings: ConsumerSettings[String, String] =
  22. ConsumerSettings(config, new StringDeserializer, new StringDeserializer)
  23. .withBootstrapServers("localhost:9003")
  24. .withGroupId("SAPSENDER")
  25. .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
  26.  
  27. Consumer
  28. .plainSource(
  29. consumerSettings,
  30. Subscriptions.topics("TEST-TOPIC")
  31. )
  32. .runWith(Sink.foreach(println))
  33. .onComplete{
  34. case Success(_) => println("Goood")
  35. case Failure(ex) =>
  36. println(s"I am failed ==============> ${ex.getMessage}")
  37. system.terminate()
  38. }
  39.  
  40. }
  41. }
  42.  
  43. 19:03:47.342 [SAP-SENDER-akka.kafka.default-dispatcher-15] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-1, groupId=SAPSENDER] Pausing partitions []
  44. 19:03:47.342 [SAP-SENDER-akka.kafka.default-dispatcher-15] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=SAPSENDER] No broker available to send FindCoordinator request
  45. 19:03:47.342 [SAP-SENDER-akka.kafka.default-dispatcher-15] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=SAPSENDER] Give up sending metadata request since no node is available
  46. 19:03:47.342 [SAP-SENDER-akka.kafka.default-dispatcher-15] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=SAPSENDER] Coordinator discovery failed, refreshing metadata
  47. 19:03:47.342 [SAP-SENDER-akka.kafka.default-dispatcher-15] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=SAPSENDER] Give up sending metadata request since no node is available
  48. 19:03:47.412 [SAP-SENDER-akka.kafka.default-dispatcher-17] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-1, groupId=SAPSENDER] Pausing partitions []
  49. 19:03:47.412 [SAP-SENDER-akka.kafka.default-dispatcher-17] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=SAPSENDER] No broker available to send FindCoordinator request
  50. 19:03:47.412 [SAP-SENDER-akka.kafka.default-dispatcher-17] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=SAPSENDER] Give up sending metadata request since no node is available
  51. 19:03:47.412 [SAP-SENDER-akka.kafka.default-dispatcher-17] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=SAPSENDER] Coordinator discovery failed, refreshing metadata
  52. 19:03:47.412 [SAP-SENDER-akka.kafka.default-dispatcher-17] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=SAPSENDER] Give up sending metadata request since no node is available
  53. 19:03:47.478 [SAP-SENDER-akka.kafka.default-dispatcher-20] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-1, groupId=SAPSENDER] Pausing partitions []
  54.  
  55. java.net.ConnectException: Connection refused
  56. at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
  57. at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
  58. at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50)
  59. at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:173)
  60. at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:515)
  61. at org.apache.kafka.common.network.Selector.poll(Selector.java:467)
  62. at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535)
  63. at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
  64. at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
  65. at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
  66. at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:231)
  67. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:316)
  68. at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1214)
  69. at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179)
  70. at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164)
  71. at akka.kafka.internal.KafkaConsumerActor.poll(KafkaConsumerActor.scala:380)
  72. at akka.kafka.internal.KafkaConsumerActor.akka$kafka$internal$KafkaConsumerActor$$receivePoll(KafkaConsumerActor.scala:360)
  73. at akka.kafka.internal.KafkaConsumerActor$$anonfun$receive$1.applyOrElse(KafkaConsumerActor.scala:221)
  74. at akka.actor.Actor.aroundReceive(Actor.scala:539)
  75. at akka.actor.Actor.aroundReceive$(Actor.scala:537)
  76. at akka.kafka.internal.KafkaConsumerActor.akka$actor$Timers$$super$aroundReceive(KafkaConsumerActor.scala:142)
  77. at akka.actor.Timers.aroundReceive(Timers.scala:51)
  78. at akka.actor.Timers.aroundReceive$(Timers.scala:40)
  79. at akka.kafka.internal.KafkaConsumerActor.aroundReceive(KafkaConsumerActor.scala:142)
  80. at akka.actor.ActorCell.receiveMessage(ActorCell.scala:610)
  81. at akka.actor.ActorCell.invoke(ActorCell.scala:579)
  82. at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
  83. at akka.dispatch.Mailbox.run(Mailbox.scala:229)
  84. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  85. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  86. at java.lang.Thread.run(Thread.java:748)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement