-
-
Save oising/6f8a09b1038c2940c3c49dd334e30f61 to your computer and use it in GitHub Desktop.
A PowerShell Job wrapper for System.Threading.Tasks.Task to bridge the gap between asynchronous operations in C# and PowerShell.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Management.Automation; | |
using System.Threading; | |
using System.Threading.Tasks; | |
namespace Casion.PowerShell | |
{ | |
/// <summary> | |
/// <para type='description'>A PowerShell Job that represents one or more asynchronous <see cref="Task"/> instances | |
/// that belong to a logical group (such as multiple tasks created by a single cmdlet invocation). | |
/// This allows standard PowerShell job operations such as Wait-Job and Receive-Job to be used with the <see cref="Task"/> class.</para> | |
/// </summary> | |
public sealed class TaskJob : Job | |
{ | |
private volatile bool _hasFailed, _hasStopped; | |
private readonly int _childJobsCount; | |
private volatile int _childJobsFinishedCount; | |
private readonly object _syncObject = new object(); | |
/// <summary> | |
/// Returns <see langword="true"/> if this job has unread data in <see cref="Job.Output"/> or <see cref="Job.Error"/>. | |
/// </summary> | |
public override bool HasMoreData => ChildJobs.Any(j => j.HasMoreData); | |
/// <summary> | |
/// The host executing this job. | |
/// </summary> | |
public override string Location => Environment.MachineName; | |
/// <summary> | |
/// The status of the child jobs. | |
/// </summary> | |
public override string StatusMessage => string.Join(", ", ChildJobs.Select(j => j.StatusMessage)); | |
/// <summary> | |
/// Determines if the <see cref="CancellationTokenSource"/> used by the child jobs should be disposed when the child jobs are disposed. The default value is <see langword="true"/>. | |
/// </summary> | |
public bool DisposeCancellationTokenSourceOnDisposed | |
{ | |
set | |
{ | |
foreach (var job in ChildJobs) | |
{ | |
if (job is TaskChildJob tjob) | |
{ | |
tjob.DisposeCancellationTokenSourceOnDisposed = value; | |
} | |
} | |
} | |
} | |
/// <summary> | |
/// Check if all child jobs are finished; if so, mark this job as finished. | |
/// </summary> | |
private void OnChildJobCompleted(object? sender, EventArgs? arguments) | |
{ | |
if (sender is Job job) | |
{ | |
_hasFailed = _hasFailed || job.JobStateInfo.State == JobState.Failed; | |
_hasStopped = _hasStopped || job.JobStateInfo.State == JobState.Stopped; | |
} | |
lock (_syncObject) | |
{ | |
if (++_childJobsFinishedCount >= _childJobsCount) | |
{ | |
SetJobState( | |
_hasFailed ? JobState.Failed : | |
_hasStopped ? JobState.Stopped : | |
JobState.Completed); | |
} | |
} | |
} | |
/// <summary> | |
/// Stops child jobs. | |
/// </summary> | |
public override void StopJob() | |
{ | |
foreach (var job in ChildJobs) | |
{ | |
job.StopJob(); | |
} | |
} | |
/// <summary> | |
/// Indicates whether any of the tasks passed to this task are expected to write an object to <see cref="Job.Output"/>. | |
/// </summary> | |
public bool ExpectingOutput => ChildJobs.Any(c => c is TaskChildJob tjob && tjob.ExpectingOutput); | |
private TaskJob(string? name, string? command, IEnumerable<(Task, CancellationTokenSource?)> tasks) | |
: base(command, name) | |
{ | |
if (tasks.All(t => t.Item1 is null) || tasks.Count() == 0) | |
throw new ArgumentException($"One or more {nameof(Task)} values must be provided."); | |
PSJobTypeName = nameof(TaskJob); | |
SetJobState(JobState.Running); | |
lock (_syncObject) | |
{ | |
foreach (var pair in tasks ?? throw new ArgumentNullException(nameof(tasks))) | |
{ | |
if (pair.Item1 is null) continue; | |
var childJob = new TaskChildJob(task: pair.Item1, | |
cancellationTokenSource: pair.Item2, | |
command: command); | |
ChildJobs.Add(childJob); | |
childJob.StateChanged += OnChildJobCompleted; | |
} | |
_childJobsCount = ChildJobs.Count; | |
} | |
} | |
public static TaskJob StartJob(string? name, string? command, IEnumerable<(Task, CancellationTokenSource?)> tasks) | |
{ | |
return new TaskJob(name, command, tasks); | |
} | |
public static TaskJob StartJob(string? name, string? command, IEnumerable<Task> tasks, CancellationTokenSource? cancellationTokenSource) | |
=> StartJob(name, command, tasks.Select(t => (t, cancellationTokenSource))); | |
public static TaskJob StartJob(string? name, string? command, Task task, CancellationTokenSource? cancellationTokenSource) | |
=> StartJob(name, command, new[] { task }, cancellationTokenSource); | |
/// <summary> | |
/// A PowerShell job representing a single <see cref="Task"/>. | |
/// </summary> | |
private sealed class TaskChildJob : Job | |
{ | |
internal event EventHandler? TaskFinished; | |
private readonly Task _task; | |
private readonly CancellationTokenSource? _cts; | |
public override bool HasMoreData => Error.Count > 0 || Output.Count > 0; | |
public override string Location => Environment.MachineName; | |
public override string StatusMessage => _task.Status.ToString(); | |
// task.Result type is not VoidTaskResult. If Task.Result doesn't exist, returns false. | |
public bool ExpectingOutput => !_task.GetType().GetProperty("Result")?.GetValue(_task)?.GetType()?.Name?.Equals("VoidTaskResult") ?? false; | |
/// <summary> | |
/// Indicates if the <see cref="CancellationTokenSource"/> passed to this job's constructor (if any) should be disposed when the job is disposed. | |
/// Defaults to <see langword="true"/>. | |
/// </summary> | |
public bool DisposeCancellationTokenSourceOnDisposed | |
{ | |
set | |
{ | |
_disposeCTS = value; | |
} | |
} | |
private bool _disposeCTS = true; | |
/// <summary> | |
/// If a <see cref="CancellationTokenSource"/> was passed to this job's constructor, it will be cancelled. | |
/// Otherwise, the job state will just be set to <see cref="JobState.Stopped"/>. | |
/// </summary> | |
public override void StopJob() | |
{ | |
SetJobState(JobState.Stopping); | |
// to prevent the job from hanging, we'll say the job is stopped | |
// if we can't stop it. Otherwise, we'll cancel _cts and let the | |
// .ContinueWith() invocation set the job's state. | |
if (_cts is null) | |
{ | |
SetJobState(JobState.Stopped); | |
TaskFinished?.Invoke(this, new EventArgs()); | |
} | |
else | |
{ | |
_cts.Cancel(); | |
} | |
} | |
protected override void Dispose(bool disposing) | |
{ | |
if (disposing) | |
{ | |
_task.Dispose(); | |
if (_disposeCTS) | |
{ | |
_cts?.Dispose(); | |
} | |
} | |
base.Dispose(disposing); | |
} | |
internal TaskChildJob(Task task, CancellationTokenSource? cancellationTokenSource = null, string? command = null) | |
: base(command) | |
{ | |
PSJobTypeName = nameof(TaskChildJob); | |
SetJobState(JobState.Running); | |
_task = task ?? throw new ArgumentNullException(nameof(task)); | |
_task.ContinueWith(OnTaskCompleted); | |
_cts = cancellationTokenSource; | |
} | |
/// <summary> | |
/// Executed when the <see cref="Task"/> this job represents has completed. | |
/// </summary> | |
/// <param name="task"><see cref="_task"/> after completing.</param> | |
private async Task OnTaskCompleted(Task task) | |
{ | |
// 1. Is there any reason for this method to return Task? | |
// 2. Is there any reason to say _task = task; _task.ContinueWith() instead of just _task = task.ContinueWith()? | |
// When this method does not return null that returns Task<Task>. But ... does that matter? | |
if (task.IsCanceled) | |
{ | |
SetJobState(JobState.Stopped); | |
TaskFinished?.Invoke(this, new EventArgs()); | |
} | |
else | |
{ | |
try | |
{ | |
if (ExpectingOutput) | |
{ | |
var invokeResult = await (dynamic)task; | |
Output.Add(PSObject.AsPSObject(invokeResult)); | |
} | |
else | |
{ | |
await task; | |
} | |
SetJobState(JobState.Completed); | |
TaskFinished?.Invoke(this, new EventArgs()); | |
} | |
catch (Exception haltingException) | |
{ | |
Error.Add(new ErrorRecord( | |
haltingException, | |
"TaskException", | |
ErrorCategory.NotSpecified, | |
task) | |
{ | |
ErrorDetails = new ErrorDetails($"An exception occurred in the task. {haltingException.Message}"), | |
}); | |
SetJobState(JobState.Failed); | |
TaskFinished?.Invoke(this, new EventArgs()); | |
} | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment