Guest User

Kafka Confluent .NET AdminClient Transient Failure Program

a guest
Dec 31st, 2020
342
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C# 7.27 KB | None | 0 0
  1. using System;
  2. using System.Diagnostics;
  3. using System.Threading.Tasks;
  4.  
  5. using Confluent.Kafka;
  6. using Confluent.Kafka.Admin;
  7. using Polly;
  8. using Polly.Retry;
  9.  
  10.  
  11. namespace KafkaTransientFaultHandler
  12. {
  13.     class MainClass
  14.     {
  15.         private static AsyncRetryPolicy BrokerWaitAndRetry(short retryCount, short waitSeconds)
  16.         {
  17.             var pauseBetweenFailures = TimeSpan.FromSeconds(waitSeconds);
  18.  
  19.             var retryPolicy = Policy
  20.                 .Handle<KafkaException>()
  21.                 .WaitAndRetryAsync(retryCount, i => pauseBetweenFailures, onRetry: (Exception e, TimeSpan time) =>
  22.                 {
  23.                     Console.WriteLine($"ONRETRY :: Waited for timepsan {time}");
  24.                 });
  25.  
  26.             return retryPolicy;
  27.         }
  28.  
  29.  
  30.         private static IAdminClient CreateAdminClient(string brokers, int socketTimeOutMs = 60000)
  31.         {
  32.             return new AdminClientBuilder(
  33.                 new AdminClientConfig()
  34.                 {
  35.                     BootstrapServers = brokers,
  36.                     Debug = "all",
  37.                     SocketTimeoutMs = socketTimeOutMs,
  38.                 }
  39.             )
  40.                 .Build();
  41.         }
  42.  
  43.         private static async Task CreateTopic(IAdminClient client, string topic)
  44.         {
  45.             var stopWatch = new Stopwatch();
  46.  
  47.             stopWatch.Start();
  48.  
  49.             try
  50.             {
  51.                 Console.WriteLine($"SENDING REQUEST TO CREATE TOPIC :: {topic}");
  52.  
  53.                 await client.CreateTopicsAsync(
  54.                    new TopicSpecification[] {
  55.                         new TopicSpecification()
  56.                         {
  57.                             Name = topic,
  58.                             NumPartitions = 3,
  59.                             ReplicationFactor = 1
  60.                         }
  61.                    },
  62.                    new CreateTopicsOptions
  63.                    {
  64.                        /** Set low request timeout for basic testing, otherwise would use a higher timeout of 60 seconds*/
  65.                        RequestTimeout = TimeSpan.FromSeconds(20)
  66.                    }
  67.                 );
  68.  
  69.                 PrintRuntime(stopWatch.Elapsed);
  70.  
  71.                 Console.WriteLine($"SUCCESSFULLY CREATED TOPIC :: {topic}");
  72.             }
  73.             catch (KafkaException ke)
  74.             {
  75.                 Console.WriteLine($"EXCEPTION ENCOUNTERED CREATING TOPIC :: {topic}");
  76.                 Console.Error.WriteLine($"Kafka exception : {ke.Error.Code}");
  77.                 Console.Error.WriteLine($"Kafka error : {ke.Error.Reason}");
  78.                 Console.Error.WriteLine($"Kafka fatal : {ke.Error.IsFatal}");
  79.                 Console.Error.WriteLine($"Kafka broker : {ke.Error.IsBrokerError}");
  80.                 Console.Error.WriteLine($"Kafka local error : {ke.Error.IsLocalError}");
  81.  
  82.                 PrintRuntime(stopWatch.Elapsed);
  83.  
  84.                 throw ke;
  85.             }
  86.         }
  87.  
  88.         private static void PrintRuntime(TimeSpan ts)
  89.         {
  90.             string elapsedTime = String.Format("{0:00}:{1:00}:{2:00}.{3:00}",
  91.                     ts.Hours, ts.Minutes, ts.Seconds,
  92.                     ts.Milliseconds / 10);
  93.  
  94.             Console.WriteLine("RunTime " + elapsedTime);
  95.         }
  96.  
  97.         private static void PrintUsage() => Console.WriteLine("Usage: .. <reuse|new> <broker,broker,..> <topic> [topic...]");
  98.  
  99.  
  100.         /**
  101.          * Small test console app to investigate transient failure handling for creating a topic using AdminClient when the broker is not online, i.e. unavailable.
  102.          *
  103.          * Usage:
  104.          * ------
  105.          * dotnet run reuse localhost:9092 mytopic:    Try and create a topic using a wait and retry failure handling strategy. On each retry reuse same admin client instance
  106.          * dotnet run new localhost:9092 mytopic:      Try and create a topic using a wait and retry failure handling strategy. On each try use a new admin client instance
  107.          *
  108.          * Analysis:
  109.          * ---------
  110.          * When the same client instance is used to create a topic upon a retry attempt:    The program blocks when the rdkafka poll thread detects that a broker is connected and
  111.          *                                                                                  enters the main thread. The CreateTopicsAsync is not resumed, blocking occurs.
  112.          *
  113.          * When a new client instance is used to create a topic upon each retry attempt:    Success, the topic is created and the program exits.
  114.          *
  115.          *
  116.          * Is the blocking behaviour happening because the rdkafka library hides/suppresses multiple instances of a local time out error?
  117.          *
  118.          *
  119.          * How to test
  120.          * -----------
  121.          * Start this console app and wait until see console output informing that ONRETRY delegate has been triggered.
  122.          * Then, start the kafka broker, e.g. from docker-compose stack
  123.          *
  124.          * The program will block when reusing the same client instance for each retry attempt.
  125.          * The program will succeed when using a new client instance for each retry attempt.
  126.          *
  127.          * Questions
  128.          * --------
  129.          *
  130.          * Does confluent Kafka SDK have built in support for transient failure handling for AdminClient when a broker is unavailable?
  131.          * Is it possible to use the same client instance for retry attempts of CreateTopic operation, rather than create a new client instance each time?
  132.          */
  133.         public static async Task Main(string[] args)
  134.         {
  135.             if (args.Length < 3)
  136.             {
  137.                 PrintUsage();
  138.                 return;
  139.             }
  140.  
  141.             const short retryAttempts = 3;
  142.             const short delayInterval = 2;
  143.  
  144.             var strategy = args[0];
  145.             var brokers = args[1];
  146.             var topic = args[2];
  147.  
  148.             Console.WriteLine($"Creating polly wait and retry transient failure policy with {retryAttempts} retries and a {delayInterval} second delay");
  149.             var transientFailurePolicy = BrokerWaitAndRetry(retryAttempts, delayInterval);
  150.  
  151.             Console.WriteLine($"Creating admin client using broker(s) {brokers}, with a socket timeout of 20s for testing...");
  152.             var client = CreateAdminClient(brokers, 20000);
  153.  
  154.  
  155.             if(strategy.Equals("reuse")) {
  156.                 Console.WriteLine($"Trying to create topic {topic} using the wait and retry policy that reuses a client instance");
  157.                 await transientFailurePolicy.ExecuteAsync(async () => await CreateTopic(client, topic));
  158.             }
  159.             else if(strategy.Equals("new")) {
  160.                 Console.WriteLine($"Trying to create topic {topic} using the wait and retry policy that creates a new admin client for each retry attempt");
  161.                 await transientFailurePolicy.ExecuteAsync(async () =>
  162.                 {
  163.                     client.Dispose();
  164.                     client = CreateAdminClient(brokers, 20000);
  165.                     await CreateTopic(client, topic);
  166.  
  167.                     Console.WriteLine($"TRANSIENT FAILURE STRATEGY SUCCESSFUL!");
  168.                 });
  169.             }
  170.             else
  171.             {
  172.                 Console.Error.WriteLine("Unrecognised admin client strategy");
  173.             }
  174.         }
  175.     }
  176. }
  177.  
Advertisement
Add Comment
Please, Sign In to add comment