Last active
October 20, 2017 10:19
-
-
Save Justasic/8452518 to your computer and use it in GitHub Desktop.
A dynamic threading engine I built for a game I never made. You should only interact with the ThreadHandler class. Requires C++14 or C++17.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /* | |
| * Copyright (c) 2013-2017, Justin Crawford <Justin@stacksmash.net> | |
| * | |
| * Permission to use, copy, modify, and/or distribute this software for any purpose | |
| * with or without fee is hereby granted, provided that the above copyright notice | |
| * and this permission notice appear in all copies. | |
| * | |
| * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO | |
| * THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO | |
| * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL | |
| * DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER | |
| * IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN | |
| * CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. | |
| */ | |
| #include "ThreadEngine.h" | |
| #include <chrono> | |
| #include <cstring> | |
| #ifndef NDEBUG | |
| // Gcc/Clang whine about std::thread::id not being a pointer (when it is one on linux) | |
| // but there is no valid way to cast to a string for printf so we just disable the | |
| // warning instead. This may cause printing problems because it disables *all* checks | |
| // and not specifically the one about std::thread::id unfortunately. | |
| # pragma GCC diagnostic ignored "-Wformat" | |
| # define d_printf(...) printf(__VA_ARGS__) | |
| # include <cstdio> | |
| #else | |
| # define d_printf(...) | |
| #endif | |
| // Create a thread local queue for each thread | |
| // This is what allows us to submit functions asynchronously | |
| // then submit them to the global job queue | |
| #ifndef _WIN32 | |
| thread_local functions_t threadQueue; | |
| float *GetLoadAvg() | |
| { | |
| FILE *f = fopen("/proc/loadavg", "r"); | |
| if (!f) | |
| return NULL; | |
| static float loads[3]; | |
| unsigned int __attribute__((unused)) sched_runnable = 0; | |
| unsigned int __attribute__((unused)) sched_existed = 0; | |
| pid_t __attribute__((unused)) lastpid = 0; | |
| fscanf(f, "%f %f %f %u/%u %lu", &loads[0], &loads[1], &loads[2], &sched_runnable, &sched_existed, (unsigned long*)&lastpid); | |
| fclose(f); | |
| return loads; | |
| } | |
| #else | |
| __declspec(thread) functions_t* threadQueue; | |
| float *GetLoadAvg() | |
| { | |
| // TODO: Calculate load averages on windows | |
| // See: http://blog.sflow.com/2011/02/windows-load-average.html | |
| // We need to use the WMI class 'Win32_PerfFormattedData_PerfNet_ServerWorkQueues' | |
| // to get the processor queue length to make this calculation. | |
| static float ret[3]; | |
| memset(&ret, 0, sizeof(ret)); | |
| return ret; | |
| } | |
| #endif | |
| // For chrono literals. | |
| using namespace std::chrono_literals; | |
| // A mutex to avoid a race condition when checking the queue. | |
| std::mutex queueLock; | |
| // another for accessing the thread list. | |
| std::mutex ThreadListLock; | |
| // Values used to calculate the 1 minute, 5 minute, and 15 minute load times. | |
| static const double EXP_1 = LoadAverage<double>::CalculateMagic(5.0f, 60.0f); | |
| static const double EXP_5 = LoadAverage<double>::CalculateMagic(5.0f, 300.0f); | |
| static const double EXP_15 = LoadAverage<double>::CalculateMagic(5.0f, 900.0f); | |
| ThreadHandler::ThreadHandler() : loads({{LoadAverage<double>(EXP_1), LoadAverage<double>(EXP_5), LoadAverage<double>(EXP_15)}}), quitting(false), totalConcurrentThreads(0) | |
| { | |
| // NOTE: Windows thread_local stuff is not complete currently, we must work around | |
| // this with pointers. This will initialize a queue for the main thread. | |
| // Each thread will also have their own queues. | |
| #ifdef _WIN32 | |
| functions_t *queue = new functions_t; | |
| threadQueue = queue; | |
| #endif | |
| } | |
| ThreadHandler::~ThreadHandler() | |
| { | |
| this->Shutdown(); | |
| #ifdef _WIN32 | |
| delete threadQueue; | |
| #endif | |
| } | |
| void ThreadHandler::Initialize() | |
| { | |
| d_printf("Thread engine initializing\n"); | |
| // Initialize some of our stuff we use in the class. | |
| this->funcs = functions_t(); | |
| this->totalConcurrentThreads = std::thread::hardware_concurrency(); | |
| d_printf("Total supported threads: %d\n", this->totalConcurrentThreads); | |
| // Create a new thread to get things started. | |
| new WorkerThread(this); | |
| // Since we're now working with two threads, we acquire a lock while we're | |
| // still setting some stuff up. | |
| std::unique_lock<std::mutex> lck(ThreadListLock); | |
| d_printf("Spawned %zu threads\n", this->Threads.size()); | |
| #ifdef THREAD_USE_LOOP | |
| // Send our scheduler into it's own idle thread. | |
| this->loadthrd = std::thread(&ThreadHandler::Schedule, this); | |
| #endif | |
| } | |
| void ThreadHandler::Shutdown() | |
| { | |
| d_printf("Thread engine shutting down...\n"); | |
| // Wake all threads, make this->funcs read only, finish work, shutdown threads | |
| this->JoinThreads(); | |
| // We avoid a race condition because all threads have been joined at this point. | |
| d_printf("%lu jobs left and will not be processed.\n", this->funcs.size()); | |
| for (auto it = this->Threads.begin(), it_end = this->Threads.end(); it != it_end;) | |
| delete *(it++); | |
| } | |
| void ThreadHandler::Schedule() | |
| { | |
| #ifdef THREAD_USE_LOOP | |
| while (!this->quitting.load()) | |
| { | |
| #endif | |
| // First we start out by measuring the queue length of our own queue | |
| // and calculating load times from that. | |
| queueLock.lock(); | |
| unsigned long workqueue = this->funcs.size(); | |
| queueLock.unlock(); | |
| // Now get how many threads have been running beyond their run limit of 1 usec | |
| // This will help prevent "Dead Threading", a condition where one thread is consumed | |
| // for an unreasonable amount of time (eg, we're waiting for user input) and therefore | |
| // will be added to the load average. | |
| std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now(); | |
| ThreadListLock.lock(); | |
| for (auto it : this->Threads) | |
| { | |
| if (it->IsRunning.load() && (it->start.load() + 5us) < now) | |
| { | |
| // If a thread is busy (even if the work queue is empty) we increment it | |
| // so that we don't stall from dead threads. | |
| workqueue++; | |
| } | |
| // Since we're iterating, also check for zombies and deallocate them. | |
| if (it->quitting.load() && !it->IsRunning.load()) | |
| { | |
| d_printf("[ThreadEngine][Scheduler] Found zombie thread %p, deallocating...\n", it->GetID()); | |
| delete it; | |
| } | |
| } | |
| ThreadListLock.unlock(); | |
| // Now we calculate our load time and see if we need more threads or not. | |
| this->loads[0] += workqueue; | |
| this->loads[1] += workqueue; | |
| this->loads[2] += workqueue; | |
| // Due to how floating points work, we need to convert this from double to int in the most | |
| // "natural" way possible so we don't prematurely spawn a thread simply because we passed the 0.5 | |
| // threshold. We must change our floating point environment to ensure we're using nearest rounding. | |
| int CurrentRound = fegetround(); | |
| fesetround(FE_TONEAREST); | |
| uint32_t load5 = lrint(this->loads[1]()); | |
| fesetround(CurrentRound); | |
| // Debug :D | |
| d_printf("[ThreadEngine][Scheduler] Current thread engine load: %.2f (1 min), %.2f (5 min), %.2f (15 min) with %ld item(s) in queue with %ld workers\n", this->loads[0](), this->loads[1](), | |
| this->loads[2](), workqueue, this->Threads.size()); | |
| // We have our load time for 5 minutes for this thread. Now we calculate | |
| // whether or not we need a new thread or dump existing threads. | |
| ThreadListLock.lock(); | |
| // If we need more threads (eg, the load is greater than the threads currently working) | |
| if (load5 >= this->Threads.size()) | |
| { | |
| // If we want to request a new thread, check the system load times and ensure we don't | |
| // have a large amount of processor congestion by measuring the 5 minute load time | |
| // of the processor. | |
| float *avg = GetLoadAvg(); | |
| if (lrint(avg[1]) >= this->totalConcurrentThreads) | |
| { | |
| // We cannot spawn another thread, the CPU is already saturated with work. Don't bother continuing. | |
| d_printf("[ThreadEngine][Scheduler] WARNING: System CPU is saturated, will not spawn new threads as desired due to high load averages. (%2f 5-minute load, %d max load threshold)\n", | |
| avg[1], this->totalConcurrentThreads); | |
| goto end; | |
| } | |
| // We need more threads, all our threads are saturated. | |
| d_printf("[ThreadEngine][Scheduler] Thread engine requires more threads, %.2f >= %zu\n", this->loads[1](), this->Threads.size()); | |
| // Spawn another thread. We do this one at a time to allow for more of an "evolution" factor | |
| // otherwise we may spawn like 5000 threads to handle a sudden influx of data which | |
| // most kernels will not like you much if you do that. | |
| ThreadListLock.unlock(); | |
| new WorkerThread(this); | |
| } | |
| // if we should despawn threads (eg, the threads are idle and little work is being done.) | |
| if (load5 + THREAD_MINIMUM < this->Threads.size()) | |
| { | |
| // we need less threads, but make sure we're not gonna be less than the minimum threads desired. | |
| if (this->Threads.size() - THREAD_MINIMUM != THREAD_MINIMUM-1) | |
| { | |
| d_printf("[ThreadEngine][Scheduler] Thread engine can release some threads, %.2f < %zu\n", this->loads[1](), this->Threads.size()); | |
| WorkerThread *th = this->Threads.back(); | |
| ThreadListLock.unlock(); | |
| // Sanity check, sometimes we might have screwed up somewhere and gotten to this case. | |
| if (!th) | |
| { | |
| d_printf("[ThreadEngine][Scheduler] ERROR: there are no more threads to quit??\n"); | |
| goto end; | |
| } | |
| // If the thread is running, we cannot join it (and we cannot forcefully terminate it either due | |
| // to the implementation of std::thread) so we can only ask it to leave once it's finished with | |
| // it's job. Unfortunately this leaves a dead thread in the thread list so the scheduler will | |
| // deallocate it's class on the next iteration of the class (so long as the thread is no longer | |
| // running anymore) | |
| if (th->IsRunning.load() == true) | |
| { | |
| d_printf("[ThreadEngine][Scheduler] Thread %p is currently busy but asked to terminate.\n", th->GetID()); | |
| th->quitting = true; | |
| goto end; | |
| } | |
| th->Join(); | |
| delete th; | |
| } | |
| } | |
| end: | |
| // Ensure we're unlocked. | |
| ThreadListLock.unlock(); | |
| #ifdef THREAD_USE_LOOP | |
| // Suspend this thread for 5 seconds. | |
| d_printf("[ThreadEngine][Scheduler] Entering 5 second sleep...\n"); | |
| std::this_thread::sleep_for(5s); | |
| } // end of while (!this->quitting.load()) | |
| d_printf("[ThreadEngine][Scheduler] Exiting scheduler thread\n"); | |
| this->loadthrd.join(); | |
| #endif | |
| } | |
| void ThreadHandler::JoinThreads() | |
| { | |
| std::unique_lock<std::mutex> lck(ThreadListLock); | |
| for (const auto &t : this->Threads) | |
| t->Join(); | |
| // Wait for confirmation that threads have joined. | |
| for (const auto &t : this->Threads) | |
| { | |
| while (t->IsRunning.load() && t->quitting.load()) | |
| { | |
| d_printf("[ThreadEngine] Thread %p refusing to join, waiting 2 seconds for thread to join...\n", t->GetID()); | |
| std::this_thread::sleep_for(2s); | |
| // In the future, allow for non-interruptable threads to force-terminate? | |
| } | |
| } | |
| // quit the scheduler thread. | |
| this->quitting = true; | |
| this->loadthrd.join(); | |
| } | |
| void ThreadHandler::WakeThreads() | |
| { | |
| std::unique_lock<std::mutex> lck(ThreadListLock); | |
| for (const auto &t : this->Threads) | |
| t->Wake(); | |
| } | |
| bool ThreadHandler::Submit(bool noStall) | |
| { | |
| // Acquire a lock. if noStall, return -1 so the function can continue, otherwise wait until the lock is acquired | |
| if (noStall) | |
| { | |
| if (!queueLock.try_lock()) | |
| return false; | |
| } | |
| else | |
| queueLock.lock(); | |
| #ifndef _WIN32 | |
| // Submit the functions pending | |
| while (!threadQueue.empty()) | |
| { | |
| // Remove function from thread local stack | |
| function_t func = threadQueue.front(); | |
| threadQueue.pop(); | |
| // Move to global stack so threads can pick what is needed | |
| this->funcs.push(func); | |
| } | |
| #else | |
| // Submit the functions pending | |
| while (!threadQueue->empty()) | |
| { | |
| // Remove function from thread local stack | |
| function_t func = threadQueue->front(); | |
| threadQueue->pop(); | |
| // Move to global stack so threads can pick what is needed | |
| this->funcs.push(func); | |
| } | |
| #endif | |
| // release our lock | |
| queueLock.unlock(); | |
| // Notify the threads that there is work | |
| this->WakeThreads(); | |
| return true; | |
| } | |
| /******************************************************************/ | |
| WorkerThread::WorkerThread(ThreadHandler *thr) : wakeThread(false), | |
| thr(thr), IsRunning(false), quitting(false) | |
| { | |
| std::unique_lock<std::mutex> lck(ThreadListLock); | |
| // NOTICE: We must wait for the class to initialize before we can start our thread | |
| // otherwise we get uninitialized values which can cause a race condition | |
| // and undefined behavior when the class | |
| this->th = std::thread(&WorkerThread::Main, this); | |
| // Add the thread to the thread list | |
| thr->Threads.push_back(this); | |
| d_printf("Thread ID: %p\n", this->th.get_id()); | |
| } | |
| WorkerThread::~WorkerThread() | |
| { | |
| std::unique_lock<std::mutex> lck(ThreadListLock); | |
| // Remove the thread from the thread list | |
| this->thr->Threads.remove(this); | |
| } | |
| void WorkerThread::Sleep() | |
| { | |
| std::unique_lock<std::mutex> lk(this->m); | |
| this->cv.wait(lk, [this] (){return this->wakeThread.load(); }); | |
| this->wakeThread = false; | |
| } | |
| void WorkerThread::Wake() | |
| { | |
| if (this->IsRunning.load()) | |
| return; | |
| this->wakeThread = true; | |
| this->cv.notify_all(); | |
| } | |
| void WorkerThread::Join() | |
| { | |
| d_printf("Thread %p joining to next thread...\n", this->GetID()); | |
| this->quitting = true; | |
| if (this->IsRunning.load()) | |
| d_printf("Cannot join thread %p because it's currently running...\n", this->GetID()); | |
| else | |
| { | |
| this->Wake(); | |
| this->th.join(); | |
| } | |
| } | |
| void WorkerThread::Main() | |
| { | |
| #ifdef _WIN32 | |
| functions_t *queue = new functions_t; | |
| threadQueue = queue; | |
| #endif | |
| this->ThreadID = std::this_thread::get_id(); | |
| while (!this->quitting) | |
| { | |
| // Acquire a lock on the queue and get the data | |
| queueLock.lock(); | |
| // Check the funcs are in the queue and ready to be processed | |
| if (!this->thr->funcs.empty()) | |
| { | |
| // Get a local copy of the function | |
| function_t func = this->thr->funcs.front(); | |
| this->thr->funcs.pop(); | |
| // Release our lock so other threads can use it before we run our function. | |
| queueLock.unlock(); | |
| // So we don't need an atomic, we set our runtime before our isrunning flag. | |
| this->start = std::chrono::steady_clock::now(); | |
| // Set that we're busy. | |
| this->IsRunning.store(true); | |
| // Run the function (we released the lock in case the function takes a long time to run) | |
| func(); | |
| //d_printf("[Thread %p] Finished running function. Took %.6f seconds\n", this->GetID(), std::chrono::duration_cast<std::chrono::duration<double>>(this->start.load() - std::chrono::steady_clock::now()).count()); | |
| // We're not busy anymore. | |
| this->IsRunning.store(false); | |
| this->start = std::chrono::steady_clock::time_point(); | |
| } | |
| else | |
| { | |
| queueLock.unlock(); | |
| this->Sleep(); | |
| } | |
| } | |
| #ifdef _WIN32 | |
| // Clean up our shit. | |
| delete queue; | |
| #endif | |
| d_printf("Thread %p exiting...\n", this->GetID()); | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /* | |
| * Copyright (c) 2013-2017, Justin Crawford <Justin@stacksmash.net> | |
| * | |
| * Permission to use, copy, modify, and/or distribute this software for any purpose | |
| * with or without fee is hereby granted, provided that the above copyright notice | |
| * and this permission notice appear in all copies. | |
| * | |
| * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO | |
| * THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO | |
| * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL | |
| * DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER | |
| * IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN | |
| * CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. | |
| */ | |
| #pragma once | |
| #include <functional> | |
| #include <queue> | |
| #include <thread> | |
| #include <mutex> | |
| #include <atomic> | |
| #include <list> | |
| #include <cmath> | |
| #include <cfenv> | |
| #include <array> | |
| #include <condition_variable> | |
| // Undef this if you want to manually call ThreadHandler::Schedule() every 5 seconds. | |
| // If you want to call less often or more often than that, then you need to recalculate | |
| // the load time magic values for the scheduler to work correctly. | |
| #define THREAD_USE_LOOP 1 | |
| // Define this to tell the threading engine how many minimum threads you wish to keep | |
| // at all times. the thread engine will only kill threads to this number. Default | |
| // is 1 thread minimum. Setting this to zero will mean the engine will take a large | |
| // amount of time to spawn a thread to do the work unless there is a lot of work to | |
| // be done (lots of things submitted to the engine). | |
| #define THREAD_MINIMUM 1 | |
| ///////////////////////////////////////////////////////////////// | |
| typedef std::queue < std::function < void () >> functions_t; | |
| typedef std::function<void ()> function_t; | |
| class ThreadHandler; | |
| class WorkerThread | |
| { | |
| // Mutex used for sleeping the thread | |
| std::mutex m; | |
| std::condition_variable cv; | |
| std::atomic<bool> wakeThread; | |
| std::thread th; // Inherit std::thread instead? | |
| ThreadHandler *thr; | |
| protected: | |
| // Used for the scheduler, this is set when the thread is *actively* doing something | |
| std::atomic<bool> IsRunning; | |
| // When above is true, this is set to a monotonic start time. | |
| // Used by scheduler to determine if this thread is too "overloaded" | |
| // When IsRunning = false, this is reset to 0. | |
| std::atomic<std::chrono::steady_clock::time_point> start; | |
| // This is the thread ID we get. | |
| std::atomic<std::thread::id> ThreadID; | |
| public: | |
| WorkerThread(ThreadHandler*); | |
| ~WorkerThread(); | |
| friend class ThreadHandler; | |
| // Put the thread into an idle state | |
| void Sleep(); | |
| // Wake the thread from idle state | |
| void Wake(); | |
| // Execute jobs - This is called by the Job Handler directly. | |
| void Main(); | |
| // Join the thread to the main thread (used for shutdown) | |
| void Join(); | |
| // Get our thread ID | |
| inline std::thread::id GetID() const | |
| { | |
| return this->ThreadID.load(); | |
| } | |
| // Check whether the thread is quitting | |
| std::atomic<bool> quitting; | |
| }; | |
| // Class: LoadAverage | |
| // | |
| // Basic: Calculates load averages over time in a simple way. | |
| // | |
| // Description: | |
| // This class I wrote to help keep things organized and clean. | |
| // The class is capable of calculating linux-style load averages | |
| // at any sample rate for any period of time (standard is a sample | |
| // rate of 5 seconds with timeframes of 1, 5, and 15 minutes). | |
| // This class takes in a "magic" value which is the output of | |
| // 'CalculateMagic' function below. The magic can be pre-calculated | |
| // and static. The class is designed to take in any floating point | |
| // type (or any type capable of accepting floats, really) to perform | |
| // calculations. It is meant to behave rather intuitively without | |
| // needing to explicitly call a function to get it's value. | |
| // | |
| // Load is calculated as: | |
| // load(t) = load(t-1) + e ^ -5/60 + n (1 - e ^ -5/60) | |
| // where t = time | |
| // and n = load amount (eg, length of queue) | |
| // | |
| // The output of load always approaches n over a time period of t | |
| // but in fact will never reach n due to how math works. We round to | |
| // reach n in our thread engine here. load(t-1) is our last load, we | |
| // add-on to that into our calculation to give the appearance of history. | |
| // Note: the above calculation is for the 1 minute load time with a | |
| // sample rate of 5 seconds. | |
| // | |
| // Calculating our magic: | |
| // The magic value is something I only vaguely understood the calculation | |
| // of but it goes as follows: | |
| // | |
| // m = (e ^ (-s/t)) * (2^11) | |
| // where: | |
| // e = euler's number | |
| // s = sample rate | |
| // t = timeframe | |
| // The purpose of this is to be faster at doing the calculations since this | |
| // only needs to be calculated once (realistically during compile time but | |
| // C++17 doesn't support constexpr floating point operations) | |
| template<typename T> | |
| class LoadAverage | |
| { | |
| // Our current load average right now. | |
| T loadavg; | |
| // The "magic" number for calculating the load against. | |
| T magic; | |
| public: | |
| // Make sure we know what we are. | |
| typedef T datatype; | |
| // Calculate a magic number so you can do adjustments to the | |
| // load rate without affecting the load average. (useful in | |
| // event loops) | |
| static inline T CalculateMagic(T SampleRate, T TimeFrame) | |
| { | |
| return std::exp(-SampleRate/TimeFrame); | |
| } | |
| LoadAverage(T magic) : loadavg(0.0), magic(magic) {} | |
| LoadAverage() : loadavg(0.0), magic(0.0) {} | |
| // Get the load average. | |
| T operator()() const { return this->loadavg; } | |
| // add to the load average. | |
| LoadAverage &operator +=(size_t n) { this->Add(n); return *this; } | |
| void Add(size_t n) | |
| { | |
| this->loadavg *= this->magic; | |
| this->loadavg += n * (1 - this->magic); | |
| } | |
| T Get() const { return this->loadavg; } | |
| // Implement the cast operator so C++ can implicit cast this class. | |
| operator T() const { return this->loadavg; } | |
| }; | |
| class ThreadHandler | |
| { | |
| protected: | |
| void JoinThreads(); | |
| void WakeThreads(); | |
| std::list<WorkerThread*> Threads; | |
| // Our local job queue load times: 1 minute, 5 minute, and 15 minutes. | |
| std::array<LoadAverage<double>, 3> loads; | |
| #ifdef THREAD_USE_LOOP | |
| // This is all used for a separate thread | |
| // to run the scheduler for the threads. | |
| std::atomic<bool> quitting; | |
| std::thread loadthrd; | |
| #endif | |
| public: | |
| ThreadHandler(); | |
| ~ThreadHandler(); | |
| // Let the threads access members in this class | |
| friend class WorkerThread; | |
| // Waiting jobs, This is global among all threads. | |
| functions_t funcs; | |
| // Defines how many threads the system is designed to use | |
| unsigned int totalConcurrentThreads; | |
| // This makes it easy to specify functions and arguments | |
| // in a readable manner | |
| template< class _Function, class... _Args > | |
| void AddQueue(_Function&& __f, _Args&&... __args) | |
| { | |
| // Create a thread local queue for each thread | |
| // This is what allows us to submit functions asynchronously | |
| // then submit them to the global job queue | |
| // Make a bound std::function for storage | |
| function_t func = std::bind(std::forward<_Function>(__f), std::forward<_Args>(__args)...); | |
| // Push to thread local storage container | |
| #ifndef _WIN32 | |
| extern thread_local functions_t threadQueue; | |
| threadQueue.push(func); | |
| #else | |
| extern __declspec(thread) functions_t* threadQueue; | |
| threadQueue->push(func); | |
| #endif | |
| } | |
| // Copy all the data from the thread local queue to the master queue and wake the threads. | |
| bool Submit(bool noStall = false); | |
| // Handle thread scheduling. This function is pretty critical as it allows for the | |
| // spawning of new threads based on work queue but only to a maximum of the current | |
| // actual CPU processor queue length (aka, it is allowed to spawn new threads until | |
| // the kernel states that the CPU is saturated via load times). In addition to this, | |
| // it will terminate threads until only one thread is left to process new items on | |
| // the master queue. The execution of this function should be in it's own real-time | |
| // thread to allow for proper scheduling outside of I/O operations. This scheduler | |
| // is still subject to the operating system scheduler and may not be entirely accurate. | |
| void Schedule(); | |
| // Initialize and shutdown functions to start the threads and allocate memory | |
| // as well as to free the memory and shutdown the threads. | |
| void Initialize(); | |
| void Shutdown(); | |
| }; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment