Skip to content

Instantly share code, notes, and snippets.

@ecounysis
Last active January 16, 2016 00:20
Show Gist options
  • Save ecounysis/8cf923efe9a4ab55f91e to your computer and use it in GitHub Desktop.
Save ecounysis/8cf923efe9a4ab55f91e to your computer and use it in GitHub Desktop.
using System.ComponentModel;
using System.Collections.Generic;
using System.Threading;
using System;
namespace Ecounysis.DataProcessing
{
// How to use this:
// 1. create a subclass
// 2. implement the Execute method in the subclass
// 3. add work items to the subclass using AddWork method (work items must be independent of each other)
// 4. call the Start method on the subclass to Execute the work items in multi-threaded environment
// 5. optionally set the PollingInterval and MaxThreads properties prior to calling Start (you can set these later too)
public abstract class ThreadedWorker<T>
{
// Visible only within containing type or subclasses.
#region PROTECTED
// set by subclass
protected long MaxThreads
{
get
{
return Interlocked.Read(ref maxThreads);
}
set
{
Interlocked.Exchange(ref maxThreads, value);
}
}
// set by subclass
protected long PollingInterval
{
get
{
return Interlocked.Read(ref pollingInterval);
}
set
{
Interlocked.Exchange(ref pollingInterval, (value >= 25) ? value : 25); // 25 is the minimum (25 milliseconds)
}
}
// implemented by the subclass
protected abstract void Execute(T data);
// called by the subclass
protected void Start()
{
if (Done)
{
throw new Exception("This ThreadedWorker has completed its work. Please create a new instance of ThreadedWorker to do more work.");
}
if (Running)
{
throw new Exception("This ThreadedWorker is already running. Please create a new instance of ThreadedWorker to do more work.");
}
else
{
setRunningFlag();
}
if (Interlocked.Read(ref maxThreads) == 0)
{
throw new Exception("Please set MaxThreads to a non-zero value prior to execution.");
}
if (Interlocked.Read(ref workToDo) == 0)
{
// nothing to do
// should it throw an exception or something to let client know this was the result?
// maybe need a return value from Start?
// Monad?
return;
}
AutoResetEvent workerFinished = new AutoResetEvent(false);
foreach (var data in workData)
{
while (Interlocked.Read(ref numThreads) >= Interlocked.Read(ref maxThreads)) // main thread sleeps until there is an opening
{
System.Threading.Thread.Sleep((int)PollingInterval); // wait
}
// after there is an opening
new Thread(() => // spawn a new thread to work on this data
{
Interlocked.Increment(ref numThreads);
Execute(data); // Execute is implemented in subclass.
Interlocked.Decrement(ref numThreads);
Interlocked.Increment(ref workDone);
if (Interlocked.Read(ref workDone) >= Interlocked.Read(ref workToDo))
{
workerFinished.Set();
}
}).Start();
}
workerFinished.WaitOne();
setDoneFlag();
clearRunningFlag();
}
// called by the subclass
protected void AddWork(T Work)
{
if (Running || Done) // when Running is on, we are in multi-threaded mode and don't want to mess with any data
{ // when Done, we can't start again, so there is no reason to add work
throw new Exception("Cannot add work after job has begun execution.");
}
else
{
workData.Add(Work);
workToDo++;
}
}
#endregion
// Visible only within containing type. The default accessibility for members of a struct or class.
#region PRIVATE
long workToDo;
long pollingInterval;
List<T> workData;
long maxThreads;
long numThreads;
long workDone;
long running;
long done;
bool Running
{
get
{
return (Interlocked.Read(ref running) > 0);
}
}
bool Done
{
get
{
return (Interlocked.Read(ref done) > 0);
}
}
void setDoneFlag()
{
Interlocked.Increment(ref done);
}
void setRunningFlag()
{
Interlocked.Increment(ref running);
}
void clearRunningFlag()
{
Interlocked.Exchange(ref running, 0);
}
#endregion
// Fully accessible. The implicit accessibility for members of an enum or interface.
#region PUBLIC
public ThreadedWorker() // constructor
{
// we aren't in multi-threaded mode at this point
numThreads = 0; // default to five threads
maxThreads = 5;
workDone = 0;
workToDo = 0;
running = 0;
done = 0;
pollingInterval = 250; // default to 250 milliseconds
workData = new List<T>();
}
#endregion
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment