Created
December 14, 2019 14:51
-
-
Save RobertBouillon/5eca4e1e49977aad96e95b7345477d4c to your computer and use it in GitHub Desktop.
C# Worker Model
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Text; | |
using System.ComponentModel; | |
using System.Diagnostics; | |
namespace System.Threading | |
{ | |
public abstract class Worker : System.Threading.Workers.IWorker | |
{ | |
#region Fields | |
private Thread _workerThread; | |
private volatile bool _isStopping; | |
private volatile bool _isStarted; | |
private volatile bool _isWorking; | |
private TimeSpan _waitDelay = TimeSpan.FromMilliseconds(100); | |
private readonly string _name; | |
#endregion | |
#region Properties | |
protected Thread WorkerThread => _workerThread; | |
public virtual string Name => _name; | |
public bool IsStopping | |
{ | |
get { return _isStopping; } | |
protected set { _isStopping = value; } | |
} | |
public bool IsStarted | |
{ | |
get { return _isStarted; } | |
protected set { _isStarted = value; } | |
} | |
public bool IsWorking | |
{ | |
get { return _isWorking; } | |
protected set { _isWorking = value; } | |
} | |
public TimeSpan WaitDelay | |
{ | |
get { return _waitDelay; } | |
protected set { _waitDelay = value; } | |
} | |
#endregion | |
#region Constructors | |
public Worker(){} | |
protected Worker(string name) | |
{ | |
#region Validation | |
if (name == null) | |
throw new ArgumentNullException("name"); | |
#endregion | |
_name = name; | |
} | |
#endregion | |
#region Methods | |
protected virtual Thread CreateThread(String name) | |
{ | |
Thread worker = new Thread(WorkerLoop); | |
if(!String.IsNullOrWhiteSpace(name)) | |
worker.Name = _name; | |
worker.Priority = ThreadPriority.Normal; | |
worker.IsBackground = true; | |
worker.Start(); | |
return worker; | |
} | |
public bool Start() | |
{ | |
#region Validation | |
//if (_internalInterval== TimeSpan.Zero) | |
// throw new InvalidOperationException("Interval must be greater than or equal to zero"); | |
//if (_internalInterval.TotalMilliseconds <= 0) | |
// throw new InvalidOperationException("Interval must be a positive number"); | |
if (_isStarted) | |
throw new InvalidOperationException("Cannot start worker: worker already started"); | |
#endregion | |
if (!OnStarting()) | |
return false; | |
//Set here so no one can call Start again (save us having to declare and manage _isStarting) | |
_isStarted = true; | |
_workerThread = CreateThread(_name); | |
OnStarted(); | |
return true; | |
} | |
public bool Stop() => Stop(TimeSpan.FromSeconds(10), true); | |
public virtual bool Stop(TimeSpan timeout, bool kill) | |
{ | |
#region Validation | |
if (!_isStarted) | |
throw new InvalidOperationException("Cannot stop worker: worker has not started."); | |
if (_isStopping) | |
throw new InvalidOperationException("Cannot stop worker: worker is stopping."); | |
#endregion | |
if (!OnStopping()) | |
return false; | |
_isStopping = true; | |
try | |
{ | |
if (_isWorking && kill) | |
CancelWork(); | |
if (!_workerThread.Join(timeout)) | |
{ | |
if (kill) | |
_workerThread.Abort(); | |
else | |
OnError(new Exception(String.Format("Unable to stop worker: {0}", _name))); | |
} | |
_isStarted = false; | |
OnStopped(); | |
} | |
catch (Exception ex) | |
{ | |
System.Diagnostics.Debugger.Break(); | |
throw ex; | |
} | |
//Don't catch exception here. This happens on calling thread outside of the worker; | |
//OnError is in the context if the worker thread. We should bubble STOP exceptions | |
//to the calling thread. | |
finally | |
{ | |
_isStopping = false; | |
} | |
return true; | |
} | |
protected abstract void Work(); | |
protected virtual void CancelWork() { } | |
protected virtual bool HasWork => true; | |
protected virtual void WaitForWork() => Thread.Sleep(_waitDelay); | |
protected virtual void WorkerLoop() | |
{ | |
Stopwatch sw = new Stopwatch(); | |
sw.Start(); | |
while (!_isStopping) | |
{ | |
if (!HasWork) | |
{ | |
WaitForWork(); | |
continue; | |
} | |
_isWorking = true; | |
try | |
{ | |
if (!OnWorking()) | |
continue; | |
sw.Restart(); | |
Work(); | |
OnWorked(sw.Elapsed); | |
} | |
catch (Exception ex) | |
{ | |
OnError(ex); | |
} | |
finally | |
{ | |
_isWorking = false; | |
} | |
} | |
} | |
#endregion | |
#region Events | |
public event CancelEventHandler Starting; | |
protected bool OnStarting() => new CancelEventArgs(false).Invoke(OnStarting); | |
protected virtual void OnStarting(CancelEventArgs e) => Starting?.Invoke(this, e); | |
public event EventHandler Started; | |
protected void OnStarted() => OnStarted(EventArgs.Empty); | |
protected virtual void OnStarted(EventArgs e) => Started?.Invoke(this, e); | |
public event CancelEventHandler Stopping; | |
protected bool OnStopping() => new CancelEventArgs(false).Invoke(OnStopping); | |
protected virtual void OnStopping(CancelEventArgs e) => Stopping?.Invoke(this, e); | |
public event EventHandler Stopped; | |
protected void OnStopped() => OnStopped(EventArgs.Empty); | |
protected virtual void OnStopped(EventArgs e) => Stopped?.Invoke(this, e); | |
#region ErrorEventArgs Subclass | |
public class ErrorEventArgs : EventArgs | |
{ | |
public Exception Exception { private set; get; } | |
internal ErrorEventArgs(Exception exception) | |
{ | |
#region Validation | |
if (exception == null) | |
throw new ArgumentNullException("exception"); | |
#endregion | |
Exception = exception; | |
} | |
} | |
#endregion | |
public event EventHandler<ErrorEventArgs> Error; | |
protected void OnError(Exception exception) => OnError(new ErrorEventArgs(exception)); | |
protected virtual void OnError(ErrorEventArgs e) => Error?.Invoke(this, e); | |
#region WorkPerformedEventArgs Subclass | |
public class WorkPerformedEventArgs : EventArgs | |
{ | |
public TimeSpan Duration { get; private set; } | |
internal WorkPerformedEventArgs(TimeSpan duration) | |
{ | |
#region Validation | |
if (duration == null) | |
throw new ArgumentNullException("duration"); | |
#endregion | |
Duration = duration; | |
} | |
} | |
#endregion | |
public event EventHandler<WorkPerformedEventArgs> Worked; | |
protected void OnWorked(TimeSpan duration) => OnWorked(new WorkPerformedEventArgs(duration)); | |
protected virtual void OnWorked(WorkPerformedEventArgs e) => Worked?.Invoke(this, e); | |
public event CancelEventHandler Working; | |
protected bool OnWorking() => new CancelEventArgs(false).Invoke(OnWorking); | |
protected virtual void OnWorking(CancelEventArgs e) => Working?.Invoke(this, e); | |
#endregion | |
} | |
} | |
namespace System.Threading.Workers | |
{ | |
public abstract class EventWorker : Worker | |
{ | |
protected abstract WaitHandle Handle { get; } | |
public EventWorker() { } | |
public EventWorker(string name) : base(name) { } | |
protected override void WaitForWork() => Handle.WaitOne(WaitDelay); | |
} | |
} | |
namespace System.Threading.Workers | |
{ | |
public abstract class QueueWorker<T> : EventWorker, IEnumerable<T> | |
{ | |
#region Fields | |
private IProducerConsumerCollection<T> _queue; | |
private AutoResetEvent _handle = new AutoResetEvent(false); | |
private T _item; | |
#endregion | |
#region Constructors | |
public QueueWorker() => _queue = new ConcurrentQueue<T>(); | |
public QueueWorker(IProducerConsumerCollection<T> queue) | |
{ | |
#region Validation | |
if (queue == null) | |
throw new ArgumentNullException(nameof(queue)); | |
#endregion | |
_queue = queue; | |
} | |
public QueueWorker(string name) : this(name, new ConcurrentQueue<T>()){} | |
public QueueWorker(string name, IProducerConsumerCollection<T> queue) : base(name) | |
{ | |
#region Validation | |
if (queue == null) | |
throw new ArgumentNullException(nameof(queue)); | |
#endregion | |
_queue = queue; | |
} | |
#endregion | |
#region Methods | |
public virtual void Enqueue(T item) | |
{ | |
// Just keep retrying | |
while (!_queue.TryAdd(item)) ; | |
_handle.Set(); | |
} | |
#endregion | |
#region Abstract Declarations | |
public abstract void Work(T item); | |
#endregion | |
#region Overrides | |
protected override bool HasWork => _queue.Count > 0; | |
protected override WaitHandle Handle => _handle; | |
public void Flush() | |
{ | |
while (_queue.Count > 0) | |
Thread.Sleep(WaitDelay); | |
} | |
public override bool Stop(TimeSpan timeout, bool kill) | |
{ | |
return base.Stop(timeout, kill); | |
} | |
protected override void Work() | |
{ | |
if (_queue.TryTake(out T item)) | |
Work(_item = item); | |
} | |
#endregion | |
#region Events | |
#region WorkPerformedEventArgs Subclass | |
public class WorkPerformedEventArgs : EventArgs | |
{ | |
#region Fields | |
private readonly TimeSpan _duration; | |
private readonly T _item; | |
#endregion | |
#region Properties | |
public T Item | |
{ | |
get { return _item; } | |
} | |
public TimeSpan Duration | |
{ | |
get { return _duration; } | |
} | |
#endregion | |
#region Constructors | |
internal WorkPerformedEventArgs(TimeSpan duration, T item) | |
{ | |
#region Validation | |
if (duration == null) | |
throw new ArgumentNullException("duration"); | |
if (item == null) | |
throw new ArgumentNullException("item"); | |
#endregion | |
_duration = duration; | |
_item = item; | |
} | |
#endregion | |
} | |
#endregion | |
public new event EventHandler<WorkPerformedEventArgs> Worked; | |
protected override void OnWorked(Worker.WorkPerformedEventArgs e) | |
{ | |
Worked?.Invoke(this, new WorkPerformedEventArgs(e.Duration, _item)); | |
base.OnWorked(e); | |
} | |
public IEnumerator<T> GetEnumerator() => _queue.GetEnumerator(); | |
IEnumerator IEnumerable.GetEnumerator() => _queue.GetEnumerator(); | |
#endregion | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment