Last active
January 16, 2016 00:20
-
-
Save ecounysis/8cf923efe9a4ab55f91e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.ComponentModel; | |
using System.Collections.Generic; | |
using System.Threading; | |
using System; | |
namespace Ecounysis.DataProcessing | |
{ | |
// How to use this: | |
// 1. create a subclass | |
// 2. implement the Execute method in the subclass | |
// 3. add work items to the subclass using AddWork method (work items must be independent of each other) | |
// 4. call the Start method on the subclass to Execute the work items in multi-threaded environment | |
// 5. optionally set the PollingInterval and MaxThreads properties prior to calling Start (you can set these later too) | |
public abstract class ThreadedWorker<T> | |
{ | |
// Visible only within containing type or subclasses. | |
#region PROTECTED | |
// set by subclass | |
protected long MaxThreads | |
{ | |
get | |
{ | |
return Interlocked.Read(ref maxThreads); | |
} | |
set | |
{ | |
Interlocked.Exchange(ref maxThreads, value); | |
} | |
} | |
// set by subclass | |
protected long PollingInterval | |
{ | |
get | |
{ | |
return Interlocked.Read(ref pollingInterval); | |
} | |
set | |
{ | |
Interlocked.Exchange(ref pollingInterval, (value >= 25) ? value : 25); // 25 is the minimum (25 milliseconds) | |
} | |
} | |
// implemented by the subclass | |
protected abstract void Execute(T data); | |
// called by the subclass | |
protected void Start() | |
{ | |
if (Done) | |
{ | |
throw new Exception("This ThreadedWorker has completed its work. Please create a new instance of ThreadedWorker to do more work."); | |
} | |
if (Running) | |
{ | |
throw new Exception("This ThreadedWorker is already running. Please create a new instance of ThreadedWorker to do more work."); | |
} | |
else | |
{ | |
setRunningFlag(); | |
} | |
if (Interlocked.Read(ref maxThreads) == 0) | |
{ | |
throw new Exception("Please set MaxThreads to a non-zero value prior to execution."); | |
} | |
if (Interlocked.Read(ref workToDo) == 0) | |
{ | |
// nothing to do | |
// should it throw an exception or something to let client know this was the result? | |
// maybe need a return value from Start? | |
// Monad? | |
return; | |
} | |
AutoResetEvent workerFinished = new AutoResetEvent(false); | |
foreach (var data in workData) | |
{ | |
while (Interlocked.Read(ref numThreads) >= Interlocked.Read(ref maxThreads)) // main thread sleeps until there is an opening | |
{ | |
System.Threading.Thread.Sleep((int)PollingInterval); // wait | |
} | |
// after there is an opening | |
new Thread(() => // spawn a new thread to work on this data | |
{ | |
Interlocked.Increment(ref numThreads); | |
Execute(data); // Execute is implemented in subclass. | |
Interlocked.Decrement(ref numThreads); | |
Interlocked.Increment(ref workDone); | |
if (Interlocked.Read(ref workDone) >= Interlocked.Read(ref workToDo)) | |
{ | |
workerFinished.Set(); | |
} | |
}).Start(); | |
} | |
workerFinished.WaitOne(); | |
setDoneFlag(); | |
clearRunningFlag(); | |
} | |
// called by the subclass | |
protected void AddWork(T Work) | |
{ | |
if (Running || Done) // when Running is on, we are in multi-threaded mode and don't want to mess with any data | |
{ // when Done, we can't start again, so there is no reason to add work | |
throw new Exception("Cannot add work after job has begun execution."); | |
} | |
else | |
{ | |
workData.Add(Work); | |
workToDo++; | |
} | |
} | |
#endregion | |
// Visible only within containing type. The default accessibility for members of a struct or class. | |
#region PRIVATE | |
long workToDo; | |
long pollingInterval; | |
List<T> workData; | |
long maxThreads; | |
long numThreads; | |
long workDone; | |
long running; | |
long done; | |
bool Running | |
{ | |
get | |
{ | |
return (Interlocked.Read(ref running) > 0); | |
} | |
} | |
bool Done | |
{ | |
get | |
{ | |
return (Interlocked.Read(ref done) > 0); | |
} | |
} | |
void setDoneFlag() | |
{ | |
Interlocked.Increment(ref done); | |
} | |
void setRunningFlag() | |
{ | |
Interlocked.Increment(ref running); | |
} | |
void clearRunningFlag() | |
{ | |
Interlocked.Exchange(ref running, 0); | |
} | |
#endregion | |
// Fully accessible. The implicit accessibility for members of an enum or interface. | |
#region PUBLIC | |
public ThreadedWorker() // constructor | |
{ | |
// we aren't in multi-threaded mode at this point | |
numThreads = 0; // default to five threads | |
maxThreads = 5; | |
workDone = 0; | |
workToDo = 0; | |
running = 0; | |
done = 0; | |
pollingInterval = 250; // default to 250 milliseconds | |
workData = new List<T>(); | |
} | |
#endregion | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment