Skip to content

Instantly share code, notes, and snippets.

@Justasic
Last active October 20, 2017 10:19
Show Gist options
  • Select an option

  • Save Justasic/8452518 to your computer and use it in GitHub Desktop.

Select an option

Save Justasic/8452518 to your computer and use it in GitHub Desktop.
A dynamic threading engine I built for a game I never made. You should only interact with the ThreadHandler class. Requires C++14 or C++17.
/*
* Copyright (c) 2013-2017, Justin Crawford <Justin@stacksmash.net>
*
* Permission to use, copy, modify, and/or distribute this software for any purpose
* with or without fee is hereby granted, provided that the above copyright notice
* and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO
* THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO
* EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL
* DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER
* IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
* CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#include "ThreadEngine.h"
#include <chrono>
#include <cstring>
#ifndef NDEBUG
// Gcc/Clang whine about std::thread::id not being a pointer (when it is one on linux)
// but there is no valid way to cast to a string for printf so we just disable the
// warning instead. This may cause printing problems because it disables *all* checks
// and not specifically the one about std::thread::id unfortunately.
# pragma GCC diagnostic ignored "-Wformat"
# define d_printf(...) printf(__VA_ARGS__)
# include <cstdio>
#else
# define d_printf(...)
#endif
// Create a thread local queue for each thread
// This is what allows us to submit functions asynchronously
// then submit them to the global job queue
#ifndef _WIN32
thread_local functions_t threadQueue;
float *GetLoadAvg()
{
FILE *f = fopen("/proc/loadavg", "r");
if (!f)
return NULL;
static float loads[3];
unsigned int __attribute__((unused)) sched_runnable = 0;
unsigned int __attribute__((unused)) sched_existed = 0;
pid_t __attribute__((unused)) lastpid = 0;
fscanf(f, "%f %f %f %u/%u %lu", &loads[0], &loads[1], &loads[2], &sched_runnable, &sched_existed, (unsigned long*)&lastpid);
fclose(f);
return loads;
}
#else
__declspec(thread) functions_t* threadQueue;
float *GetLoadAvg()
{
// TODO: Calculate load averages on windows
// See: http://blog.sflow.com/2011/02/windows-load-average.html
// We need to use the WMI class 'Win32_PerfFormattedData_PerfNet_ServerWorkQueues'
// to get the processor queue length to make this calculation.
static float ret[3];
memset(&ret, 0, sizeof(ret));
return ret;
}
#endif
// For chrono literals.
using namespace std::chrono_literals;
// A mutex to avoid a race condition when checking the queue.
std::mutex queueLock;
// another for accessing the thread list.
std::mutex ThreadListLock;
// Values used to calculate the 1 minute, 5 minute, and 15 minute load times.
static const double EXP_1 = LoadAverage<double>::CalculateMagic(5.0f, 60.0f);
static const double EXP_5 = LoadAverage<double>::CalculateMagic(5.0f, 300.0f);
static const double EXP_15 = LoadAverage<double>::CalculateMagic(5.0f, 900.0f);
ThreadHandler::ThreadHandler() : loads({{LoadAverage<double>(EXP_1), LoadAverage<double>(EXP_5), LoadAverage<double>(EXP_15)}}), quitting(false), totalConcurrentThreads(0)
{
// NOTE: Windows thread_local stuff is not complete currently, we must work around
// this with pointers. This will initialize a queue for the main thread.
// Each thread will also have their own queues.
#ifdef _WIN32
functions_t *queue = new functions_t;
threadQueue = queue;
#endif
}
ThreadHandler::~ThreadHandler()
{
this->Shutdown();
#ifdef _WIN32
delete threadQueue;
#endif
}
void ThreadHandler::Initialize()
{
d_printf("Thread engine initializing\n");
// Initialize some of our stuff we use in the class.
this->funcs = functions_t();
this->totalConcurrentThreads = std::thread::hardware_concurrency();
d_printf("Total supported threads: %d\n", this->totalConcurrentThreads);
// Create a new thread to get things started.
new WorkerThread(this);
// Since we're now working with two threads, we acquire a lock while we're
// still setting some stuff up.
std::unique_lock<std::mutex> lck(ThreadListLock);
d_printf("Spawned %zu threads\n", this->Threads.size());
#ifdef THREAD_USE_LOOP
// Send our scheduler into it's own idle thread.
this->loadthrd = std::thread(&ThreadHandler::Schedule, this);
#endif
}
void ThreadHandler::Shutdown()
{
d_printf("Thread engine shutting down...\n");
// Wake all threads, make this->funcs read only, finish work, shutdown threads
this->JoinThreads();
// We avoid a race condition because all threads have been joined at this point.
d_printf("%lu jobs left and will not be processed.\n", this->funcs.size());
for (auto it = this->Threads.begin(), it_end = this->Threads.end(); it != it_end;)
delete *(it++);
}
void ThreadHandler::Schedule()
{
#ifdef THREAD_USE_LOOP
while (!this->quitting.load())
{
#endif
// First we start out by measuring the queue length of our own queue
// and calculating load times from that.
queueLock.lock();
unsigned long workqueue = this->funcs.size();
queueLock.unlock();
// Now get how many threads have been running beyond their run limit of 1 usec
// This will help prevent "Dead Threading", a condition where one thread is consumed
// for an unreasonable amount of time (eg, we're waiting for user input) and therefore
// will be added to the load average.
std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now();
ThreadListLock.lock();
for (auto it : this->Threads)
{
if (it->IsRunning.load() && (it->start.load() + 5us) < now)
{
// If a thread is busy (even if the work queue is empty) we increment it
// so that we don't stall from dead threads.
workqueue++;
}
// Since we're iterating, also check for zombies and deallocate them.
if (it->quitting.load() && !it->IsRunning.load())
{
d_printf("[ThreadEngine][Scheduler] Found zombie thread %p, deallocating...\n", it->GetID());
delete it;
}
}
ThreadListLock.unlock();
// Now we calculate our load time and see if we need more threads or not.
this->loads[0] += workqueue;
this->loads[1] += workqueue;
this->loads[2] += workqueue;
// Due to how floating points work, we need to convert this from double to int in the most
// "natural" way possible so we don't prematurely spawn a thread simply because we passed the 0.5
// threshold. We must change our floating point environment to ensure we're using nearest rounding.
int CurrentRound = fegetround();
fesetround(FE_TONEAREST);
uint32_t load5 = lrint(this->loads[1]());
fesetround(CurrentRound);
// Debug :D
d_printf("[ThreadEngine][Scheduler] Current thread engine load: %.2f (1 min), %.2f (5 min), %.2f (15 min) with %ld item(s) in queue with %ld workers\n", this->loads[0](), this->loads[1](),
this->loads[2](), workqueue, this->Threads.size());
// We have our load time for 5 minutes for this thread. Now we calculate
// whether or not we need a new thread or dump existing threads.
ThreadListLock.lock();
// If we need more threads (eg, the load is greater than the threads currently working)
if (load5 >= this->Threads.size())
{
// If we want to request a new thread, check the system load times and ensure we don't
// have a large amount of processor congestion by measuring the 5 minute load time
// of the processor.
float *avg = GetLoadAvg();
if (lrint(avg[1]) >= this->totalConcurrentThreads)
{
// We cannot spawn another thread, the CPU is already saturated with work. Don't bother continuing.
d_printf("[ThreadEngine][Scheduler] WARNING: System CPU is saturated, will not spawn new threads as desired due to high load averages. (%2f 5-minute load, %d max load threshold)\n",
avg[1], this->totalConcurrentThreads);
goto end;
}
// We need more threads, all our threads are saturated.
d_printf("[ThreadEngine][Scheduler] Thread engine requires more threads, %.2f >= %zu\n", this->loads[1](), this->Threads.size());
// Spawn another thread. We do this one at a time to allow for more of an "evolution" factor
// otherwise we may spawn like 5000 threads to handle a sudden influx of data which
// most kernels will not like you much if you do that.
ThreadListLock.unlock();
new WorkerThread(this);
}
// if we should despawn threads (eg, the threads are idle and little work is being done.)
if (load5 + THREAD_MINIMUM < this->Threads.size())
{
// we need less threads, but make sure we're not gonna be less than the minimum threads desired.
if (this->Threads.size() - THREAD_MINIMUM != THREAD_MINIMUM-1)
{
d_printf("[ThreadEngine][Scheduler] Thread engine can release some threads, %.2f < %zu\n", this->loads[1](), this->Threads.size());
WorkerThread *th = this->Threads.back();
ThreadListLock.unlock();
// Sanity check, sometimes we might have screwed up somewhere and gotten to this case.
if (!th)
{
d_printf("[ThreadEngine][Scheduler] ERROR: there are no more threads to quit??\n");
goto end;
}
// If the thread is running, we cannot join it (and we cannot forcefully terminate it either due
// to the implementation of std::thread) so we can only ask it to leave once it's finished with
// it's job. Unfortunately this leaves a dead thread in the thread list so the scheduler will
// deallocate it's class on the next iteration of the class (so long as the thread is no longer
// running anymore)
if (th->IsRunning.load() == true)
{
d_printf("[ThreadEngine][Scheduler] Thread %p is currently busy but asked to terminate.\n", th->GetID());
th->quitting = true;
goto end;
}
th->Join();
delete th;
}
}
end:
// Ensure we're unlocked.
ThreadListLock.unlock();
#ifdef THREAD_USE_LOOP
// Suspend this thread for 5 seconds.
d_printf("[ThreadEngine][Scheduler] Entering 5 second sleep...\n");
std::this_thread::sleep_for(5s);
} // end of while (!this->quitting.load())
d_printf("[ThreadEngine][Scheduler] Exiting scheduler thread\n");
this->loadthrd.join();
#endif
}
void ThreadHandler::JoinThreads()
{
std::unique_lock<std::mutex> lck(ThreadListLock);
for (const auto &t : this->Threads)
t->Join();
// Wait for confirmation that threads have joined.
for (const auto &t : this->Threads)
{
while (t->IsRunning.load() && t->quitting.load())
{
d_printf("[ThreadEngine] Thread %p refusing to join, waiting 2 seconds for thread to join...\n", t->GetID());
std::this_thread::sleep_for(2s);
// In the future, allow for non-interruptable threads to force-terminate?
}
}
// quit the scheduler thread.
this->quitting = true;
this->loadthrd.join();
}
void ThreadHandler::WakeThreads()
{
std::unique_lock<std::mutex> lck(ThreadListLock);
for (const auto &t : this->Threads)
t->Wake();
}
bool ThreadHandler::Submit(bool noStall)
{
// Acquire a lock. if noStall, return -1 so the function can continue, otherwise wait until the lock is acquired
if (noStall)
{
if (!queueLock.try_lock())
return false;
}
else
queueLock.lock();
#ifndef _WIN32
// Submit the functions pending
while (!threadQueue.empty())
{
// Remove function from thread local stack
function_t func = threadQueue.front();
threadQueue.pop();
// Move to global stack so threads can pick what is needed
this->funcs.push(func);
}
#else
// Submit the functions pending
while (!threadQueue->empty())
{
// Remove function from thread local stack
function_t func = threadQueue->front();
threadQueue->pop();
// Move to global stack so threads can pick what is needed
this->funcs.push(func);
}
#endif
// release our lock
queueLock.unlock();
// Notify the threads that there is work
this->WakeThreads();
return true;
}
/******************************************************************/
WorkerThread::WorkerThread(ThreadHandler *thr) : wakeThread(false),
thr(thr), IsRunning(false), quitting(false)
{
std::unique_lock<std::mutex> lck(ThreadListLock);
// NOTICE: We must wait for the class to initialize before we can start our thread
// otherwise we get uninitialized values which can cause a race condition
// and undefined behavior when the class
this->th = std::thread(&WorkerThread::Main, this);
// Add the thread to the thread list
thr->Threads.push_back(this);
d_printf("Thread ID: %p\n", this->th.get_id());
}
WorkerThread::~WorkerThread()
{
std::unique_lock<std::mutex> lck(ThreadListLock);
// Remove the thread from the thread list
this->thr->Threads.remove(this);
}
void WorkerThread::Sleep()
{
std::unique_lock<std::mutex> lk(this->m);
this->cv.wait(lk, [this] (){return this->wakeThread.load(); });
this->wakeThread = false;
}
void WorkerThread::Wake()
{
if (this->IsRunning.load())
return;
this->wakeThread = true;
this->cv.notify_all();
}
void WorkerThread::Join()
{
d_printf("Thread %p joining to next thread...\n", this->GetID());
this->quitting = true;
if (this->IsRunning.load())
d_printf("Cannot join thread %p because it's currently running...\n", this->GetID());
else
{
this->Wake();
this->th.join();
}
}
void WorkerThread::Main()
{
#ifdef _WIN32
functions_t *queue = new functions_t;
threadQueue = queue;
#endif
this->ThreadID = std::this_thread::get_id();
while (!this->quitting)
{
// Acquire a lock on the queue and get the data
queueLock.lock();
// Check the funcs are in the queue and ready to be processed
if (!this->thr->funcs.empty())
{
// Get a local copy of the function
function_t func = this->thr->funcs.front();
this->thr->funcs.pop();
// Release our lock so other threads can use it before we run our function.
queueLock.unlock();
// So we don't need an atomic, we set our runtime before our isrunning flag.
this->start = std::chrono::steady_clock::now();
// Set that we're busy.
this->IsRunning.store(true);
// Run the function (we released the lock in case the function takes a long time to run)
func();
//d_printf("[Thread %p] Finished running function. Took %.6f seconds\n", this->GetID(), std::chrono::duration_cast<std::chrono::duration<double>>(this->start.load() - std::chrono::steady_clock::now()).count());
// We're not busy anymore.
this->IsRunning.store(false);
this->start = std::chrono::steady_clock::time_point();
}
else
{
queueLock.unlock();
this->Sleep();
}
}
#ifdef _WIN32
// Clean up our shit.
delete queue;
#endif
d_printf("Thread %p exiting...\n", this->GetID());
}
/*
* Copyright (c) 2013-2017, Justin Crawford <Justin@stacksmash.net>
*
* Permission to use, copy, modify, and/or distribute this software for any purpose
* with or without fee is hereby granted, provided that the above copyright notice
* and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO
* THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO
* EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL
* DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER
* IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
* CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#pragma once
#include <functional>
#include <queue>
#include <thread>
#include <mutex>
#include <atomic>
#include <list>
#include <cmath>
#include <cfenv>
#include <array>
#include <condition_variable>
// Undef this if you want to manually call ThreadHandler::Schedule() every 5 seconds.
// If you want to call less often or more often than that, then you need to recalculate
// the load time magic values for the scheduler to work correctly.
#define THREAD_USE_LOOP 1
// Define this to tell the threading engine how many minimum threads you wish to keep
// at all times. the thread engine will only kill threads to this number. Default
// is 1 thread minimum. Setting this to zero will mean the engine will take a large
// amount of time to spawn a thread to do the work unless there is a lot of work to
// be done (lots of things submitted to the engine).
#define THREAD_MINIMUM 1
/////////////////////////////////////////////////////////////////
typedef std::queue < std::function < void () >> functions_t;
typedef std::function<void ()> function_t;
class ThreadHandler;
class WorkerThread
{
// Mutex used for sleeping the thread
std::mutex m;
std::condition_variable cv;
std::atomic<bool> wakeThread;
std::thread th; // Inherit std::thread instead?
ThreadHandler *thr;
protected:
// Used for the scheduler, this is set when the thread is *actively* doing something
std::atomic<bool> IsRunning;
// When above is true, this is set to a monotonic start time.
// Used by scheduler to determine if this thread is too "overloaded"
// When IsRunning = false, this is reset to 0.
std::atomic<std::chrono::steady_clock::time_point> start;
// This is the thread ID we get.
std::atomic<std::thread::id> ThreadID;
public:
WorkerThread(ThreadHandler*);
~WorkerThread();
friend class ThreadHandler;
// Put the thread into an idle state
void Sleep();
// Wake the thread from idle state
void Wake();
// Execute jobs - This is called by the Job Handler directly.
void Main();
// Join the thread to the main thread (used for shutdown)
void Join();
// Get our thread ID
inline std::thread::id GetID() const
{
return this->ThreadID.load();
}
// Check whether the thread is quitting
std::atomic<bool> quitting;
};
// Class: LoadAverage
//
// Basic: Calculates load averages over time in a simple way.
//
// Description:
// This class I wrote to help keep things organized and clean.
// The class is capable of calculating linux-style load averages
// at any sample rate for any period of time (standard is a sample
// rate of 5 seconds with timeframes of 1, 5, and 15 minutes).
// This class takes in a "magic" value which is the output of
// 'CalculateMagic' function below. The magic can be pre-calculated
// and static. The class is designed to take in any floating point
// type (or any type capable of accepting floats, really) to perform
// calculations. It is meant to behave rather intuitively without
// needing to explicitly call a function to get it's value.
//
// Load is calculated as:
// load(t) = load(t-1) + e ^ -5/60 + n (1 - e ^ -5/60)
// where t = time
// and n = load amount (eg, length of queue)
//
// The output of load always approaches n over a time period of t
// but in fact will never reach n due to how math works. We round to
// reach n in our thread engine here. load(t-1) is our last load, we
// add-on to that into our calculation to give the appearance of history.
// Note: the above calculation is for the 1 minute load time with a
// sample rate of 5 seconds.
//
// Calculating our magic:
// The magic value is something I only vaguely understood the calculation
// of but it goes as follows:
//
// m = (e ^ (-s/t)) * (2^11)
// where:
// e = euler's number
// s = sample rate
// t = timeframe
// The purpose of this is to be faster at doing the calculations since this
// only needs to be calculated once (realistically during compile time but
// C++17 doesn't support constexpr floating point operations)
template<typename T>
class LoadAverage
{
// Our current load average right now.
T loadavg;
// The "magic" number for calculating the load against.
T magic;
public:
// Make sure we know what we are.
typedef T datatype;
// Calculate a magic number so you can do adjustments to the
// load rate without affecting the load average. (useful in
// event loops)
static inline T CalculateMagic(T SampleRate, T TimeFrame)
{
return std::exp(-SampleRate/TimeFrame);
}
LoadAverage(T magic) : loadavg(0.0), magic(magic) {}
LoadAverage() : loadavg(0.0), magic(0.0) {}
// Get the load average.
T operator()() const { return this->loadavg; }
// add to the load average.
LoadAverage &operator +=(size_t n) { this->Add(n); return *this; }
void Add(size_t n)
{
this->loadavg *= this->magic;
this->loadavg += n * (1 - this->magic);
}
T Get() const { return this->loadavg; }
// Implement the cast operator so C++ can implicit cast this class.
operator T() const { return this->loadavg; }
};
class ThreadHandler
{
protected:
void JoinThreads();
void WakeThreads();
std::list<WorkerThread*> Threads;
// Our local job queue load times: 1 minute, 5 minute, and 15 minutes.
std::array<LoadAverage<double>, 3> loads;
#ifdef THREAD_USE_LOOP
// This is all used for a separate thread
// to run the scheduler for the threads.
std::atomic<bool> quitting;
std::thread loadthrd;
#endif
public:
ThreadHandler();
~ThreadHandler();
// Let the threads access members in this class
friend class WorkerThread;
// Waiting jobs, This is global among all threads.
functions_t funcs;
// Defines how many threads the system is designed to use
unsigned int totalConcurrentThreads;
// This makes it easy to specify functions and arguments
// in a readable manner
template< class _Function, class... _Args >
void AddQueue(_Function&& __f, _Args&&... __args)
{
// Create a thread local queue for each thread
// This is what allows us to submit functions asynchronously
// then submit them to the global job queue
// Make a bound std::function for storage
function_t func = std::bind(std::forward<_Function>(__f), std::forward<_Args>(__args)...);
// Push to thread local storage container
#ifndef _WIN32
extern thread_local functions_t threadQueue;
threadQueue.push(func);
#else
extern __declspec(thread) functions_t* threadQueue;
threadQueue->push(func);
#endif
}
// Copy all the data from the thread local queue to the master queue and wake the threads.
bool Submit(bool noStall = false);
// Handle thread scheduling. This function is pretty critical as it allows for the
// spawning of new threads based on work queue but only to a maximum of the current
// actual CPU processor queue length (aka, it is allowed to spawn new threads until
// the kernel states that the CPU is saturated via load times). In addition to this,
// it will terminate threads until only one thread is left to process new items on
// the master queue. The execution of this function should be in it's own real-time
// thread to allow for proper scheduling outside of I/O operations. This scheduler
// is still subject to the operating system scheduler and may not be entirely accurate.
void Schedule();
// Initialize and shutdown functions to start the threads and allocate memory
// as well as to free the memory and shutdown the threads.
void Initialize();
void Shutdown();
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment