Advertisement
Guest User

Untitled

a guest
Jun 20th, 2019
74
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.03 KB | None | 0 0
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Threading.Tasks;
  5.  
  6. namespace ConsoleApp16
  7. {
  8. class Program
  9. {
  10. static void Main(string[] args)
  11. {
  12. Console.WriteLine("User IDs 1, 2, 3");
  13. ProcessMessages(GetTestMessages(1, 2, 3), 4);
  14.  
  15. Console.WriteLine("User IDs empty");
  16. ProcessMessages(GetTestMessages(), 4);
  17.  
  18. Console.WriteLine("User IDs 1, 2, 3, 4, 5, 6, 7, 8, 9, 10");
  19. ProcessMessages(GetTestMessages(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 4);
  20.  
  21. Console.WriteLine("User IDs 2, 2, 2, 1, 1, 4, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 4, 4, 6, 7, 8, 9, 10");
  22. ProcessMessages(GetTestMessages(2, 2, 2, 1, 1, 4, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 4, 4, 6, 7, 8, 9, 10), 4);
  23.  
  24. Console.ReadLine();
  25. }
  26.  
  27. private static IEnumerable<Message> GetTestMessages(params int[] userIds)
  28. {
  29. int i = 1;
  30. foreach (var userId in userIds)
  31. yield return new Message { MessageId = i++, UserId = userId };
  32. }
  33.  
  34. private class Message
  35. {
  36. public int MessageId { get; set; }
  37. public int UserId { get; set; }
  38. //... Real message properties
  39. }
  40.  
  41. private static void ProcessMessages(IEnumerable<Message> incomingMessages, int nThreads)
  42. {
  43. var tasks = GetPartitionedMessages(incomingMessages, nThreads)
  44. .Select((messages, i) => Task.Run(() => DoMessageBusinessLogic(messages, i)))
  45. .ToArray();
  46. Task.WaitAll(tasks);
  47. }
  48.  
  49. private static void DoMessageBusinessLogic(IEnumerable<Message> messages, int threadIdx)
  50. {
  51. foreach (var message in messages)
  52. Console.WriteLine($"Thread ID: {threadIdx}, MsgId: {message.MessageId}, UserId: {message.UserId}");
  53. }
  54.  
  55. private static IEnumerable<IEnumerable<Message>> GetPartitionedMessages(IEnumerable<Message> messages, int nPartitions)
  56. {
  57. var orderedMessages = messages.OrderBy(x => x.UserId).ThenBy(x => x.MessageId).ToList();
  58. int? lastUserId = null;
  59. int maxPartitionSize = (int)Math.Ceiling(orderedMessages.Count / (double)nPartitions);
  60. var partitions = new List<List<Message>>();
  61. List<Message> currentPartition = null;
  62.  
  63. foreach (var message in orderedMessages)
  64. {
  65. if (lastUserId == message.UserId)
  66. {
  67. currentPartition.Add(message);
  68. }
  69. else
  70. {
  71. lastUserId = message.UserId;
  72. if (currentPartition == null || currentPartition.Count >= maxPartitionSize)
  73. {
  74. currentPartition = new List<Message>();
  75. partitions.Add(currentPartition);
  76. }
  77.  
  78. currentPartition.Add(message);
  79. }
  80. }
  81.  
  82. return partitions;
  83. }
  84. }
  85. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement