Last active
April 25, 2016 04:13
-
-
Save binji/16b9bc4a1e875b6a64663ff797456a5b to your computer and use it in GitHub Desktop.
simple thread pool w/ bump allocator
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
doing stuff... | |
doing stuff... | |
chunk 0: used: 50000 size: 65536 | |
chunk 1: used: 37020 size: 65536 | |
chunk 2: used: 51000 size: 65536 | |
chunk 3: used: 44000 size: 65536 | |
chunk 4: used: 58000 size: 65536 | |
chunk 5: used: 63000 size: 65536 | |
chunk 6: used: 41000 size: 65536 | |
chunk 7: used: 56000 size: 65536 | |
TOTAL: used: 400020 size: 524288 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <assert.h> | |
#include <stdint.h> | |
#include <stdio.h> | |
#include <stdlib.h> | |
#include <chrono> | |
#include <condition_variable> | |
#include <functional> | |
#include <memory> | |
#include <mutex> | |
#include <queue> | |
#include <thread> | |
#define LOG 0 | |
#if LOG | |
#define LOGF(...) fprintf(stderr, __VA_ARGS__) | |
#else | |
#define LOGF(...) (void)0 | |
#endif | |
struct Chunk { | |
explicit Chunk(size_t size) { | |
data = new uint8_t[size]; | |
current = data; | |
end = data + size; | |
next = nullptr; | |
} | |
~Chunk() { delete[] data; } | |
void* Alloc(size_t size) { | |
if (end - current < size) | |
return nullptr; | |
void* result = current; | |
current += size; | |
return result; | |
} | |
uint8_t* data; | |
uint8_t* current; | |
uint8_t* end; | |
Chunk* next; | |
}; | |
const size_t kDefaultChunkSize = 64 * 1024; | |
struct Allocator { | |
Allocator() : first(nullptr), last(nullptr), chunk_count(0) {} | |
~Allocator() { | |
while (first) { | |
Chunk* next = first->next; | |
delete first; | |
first = next; | |
} | |
} | |
void* Alloc(size_t size) { | |
void* result = nullptr; | |
if (first) { | |
result = first->Alloc(size); | |
} | |
if (!result) { | |
Chunk* chunk = new Chunk(std::max(size, kDefaultChunkSize)); | |
chunk_count++; | |
if (first) { | |
chunk->next = first; | |
} else { | |
last = chunk; | |
} | |
first = chunk; | |
result = chunk->Alloc(size); | |
assert(result); | |
} | |
return result; | |
} | |
void StealFirstAndPrepend(Allocator* their) { | |
Chunk* stolen = their->first; | |
if (!stolen) { | |
return; | |
} | |
their->first = stolen->next; | |
if (!their->first) { | |
their->last = nullptr; | |
} | |
stolen->next = first; | |
if (!first) { | |
last = stolen; | |
} | |
first = stolen; | |
chunk_count++; | |
their->chunk_count--; | |
} | |
void StealAllAndAppend(Allocator* their) { | |
if (!their->first) { | |
// nothing to steal | |
return; | |
} | |
if (last) { | |
last->next = their->first; | |
} else { | |
first = their->first; | |
} | |
last = their->last; | |
their->first = their->last = nullptr; | |
chunk_count += their->chunk_count; | |
their->chunk_count = 0; | |
} | |
void Dump() { | |
size_t total_used = 0; | |
size_t total_size = 0; | |
int i = 0; | |
for (Chunk* chunk = first; chunk; chunk = chunk->next, ++i) { | |
size_t chunk_used = chunk->current - chunk->data; | |
size_t chunk_size = chunk->end - chunk->data; | |
fprintf(stderr, "chunk %d: used: %zd size: %zd\n", i, chunk_used, | |
chunk_size); | |
total_used += chunk_used; | |
total_size += chunk_size; | |
} | |
fprintf(stderr, "TOTAL: used: %zd size: %zd\n", total_used, total_size); | |
} | |
Chunk* first; | |
Chunk* last; | |
int chunk_count; | |
}; | |
struct Task { | |
virtual ~Task() {} | |
virtual void Before() {} | |
virtual void Run() = 0; | |
virtual void After() {} | |
}; | |
using TaskPtr = std::unique_ptr<Task>; | |
struct ThreadPool; | |
struct Thread { | |
explicit Thread(ThreadPool* pool) : thread(&Thread::Run, this, pool) {} | |
~Thread() { thread.join(); } | |
void Run(ThreadPool* pool); | |
std::thread thread; | |
}; | |
using ThreadPtr = std::unique_ptr<Thread>; | |
struct TaskQueue { | |
TaskQueue() : exiting(false) {} | |
void Enqueue(TaskPtr task) { | |
std::lock_guard<std::mutex> lock(mut); | |
tasks.push(std::move(task)); | |
has_task.notify_one(); | |
} | |
TaskPtr Dequeue() { | |
std::unique_lock<std::mutex> lock(mut); | |
has_task.wait(lock, [this] { return exiting || !tasks.empty(); }); | |
if (exiting) | |
return nullptr; | |
TaskPtr task = std::move(tasks.front()); | |
tasks.pop(); | |
return task; | |
} | |
void Exit() { | |
std::lock_guard<std::mutex> lock(mut); | |
exiting = true; | |
has_task.notify_all(); | |
} | |
private: | |
std::mutex mut; | |
std::condition_variable has_task; | |
std::queue<TaskPtr> tasks; | |
bool exiting; | |
}; | |
struct ThreadPool { | |
explicit ThreadPool(int count) { | |
for (int i = 0; i < count; ++i) { | |
threads.emplace_back(new Thread(this)); | |
} | |
} | |
~ThreadPool() { | |
tasks_to_run.Exit(); | |
} | |
template <typename Container> | |
void Run(Container& tasks) { | |
const size_t kMaxRunning = threads.size() * 2; | |
size_t total = tasks.size(); | |
size_t started = 0; | |
size_t finished = 0; | |
while (started < kMaxRunning) { | |
TaskPtr task = std::move(tasks[started++]); | |
task->Before(); | |
tasks_to_run.Enqueue(std::move(task)); | |
} | |
while (finished < total) { | |
TaskPtr task = done_tasks.Dequeue(); | |
task->After(); | |
finished++; | |
if (started < total) { | |
TaskPtr task = std::move(tasks[started++]); | |
task->Before(); | |
tasks_to_run.Enqueue(std::move(task)); | |
} | |
} | |
tasks.clear(); | |
} | |
private: | |
TaskQueue tasks_to_run; | |
TaskQueue done_tasks; | |
std::vector<ThreadPtr> threads; | |
friend struct Thread; | |
}; | |
void Thread::Run(ThreadPool* pool) { | |
while (true) { | |
TaskPtr task = pool->tasks_to_run.Dequeue(); | |
if (!task) | |
break; | |
task->Run(); | |
pool->done_tasks.Enqueue(std::move(task)); | |
} | |
} | |
struct Function { | |
explicit Function(int dummy) : dummy(dummy) {} | |
int dummy; | |
}; | |
struct Module { | |
Allocator allocator; | |
std::vector<Function> functions; | |
}; | |
struct ModuleTask : Task { | |
explicit ModuleTask(Module* module) : module(module) {} | |
virtual void Before() { allocator.StealFirstAndPrepend(&module->allocator); } | |
virtual void After() { | |
module->allocator.StealFirstAndPrepend(&allocator); | |
module->allocator.StealAllAndAppend(&allocator); | |
} | |
Module* module; | |
Allocator allocator; | |
}; | |
int fib(int n) { | |
if (n < 2) return 1; | |
return fib(n - 1) + fib(n - 2); | |
} | |
struct MyTask : ModuleTask { | |
MyTask(Module* module, Function* f) : ModuleTask(module), f(f) { | |
id = s_counter++; | |
} | |
virtual void Run() { | |
LOGF("%d: started\n", id); | |
// silly allocations | |
LOGF("%d: allocating 1000 bytes\n", id); | |
allocator.Alloc(1000); | |
// busy work | |
f->dummy = fib(id % 40); | |
LOGF("%d: finished\n", id); | |
} | |
static int s_counter; | |
int id; | |
Function* f; | |
}; | |
int MyTask::s_counter = 0; | |
void DoSomeStuff(ThreadPool* pool, Module* module) { | |
fprintf(stderr, "doing stuff...\n"); | |
std::vector<TaskPtr> tasks; | |
for (Function& f : module->functions) { | |
tasks.emplace_back(new MyTask(module, &f)); | |
} | |
pool->Run(tasks); | |
} | |
int main() { | |
const int kNumFunctions = 200; | |
ThreadPool pool(4); | |
Module module; | |
module.allocator.Alloc(20); | |
for (int i = 0; i < kNumFunctions; ++i) { | |
module.functions.emplace_back(i); | |
} | |
DoSomeStuff(&pool, &module); | |
DoSomeStuff(&pool, &module); | |
module.allocator.Dump(); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment