using System; using System.Collections; using System.Collections.Generic; using System.Linq; using System.Text; using System.Collections.Specialized; using System.ComponentModel; using System.Threading; using System.Threading.Tasks; using System.Collections.ObjectModel; using System.Collections.Concurrent; using RoboSharp.EventArgObjects; using RoboSharp.Interfaces; using RoboSharp.Results; namespace RoboSharp { /// /// Contains a private List{IRoboCommand} object with controlled methods for access to it.
/// Attempting to modify the list while = true results in being thrown. /// Implements the following:
///
/// -- Allow enumerating through the collection that is stored in a private list -- Also see
/// -- Allow subscription to collection changes against the list
/// -- Most properties will trigger events when updated.
/// -- Allow disposal of all objects in the list. ///
/// /// /// public sealed class RoboQueue : IRoboQueue, IDisposable, INotifyPropertyChanged, IEnumerable, INotifyCollectionChanged { #region < Constructors > /// /// Initialize a new (empty) object. /// public RoboQueue() { Init(); Commands = new ReadOnlyCollection(CommandList); } /// /// Initialize a new (empty) object with a specificed Name. /// /// public RoboQueue(string name, int maxConcurrentJobs = 1) { Init(name, maxConcurrentJobs); Commands = new ReadOnlyCollection(CommandList); } /// /// Initialize a new object that contains the supplied . /// /// public RoboQueue(IRoboCommand roboCommand, string name = "", int maxConcurrentJobs = 1) { CommandList.Add(roboCommand); Init(name, maxConcurrentJobs); Commands = new ReadOnlyCollection(CommandList); } /// /// Initialize a new object that contains the supplied collection. /// /// IRoboCommand(s) to populate the list with. /// /// public RoboQueue(IEnumerable roboCommand, string name = "", int maxConcurrentJobs = 1) { CommandList.AddRange(roboCommand); Init(name, maxConcurrentJobs); Commands = new ReadOnlyCollection(CommandList); } private void Init(string name = "", int maxConcurrentJobs = 1) { NameField = name; MaxConcurrentJobsField = maxConcurrentJobs; } #endregion #region < Fields > private readonly ObservableList CommandList = new ObservableList(); private RoboQueueProgressEstimator Estimator; private bool disposedValue; private CancellationTokenSource TaskCancelSource; private string NameField; private bool WasCancelledField = false; private bool IsPausedField = false; private bool IsCopyOperationRunningField = false; private bool IsListOperationRunningField = false; private bool ListOnlyCompletedField = false; private bool CopyOpCompletedField = false; private int MaxConcurrentJobsField; private int JobsStartedField; private int JobsCompleteField; private int JobsCompletedSuccessfullyField; #endregion #region < Properties Dependent on CommandList > /// /// Checks property of all items in the list. ///
INotifyPropertyChanged is not raised when this property changes. ///
public bool AnyRunning => CommandList.Any(c => c.IsRunning); /// /// Checks property of all items in the list. ///
INotifyPropertyChanged is not raised when this property changes. ///
public bool AnyPaused => CommandList.Any(c => c.IsPaused); /// /// Checks property of all items in the list. ///
INotifyPropertyChanged is not raised when this property changes. ///
public bool AnyCancelled => CommandList.Any(c => c.IsCancelled); /// /// Check the list and get the count of RoboCommands that are either in the 'Run' or 'Paused' state.
/// (Paused state is included since these can be resumed at any time) ///
public int JobsCurrentlyRunning => CommandList.Where((C) => C.IsRunning | C.IsPaused).Count(); /// Number of RoboCommands in the list public int ListCount => CommandList.Count; #endregion #region < Properties > /// /// Name of this collection of RoboCommands /// public string Name { get => NameField; private set { if (value != NameField) { NameField = value; OnPropertyChanged("Name"); } } } /// /// Wraps the private into a ReadOnlyCollection for public consumption and data binding. /// public ReadOnlyCollection Commands { get; } /// /// /// This object will produce the sum of all the ProgressEstimator objects generated by the commands within the list. /// After the first request, the values will be updated every 250ms while the Queue is still running. /// public IProgressEstimator ProgressEstimator => Estimator; /// /// Indicates if a task is currently running or paused.
/// When true, prevents starting new tasks and prevents modication of the list. ///
public bool IsRunning => IsCopyOperationRunning || IsListOnlyRunning; /// /// This is set true when is called while any of the items in the list were running, and set false when or is called. /// public bool IsPaused { get => IsPausedField; private set { if (value != IsPausedField) { IsPausedField = value; OnPropertyChanged("IsPaused"); } } } /// /// Flag is set to TRUE if the 'Stop' command is issued. Reset to False when starting a new operation. /// public bool WasCancelled { get => WasCancelledField; private set { if (value != WasCancelledField) { WasCancelledField = value; OnPropertyChanged("WasCancelled"); } } } /// Indicates if the StartAll task is currently running. public bool IsCopyOperationRunning { get => IsCopyOperationRunningField; private set { if (value != IsCopyOperationRunningField) { bool running = IsRunning; IsCopyOperationRunningField = value; OnPropertyChanged("IsCopyOperationRunning"); if (IsRunning != running) OnPropertyChanged("IsRunning"); } } } /// Indicates if the StartAll_ListOnly task is currently running. public bool IsListOnlyRunning { get => IsListOperationRunningField; private set { if (value != IsListOperationRunningField) { bool running = IsRunning; IsListOperationRunningField = value; OnPropertyChanged("IsListOnlyRunning"); if (IsRunning != running) OnPropertyChanged("IsRunning"); } } } /// Indicates if the StartAll_ListOnly() operation has been completed. public bool ListOnlyCompleted { get => ListOnlyCompletedField; private set { if (value != ListOnlyCompletedField) { ListOnlyCompletedField = value; OnPropertyChanged("ListOnlyCompleted"); } } } /// Indicates if the StartAll() operation has been completed. public bool CopyOperationCompleted { get => CopyOpCompletedField; private set { if (value != CopyOpCompletedField) { CopyOpCompletedField = value; OnPropertyChanged("CopyOperationCompleted"); } } } /// /// Specify the max number of RoboCommands to execute at the same time.
/// Set Value to 0 to allow infinite number of jobs (Will issue all start commands at same time)
/// Default Value = 1;
///
public int MaxConcurrentJobs { get => MaxConcurrentJobsField; set { int newVal = value > 0 ? value : IsRunning & MaxConcurrentJobsField > 0 ? 1 : 0; //Allow > 0 at all times //If running, set value to 1 if (newVal != MaxConcurrentJobsField) { MaxConcurrentJobsField = newVal; OnPropertyChanged("MaxConcurrentJobs"); } } } /// /// Report how many tasks has completed during the run.
/// This value is reset to 0 when a new run starts, and increments as each job exits. ///
public int JobsComplete { get => JobsCompleteField; private set { if (value != JobsCompleteField) { JobsCompleteField = value; OnPropertyChanged("JobsComplete"); } } } /// /// Report how many tasks has completed successfully during the run.
/// This value is reset to 0 when a new run starts, and increments as each job exits. ///
public int JobsCompletedSuccessfully { get => JobsCompletedSuccessfullyField; private set { if (value != JobsCompletedSuccessfullyField) { JobsCompletedSuccessfullyField = value; OnPropertyChanged("JobsCompletedSuccessfully"); } } } /// /// Report how many tasks have been started during the run.
/// This value is reset to 0 when a new run starts, and increments as each job starts. ///
public int JobsStarted { get => JobsStartedField; private set { if (value != JobsStartedField) { JobsStartedField = value; OnPropertyChanged("JobsStarted"); } } } /// /// Contains the results from the most recent run started via /// Any time StartALL_ListOnly is called, a new RoboQueueResults object will be created.
///
public IRoboQueueResults ListResults => ListResultsObj; private RoboQueueResults ListResultsObj { get; set; } /// /// Contains the results from the most recent run started via /// Any time StartALL is called, a new RoboQueueResults object will be created.
///
public IRoboQueueResults RunResults => RunResultsObj; private RoboQueueResults RunResultsObj { get; set; } /* * Possible To-Do: Code in ConcurrentQueue objects if issues arise with items being added to the ResultsObj lists. * private ConcurrentQueue ListResultsQueue = new ConcurrentQueue(); * private ConcurrentQueue RunResultsQueue = new ConcurrentQueue(); */ #endregion #region < Events > #region < IRoboCommand Events > /// /// This bind to every IRoboCommand in the list. public event RoboCommand.FileProcessedHandler OnFileProcessed; /// /// This bind to every RoboCommand in the list. public event RoboCommand.CommandErrorHandler OnCommandError; /// /// This bind to every RoboCommand in the list. public event RoboCommand.ErrorHandler OnError; /// /// This will occur for every RoboCommand in the list. public event RoboCommand.CommandCompletedHandler OnCommandCompleted; /// /// This bind to every RoboCommand in the list. public event RoboCommand.CopyProgressHandler OnCopyProgressChanged; #endregion #region < ListUpdated Events > /// Occurs when the gets updated public event RoboCopyResultsList.ResultsListUpdated ListResultsUpdated; /// Occurs when the gets updated public event RoboCopyResultsList.ResultsListUpdated RunResultsUpdated; #endregion #region < ProgressUpdater Event > /// Handles public delegate void ProgressUpdaterCreatedHandler(RoboQueue sender, ProgressEstimatorCreatedEventArgs e); /// /// Occurs when a is created when starting a new task, allowing binding to occur within the event subscriber.
/// This event will occur once per Start. See notes on for more details. ///
public event ProgressUpdaterCreatedHandler OnProgressEstimatorCreated; #endregion #region < CommandStarted Event > /// Handles public delegate void CommandStartedHandler(RoboQueue sender, RoboQueueCommandStartedEventArgs e); /// /// Occurs each time a Command has started succesfully /// public event CommandStartedHandler OnCommandStarted; #endregion #region < RunComplete Event > /// Handles public delegate void RunCompletedHandler(RoboQueue sender, RoboQueueCompletedEventArgs e); /// /// Occurs after when the task started by the StartAll and StartAll_ListOnly methods has finished executing. /// public event RunCompletedHandler RunCompleted; #endregion #region < UnhandledException Fault > /// /// Occurs if the RoboQueue task is stopped due to an unhandled exception. Occurs instead of ///
Also occurs if any of the RoboCommand objects raise ///
public event UnhandledExceptionEventHandler TaskFaulted; #endregion #endregion #region < Methods > /// /// Get the current instance of the object /// /// New instance of the list. public RoboQueueResults GetListResults() => ListResultsObj; /// /// Get the current of the object /// /// New instance of the list. public RoboQueueResults GetRunResults() => RunResultsObj; /// /// Run against all items in the list. /// public void StopAll() { //If a TaskCancelSource is present, request cancellation. The continuation tasks null the value out then call this method to ensure everything stopped once they complete. if (TaskCancelSource != null && !TaskCancelSource.IsCancellationRequested) { IsPaused = false; TaskCancelSource.Cancel(); // Cancel the IRoboCommand Task //IRoboCommand Continuation Task will call StopAllTask() method to ensure all processes are stopped & diposed. } else if (TaskCancelSource == null) { //This is supplied to allow stopping all commands if consumer manually looped through the list instead of using the Start* methods. CommandList.ForEach((c) => c.Stop()); IsCopyOperationRunning = false; IsListOnlyRunning = false; IsPaused = false; } WasCancelled = true; } /// /// Loop through the items in the list and issue on any commands where is true. /// public void PauseAll() { CommandList.ForEach((c) => { if (c.IsRunning) c.Pause(); }); IsPaused = IsRunning || AnyPaused; } /// /// Loop through the items in the list and issue on any commands where is true. /// public void ResumeAll() { CommandList.ForEach((c) => { if (c.IsPaused) c.Resume(); }); IsPaused = false; } #endregion #region < Run List-Only Mode > /// /// Set all IRoboCommand objects to ListOnly mode, run them, then set all RoboCommands back to their previous ListOnly mode setting. /// /// public Task StartAll_ListOnly(string domain = "", string username = "", string password = "") { if (IsRunning) throw new InvalidOperationException("Cannot start a new RoboQueue Process while this RoboQueue is already running."); IsListOnlyRunning = true; ListOnlyCompleted = false; ListResultsObj = new RoboQueueResults(); ListResultsUpdated?.Invoke(this, new ResultListUpdatedEventArgs(ListResults)); //Run the commands Task Run = StartJobs(domain, username, password, true); Task ResultsTask = Run.ContinueWith((continuation) => { //Set Flags IsListOnlyRunning = false; IsPaused = false; ListOnlyCompleted = !WasCancelled && !continuation.IsFaulted; // If some fault occurred while processing, throw the exception to caller if (continuation.IsFaulted) { TaskFaulted?.Invoke(this, new UnhandledExceptionEventArgs(continuation.Exception, true)); throw continuation.Exception; } ListResultsObj.EndTime= DateTime.Now; RunCompleted?.Invoke(this, new RoboQueueCompletedEventArgs(ListResultsObj, true)); return (IRoboQueueResults)ListResultsObj; }, CancellationToken.None ); return ResultsTask; } #endregion #region < Run User-Set Parameters > /// public Task StartAll(string domain = "", string username = "", string password = "") { if (IsRunning) throw new InvalidOperationException("Cannot start a new RoboQueue Process while this RoboQueue is already running."); IsCopyOperationRunning = true; CopyOperationCompleted = false; RunResultsObj = new RoboQueueResults(); RunResultsUpdated?.Invoke(this, new ResultListUpdatedEventArgs(RunResults)); Task Run = StartJobs(domain, username, password, false); Task ResultsTask = Run.ContinueWith((continuation) => { IsCopyOperationRunning = false; IsPaused = false; CopyOperationCompleted = !WasCancelled && !continuation.IsFaulted; // If some fault occurred while processing, throw the exception to caller if (continuation.IsFaulted) { TaskFaulted?.Invoke(this, new UnhandledExceptionEventArgs(continuation.Exception, true)); throw continuation.Exception; } RunResultsObj.EndTime = DateTime.Now; RunCompleted?.Invoke(this, new RoboQueueCompletedEventArgs(RunResultsObj, false)); return (IRoboQueueResults)RunResultsObj; }, CancellationToken.None ); return ResultsTask; } #endregion #region < StartJobs Method > /// /// Create Task that Starts all RoboCommands. /// /// , , and are applied to all IRoboCommand objects during this run. /// New Task that finishes after all RoboCommands have stopped executing private Task StartJobs(string domain = "", string username = "", string password = "", bool ListOnlyMode = false) { Debugger.Instance.DebugMessage("Starting Parallel execution of RoboQueue"); TaskCancelSource = new CancellationTokenSource(); CancellationToken cancellationToken = TaskCancelSource.Token; var SleepCancelToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken).Token; List TaskList = new List(); JobsStarted = 0; JobsComplete = 0; JobsCompletedSuccessfully = 0; WasCancelled = false; IsPaused = false; //Create a Task to Start all the RoboCommands Task StartAll = Task.Factory.StartNew(async () => { //Reset results of all commands in the list foreach (RoboCommand cmd in CommandList) cmd.ResetResults(); Estimator = new RoboQueueProgressEstimator(); OnProgressEstimatorCreated?.Invoke(this, new ProgressEstimatorCreatedEventArgs(Estimator)); //Start all commands, running as many as allowed foreach (IRoboCommand cmd in CommandList) { if (cancellationToken.IsCancellationRequested) break; //Assign the events RoboCommand.CommandCompletedHandler handler = (o, e) => RaiseCommandCompleted(o, e, ListOnlyMode); cmd.OnCommandCompleted += handler; cmd.OnCommandError += this.OnCommandError; cmd.OnCopyProgressChanged += this.OnCopyProgressChanged; cmd.OnError += this.OnError; cmd.OnFileProcessed += this.OnFileProcessed; cmd.OnProgressEstimatorCreated += Cmd_OnProgressEstimatorCreated; cmd.TaskFaulted += TaskFaulted; //Start the job //Once the job ends, unsubscribe events Task C = !ListOnlyMode ? cmd.Start(domain, username, password) : cmd.Start_ListOnly(domain, username, password); Task T = C.ContinueWith((t) => { cmd.OnCommandCompleted -= handler; cmd.OnCommandError -= this.OnCommandError; cmd.OnCopyProgressChanged -= this.OnCopyProgressChanged; cmd.OnError -= this.OnError; cmd.OnFileProcessed -= this.OnFileProcessed; if (t.IsFaulted) throw t.Exception; // If some fault occurred while processing, throw the exception to caller }, CancellationToken.None); TaskList.Add(T); //Add the continuation task to the list. //Raise Events JobsStarted++; OnPropertyChanged("JobsStarted"); if (cmd.IsRunning) OnCommandStarted?.Invoke(this, new RoboQueueCommandStartedEventArgs(cmd)); //Declare that a new command in the queue has started. OnPropertyChanged("JobsCurrentlyRunning"); //Notify the Property Changes //Check if more jobs are allowed to run if (IsPaused) cmd.Pause(); //Ensure job that just started gets paused if Pausing was requested while (!cancellationToken.IsCancellationRequested && (IsPaused || (MaxConcurrentJobs > 0 && JobsCurrentlyRunning >= MaxConcurrentJobs && TaskList.Count < CommandList.Count))) await ThreadEx.CancellableSleep(500, SleepCancelToken); } //End of ForEachLoop //Asynchronous wait for either cancellation is requested OR all jobs to finish. //- Task.WaitAll is blocking -> not ideal, and also throws if cancellation is requested -> also not ideal. //- Task.WhenAll is awaitable, but does not provide allow cancellation //- If Cancelled, the 'WhenAll' task continues to run, but the ContinueWith task here will stop all tasks, thus completing the WhenAll task if (!cancellationToken.IsCancellationRequested) { var tcs = new TaskCompletionSource(); _ = cancellationToken.Register(() => tcs.TrySetResult(null)); _ = await Task.WhenAny(Task.WhenAll(TaskList.ToArray()), tcs.Task); } }, cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Current).Unwrap(); //Continuation Task return results to caller Task ContinueWithTask = StartAll.ContinueWith(async (continuation) => { Estimator?.CancelTasks(); if (cancellationToken.IsCancellationRequested) { //If cancellation was requested -> Issue the STOP command to all commands in the list Debugger.Instance.DebugMessage("RoboQueue Task Was Cancelled"); await StopAllTask(TaskList); } else if (continuation.IsFaulted) { Debugger.Instance.DebugMessage("RoboQueue Task Faulted"); await StopAllTask(TaskList); throw continuation.Exception; } else { Debugger.Instance.DebugMessage("RoboQueue Task Completed"); } TaskCancelSource?.Dispose(); TaskCancelSource = null; }, CancellationToken.None).Unwrap(); return ContinueWithTask; } private async Task StopAllTask(IEnumerable StartedTasks) { CommandList.ForEach((c) => c.Stop()); await Task.WhenAll(StartedTasks); IsCopyOperationRunning = false; IsListOnlyRunning = false; IsPaused = false; TaskCancelSource.Dispose(); TaskCancelSource = null; } private void Cmd_OnProgressEstimatorCreated(IRoboCommand sender, ProgressEstimatorCreatedEventArgs e) { Estimator?.BindToProgressEstimator(e.ResultsEstimate); sender.OnProgressEstimatorCreated -= Cmd_OnProgressEstimatorCreated; } /// /// Intercept OnCommandCompleted from each IRoboCommand, react, then raise this object's OnCommandCompleted event /// private void RaiseCommandCompleted(IRoboCommand sender, RoboCommandCompletedEventArgs e, bool ListOnlyBinding) { if (ListOnlyBinding) { ListResultsObj.Add(sender.GetResults()); ListResultsUpdated?.Invoke(this, new ResultListUpdatedEventArgs(ListResults)); } else { RunResultsObj.Add(sender.GetResults()); RunResultsUpdated?.Invoke(this, new ResultListUpdatedEventArgs(RunResults)); } //Notify the Property Changes if (!sender.IsCancelled) { JobsCompletedSuccessfully++; } JobsComplete++; OnPropertyChanged("JobsCurrentlyRunning"); OnCommandCompleted?.Invoke(sender, e); } #endregion #region < IDisposable Implementation > private void Dispose(bool disposing) { if (!disposedValue) { if (disposing) { Estimator?.UnBind(); //IRoboCommand objects attach to a process, so must be in the 'unmanaged' section. foreach (IRoboCommand cmd in CommandList) cmd.Dispose(); CommandList.Clear(); } disposedValue = true; } } /// /// Finalizer -> Ensures that all IRoboCommand objects get disposed of properly when program exits /// ~RoboQueue() { // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method Dispose(disposing: false); } /// /// Dispose all IRoboCommand objects contained in the list. - This will kill any Commands that have = true (default)
///
public void Dispose() { // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method Dispose(disposing: true); GC.SuppressFinalize(this); } #endregion #region < INotifyPropertyChanged, INotifyCollectionChanged, IEnumerable > /// public event PropertyChangedEventHandler PropertyChanged; private void OnPropertyChanged(string propertyName) => PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(propertyName)); /// public event NotifyCollectionChangedEventHandler CollectionChanged { add { CommandList.CollectionChanged += value; } remove { CommandList.CollectionChanged -= value; } } /// /// Gets the enumerator for the enumeating through this object's objects /// public IEnumerator GetEnumerator() { return Commands.GetEnumerator(); } IEnumerator IEnumerable.GetEnumerator() { return ((IEnumerable)Commands).GetEnumerator(); } #endregion #region < List Access Methods > /// /// Exception thrown when attempting to run a method accesses the list backing a RoboQueue object while the tasks are in progress. /// public class ListAccessDeniedException : Exception { /// This functionality is disabled if == true. /// private const string StandardMsg = "Running methods that modify the list of RoboCommands methods while RoboQueue.IsRunning = TRUE is prohibited."; internal ListAccessDeniedException() : base(StandardMsg) { } internal ListAccessDeniedException(string message) : base($"{StandardMsg}\n{message}") { } internal ListAccessDeniedException(string message, Exception innerException) : base(message, innerException) { } } #region < Add > /// /// public void AddCommand(IRoboCommand item) { if (IsRunning) throw new ListAccessDeniedException(); CommandList.Add(item); OnPropertyChanged("ListCount"); OnPropertyChanged("Commands"); } /// /// public void AddCommand(int index, IRoboCommand item) { if (IsRunning) throw new ListAccessDeniedException(); CommandList.Insert(index, item); OnPropertyChanged("ListCount"); OnPropertyChanged("Commands"); } /// /// public void AddCommand(IEnumerable collection) { if (IsRunning) throw new ListAccessDeniedException(); CommandList.AddRange(collection); OnPropertyChanged("ListCount"); OnPropertyChanged("Commands"); } #endregion #region < Remove > /// /// public void RemoveCommand(IRoboCommand item) { if (IsRunning) throw new ListAccessDeniedException(); CommandList.Remove(item); OnPropertyChanged("ListCount"); OnPropertyChanged("Commands"); } /// /// public void RemoveCommand(int index) { if (IsRunning) throw new ListAccessDeniedException(); CommandList.RemoveAt(index); OnPropertyChanged("ListCount"); OnPropertyChanged("Commands"); } /// /// public void RemoveCommand(int index, int count) { if (IsRunning) throw new ListAccessDeniedException(); CommandList.RemoveRange(index, count); OnPropertyChanged("ListCount"); OnPropertyChanged("Commands"); } /// /// public void RemovCommand(Predicate match) { if (IsRunning) throw new ListAccessDeniedException(); CommandList.RemoveAll(match); OnPropertyChanged("ListCount"); OnPropertyChanged("Commands"); } /// /// public void ClearCommandList() { if (IsRunning) throw new ListAccessDeniedException(); CommandList.Clear(); OnPropertyChanged("ListCount"); OnPropertyChanged("Commands"); } /// Performs then public void ReplaceCommand(IRoboCommand item, int index) { if (IsRunning) throw new ListAccessDeniedException(); CommandList.Replace(index, item); } #endregion #region < Find / Contains / Etc > /// public bool Contains(IRoboCommand item) => CommandList.Contains(item); /// /// public void ForEach(Action action) { if (IsRunning) throw new ListAccessDeniedException(); CommandList.ForEach(action); } /// public List FindAll(Predicate predicate) => CommandList.FindAll(predicate); /// public IRoboCommand Find(Predicate predicate) => CommandList.Find(predicate); /// public int IndexOf(IRoboCommand item) => CommandList.IndexOf(item); #endregion #endregion } }