Advertisement
Guest User

Untitled

a guest
Apr 22nd, 2019
122
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.48 KB | None | 0 0
  1. using System;
  2. using System.Data;
  3. using System.Diagnostics;
  4. using System.Linq;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. using System.Threading.Tasks.Dataflow;
  8. using Dapper;
  9. using Dapper.Contrib.Extensions;
  10. using MySql.Data.MySqlClient;
  11. using Polly;
  12.  
  13. namespace dotnet_poc {
  14. class Program {
  15. static async Task Main (string[] args) {
  16. var port = 33061;
  17. var server = "localhost";
  18. var database = "homestead";
  19. var username = "root";
  20. var password = "secret";
  21.  
  22. var connectionString = $"Server={server};Port={port};Database={database};Uid={username};Pwd={password};";
  23.  
  24. var batchSize = 2000;
  25. var totalMessages = 1000000;
  26.  
  27. var completed = 0;
  28.  
  29. // string sQuery = "SELECT ID, FirstName, LastName, DateOfBirth FROM Employee WHERE DateOfBirth = @DateOfBirth";
  30.  
  31. // var count = "SELECT * FROM messages";
  32. // var result = await conn.QueryAsync<Message> (count);
  33. // Console.WriteLine (result.First ());
  34.  
  35. var timer = Stopwatch.StartNew ();
  36.  
  37. Console.WriteLine ($"Starting insert on {Environment.ProcessorCount} cpu cores");
  38.  
  39. var makeRecord = new TransformBlock<int, Message> (x =>
  40. new Message {
  41. contact_id = x,
  42. }, new ExecutionDataflowBlockOptions {
  43. MaxDegreeOfParallelism = 2
  44. });
  45.  
  46. var makeBatch = new BatchBlock<Message> (batchSize);
  47.  
  48. var insertPolicy = Policy.Handle<Exception> ().WaitAndRetryForeverAsync (retryAttempt =>
  49. TimeSpan.FromSeconds (Math.Pow (2, retryAttempt)), (exception, timespan) => {
  50. Console.WriteLine ($"Retrying in {timespan}");
  51. });
  52.  
  53. var batchInsert = new TransformBlock<Message[], int> (async x => {
  54. await insertPolicy.ExecuteAsync (async () => {
  55. using (IDbConnection conn = new MySqlConnection (connectionString)) {
  56. var inserted = await conn.InsertAsync (x);
  57. }
  58. });
  59.  
  60. return x.Count ();
  61.  
  62. }, new ExecutionDataflowBlockOptions {
  63. BoundedCapacity = Environment.ProcessorCount,
  64. MaxDegreeOfParallelism = Environment.ProcessorCount
  65. });
  66.  
  67. var reportProgress = new ActionBlock<int> (x => {
  68. var newCompleted = Interlocked.Add (ref completed, x);
  69. var speed = (double) newCompleted / (timer.ElapsedMilliseconds / 1000);
  70. Console.WriteLine ($"{newCompleted} - avg {speed:0.##} / s");
  71. });
  72.  
  73. makeRecord.LinkTo (makeBatch, new DataflowLinkOptions { PropagateCompletion = true });
  74. makeBatch.LinkTo (batchInsert, new DataflowLinkOptions { PropagateCompletion = true });
  75. batchInsert.LinkTo (reportProgress, new DataflowLinkOptions { PropagateCompletion = true });
  76.  
  77. foreach (var seed in Enumerable.Range (0, totalMessages)) {
  78. await makeRecord.SendAsync (seed);
  79. }
  80.  
  81. makeRecord.Complete ();
  82.  
  83. await reportProgress.Completion;
  84.  
  85. timer.Stop ();
  86.  
  87. var finalSpeed = (double) totalMessages / (timer.ElapsedMilliseconds / 1000);
  88. Console.WriteLine ($"\nCompleted insert of {totalMessages} Messages in {timer.Elapsed}.\n\tWrite speed: {finalSpeed}/s");
  89.  
  90. }
  91. }
  92. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement