Created
May 6, 2011 22:19
-
-
Save kellypleahy/959904 to your computer and use it in GitHub Desktop.
thread pool implementation from the advanced threading class
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Threading; | |
namespace ThreadingApp | |
{ | |
public interface ITask | |
{ | |
void Execute(); | |
} | |
public class ThreadSafeTaskQueue | |
{ | |
private readonly Queue<ITask> _tasks = new Queue<ITask>(); | |
public void Put(ITask task) | |
{ | |
lock(_tasks) | |
_tasks.Enqueue(task); | |
} | |
public ITask Get() | |
{ | |
if (_tasks.Count == 0) | |
return null; | |
lock (_tasks) | |
return _tasks.Count == 0 | |
? null | |
: _tasks.Dequeue(); | |
} | |
} | |
public class Worker | |
{ | |
private readonly ThreadSafeTaskQueue _taskQueue; | |
private readonly CancellationTokenSource _cancellationTokenSource; | |
private readonly Thread _thread; | |
public Worker(ThreadSafeTaskQueue taskQueue) | |
{ | |
_taskQueue = taskQueue; | |
_cancellationTokenSource = new CancellationTokenSource(); | |
_thread = new Thread(Execute); | |
} | |
private void Execute() | |
{ | |
while(!_cancellationTokenSource.IsCancellationRequested) | |
{ | |
var task = _taskQueue.Get(); | |
if(task == null) | |
Thread.Sleep(100); | |
else | |
task.Execute(); | |
} | |
} | |
public Worker Start() | |
{ | |
_thread.Start(); | |
return this; | |
} | |
public Worker StartShutdown() | |
{ | |
_cancellationTokenSource.Cancel(); | |
return this; | |
} | |
public Worker WaitForShutdown() | |
{ | |
_thread.Join(); | |
Console.WriteLine("Shut down thread {0}", _thread.ManagedThreadId); | |
return this; | |
} | |
} | |
public class ThreadPool | |
{ | |
private readonly List<Worker> _workers = new List<Worker>(); | |
private readonly ThreadSafeTaskQueue _taskQueue = new ThreadSafeTaskQueue(); | |
public ThreadPool(int threads) | |
{ | |
_workers.AddRange( | |
Enumerable.Range(0, threads) | |
.Select(i => new Worker(_taskQueue))); | |
foreach(var w in _workers) | |
w.Start(); | |
} | |
public void AddTask(ITask task) | |
{ | |
_taskQueue.Put(task); | |
} | |
public void Shutdown() | |
{ | |
foreach (var w in _workers) | |
w.StartShutdown(); | |
foreach(var w in _workers) | |
w.WaitForShutdown(); | |
} | |
public void Resize(int newSize) | |
{ | |
List<Worker> shuttingDownWorkers = null; | |
lock(_workers) | |
{ | |
Console.WriteLine("Changing pool size from {0} to {1}.", _workers.Count, newSize); | |
while (_workers.Count < newSize) | |
_workers.Add(new Worker(_taskQueue).Start()); | |
if (_workers.Count <= newSize) | |
return; | |
shuttingDownWorkers = _workers.Skip(newSize).ToList(); | |
_workers.RemoveRange(newSize, shuttingDownWorkers.Count); | |
} | |
foreach (var w in shuttingDownWorkers) | |
w.StartShutdown(); | |
foreach (var w in shuttingDownWorkers) | |
w.WaitForShutdown(); | |
} | |
} | |
public class Holder | |
{ | |
public int Value; | |
} | |
public class SampleTask : ITask | |
{ | |
private readonly Holder _holder; | |
public SampleTask(Holder holder) | |
{ | |
_holder = holder; | |
} | |
public void Execute() | |
{ | |
lock (_holder) | |
{ | |
var result = ++_holder.Value; | |
Console.WriteLine(result + " from " + Thread.CurrentThread.ManagedThreadId); | |
} | |
} | |
} | |
class Program | |
{ | |
static void Main(string[] args) | |
{ | |
var pool = new ThreadPool(10); | |
var holder = new Holder(); | |
var cancellationSource = new CancellationTokenSource(); | |
new Thread(() => EnqueueWork(cancellationSource, pool, holder)).Start(); | |
while (true) | |
{ | |
var key = Console.ReadKey(); | |
if (key.Key == ConsoleKey.Enter) | |
{ | |
Console.WriteLine("Cancel requested."); | |
cancellationSource.Cancel(); | |
pool.Shutdown(); | |
} | |
else if(key.KeyChar >= '0' && key.KeyChar <= '9') | |
{ | |
var numThreads = key.KeyChar - '0'; | |
if (numThreads == 0) | |
numThreads = 10; | |
pool.Resize(numThreads); | |
} | |
} | |
} | |
private static void EnqueueWork(CancellationTokenSource cancellationSource, ThreadPool pool, Holder holder) | |
{ | |
while(!cancellationSource.IsCancellationRequested) | |
{ | |
pool.AddTask(new SampleTask(holder)); | |
Thread.Sleep(25); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment