Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- using System;
- using System.Collections.Concurrent;
- using System.Collections.Generic;
- using System.Data;
- using System.Data.SqlClient;
- using System.Data.SqlTypes;
- using System.Linq;
- using System.Threading;
- using System.Threading.Tasks;
- namespace TimestampCursor
- {
- public class Program
- {
- public static void Main(string[] args)
- {
- MainAsync().GetAwaiter().GetResult();
- }
- private static async Task MainAsync()
- {
- var writerCount = 16;
- var context = new Context(
- desiredRecordCount: 2000,
- connectionString: "CONNECTION STRING HERE",
- cancellationTokenSource: new CancellationTokenSource());
- await InitializeTableAsync(context);
- var readerTask = ReadRecordsAsync(context);
- var writerTasks = Enumerable
- .Range(0, writerCount)
- .Select(i => UpdateRandomRecordsAsync(context, i))
- .ToList();
- await Task.WhenAll(readerTask);
- await Task.WhenAll(writerTasks);
- Console.WriteLine();
- await ReadRecordsOnceAsync(context);
- }
- private static async Task InitializeTableAsync(Context context)
- {
- using (var connection = new SqlConnection(context.ConnectionString))
- {
- await connection.OpenAsync();
- Console.Write($"{GetActorTag("INIT")} Creating the table...");
- using (var command = connection.CreateCommand())
- {
- command.CommandText = @"
- IF NOT EXISTS (SELECT * FROM sys.tables t JOIN sys.schemas s ON (t.schema_id = s.schema_id) WHERE s.name = 'dbo' and t.name = 'Records')
- CREATE TABLE Records (
- RecordKey INT IDENTITY(1, 1) PRIMARY KEY,
- LastEdited DATETIME NOT NULL
- )";
- await command.ExecuteNonQueryAsync();
- }
- Console.WriteLine(" done.");
- Console.Write($"{GetActorTag("INIT")} Creating the index...");
- using (var command = connection.CreateCommand())
- {
- command.CommandText = @"
- IF NOT EXISTS (SELECT * FROM sys.indexes WHERE name = 'IX_Records_LastEdited' AND object_id = OBJECT_ID('dbo.Records'))
- CREATE NONCLUSTERED INDEX IX_Records_LastEdited ON dbo.Records (LastEdited ASC)";
- await command.ExecuteNonQueryAsync();
- }
- Console.WriteLine(" done.");
- Console.Write($"{GetActorTag("INIT")} Creating the trigger...");
- using (var command = connection.CreateCommand())
- {
- command.CommandText = @"
- IF NOT EXISTS (SELECT * FROM sys.triggers WHERE object_id = OBJECT_ID('dbo.Trigger_Records_LastEdited'))
- EXEC dbo.sp_executesql @statement = N'
- CREATE TRIGGER dbo.Trigger_Records_LastEdited ON dbo.Records
- AFTER INSERT, UPDATE
- AS
- BEGIN
- IF (UPDATE(LastEdited))
- BEGIN
- UPDATE dbo.Records
- SET LastEdited = GETUTCDATE()
- FROM INSERTED
- WHERE dbo.Records.RecordKey = INSERTED.RecordKey
- END
- END
- '";
- await command.ExecuteNonQueryAsync();
- }
- Console.WriteLine(" done.");
- Console.Write($"{GetActorTag("INIT")} Counting existing records...");
- int recordCount;
- using (var command = connection.CreateCommand())
- {
- command.CommandText = "SELECT COUNT(*) FROM Records";
- recordCount = (int)(await command.ExecuteScalarAsync());
- }
- Console.WriteLine($" {recordCount} found.");
- var recordsToInsert = Math.Max(0, context.DesiredRecordCount - recordCount);
- Console.Write($"{GetActorTag("INIT")} Inserting {recordsToInsert} records...");
- if (recordsToInsert > 0)
- {
- using (var command = connection.CreateCommand())
- {
- command.CommandText = "INSERT INTO Records (LastEdited) VALUES (@LastEdited)";
- var lastEditedParameter = command.CreateParameter();
- lastEditedParameter.ParameterName = "LastEdited";
- lastEditedParameter.SqlDbType = SqlDbType.DateTime;
- command.Parameters.Add(lastEditedParameter);
- for (var i = 0; i < recordsToInsert; i++)
- {
- lastEditedParameter.SqlValue = DateTime.UtcNow;
- await command.ExecuteNonQueryAsync();
- }
- }
- }
- Console.WriteLine(" done.");
- Console.Write($"{GetActorTag("INIT")} Fetching initial values...");
- context.InitialRecordKeyToLastEdited = new Dictionary<int, DateTime>();
- context.InitialCursor = await UpdateRecordKeyToLastEdited(context, SqlDateTime.MinValue.Value, context.InitialRecordKeyToLastEdited, isReader: false);
- context.AllRecordKeys = context.InitialRecordKeyToLastEdited.Keys.OrderBy(x => x).ToList();
- context.ReaderRecordKeyToLastEdited = new Dictionary<int, DateTime>(context.InitialRecordKeyToLastEdited);
- context.ReaderCursor = context.InitialCursor;
- context.WriterRecordKeyToLastEdited = new Dictionary<int, DateTime>(context.InitialRecordKeyToLastEdited);
- var availableWriterRecordKeys = context
- .AllRecordKeys
- .OrderBy(x => Guid.NewGuid())
- .Take(context.DesiredRecordCount);
- context.AvailableWriterRecordKeys = new ConcurrentQueue<int>(availableWriterRecordKeys);
- context.WriterLock = new object();
- Console.WriteLine(" done.");
- }
- }
- private static async Task ReadRecordsAsync(Context context)
- {
- await Task.Yield();
- while (!context.CancellationTokenSource.Token.IsCancellationRequested)
- {
- await ReadRecordsOnceAsync(context);
- await Task.Delay(TimeSpan.FromMilliseconds(250));
- }
- }
- private static async Task ReadRecordsOnceAsync(Context context)
- {
- context.ReaderCursor = await UpdateRecordKeyToLastEdited(context, context.ReaderCursor, context.ReaderRecordKeyToLastEdited, isReader: true);
- Console.WriteLine($"{GetActorTag("READER")} {(AreReaderAndWriterEqual(context) ? "No missing records so far." : "Found missing records!")}");
- }
- private static async Task UpdateRandomRecordsAsync(Context context, int writerId)
- {
- await Task.Yield();
- while (!context.CancellationTokenSource.Token.IsCancellationRequested)
- {
- if (!context.AvailableWriterRecordKeys.TryDequeue(out var chosenRecordKey))
- {
- context.CancellationTokenSource.Cancel();
- return;
- }
- DateTime lastEdited;
- using (var connection = new SqlConnection(context.ConnectionString))
- {
- await connection.OpenAsync();
- using (var command = connection.CreateCommand())
- {
- command.CommandText = @"
- UPDATE Records
- SET LastEdited = @LastEdited
- WHERE RecordKey = @RecordKey;
- SELECT LastEdited
- FROM Records
- WHERE RecordKey = @RecordKey";
- command.Parameters.Add("LastEdited", SqlDbType.DateTime).SqlValue = DateTime.UtcNow;
- command.Parameters.Add("RecordKey", SqlDbType.Int).SqlValue = chosenRecordKey;
- lastEdited = (DateTime)(await command.ExecuteScalarAsync());
- Console.WriteLine($"{GetActorTag("WRITER " + writerId.ToString().PadLeft(2))} Updated record key {chosenRecordKey} to {lastEdited:O}.");
- }
- }
- lock (context.WriterLock)
- {
- context.WriterRecordKeyToLastEdited[chosenRecordKey] = lastEdited;
- }
- }
- }
- private static async Task<DateTime> UpdateRecordKeyToLastEdited(Context context, DateTime cursor, Dictionary<int, DateTime> recordKeyToLastEdited, bool isReader)
- {
- var latestRecords = await ReadAllRecordsAsync(context, cursor);
- if (isReader)
- {
- Console.WriteLine($"{GetActorTag("READER")} Found {latestRecords.Count} records after the cursor {cursor:O}.");
- }
- if (latestRecords.Any())
- {
- foreach (var record in latestRecords)
- {
- recordKeyToLastEdited[record.RecordKey] = record.LastEdited;
- }
- cursor = latestRecords.Max(x => x.LastEdited);
- }
- return cursor;
- }
- private static bool AreReaderAndWriterEqual(Context context)
- {
- var output = true;
- lock (context.WriterLock)
- {
- foreach (var key in context.AllRecordKeys)
- {
- if (context.ReaderRecordKeyToLastEdited[key] != context.WriterRecordKeyToLastEdited[key])
- {
- Console.WriteLine($"{GetActorTag("READER")} Record key {key}. Reader has {context.ReaderRecordKeyToLastEdited[key]:O}. Writer has {context.WriterRecordKeyToLastEdited[key]:O}.");
- output = false;
- }
- }
- return output;
- }
- }
- private static async Task<List<Record>> ReadAllRecordsAsync(Context context, DateTime cursor)
- {
- using (var connection = new SqlConnection(context.ConnectionString))
- {
- await connection.OpenAsync();
- var records = new List<Record>();
- using (var command = connection.CreateCommand())
- {
- command.CommandText = @"
- SELECT RecordKey, LastEdited
- FROM Records
- WHERE LastEdited >= @Cursor
- ORDER BY LastEdited ASC";
- var cursorParameter = command.Parameters.Add("Cursor", SqlDbType.DateTime).Value = cursor;
- var reader = await command.ExecuteReaderAsync();
- while (await reader.ReadAsync())
- {
- var recordKey = reader.GetInt32(0);
- var lastEdited = reader.GetDateTime(1);
- records.Add(new Record(recordKey, lastEdited));
- }
- }
- return records;
- }
- }
- private static string GetActorTag(string name)
- {
- return $"[{name.PadRight(9)}]";
- }
- private class Record
- {
- public Record(int recordKey, DateTime lastEdited)
- {
- RecordKey = recordKey;
- LastEdited = lastEdited;
- }
- public int RecordKey { get; }
- public DateTime LastEdited { get; }
- }
- private class Context
- {
- public Context(int desiredRecordCount, string connectionString, CancellationTokenSource cancellationTokenSource)
- {
- DesiredRecordCount = desiredRecordCount;
- ConnectionString = connectionString;
- CancellationTokenSource = cancellationTokenSource;
- }
- public int DesiredRecordCount { get; }
- public string ConnectionString { get; }
- public CancellationTokenSource CancellationTokenSource { get; }
- public List<int> AllRecordKeys { get; set; }
- public DateTime InitialCursor { get; set; }
- public Dictionary<int, DateTime> InitialRecordKeyToLastEdited { get; set; }
- public Dictionary<int, DateTime> ReaderRecordKeyToLastEdited { get; set; }
- public Dictionary<int, DateTime> WriterRecordKeyToLastEdited { get; set; }
- public ConcurrentQueue<int> AvailableWriterRecordKeys { get; set; }
- public object WriterLock { get; set; }
- public DateTime ReaderCursor { get; set; }
- }
- }
- }
Add Comment
Please, Sign In to add comment