Advertisement
Guest User

Untitled

a guest
Jun 26th, 2019
81
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.88 KB | None | 0 0
  1. using (var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { SchemaRegistryUrl = schemaRegistryUrl }))
  2. using (var consumer = new
  3. ConsumerBuilder<string, GenericRecord>(new ConsumerConfig { BootstrapServers = bootstrapServers, GroupId = groupName })
  4. .SetKeyDeserializer(new AsyncAvroDeserializer<string>(schemaRegistry).AsSyncOverAsync())
  5. .SetValueDeserializer(new AsyncAvroDeserializer<GenericRecord>(schemaRegistry).AsSyncOverAsync())
  6. .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
  7. .Build())
  8. {
  9. consumer.Subscribe(topicName);
  10.  
  11. try
  12. {
  13. while (true)
  14. {
  15. try
  16. {
  17. var consumeResult = consumer.Consume();
  18. Console.WriteLine($"Key: {consumeResult.Message.Key}nValue: {consumeResult.Value}");
  19. Console.WriteLine(consumeResult.Value.Schema);
  20. Console.WriteLine(consumeResult.Value.Schema["favorite_number"]);
  21.  
  22. }
  23. catch (ConsumeException e)
  24. {
  25. Console.WriteLine($"Consume error: {e.Error.Reason}");
  26. }
  27. }
  28. }
  29. catch (OperationCanceledException)
  30. {
  31. // commit final offsets and leave the group.
  32. consumer.Close();
  33. }
  34. }
  35.  
  36. Console.WriteLine(consumeResult.Value.Schema);
  37. Console.WriteLine(consumeResult.Value.Schema["favorite_number"]);
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement