Skip to content

Instantly share code, notes, and snippets.

@Lachee
Last active August 29, 2019 03:01
Show Gist options
  • Save Lachee/2e1a4eebaa5fa800052ace7ebffaa523 to your computer and use it in GitHub Desktop.
Save Lachee/2e1a4eebaa5fa800052ace7ebffaa523 to your computer and use it in GitHub Desktop.
A wrapper around a process to provide safe asyncronous
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace AsyncProcess
{
public class ThreadedProcess : IDisposable
{
public ProcessStartInfo StartInfo { get; }
private Process _process;
private SemaphoreSlim _queueSemaphore;
private SemaphoreSlim _threadSemaphore;
private Queue<string> _queue;
private Thread _thread;
private volatile State state = State.Offline;
private enum State : int { Offline = 0, Killing = 1, Exiting = 2, Starting = 3, Running = 4, Aborting = 5 }
public event Action Exited;
public ThreadedProcess(ProcessStartInfo startInfo, int capacity = 100)
{
this.StartInfo = startInfo;
this.StartInfo.RedirectStandardOutput = true;
this.StartInfo.UseShellExecute = false;
this.StartInfo.CreateNoWindow = false;
this._queueSemaphore = new SemaphoreSlim(1, 1);
this._threadSemaphore = new SemaphoreSlim(1, 1);
this._queue = new Queue<string>(capacity);
}
/// <summary>Starts the process</summary>
public void Start()
{
if (_thread != null)
throw new InvalidOperationException("Cannot start the process if it is already running.");
Log("Starting Thread");
_thread = new Thread(p_RunThread);
_thread.Start();
}
/// <summary>Stops the process</summary>
public void Stop()
{
Log("Aborting State");
state = State.Aborting;
}
/// <summary>Stops the process and asyncronously waits it to finish cleanup.</summary>
/// <returns></returns>
public async Task StopAsync()
{
Stop();
Log("Waiting for thread to cleanup.");
await this._threadSemaphore.WaitAsync();
this._threadSemaphore.Release();
}
/// <summary>
/// Dequeues all the content that has been read by the stream so far.
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task<string[]> ReadStandardOutputAsync()
{
await this._queueSemaphore.WaitAsync();
try
{
List<string> tmp = new List<string>(_queue.Count);
while (_queue.TryDequeue(out var str)) tmp.Add(str);
return tmp.ToArray();
}
finally
{
this._queueSemaphore.Release();
}
}
private void p_RunThread()
{
StringBuilder sb = new StringBuilder();
int charread = 0;
char[] buffer = new char[2048];
this._threadSemaphore.Wait();
try
{
//Start the process
p_StartProcess();
//Whiel we are in a running state and have a process
while (state == State.Running && _process != null)
{
while ((charread = _process.StandardOutput.ReadBlock(buffer, 0, buffer.Length)) > 0)
{
sb.Append(buffer, 0, charread);
if (charread < buffer.Length)
{
p_EnqueueContent(sb.ToString());
sb.Clear();
}
}
//Sleep on it.
Thread.Sleep(2000);
}
}
catch (Exception e)
{
//An exception has occured
LogError(e);
}
finally
{
//Finally kill the process, then release our semaphore.
p_KillProcess();
this._threadSemaphore.Release();
}
}
private void p_EnqueueContent(string content)
{
//Wait for our turn to add more content
this._queueSemaphore.Wait();
try
{
//Add the content
this._queue.Enqueue(content);
}
finally
{
//Finally release the queue
this._queueSemaphore.Release();
}
}
private bool p_StartProcess()
{
Log("Starting Process...");
if (_process != null)
throw new InvalidOperationException("Cannot start the process while it is already running.");
//Update the state
state = State.Starting;
//Create the process and hook into our events
Log("Creating Process...");
_process = new Process()
{
EnableRaisingEvents = true,
StartInfo = StartInfo
};
_process.Exited += ProcessExited;
//Start the procecss
Log("Starting Process...");
var success = _process.Start();
if (!success)
{
Log("Failed, cleaning up...");
p_KillProcess();
return false;
}
else
{
Log("Success");
state = State.Running;
return true;
}
}
private void ProcessExited(object sender, EventArgs e)
{
Log("Process has exited.");
//We are already killing the process, abort!
if (state == State.Killing)
return;
//Actually kill the process
Log("Exiting Process ourselves...");
state = State.Exiting;
p_KillProcess();
}
private bool p_KillProcess()
{
if (_process == null)
return true;
//Update the state
state = state == State.Exiting ? state : State.Killing;
try
{
//Kill the process if we are exiting
if (!_process.HasExited && state != State.Exiting)
{
Log("Killing Process");
_process.Kill();
//Read to end then wait for exit
Log("Reading until end...");
_process.StandardOutput.ReadToEnd();
Log("Waiting for exit");
_process.WaitForExit();
}
}
catch (System.InvalidOperationException e)
{
LogError(e, "IOE: {0}");
}
//Dispose of the process
Log("Disposing and cleaning up process");
_process.Dispose();
_process = null;
Log("Killing finished");
state = State.Offline;
Exited?.Invoke();
return true;
}
#region Logging
private void LogError(Exception e, string format = "ERRO: {0}") { Console.WriteLine(format, e.Message); }
private void LogError(string err) { Console.WriteLine("ERRO: " + err); }
private void LogWarning(string warn) { Console.WriteLine("WARN: " + warn); }
private void Log(string log) { Console.WriteLine("INFO: " + log); }
/// <summary>
/// Disposes the object.
/// </summary>
public void Dispose()
{
Stop();
_queueSemaphore.Dispose();
_threadSemaphore.Dispose();
}
#endregion
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment