Guest User

Untitled

a guest
May 16th, 2018
129
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 13.01 KB | None | 0 0
  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Collections.Generic;
  4. using System.Data;
  5. using System.Data.SqlClient;
  6. using System.Data.SqlTypes;
  7. using System.Linq;
  8. using System.Threading;
  9. using System.Threading.Tasks;
  10.  
  11. namespace TimestampCursor
  12. {
  13. public class Program
  14. {
  15. public static void Main(string[] args)
  16. {
  17. MainAsync().GetAwaiter().GetResult();
  18. }
  19.  
  20. private static async Task MainAsync()
  21. {
  22. var writerCount = 16;
  23. var context = new Context(
  24. desiredRecordCount: 2000,
  25. connectionString: "CONNECTION STRING HERE",
  26. cancellationTokenSource: new CancellationTokenSource());
  27.  
  28. await InitializeTableAsync(context);
  29.  
  30. var readerTask = ReadRecordsAsync(context);
  31. var writerTasks = Enumerable
  32. .Range(0, writerCount)
  33. .Select(i => UpdateRandomRecordsAsync(context, i))
  34. .ToList();
  35.  
  36. await Task.WhenAll(readerTask);
  37. await Task.WhenAll(writerTasks);
  38.  
  39. Console.WriteLine();
  40. await ReadRecordsOnceAsync(context);
  41. }
  42.  
  43. private static async Task InitializeTableAsync(Context context)
  44. {
  45. using (var connection = new SqlConnection(context.ConnectionString))
  46. {
  47. await connection.OpenAsync();
  48.  
  49. Console.Write($"{GetActorTag("INIT")} Creating the table...");
  50. using (var command = connection.CreateCommand())
  51. {
  52. command.CommandText = @"
  53. IF NOT EXISTS (SELECT * FROM sys.tables t JOIN sys.schemas s ON (t.schema_id = s.schema_id) WHERE s.name = 'dbo' and t.name = 'Records')
  54. CREATE TABLE Records (
  55. RecordKey INT IDENTITY(1, 1) PRIMARY KEY,
  56. LastEdited DATETIME NOT NULL
  57. )";
  58. await command.ExecuteNonQueryAsync();
  59. }
  60. Console.WriteLine(" done.");
  61.  
  62. Console.Write($"{GetActorTag("INIT")} Creating the index...");
  63. using (var command = connection.CreateCommand())
  64. {
  65. command.CommandText = @"
  66. IF NOT EXISTS (SELECT * FROM sys.indexes WHERE name = 'IX_Records_LastEdited' AND object_id = OBJECT_ID('dbo.Records'))
  67. CREATE NONCLUSTERED INDEX IX_Records_LastEdited ON dbo.Records (LastEdited ASC)";
  68. await command.ExecuteNonQueryAsync();
  69. }
  70. Console.WriteLine(" done.");
  71.  
  72. Console.Write($"{GetActorTag("INIT")} Creating the trigger...");
  73. using (var command = connection.CreateCommand())
  74. {
  75. command.CommandText = @"
  76. IF NOT EXISTS (SELECT * FROM sys.triggers WHERE object_id = OBJECT_ID('dbo.Trigger_Records_LastEdited'))
  77. EXEC dbo.sp_executesql @statement = N'
  78. CREATE TRIGGER dbo.Trigger_Records_LastEdited ON dbo.Records
  79. AFTER INSERT, UPDATE
  80. AS
  81. BEGIN
  82. IF (UPDATE(LastEdited))
  83. BEGIN
  84. UPDATE dbo.Records
  85. SET LastEdited = GETUTCDATE()
  86. FROM INSERTED
  87. WHERE dbo.Records.RecordKey = INSERTED.RecordKey
  88. END
  89. END
  90. '";
  91. await command.ExecuteNonQueryAsync();
  92. }
  93. Console.WriteLine(" done.");
  94.  
  95. Console.Write($"{GetActorTag("INIT")} Counting existing records...");
  96. int recordCount;
  97. using (var command = connection.CreateCommand())
  98. {
  99. command.CommandText = "SELECT COUNT(*) FROM Records";
  100. recordCount = (int)(await command.ExecuteScalarAsync());
  101. }
  102. Console.WriteLine($" {recordCount} found.");
  103.  
  104. var recordsToInsert = Math.Max(0, context.DesiredRecordCount - recordCount);
  105. Console.Write($"{GetActorTag("INIT")} Inserting {recordsToInsert} records...");
  106. if (recordsToInsert > 0)
  107. {
  108. using (var command = connection.CreateCommand())
  109. {
  110. command.CommandText = "INSERT INTO Records (LastEdited) VALUES (@LastEdited)";
  111.  
  112. var lastEditedParameter = command.CreateParameter();
  113. lastEditedParameter.ParameterName = "LastEdited";
  114. lastEditedParameter.SqlDbType = SqlDbType.DateTime;
  115. command.Parameters.Add(lastEditedParameter);
  116.  
  117. for (var i = 0; i < recordsToInsert; i++)
  118. {
  119. lastEditedParameter.SqlValue = DateTime.UtcNow;
  120. await command.ExecuteNonQueryAsync();
  121. }
  122. }
  123. }
  124. Console.WriteLine(" done.");
  125.  
  126. Console.Write($"{GetActorTag("INIT")} Fetching initial values...");
  127.  
  128. context.InitialRecordKeyToLastEdited = new Dictionary<int, DateTime>();
  129. context.InitialCursor = await UpdateRecordKeyToLastEdited(context, SqlDateTime.MinValue.Value, context.InitialRecordKeyToLastEdited, isReader: false);
  130. context.AllRecordKeys = context.InitialRecordKeyToLastEdited.Keys.OrderBy(x => x).ToList();
  131.  
  132. context.ReaderRecordKeyToLastEdited = new Dictionary<int, DateTime>(context.InitialRecordKeyToLastEdited);
  133. context.ReaderCursor = context.InitialCursor;
  134.  
  135. context.WriterRecordKeyToLastEdited = new Dictionary<int, DateTime>(context.InitialRecordKeyToLastEdited);
  136.  
  137. var availableWriterRecordKeys = context
  138. .AllRecordKeys
  139. .OrderBy(x => Guid.NewGuid())
  140. .Take(context.DesiredRecordCount);
  141.  
  142. context.AvailableWriterRecordKeys = new ConcurrentQueue<int>(availableWriterRecordKeys);
  143. context.WriterLock = new object();
  144.  
  145. Console.WriteLine(" done.");
  146. }
  147. }
  148.  
  149. private static async Task ReadRecordsAsync(Context context)
  150. {
  151. await Task.Yield();
  152.  
  153. while (!context.CancellationTokenSource.Token.IsCancellationRequested)
  154. {
  155. await ReadRecordsOnceAsync(context);
  156.  
  157. await Task.Delay(TimeSpan.FromMilliseconds(250));
  158. }
  159. }
  160.  
  161. private static async Task ReadRecordsOnceAsync(Context context)
  162. {
  163. context.ReaderCursor = await UpdateRecordKeyToLastEdited(context, context.ReaderCursor, context.ReaderRecordKeyToLastEdited, isReader: true);
  164.  
  165. Console.WriteLine($"{GetActorTag("READER")} {(AreReaderAndWriterEqual(context) ? "No missing records so far." : "Found missing records!")}");
  166. }
  167.  
  168. private static async Task UpdateRandomRecordsAsync(Context context, int writerId)
  169. {
  170. await Task.Yield();
  171.  
  172. while (!context.CancellationTokenSource.Token.IsCancellationRequested)
  173. {
  174. if (!context.AvailableWriterRecordKeys.TryDequeue(out var chosenRecordKey))
  175. {
  176. context.CancellationTokenSource.Cancel();
  177. return;
  178. }
  179.  
  180. DateTime lastEdited;
  181. using (var connection = new SqlConnection(context.ConnectionString))
  182. {
  183. await connection.OpenAsync();
  184.  
  185. using (var command = connection.CreateCommand())
  186. {
  187. command.CommandText = @"
  188. UPDATE Records
  189. SET LastEdited = @LastEdited
  190. WHERE RecordKey = @RecordKey;
  191.  
  192. SELECT LastEdited
  193. FROM Records
  194. WHERE RecordKey = @RecordKey";
  195.  
  196. command.Parameters.Add("LastEdited", SqlDbType.DateTime).SqlValue = DateTime.UtcNow;
  197. command.Parameters.Add("RecordKey", SqlDbType.Int).SqlValue = chosenRecordKey;
  198.  
  199. lastEdited = (DateTime)(await command.ExecuteScalarAsync());
  200. Console.WriteLine($"{GetActorTag("WRITER " + writerId.ToString().PadLeft(2))} Updated record key {chosenRecordKey} to {lastEdited:O}.");
  201. }
  202. }
  203.  
  204. lock (context.WriterLock)
  205. {
  206. context.WriterRecordKeyToLastEdited[chosenRecordKey] = lastEdited;
  207. }
  208. }
  209. }
  210.  
  211. private static async Task<DateTime> UpdateRecordKeyToLastEdited(Context context, DateTime cursor, Dictionary<int, DateTime> recordKeyToLastEdited, bool isReader)
  212. {
  213. var latestRecords = await ReadAllRecordsAsync(context, cursor);
  214.  
  215. if (isReader)
  216. {
  217. Console.WriteLine($"{GetActorTag("READER")} Found {latestRecords.Count} records after the cursor {cursor:O}.");
  218. }
  219.  
  220. if (latestRecords.Any())
  221. {
  222. foreach (var record in latestRecords)
  223. {
  224. recordKeyToLastEdited[record.RecordKey] = record.LastEdited;
  225. }
  226.  
  227. cursor = latestRecords.Max(x => x.LastEdited);
  228. }
  229.  
  230. return cursor;
  231. }
  232.  
  233. private static bool AreReaderAndWriterEqual(Context context)
  234. {
  235. var output = true;
  236. lock (context.WriterLock)
  237. {
  238. foreach (var key in context.AllRecordKeys)
  239. {
  240. if (context.ReaderRecordKeyToLastEdited[key] != context.WriterRecordKeyToLastEdited[key])
  241. {
  242. Console.WriteLine($"{GetActorTag("READER")} Record key {key}. Reader has {context.ReaderRecordKeyToLastEdited[key]:O}. Writer has {context.WriterRecordKeyToLastEdited[key]:O}.");
  243. output = false;
  244. }
  245. }
  246.  
  247. return output;
  248. }
  249. }
  250.  
  251. private static async Task<List<Record>> ReadAllRecordsAsync(Context context, DateTime cursor)
  252. {
  253. using (var connection = new SqlConnection(context.ConnectionString))
  254. {
  255. await connection.OpenAsync();
  256.  
  257. var records = new List<Record>();
  258. using (var command = connection.CreateCommand())
  259. {
  260. command.CommandText = @"
  261. SELECT RecordKey, LastEdited
  262. FROM Records
  263. WHERE LastEdited >= @Cursor
  264. ORDER BY LastEdited ASC";
  265.  
  266. var cursorParameter = command.Parameters.Add("Cursor", SqlDbType.DateTime).Value = cursor;
  267.  
  268. var reader = await command.ExecuteReaderAsync();
  269. while (await reader.ReadAsync())
  270. {
  271. var recordKey = reader.GetInt32(0);
  272. var lastEdited = reader.GetDateTime(1);
  273. records.Add(new Record(recordKey, lastEdited));
  274. }
  275. }
  276.  
  277. return records;
  278. }
  279. }
  280.  
  281. private static string GetActorTag(string name)
  282. {
  283. return $"[{name.PadRight(9)}]";
  284. }
  285.  
  286. private class Record
  287. {
  288. public Record(int recordKey, DateTime lastEdited)
  289. {
  290. RecordKey = recordKey;
  291. LastEdited = lastEdited;
  292. }
  293.  
  294. public int RecordKey { get; }
  295. public DateTime LastEdited { get; }
  296. }
  297.  
  298. private class Context
  299. {
  300. public Context(int desiredRecordCount, string connectionString, CancellationTokenSource cancellationTokenSource)
  301. {
  302. DesiredRecordCount = desiredRecordCount;
  303. ConnectionString = connectionString;
  304. CancellationTokenSource = cancellationTokenSource;
  305. }
  306.  
  307. public int DesiredRecordCount { get; }
  308. public string ConnectionString { get; }
  309. public CancellationTokenSource CancellationTokenSource { get; }
  310.  
  311. public List<int> AllRecordKeys { get; set; }
  312. public DateTime InitialCursor { get; set; }
  313. public Dictionary<int, DateTime> InitialRecordKeyToLastEdited { get; set; }
  314. public Dictionary<int, DateTime> ReaderRecordKeyToLastEdited { get; set; }
  315. public Dictionary<int, DateTime> WriterRecordKeyToLastEdited { get; set; }
  316. public ConcurrentQueue<int> AvailableWriterRecordKeys { get; set; }
  317. public object WriterLock { get; set; }
  318. public DateTime ReaderCursor { get; set; }
  319. }
  320. }
  321. }
Add Comment
Please, Sign In to add comment