Skip to content

Instantly share code, notes, and snippets.

@hamaluik
Last active December 19, 2023 09:59
Show Gist options
  • Save hamaluik/80fb81f84ecedbe2a6af to your computer and use it in GitHub Desktop.
Save hamaluik/80fb81f84ecedbe2a6af to your computer and use it in GitHub Desktop.
Platform-agnostic thread pool for Haxe / OpenFL
package com.blazingmammothgames.util;
#if neko
import neko.vm.Thread;
import neko.vm.Mutex;
#elseif cpp
import cpp.vm.Thread;
import cpp.vm.Mutex;
#end
#if (neko || cpp)
private class PoolThread
{
private var thread:Thread;
private var task:Dynamic->Dynamic;
private var mutex:Mutex;
public var started:Bool;
private var _done:Bool;
public var done(get, never):Bool;
private function get_done():Bool
{
mutex.acquire();
var d:Bool = _done;
mutex.release();
return d;
}
private var _result:Dynamic;
public var result(get, never):Dynamic;
private function get_result():Dynamic
{
mutex.acquire();
var r:Dynamic = _result;
mutex.release();
return r;
}
public function new()
{
mutex = new Mutex();
}
public function start(task:Dynamic->Dynamic, arg:Dynamic):Void
{
this.task = task;
started = true;
_done = false;
thread = Thread.create(doWork);
thread.sendMessage(arg);
}
private function doWork():Void
{
var arg:Dynamic = Thread.readMessage(true);
var ret:Dynamic = task(arg);
mutex.acquire();
_result = ret;
_done = true;
mutex.release();
}
}
#end
typedef Task =
{
var id:Int;
var task:Dynamic->Dynamic;
var done:Bool;
var arg:Dynamic;
#if (neko || cpp)
var thread:PoolThread;
#end
var onFinish:Dynamic->Void;
}
/**
* ...
* @author Kenton Hamaluik
*/
class ThreadPool
{
#if (neko || cpp)
private var numThreads:Int = 1;
private var threads:Array<PoolThread>;
#end
private var tasks:Array<Task>;
private var nextID:Int = 0;
public function new(numThreads:Int)
{
tasks = new Array <Task> ();
#if (neko || cpp)
this.numThreads = numThreads;
threads = new Array<PoolThread>();
for (i in 0...this.numThreads)
{
threads.push(new PoolThread());
}
#end
}
public function addTask(task:Dynamic->Dynamic, arg:Dynamic, onFinish:Dynamic->Void):Void
{
tasks.push( { id: nextID, task: task, done: false, arg: arg, #if (neko || cpp) thread: null, #end onFinish: onFinish } );
nextID++;
}
#if (neko || cpp)
private function allTasksAreDone():Bool
{
for (task in tasks)
if (!task.done)
return false;
return true;
}
private function getNextFreeThread():PoolThread
{
for (thread in threads)
if (!thread.started)
return thread;
return null;
}
#end
public function blockRunAllTasks():Void
{
#if (neko || cpp)
while (!allTasksAreDone())
{
// get a free thread
var thread:PoolThread = getNextFreeThread();
// but if it doesn't exist, try again
if (thread == null)
continue;
for (task in tasks)
{
// skip any tasks that are done
if (task.done)
continue;
// if this task is currently being run, see if it's done yet
if (task.thread != null && task.thread.started)
{
if (task.thread.done)
{
// yay, it finished!
task.done = true;
// reset the thread
task.thread.started = false;
// call the on finish function
if (task.onFinish != null)
task.onFinish(task.thread.result);
}
continue;
}
// ok, we have a task that needs running
// and a thread to run it
// combine forces!
task.thread = thread;
thread.start(task.task, task.arg);
// break to try to assign the next thread
break;
}
}
#else
for (task in tasks)
{
if (task.onFinish != null)
task.onFinish(task.task(task.arg));
}
#end
// clear the old tasks
tasks = new Array<Task>();
}
}
package ;
// ...
import com.blazingmammothgames.util.ThreadPool;
// ...
// this will create a thread pool with 8 threads on neko and cpp platforms
// on all other platforms, no threads will be created
// and the pool will use the main thread
var threadPool:ThreadPool = new ThreadPool(8);
// add a task that will take a while to complete
threadPool.addTask(function(x:Dynamic):Int {
var li:Int = 0;
for (i in 0...10)
{
li += i;
for(n in 0...10000) {}
}
return li;
}, null, onFinish);
// add a task that returns right away
threadPool.addTask(function(x:Dynamic):String {
return "herp derp";
}, null, onFinish);
// this is a blocking call that will run all the tasks
// across the pool's threads
// or just in the main thread if not on neko or cpp
threadPool.blockRunAllTasks();
// ...
// report the results of the above tasks
private function onFinish(x:Dynamic):Void
{
trace(x);
}
// on neko or cpp, this will output:
// herp derp
// 45
// because the "herp derp" task will finish much sooner
// than the fibonnaci task when they're running in parallel
// on all other platforms, this will output:
// 45
// herp derp
// because the tasks will be executed in the order they're added
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment