Advertisement
Guest User

Untitled

a guest
Jun 18th, 2019
85
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.16 KB | None | 0 0
  1. static void Main(string[] args)
  2. {
  3. string bootstrapServers = "localhost:9092";
  4. string schemaRegistryUrl = "Production163:8081";
  5. string topicName = "player";
  6. string groupName = "avro-generic-example-group";
  7.  
  8. //CancellationTokenSource cts = new CancellationTokenSource();
  9. //var consumeTask = Task.Run(() =>
  10. //{
  11. using (var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { SchemaRegistryUrl = schemaRegistryUrl }))
  12. using (var consumer =
  13. new
  14. ConsumerBuilder<string, GenericRecord>(new ConsumerConfig { BootstrapServers = bootstrapServers, GroupId = groupName })
  15. .SetKeyDeserializer(new AsyncAvroDeserializer<string>(schemaRegistry).AsSyncOverAsync())
  16. .SetValueDeserializer(new AsyncAvroDeserializer<GenericRecord>(schemaRegistry).AsSyncOverAsync())
  17. .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
  18. .Build())
  19. {
  20. consumer.Subscribe(topicName);
  21.  
  22. try
  23. {
  24. while (true)
  25. {
  26. try
  27. {
  28. var consumeResult = consumer.Consume();
  29. Console.WriteLine($"Key: {consumeResult.Message.Key}nValue: {consumeResult.Value}");
  30. Console.WriteLine(consumeResult.Value.Schema);
  31. Console.WriteLine(consumeResult.Value.Schema["favorite_number"].GetProperty("favorite_number"));
  32. Console.WriteLine(consumeResult.Value.Schema["favorite_number"]);
  33.  
  34. Avro.Field f;
  35.  
  36. //consumeResult.Value
  37.  
  38. consumeResult.Value.Schema.TryGetField("favorite_number", out f);
  39.  
  40. Console.WriteLine(consumeResult.Value["favorite_number"]);
  41. Console.WriteLine(consumeResult.Value["name"]);
  42.  
  43. //consumeResult.Message.Value.TryGetValue("favorite_number");
  44.  
  45. //Console.WriteLine(f.);
  46.  
  47. }
  48. catch (ConsumeException e)
  49. {
  50. Console.WriteLine($"Consume error: {e.Error.Reason}");
  51. }
  52. }
  53. }
  54. catch (OperationCanceledException)
  55. {
  56. // commit final offsets and leave the group.
  57. consumer.Close();
  58. }
  59. }
  60. //});
  61. }
  62.  
  63. string bootstrapServers = "localhost:9092";
  64. string schemaRegistryUrl = "Production163:8081";
  65. string topicName = "player";
  66. string groupName = "avro-generic-example-group";
  67.  
  68. //CancellationTokenSource cts = new CancellationTokenSource();
  69. //var consumeTask = Task.Run(() =>
  70. //{
  71. using (var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { SchemaRegistryUrl = schemaRegistryUrl }))
  72. using (
  73. var consumer = new ConsumerBuilder<string, GenericRecord>(new ConsumerConfig { BootstrapServers = bootstrapServers, GroupId = groupName })
  74. .SetKeyDeserializer(new AsyncAvroDeserializer<string>(schemaRegistry).AsSyncOverAsync())
  75. .SetValueDeserializer(new AsyncAvroDeserializer<GenericRecord>(schemaRegistry).AsSyncOverAsync())
  76. .SetErrorHandler((_, e) => Debug.Log($"Error: {e.Reason}"))
  77. .Build())
  78. {
  79. Debug.Log("subscribe");
  80. consumer.Subscribe(topicName);
  81.  
  82. //try
  83. //{
  84. while (true)
  85. {
  86. //try
  87. //{
  88. //CancellationTokenSource cts = new CancellationTokenSource();
  89. //cts.Token
  90. var consumeResult = consumer.Consume(TimeSpan.FromMilliseconds(1000));//TimeSpan.FromMilliseconds(50000)
  91. Debug.Log($"Key: {consumeResult.Message.Key}nValue: {consumeResult.Value}");
  92. Debug.Log(consumeResult.Value.Schema);
  93. Debug.Log(consumeResult.Value.Schema["favorite_number"].GetProperty("favorite_number"));
  94. Debug.Log(consumeResult.Value.Schema["favorite_number"]);
  95.  
  96. Avro.Field f;
  97.  
  98. //consumeResult.Value
  99.  
  100. consumeResult.Value.Schema.TryGetField("favorite_number", out f);
  101.  
  102. Debug.Log(consumeResult.Value["favorite_number"]);
  103. Debug.Log(consumeResult.Value["name"]);
  104.  
  105. //consumeResult.Message.Value.TryGetValue("favorite_number");
  106.  
  107. //Debug.Log(f.);
  108. yield return new WaitForSeconds(1);
  109.  
  110. //}
  111. //catch (ConsumeException e)
  112. //{
  113. // Debug.Log($"Consume error: {e.Error.Reason}");
  114. //}
  115.  
  116. }
  117. //}
  118. //catch (OperationCanceledException)
  119. //{
  120. // // commit final offsets and leave the group.
  121. // consumer.Close();
  122. //}
  123. }
  124. //});
  125. Debug.Log("Main end game.");
  126.  
  127.  
  128. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement