Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- using System;
- using System.Data;
- using System.Diagnostics;
- using System.Linq;
- using System.Threading;
- using System.Threading.Tasks;
- using System.Threading.Tasks.Dataflow;
- using Dapper;
- using Dapper.Contrib.Extensions;
- using MySql.Data.MySqlClient;
- using Polly;
- namespace dotnet_poc {
- class Program {
- static async Task Main (string[] args) {
- var port = 33061;
- var server = "localhost";
- var database = "homestead";
- var username = "root";
- var password = "secret";
- var connectionString = $"Server={server};Port={port};Database={database};Uid={username};Pwd={password};";
- var batchSize = 2000;
- var totalMessages = 1000000;
- var completed = 0;
- // string sQuery = "SELECT ID, FirstName, LastName, DateOfBirth FROM Employee WHERE DateOfBirth = @DateOfBirth";
- // var count = "SELECT * FROM messages";
- // var result = await conn.QueryAsync<Message> (count);
- // Console.WriteLine (result.First ());
- var timer = Stopwatch.StartNew ();
- Console.WriteLine ($"Starting insert on {Environment.ProcessorCount} cpu cores");
- var makeRecord = new TransformBlock<int, Message> (x =>
- new Message {
- contact_id = x,
- }, new ExecutionDataflowBlockOptions {
- MaxDegreeOfParallelism = 2
- });
- var makeBatch = new BatchBlock<Message> (batchSize);
- var insertPolicy = Policy.Handle<Exception> ().WaitAndRetryForeverAsync (retryAttempt =>
- TimeSpan.FromSeconds (Math.Pow (2, retryAttempt)), (exception, timespan) => {
- Console.WriteLine ($"Retrying in {timespan}");
- });
- var batchInsert = new TransformBlock<Message[], int> (async x => {
- await insertPolicy.ExecuteAsync (async () => {
- using (IDbConnection conn = new MySqlConnection (connectionString)) {
- var inserted = await conn.InsertAsync (x);
- }
- });
- return x.Count ();
- }, new ExecutionDataflowBlockOptions {
- BoundedCapacity = Environment.ProcessorCount,
- MaxDegreeOfParallelism = Environment.ProcessorCount
- });
- var reportProgress = new ActionBlock<int> (x => {
- var newCompleted = Interlocked.Add (ref completed, x);
- var speed = (double) newCompleted / (timer.ElapsedMilliseconds / 1000);
- Console.WriteLine ($"{newCompleted} - avg {speed:0.##} / s");
- });
- makeRecord.LinkTo (makeBatch, new DataflowLinkOptions { PropagateCompletion = true });
- makeBatch.LinkTo (batchInsert, new DataflowLinkOptions { PropagateCompletion = true });
- batchInsert.LinkTo (reportProgress, new DataflowLinkOptions { PropagateCompletion = true });
- foreach (var seed in Enumerable.Range (0, totalMessages)) {
- await makeRecord.SendAsync (seed);
- }
- makeRecord.Complete ();
- await reportProgress.Completion;
- timer.Stop ();
- var finalSpeed = (double) totalMessages / (timer.ElapsedMilliseconds / 1000);
- Console.WriteLine ($"\nCompleted insert of {totalMessages} Messages in {timer.Elapsed}.\n\tWrite speed: {finalSpeed}/s");
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement