daily pastebin goal
17%
SHARE
TWEET

Untitled

a guest May 16th, 2018 108 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Collections.Generic;
  4. using System.Data;
  5. using System.Data.SqlClient;
  6. using System.Data.SqlTypes;
  7. using System.Linq;
  8. using System.Threading;
  9. using System.Threading.Tasks;
  10.  
  11. namespace TimestampCursor
  12. {
  13.     public class Program
  14.     {
  15.         public static void Main(string[] args)
  16.         {
  17.             MainAsync().GetAwaiter().GetResult();
  18.         }
  19.  
  20.         private static async Task MainAsync()
  21.         {
  22.             var writerCount = 16;
  23.             var context = new Context(
  24.                 desiredRecordCount: 2000,
  25.                 connectionString: "CONNECTION STRING HERE",
  26.                 cancellationTokenSource: new CancellationTokenSource());
  27.  
  28.             await InitializeTableAsync(context);
  29.  
  30.             var readerTask = ReadRecordsAsync(context);
  31.             var writerTasks = Enumerable
  32.                 .Range(0, writerCount)
  33.                 .Select(i => UpdateRandomRecordsAsync(context, i))
  34.                 .ToList();
  35.  
  36.             await Task.WhenAll(readerTask);
  37.             await Task.WhenAll(writerTasks);
  38.  
  39.             Console.WriteLine();
  40.             await ReadRecordsOnceAsync(context);
  41.         }
  42.  
  43.         private static async Task InitializeTableAsync(Context context)
  44.         {
  45.             using (var connection = new SqlConnection(context.ConnectionString))
  46.             {
  47.                 await connection.OpenAsync();
  48.  
  49.                 Console.Write($"{GetActorTag("INIT")} Creating the table...");
  50.                 using (var command = connection.CreateCommand())
  51.                 {
  52.                     command.CommandText = @"
  53.                     IF NOT EXISTS (SELECT * FROM sys.tables t JOIN sys.schemas s ON (t.schema_id = s.schema_id) WHERE s.name = 'dbo' and t.name = 'Records')
  54.                         CREATE TABLE Records (
  55.                             RecordKey INT IDENTITY(1, 1) PRIMARY KEY,
  56.                             LastEdited DATETIME NOT NULL
  57.                         )";
  58.                     await command.ExecuteNonQueryAsync();
  59.                 }
  60.                 Console.WriteLine(" done.");
  61.  
  62.                 Console.Write($"{GetActorTag("INIT")} Creating the index...");
  63.                 using (var command = connection.CreateCommand())
  64.                 {
  65.                     command.CommandText = @"
  66.                     IF NOT EXISTS (SELECT * FROM sys.indexes WHERE name = 'IX_Records_LastEdited' AND object_id = OBJECT_ID('dbo.Records'))
  67.                         CREATE NONCLUSTERED INDEX IX_Records_LastEdited ON dbo.Records (LastEdited ASC)";
  68.                     await command.ExecuteNonQueryAsync();
  69.                 }
  70.                 Console.WriteLine(" done.");
  71.  
  72.                 Console.Write($"{GetActorTag("INIT")} Creating the trigger...");
  73.                 using (var command = connection.CreateCommand())
  74.                 {
  75.                     command.CommandText = @"
  76.                         IF NOT EXISTS (SELECT * FROM sys.triggers WHERE object_id = OBJECT_ID('dbo.Trigger_Records_LastEdited'))
  77.                         EXEC dbo.sp_executesql @statement = N'
  78.                             CREATE TRIGGER dbo.Trigger_Records_LastEdited ON dbo.Records
  79.                             AFTER INSERT, UPDATE
  80.                             AS
  81.                             BEGIN
  82.                                 IF (UPDATE(LastEdited))
  83.                                 BEGIN
  84.                                     UPDATE dbo.Records
  85.                                     SET LastEdited = GETUTCDATE()
  86.                                     FROM INSERTED
  87.                                     WHERE dbo.Records.RecordKey = INSERTED.RecordKey
  88.                                 END
  89.                             END
  90.                         '";
  91.                     await command.ExecuteNonQueryAsync();
  92.                 }
  93.                 Console.WriteLine(" done.");
  94.  
  95.                 Console.Write($"{GetActorTag("INIT")} Counting existing records...");
  96.                 int recordCount;
  97.                 using (var command = connection.CreateCommand())
  98.                 {
  99.                     command.CommandText = "SELECT COUNT(*) FROM Records";
  100.                     recordCount = (int)(await command.ExecuteScalarAsync());
  101.                 }
  102.                 Console.WriteLine($" {recordCount} found.");
  103.  
  104.                 var recordsToInsert = Math.Max(0, context.DesiredRecordCount - recordCount);
  105.                 Console.Write($"{GetActorTag("INIT")} Inserting {recordsToInsert} records...");
  106.                 if (recordsToInsert > 0)
  107.                 {
  108.                     using (var command = connection.CreateCommand())
  109.                     {
  110.                         command.CommandText = "INSERT INTO Records (LastEdited) VALUES (@LastEdited)";
  111.  
  112.                         var lastEditedParameter = command.CreateParameter();
  113.                         lastEditedParameter.ParameterName = "LastEdited";
  114.                         lastEditedParameter.SqlDbType = SqlDbType.DateTime;
  115.                         command.Parameters.Add(lastEditedParameter);
  116.  
  117.                         for (var i = 0; i < recordsToInsert; i++)
  118.                         {
  119.                             lastEditedParameter.SqlValue = DateTime.UtcNow;
  120.                             await command.ExecuteNonQueryAsync();
  121.                         }
  122.                     }
  123.                 }
  124.                 Console.WriteLine(" done.");
  125.  
  126.                 Console.Write($"{GetActorTag("INIT")} Fetching initial values...");
  127.  
  128.                 context.InitialRecordKeyToLastEdited = new Dictionary<int, DateTime>();
  129.                 context.InitialCursor = await UpdateRecordKeyToLastEdited(context, SqlDateTime.MinValue.Value, context.InitialRecordKeyToLastEdited, isReader: false);
  130.                 context.AllRecordKeys = context.InitialRecordKeyToLastEdited.Keys.OrderBy(x => x).ToList();
  131.  
  132.                 context.ReaderRecordKeyToLastEdited = new Dictionary<int, DateTime>(context.InitialRecordKeyToLastEdited);
  133.                 context.ReaderCursor = context.InitialCursor;
  134.  
  135.                 context.WriterRecordKeyToLastEdited = new Dictionary<int, DateTime>(context.InitialRecordKeyToLastEdited);
  136.  
  137.                 var availableWriterRecordKeys = context
  138.                     .AllRecordKeys
  139.                     .OrderBy(x => Guid.NewGuid())
  140.                     .Take(context.DesiredRecordCount);
  141.  
  142.                 context.AvailableWriterRecordKeys = new ConcurrentQueue<int>(availableWriterRecordKeys);
  143.                 context.WriterLock = new object();
  144.  
  145.                 Console.WriteLine(" done.");
  146.             }
  147.         }
  148.  
  149.         private static async Task ReadRecordsAsync(Context context)
  150.         {
  151.             await Task.Yield();
  152.  
  153.             while (!context.CancellationTokenSource.Token.IsCancellationRequested)
  154.             {
  155.                 await ReadRecordsOnceAsync(context);
  156.  
  157.                 await Task.Delay(TimeSpan.FromMilliseconds(250));
  158.             }
  159.         }
  160.  
  161.         private static async Task ReadRecordsOnceAsync(Context context)
  162.         {
  163.             context.ReaderCursor = await UpdateRecordKeyToLastEdited(context, context.ReaderCursor, context.ReaderRecordKeyToLastEdited, isReader: true);
  164.  
  165.             Console.WriteLine($"{GetActorTag("READER")} {(AreReaderAndWriterEqual(context) ? "No missing records so far." : "Found missing records!")}");
  166.         }
  167.  
  168.         private static async Task UpdateRandomRecordsAsync(Context context, int writerId)
  169.         {
  170.             await Task.Yield();
  171.  
  172.             while (!context.CancellationTokenSource.Token.IsCancellationRequested)
  173.             {
  174.                 if (!context.AvailableWriterRecordKeys.TryDequeue(out var chosenRecordKey))
  175.                 {
  176.                     context.CancellationTokenSource.Cancel();
  177.                     return;
  178.                 }
  179.  
  180.                 DateTime lastEdited;
  181.                 using (var connection = new SqlConnection(context.ConnectionString))
  182.                 {
  183.                     await connection.OpenAsync();
  184.  
  185.                     using (var command = connection.CreateCommand())
  186.                     {
  187.                         command.CommandText = @"
  188.                             UPDATE Records
  189.                             SET LastEdited = @LastEdited
  190.                             WHERE RecordKey = @RecordKey;
  191.                            
  192.                             SELECT LastEdited
  193.                             FROM Records
  194.                             WHERE RecordKey = @RecordKey";
  195.  
  196.                         command.Parameters.Add("LastEdited", SqlDbType.DateTime).SqlValue = DateTime.UtcNow;
  197.                         command.Parameters.Add("RecordKey", SqlDbType.Int).SqlValue = chosenRecordKey;
  198.  
  199.                         lastEdited = (DateTime)(await command.ExecuteScalarAsync());
  200.                         Console.WriteLine($"{GetActorTag("WRITER " + writerId.ToString().PadLeft(2))} Updated record key {chosenRecordKey} to {lastEdited:O}.");
  201.                     }
  202.                 }
  203.  
  204.                 lock (context.WriterLock)
  205.                 {
  206.                     context.WriterRecordKeyToLastEdited[chosenRecordKey] = lastEdited;
  207.                 }
  208.             }
  209.         }
  210.  
  211.         private static async Task<DateTime> UpdateRecordKeyToLastEdited(Context context, DateTime cursor, Dictionary<int, DateTime> recordKeyToLastEdited, bool isReader)
  212.         {
  213.             var latestRecords = await ReadAllRecordsAsync(context, cursor);
  214.  
  215.             if (isReader)
  216.             {
  217.                 Console.WriteLine($"{GetActorTag("READER")} Found {latestRecords.Count} records after the cursor {cursor:O}.");
  218.             }
  219.  
  220.             if (latestRecords.Any())
  221.             {
  222.                 foreach (var record in latestRecords)
  223.                 {
  224.                     recordKeyToLastEdited[record.RecordKey] = record.LastEdited;
  225.                 }
  226.  
  227.                 cursor = latestRecords.Max(x => x.LastEdited);
  228.             }
  229.  
  230.             return cursor;
  231.         }
  232.  
  233.         private static bool AreReaderAndWriterEqual(Context context)
  234.         {
  235.             var output = true;
  236.             lock (context.WriterLock)
  237.             {
  238.                 foreach (var key in context.AllRecordKeys)
  239.                 {
  240.                     if (context.ReaderRecordKeyToLastEdited[key] != context.WriterRecordKeyToLastEdited[key])
  241.                     {
  242.                         Console.WriteLine($"{GetActorTag("READER")} Record key {key}. Reader has {context.ReaderRecordKeyToLastEdited[key]:O}. Writer has {context.WriterRecordKeyToLastEdited[key]:O}.");
  243.                         output = false;
  244.                     }
  245.                 }
  246.  
  247.                 return output;
  248.             }
  249.         }
  250.  
  251.         private static async Task<List<Record>> ReadAllRecordsAsync(Context context, DateTime cursor)
  252.         {
  253.             using (var connection = new SqlConnection(context.ConnectionString))
  254.             {
  255.                 await connection.OpenAsync();
  256.  
  257.                 var records = new List<Record>();
  258.                 using (var command = connection.CreateCommand())
  259.                 {
  260.                     command.CommandText = @"
  261.                         SELECT RecordKey, LastEdited
  262.                         FROM Records
  263.                         WHERE LastEdited >= @Cursor
  264.                         ORDER BY LastEdited ASC";
  265.  
  266.                     var cursorParameter = command.Parameters.Add("Cursor", SqlDbType.DateTime).Value = cursor;
  267.  
  268.                     var reader = await command.ExecuteReaderAsync();
  269.                     while (await reader.ReadAsync())
  270.                     {
  271.                         var recordKey = reader.GetInt32(0);
  272.                         var lastEdited = reader.GetDateTime(1);
  273.                         records.Add(new Record(recordKey, lastEdited));
  274.                     }
  275.                 }
  276.  
  277.                 return records;
  278.             }
  279.         }
  280.  
  281.         private static string GetActorTag(string name)
  282.         {
  283.             return $"[{name.PadRight(9)}]";
  284.         }
  285.  
  286.         private class Record
  287.         {
  288.             public Record(int recordKey, DateTime lastEdited)
  289.             {
  290.                 RecordKey = recordKey;
  291.                 LastEdited = lastEdited;
  292.             }
  293.  
  294.             public int RecordKey { get; }
  295.             public DateTime LastEdited { get; }
  296.         }
  297.  
  298.         private class Context
  299.         {
  300.             public Context(int desiredRecordCount, string connectionString, CancellationTokenSource cancellationTokenSource)
  301.             {
  302.                 DesiredRecordCount = desiredRecordCount;
  303.                 ConnectionString = connectionString;
  304.                 CancellationTokenSource = cancellationTokenSource;
  305.             }
  306.  
  307.             public int DesiredRecordCount { get; }
  308.             public string ConnectionString { get; }
  309.             public CancellationTokenSource CancellationTokenSource { get; }
  310.  
  311.             public List<int> AllRecordKeys { get; set; }
  312.             public DateTime InitialCursor { get; set; }
  313.             public Dictionary<int, DateTime> InitialRecordKeyToLastEdited { get; set; }
  314.             public Dictionary<int, DateTime> ReaderRecordKeyToLastEdited { get; set; }
  315.             public Dictionary<int, DateTime> WriterRecordKeyToLastEdited { get; set; }
  316.             public ConcurrentQueue<int> AvailableWriterRecordKeys { get; set; }
  317.             public object WriterLock { get; set; }
  318.             public DateTime ReaderCursor { get; set; }
  319.         }
  320.     }
  321. }
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top