Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- using (var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { SchemaRegistryUrl = schemaRegistryUrl }))
- using (var consumer = new
- ConsumerBuilder<string, GenericRecord>(new ConsumerConfig { BootstrapServers = bootstrapServers, GroupId = groupName })
- .SetKeyDeserializer(new AsyncAvroDeserializer<string>(schemaRegistry).AsSyncOverAsync())
- .SetValueDeserializer(new AsyncAvroDeserializer<GenericRecord>(schemaRegistry).AsSyncOverAsync())
- .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
- .Build())
- {
- consumer.Subscribe(topicName);
- try
- {
- while (true)
- {
- try
- {
- var consumeResult = consumer.Consume();
- Console.WriteLine($"Key: {consumeResult.Message.Key}nValue: {consumeResult.Value}");
- Console.WriteLine(consumeResult.Value.Schema);
- Console.WriteLine(consumeResult.Value.Schema["favorite_number"]);
- }
- catch (ConsumeException e)
- {
- Console.WriteLine($"Consume error: {e.Error.Reason}");
- }
- }
- }
- catch (OperationCanceledException)
- {
- // commit final offsets and leave the group.
- consumer.Close();
- }
- }
- Console.WriteLine(consumeResult.Value.Schema);
- Console.WriteLine(consumeResult.Value.Schema["favorite_number"]);
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement