using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; namespace Sharp7.Rx.Basics { /// /// Provides a task scheduler that ensures a maximum concurrency level while /// running on top of the ThreadPool. /// from http://msdn.microsoft.com/en-us/library/ee789351.aspx /// internal class LimitedConcurrencyLevelTaskScheduler : TaskScheduler { /// Whether the current thread is processing work items. [ThreadStatic] private static bool currentThreadIsProcessingItems; /// The maximum concurrency level allowed by this scheduler. private readonly int maxDegreeOfParallelism; /// The list of tasks to be executed. private readonly LinkedList tasks = new LinkedList(); // protected by lock(_tasks) /// Whether the scheduler is currently processing work items. private int delegatesQueuedOrRunning; // protected by lock(_tasks) /// /// Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the /// specified degree of parallelism. /// /// The maximum degree of parallelism provided by this scheduler. public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism) { if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism"); this.maxDegreeOfParallelism = maxDegreeOfParallelism; } /// Gets the maximum concurrency level supported by this scheduler. public sealed override int MaximumConcurrencyLevel => maxDegreeOfParallelism; /// Gets an enumerable of the tasks currently scheduled on this scheduler. /// An enumerable of the tasks currently scheduled. protected sealed override IEnumerable GetScheduledTasks() { var lockTaken = false; try { Monitor.TryEnter(tasks, ref lockTaken); if (lockTaken) return tasks.ToArray(); else throw new NotSupportedException(); } finally { if (lockTaken) Monitor.Exit(tasks); } } /// Queues a task to the scheduler. /// The task to be queued. protected sealed override void QueueTask(Task task) { // Add the task to the list of tasks to be processed. If there aren't enough // delegates currently queued or running to process tasks, schedule another. lock (tasks) { tasks.AddLast(task); if (delegatesQueuedOrRunning < maxDegreeOfParallelism) { ++delegatesQueuedOrRunning; NotifyThreadPoolOfPendingWork(); } } } /// Attempts to remove a previously scheduled task from the scheduler. /// The task to be removed. /// Whether the task could be found and removed. protected sealed override bool TryDequeue(Task task) { lock (tasks) { return tasks.Remove(task); } } /// Attempts to execute the specified task on the current thread. /// The task to be executed. /// /// Whether the task could be executed on the current thread. protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) { // If this thread isn't already processing a task, we don't support inlining if (!currentThreadIsProcessingItems) return false; // If the task was previously queued, remove it from the queue if (taskWasPreviouslyQueued) TryDequeue(task); // Try to run the task. return TryExecuteTask(task); } /// /// Informs the ThreadPool that there's work to be executed for this scheduler. /// private void NotifyThreadPoolOfPendingWork() { ThreadPool.UnsafeQueueUserWorkItem(_ => { // Note that the current thread is now processing work items. // This is necessary to enable inlining of tasks into this thread. currentThreadIsProcessingItems = true; try { // Process all available items in the queue. while (true) { Task item; lock (tasks) { // When there are no more items to be processed, // note that we're done processing, and get out. if (tasks.Count == 0) { --delegatesQueuedOrRunning; break; } // Get the next item from the queue item = tasks.First.Value; tasks.RemoveFirst(); } // Execute the task we pulled out of the queue TryExecuteTask(item); } } // We're done processing items on the current thread finally { currentThreadIsProcessingItems = false; } }, null); } } }