Advertisement
Guest User

Untitled

a guest
Jun 26th, 2017
87
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 17.81 KB | None | 0 0
  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Collections.Generic;
  4. using System.Diagnostics;
  5. using System.Text;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8.  
  9.  
  10. namespace Orleans.Runtime.Scheduler
  11. {
  12. [DebuggerDisplay("WorkItemGroup Name={Name} State={state}")]
  13. internal class WorkItemGroup : IWorkItem
  14. {
  15. private enum WorkGroupStatus
  16. {
  17. Waiting = 0,
  18. Runnable = 1,
  19. Running = 2,
  20. Shutdown = 3
  21. }
  22.  
  23. private static readonly Logger appLogger = LogManager.GetLogger("Scheduler.WorkItemGroup", LoggerType.Runtime);
  24. private readonly Logger log;
  25. private readonly OrleansTaskScheduler masterScheduler;
  26. private int state;
  27. private readonly ConcurrentQueue<Task> workItems;
  28.  
  29. private long totalItemsEnQueued; // equals total items queued, + 1
  30. private long totalItemsProcessed;
  31. private readonly QueueTrackingStatistic queueTracking;
  32. private TimeSpan totalQueuingDelay;
  33. private readonly long quantumExpirations;
  34. private readonly int workItemGroupStatisticsNumber;
  35.  
  36. internal ActivationTaskScheduler TaskRunner { get; private set; }
  37.  
  38. public DateTime TimeQueued { get; set; }
  39.  
  40. public TimeSpan TimeSinceQueued
  41. {
  42. get { return Utils.Since(TimeQueued); }
  43. }
  44.  
  45. public ISchedulingContext SchedulingContext { get; set; }
  46.  
  47. public bool IsSystemPriority
  48. {
  49. get { return SchedulingUtils.IsSystemPriorityContext(SchedulingContext); }
  50. }
  51.  
  52. internal bool IsSystemGroup
  53. {
  54. get { return SchedulingUtils.IsSystemContext(SchedulingContext); }
  55. }
  56.  
  57. public string Name { get { return SchedulingContext == null ? "unknown" : SchedulingContext.Name; } }
  58.  
  59. internal int ExternalWorkItemCount => WorkItemCount;
  60.  
  61. private int WorkItemCount => workItems.Count;
  62.  
  63. internal float AverageQueueLenght
  64. {
  65. get
  66. {
  67. #if TRACK_DETAILED_STATS
  68. if (StatisticsCollector.CollectShedulerQueuesStats)
  69. {
  70. return queueTracking.AverageQueueLength;
  71. }
  72. #endif
  73. return 0;
  74. }
  75. }
  76.  
  77. internal float NumEnqueuedRequests
  78. {
  79. get
  80. {
  81. #if TRACK_DETAILED_STATS
  82. if (StatisticsCollector.CollectShedulerQueuesStats)
  83. {
  84. return queueTracking.NumEnqueuedRequests;
  85. }
  86. #endif
  87. return 0;
  88. }
  89. }
  90.  
  91. internal float ArrivalRate
  92. {
  93. get
  94. {
  95. #if TRACK_DETAILED_STATS
  96. if (StatisticsCollector.CollectShedulerQueuesStats)
  97. {
  98. return queueTracking.ArrivalRate;
  99. }
  100. #endif
  101. return 0;
  102. }
  103. }
  104.  
  105. private bool IsActive => !workItems.IsEmpty;
  106.  
  107. // This is the maximum number of work items to be processed in an activation turn.
  108. // If this is set to zero or a negative number, then the full work queue is drained (MaxTimePerTurn allowing).
  109. private const int MaxWorkItemsPerTurn = 0; // Unlimited
  110. // This is a soft time limit on the duration of activation macro-turn (a number of micro-turns).
  111. // If a activation was running its micro-turns longer than this, we will give up the thread.
  112. // If this is set to zero or a negative number, then the full work queue is drained (MaxWorkItemsPerTurn allowing).
  113. public static TimeSpan ActivationSchedulingQuantum { get; set; }
  114. // This is the maximum number of waiting threads (blocked in WaitForResponse) allowed
  115. // per ActivationWorker. An attempt to wait when there are already too many threads waiting
  116. // will result in a TooManyWaitersException being thrown.
  117. //private static readonly int MaxWaitingThreads = 500;
  118.  
  119.  
  120. internal WorkItemGroup(OrleansTaskScheduler sched, ISchedulingContext schedulingContext)
  121. {
  122. masterScheduler = sched;
  123. SchedulingContext = schedulingContext;
  124. state = (int)WorkGroupStatus.Waiting;
  125. workItems = new ConcurrentQueue<Task>();
  126. totalItemsEnQueued = 0;
  127. totalItemsProcessed = 0;
  128. totalQueuingDelay = TimeSpan.Zero;
  129. quantumExpirations = 0;
  130. TaskRunner = new ActivationTaskScheduler(this);
  131. log = IsSystemPriority ? LogManager.GetLogger("Scheduler." + Name + ".WorkItemGroup", LoggerType.Runtime) : appLogger;
  132.  
  133. if (StatisticsCollector.CollectShedulerQueuesStats)
  134. {
  135. queueTracking = new QueueTrackingStatistic("Scheduler." + SchedulingContext.Name);
  136. queueTracking.OnStartExecution();
  137. }
  138.  
  139. if (StatisticsCollector.CollectPerWorkItemStats)
  140. {
  141. workItemGroupStatisticsNumber = SchedulerStatisticsGroup.RegisterWorkItemGroup(SchedulingContext.Name, SchedulingContext,
  142. () =>
  143. {
  144. var sb = new StringBuilder();
  145. sb.Append("QueueLength = " + WorkItemCount);
  146. sb.Append($", State = {state}");
  147. if (state == (int)WorkGroupStatus.Runnable)
  148. {
  149. Task oldestWorkItem;
  150. workItems.TryPeek(out oldestWorkItem);
  151. sb.Append($"; oldest item is {oldestWorkItem?.ToString() ?? "null"} old");
  152. }
  153. return sb.ToString();
  154. });
  155. }
  156. }
  157.  
  158. /// <summary>
  159. /// Adds a task to this activation.
  160. /// If we're adding it to the run list and we used to be waiting, now we're runnable.
  161. /// </summary>
  162. /// <param name="task">The work item to add.</param>
  163. public void EnqueueTask(Task task)
  164. {
  165. #if DEBUG
  166. if (log.IsVerbose2) log.Verbose2("EnqueueWorkItem {0} into {1} when TaskScheduler.Current={2}", task, SchedulingContext, TaskScheduler.Current);
  167. #endif
  168. var previousState = (WorkGroupStatus)Interlocked.CompareExchange(ref state, (int)WorkGroupStatus.Runnable, (int)WorkGroupStatus.Waiting);
  169.  
  170. if (previousState == WorkGroupStatus.Shutdown)
  171. {
  172. ReportWorkGroupProblem(
  173. String.Format("Enqueuing task {0} to a stopped work item group. Going to ignore and not execute it. "
  174. + "The likely reason is that the task is not being 'awaited' properly.", task),
  175. ErrorCode.SchedulerNotEnqueuWorkWhenShutdown);
  176. task.Ignore(); // Ignore this Task, so in case it is faulted it will not cause UnobservedException.
  177. return;
  178. }
  179.  
  180. long thisSequenceNumber = totalItemsEnQueued++;
  181. int count = WorkItemCount;
  182. #if TRACK_DETAILED_STATS
  183. if (StatisticsCollector.CollectShedulerQueuesStats)
  184. queueTracking.OnEnQueueRequest(1, count);
  185.  
  186. if (StatisticsCollector.CollectGlobalShedulerStats)
  187. SchedulerStatisticsGroup.OnWorkItemEnqueue();
  188. #endif
  189.  
  190. workItems.Enqueue(task);
  191. int maxPendingItemsLimit = masterScheduler.MaxPendingItemsLimit.SoftLimitThreshold;
  192. if (maxPendingItemsLimit > 0 && count > maxPendingItemsLimit)
  193. {
  194. log.Warn(ErrorCode.SchedulerTooManyPendingItems, String.Format("{0} pending work items for group {1}, exceeding the warning threshold of {2}",
  195. count, Name, maxPendingItemsLimit));
  196. }
  197.  
  198. if (previousState != WorkGroupStatus.Waiting)
  199. {
  200. return;
  201. // Console.WriteLine(previousState + " " + wqe++);
  202. }
  203. #if DEBUG
  204. if (log.IsVerbose3) log.Verbose3("Add to RunQueue {0}, #{1}, onto {2}", task, thisSequenceNumber, SchedulingContext);
  205. #endif
  206. masterScheduler.RunQueue.Add(this);
  207. }
  208.  
  209. private static int wqe;
  210. /// <summary>
  211. /// Shuts down this work item group so that it will not process any additional work items, even if they
  212. /// have already been queued.
  213. /// </summary>
  214. internal void Stop()
  215. {
  216. if (IsActive)
  217. {
  218. ReportWorkGroupProblem(
  219. String.Format("WorkItemGroup is being stoped while still active. workItemCount = {0}."
  220. + "The likely reason is that the task is not being 'awaited' properly.", WorkItemCount),
  221. ErrorCode.SchedulerWorkGroupStopping);
  222. }
  223.  
  224. var previousState = (WorkGroupStatus)Interlocked.Exchange(ref state, (int)WorkGroupStatus.Shutdown);
  225.  
  226. if (previousState == WorkGroupStatus.Shutdown)
  227. {
  228. log.Warn(ErrorCode.SchedulerWorkGroupShuttingDown, "WorkItemGroup is already shutting down {0}", this.ToString());
  229. return;
  230. }
  231.  
  232. if (StatisticsCollector.CollectPerWorkItemStats)
  233. SchedulerStatisticsGroup.UnRegisterWorkItemGroup(workItemGroupStatisticsNumber);
  234.  
  235. if (StatisticsCollector.CollectGlobalShedulerStats)
  236. SchedulerStatisticsGroup.OnWorkItemDrop(WorkItemCount);
  237.  
  238. if (StatisticsCollector.CollectShedulerQueuesStats)
  239. queueTracking.OnStopExecution();
  240.  
  241. Task task;
  242. while (workItems.TryDequeue(out task))
  243. {
  244. // Ignore all queued Tasks, so in case they are faulted they will not cause UnobservedException.
  245. task.Ignore();
  246. }
  247. }
  248. #region IWorkItem Members
  249.  
  250. public WorkItemType ItemType
  251. {
  252. get { return WorkItemType.WorkItemGroup; }
  253. }
  254.  
  255. // Execute one or more turns for this activation.
  256. // This method is always called in a single-threaded environment -- that is, no more than one
  257. // thread will be in this method at once -- but other asynch threads may still be queueing tasks, etc.
  258. public void Execute()
  259. {
  260. var previousState = (WorkGroupStatus)Interlocked.Exchange(ref state, (int)WorkGroupStatus.Running);
  261. if (previousState == WorkGroupStatus.Shutdown)
  262. {
  263. if (!IsActive) return; // Don't mind if no work has been queued to this work group yet.
  264.  
  265. ReportWorkGroupProblemWithBacktrace(
  266. "Cannot execute work items in a work item group that is in a shutdown state.",
  267. ErrorCode.SchedulerNotExecuteWhenShutdown); // Throws InvalidOperationException
  268. return;
  269. }
  270.  
  271. var thread = WorkerPoolThread.CurrentWorkerThread;
  272.  
  273. try
  274. {
  275. // Process multiple items -- drain the applicationMessageQueue (up to max items) for this physical activation
  276. int count = 0;
  277. var stopwatch = new Stopwatch();
  278. stopwatch.Start();
  279. do
  280. {
  281. //if (state == WorkGroupStatus.Shutdown)
  282. //{
  283. // if (WorkItemCount > 0)
  284. // log.Warn(ErrorCode.SchedulerSkipWorkStopping, "Thread {0} is exiting work loop due to Shutdown state {1} while still having {2} work items in the queue.",
  285. // thread.ToString(), this.ToString(), WorkItemCount);
  286. // else
  287. // if(log.IsVerbose) log.Verbose("Thread {0} is exiting work loop due to Shutdown state {1}. Has {2} work items in the queue.",
  288. // thread.ToString(), this.ToString(), WorkItemCount);
  289.  
  290. // break;
  291. //}
  292.  
  293. // Check the cancellation token (means that the silo is stopping)
  294. if (thread.CancelToken.IsCancellationRequested)
  295. {
  296. log.Warn(ErrorCode.SchedulerSkipWorkCancelled, "Thread {0} is exiting work loop due to cancellation token. WorkItemGroup: {1}, Have {2} work items in the queue.",
  297. thread.ToString(), this.ToString(), WorkItemCount);
  298. break;
  299. }
  300.  
  301. // Get the first Work Item on the list
  302. Task task;
  303. if (!workItems.TryDequeue(out task))
  304. {
  305. // If the list is empty, then we're done
  306. break;
  307. }
  308.  
  309. #if TRACK_DETAILED_STATS
  310. if (StatisticsCollector.CollectGlobalShedulerStats)
  311. SchedulerStatisticsGroup.OnWorkItemDequeue();
  312. #endif
  313.  
  314. #if DEBUG
  315. if (log.IsVerbose2) log.Verbose2("About to execute task {0} in SchedulingContext={1}", task, SchedulingContext);
  316. #endif
  317. var taskStart = stopwatch.Elapsed;
  318.  
  319. try
  320. {
  321. thread.CurrentTask = task;
  322. #if TRACK_DETAILED_STATS
  323. if (StatisticsCollector.CollectTurnsStats)
  324. SchedulerStatisticsGroup.OnTurnExecutionStartsByWorkGroup(workItemGroupStatisticsNumber, thread.WorkerThreadStatisticsNumber, SchedulingContext);
  325. #endif
  326. TaskRunner.RunTask(task);
  327. }
  328. catch (Exception ex)
  329. {
  330. log.Error(ErrorCode.SchedulerExceptionFromExecute, String.Format("Worker thread caught an exception thrown from Execute by task {0}", task), ex);
  331. throw;
  332. }
  333. finally
  334. {
  335. #if TRACK_DETAILED_STATS
  336. if (StatisticsCollector.CollectTurnsStats)
  337. SchedulerStatisticsGroup.OnTurnExecutionEnd(Utils.Since(thread.CurrentStateStarted));
  338.  
  339. if (StatisticsCollector.CollectThreadTimeTrackingStats)
  340. thread.threadTracking.IncrementNumberOfProcessed();
  341. #endif
  342. totalItemsProcessed++;
  343. var taskLength = stopwatch.Elapsed - taskStart;
  344. if (taskLength > OrleansTaskScheduler.TurnWarningLengthThreshold)
  345. {
  346. SchedulerStatisticsGroup.NumLongRunningTurns.Increment();
  347. log.Warn(ErrorCode.SchedulerTurnTooLong3, "Task {0} in WorkGroup {1} took elapsed time {2:g} for execution, which is longer than {3}. Running on thread {4}",
  348. OrleansTaskExtentions.ToString(task), SchedulingContext.ToString(), taskLength, OrleansTaskScheduler.TurnWarningLengthThreshold, thread.ToString());
  349. }
  350. thread.CurrentTask = null;
  351. }
  352. count++;
  353. }
  354. while (((MaxWorkItemsPerTurn <= 0) || (count <= MaxWorkItemsPerTurn)) &&
  355. ((ActivationSchedulingQuantum <= TimeSpan.Zero) || (stopwatch.Elapsed < ActivationSchedulingQuantum)));
  356. stopwatch.Stop();
  357. }
  358. catch (Exception ex)
  359. {
  360. log.Error(ErrorCode.Runtime_Error_100032, String.Format("Worker thread {0} caught an exception thrown from IWorkItem.Execute", thread), ex);
  361. }
  362. finally
  363. {
  364. // Now we're not Running anymore.
  365. // If we left work items on our run list, we're Runnable, and need to go back on the silo run queue;
  366. // If our run list is empty, then we're waiting.
  367. var stateToSet = workItems.IsEmpty ? (int)WorkGroupStatus.Waiting : (int)WorkGroupStatus.Runnable;
  368. var previoussState = (WorkGroupStatus)Interlocked.Exchange(ref state, stateToSet); //, (int)WorkGroupStatus.Waiting
  369. // todo: verify
  370. if ( stateToSet == (int)WorkGroupStatus.Runnable)
  371. {
  372. masterScheduler.RunQueue.Add(this);
  373. }
  374. }
  375. }
  376.  
  377. #endregion
  378.  
  379. public override string ToString()
  380. {
  381. return String.Format("{0}WorkItemGroup:Name={1},WorkGroupStatus={2}",
  382. IsSystemGroup ? "System*" : "",
  383. Name,
  384. state);
  385. }
  386.  
  387. public string DumpStatus()
  388. {
  389. var sb = new StringBuilder();
  390. sb.Append(this);
  391. sb.AppendFormat(". Currently QueuedWorkItems={0}; Total EnQueued={1}; Total processed={2}; Quantum expirations={3}; ",
  392. WorkItemCount, totalItemsEnQueued, totalItemsProcessed, quantumExpirations);
  393.  
  394. if (AverageQueueLenght > 0)
  395. {
  396. sb.AppendFormat("average queue length at enqueue: {0}; ", AverageQueueLenght);
  397. if (!totalQueuingDelay.Equals(TimeSpan.Zero) && totalItemsProcessed > 0)
  398. {
  399. sb.AppendFormat("average queue delay: {0}ms; ", totalQueuingDelay.Divide(totalItemsProcessed).TotalMilliseconds);
  400. }
  401. }
  402.  
  403. sb.AppendFormat("TaskRunner={0}; ", TaskRunner);
  404. if (SchedulingContext != null)
  405. {
  406. sb.AppendFormat("Detailed SchedulingContext=<{0}>", SchedulingContext.DetailedStatus());
  407. }
  408. return sb.ToString();
  409. }
  410.  
  411. private void ReportWorkGroupProblemWithBacktrace(string what, ErrorCode errorCode)
  412. {
  413. var st = Utils.GetStackTrace();
  414. var msg = string.Format("{0} {1}", what, DumpStatus());
  415. log.Warn(errorCode, msg + Environment.NewLine + " Called from " + st);
  416. }
  417.  
  418. private void ReportWorkGroupProblem(string what, ErrorCode errorCode)
  419. {
  420. var msg = string.Format("{0} {1}", what, DumpStatus());
  421. log.Warn(errorCode, msg);
  422. }
  423. }
  424. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement