Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- using System;
- using System.Diagnostics;
- using System.Threading.Tasks;
- using Confluent.Kafka;
- using Confluent.Kafka.Admin;
- using Polly;
- using Polly.Retry;
- namespace KafkaTransientFaultHandler
- {
- class MainClass
- {
- private static AsyncRetryPolicy BrokerWaitAndRetry(short retryCount, short waitSeconds)
- {
- var pauseBetweenFailures = TimeSpan.FromSeconds(waitSeconds);
- var retryPolicy = Policy
- .Handle<KafkaException>()
- .WaitAndRetryAsync(retryCount, i => pauseBetweenFailures, onRetry: (Exception e, TimeSpan time) =>
- {
- Console.WriteLine($"ONRETRY :: Waited for timepsan {time}");
- });
- return retryPolicy;
- }
- private static IAdminClient CreateAdminClient(string brokers, int socketTimeOutMs = 60000)
- {
- return new AdminClientBuilder(
- new AdminClientConfig()
- {
- BootstrapServers = brokers,
- Debug = "all",
- SocketTimeoutMs = socketTimeOutMs,
- }
- )
- .Build();
- }
- private static async Task CreateTopic(IAdminClient client, string topic)
- {
- var stopWatch = new Stopwatch();
- stopWatch.Start();
- try
- {
- Console.WriteLine($"SENDING REQUEST TO CREATE TOPIC :: {topic}");
- await client.CreateTopicsAsync(
- new TopicSpecification[] {
- new TopicSpecification()
- {
- Name = topic,
- NumPartitions = 3,
- ReplicationFactor = 1
- }
- },
- new CreateTopicsOptions
- {
- /** Set low request timeout for basic testing, otherwise would use a higher timeout of 60 seconds*/
- RequestTimeout = TimeSpan.FromSeconds(20)
- }
- );
- PrintRuntime(stopWatch.Elapsed);
- Console.WriteLine($"SUCCESSFULLY CREATED TOPIC :: {topic}");
- }
- catch (KafkaException ke)
- {
- Console.WriteLine($"EXCEPTION ENCOUNTERED CREATING TOPIC :: {topic}");
- Console.Error.WriteLine($"Kafka exception : {ke.Error.Code}");
- Console.Error.WriteLine($"Kafka error : {ke.Error.Reason}");
- Console.Error.WriteLine($"Kafka fatal : {ke.Error.IsFatal}");
- Console.Error.WriteLine($"Kafka broker : {ke.Error.IsBrokerError}");
- Console.Error.WriteLine($"Kafka local error : {ke.Error.IsLocalError}");
- PrintRuntime(stopWatch.Elapsed);
- throw ke;
- }
- }
- private static void PrintRuntime(TimeSpan ts)
- {
- string elapsedTime = String.Format("{0:00}:{1:00}:{2:00}.{3:00}",
- ts.Hours, ts.Minutes, ts.Seconds,
- ts.Milliseconds / 10);
- Console.WriteLine("RunTime " + elapsedTime);
- }
- private static void PrintUsage() => Console.WriteLine("Usage: .. <reuse|new> <broker,broker,..> <topic> [topic...]");
- /**
- * Small test console app to investigate transient failure handling for creating a topic using AdminClient when the broker is not online, i.e. unavailable.
- *
- * Usage:
- * ------
- * dotnet run reuse localhost:9092 mytopic: Try and create a topic using a wait and retry failure handling strategy. On each retry reuse same admin client instance
- * dotnet run new localhost:9092 mytopic: Try and create a topic using a wait and retry failure handling strategy. On each try use a new admin client instance
- *
- * Analysis:
- * ---------
- * When the same client instance is used to create a topic upon a retry attempt: The program blocks when the rdkafka poll thread detects that a broker is connected and
- * enters the main thread. The CreateTopicsAsync is not resumed, blocking occurs.
- *
- * When a new client instance is used to create a topic upon each retry attempt: Success, the topic is created and the program exits.
- *
- *
- * Is the blocking behaviour happening because the rdkafka library hides/suppresses multiple instances of a local time out error?
- *
- *
- * How to test
- * -----------
- * Start this console app and wait until see console output informing that ONRETRY delegate has been triggered.
- * Then, start the kafka broker, e.g. from docker-compose stack
- *
- * The program will block when reusing the same client instance for each retry attempt.
- * The program will succeed when using a new client instance for each retry attempt.
- *
- * Questions
- * --------
- *
- * Does confluent Kafka SDK have built in support for transient failure handling for AdminClient when a broker is unavailable?
- * Is it possible to use the same client instance for retry attempts of CreateTopic operation, rather than create a new client instance each time?
- */
- public static async Task Main(string[] args)
- {
- if (args.Length < 3)
- {
- PrintUsage();
- return;
- }
- const short retryAttempts = 3;
- const short delayInterval = 2;
- var strategy = args[0];
- var brokers = args[1];
- var topic = args[2];
- Console.WriteLine($"Creating polly wait and retry transient failure policy with {retryAttempts} retries and a {delayInterval} second delay");
- var transientFailurePolicy = BrokerWaitAndRetry(retryAttempts, delayInterval);
- Console.WriteLine($"Creating admin client using broker(s) {brokers}, with a socket timeout of 20s for testing...");
- var client = CreateAdminClient(brokers, 20000);
- if(strategy.Equals("reuse")) {
- Console.WriteLine($"Trying to create topic {topic} using the wait and retry policy that reuses a client instance");
- await transientFailurePolicy.ExecuteAsync(async () => await CreateTopic(client, topic));
- }
- else if(strategy.Equals("new")) {
- Console.WriteLine($"Trying to create topic {topic} using the wait and retry policy that creates a new admin client for each retry attempt");
- await transientFailurePolicy.ExecuteAsync(async () =>
- {
- client.Dispose();
- client = CreateAdminClient(brokers, 20000);
- await CreateTopic(client, topic);
- Console.WriteLine($"TRANSIENT FAILURE STRATEGY SUCCESSFUL!");
- });
- }
- else
- {
- Console.Error.WriteLine("Unrecognised admin client strategy");
- }
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment