Skip to content

Instantly share code, notes, and snippets.

@ateleshev
Forked from krakjoe/pool.md
Last active February 26, 2016 13:11
Show Gist options
  • Save ateleshev/a64216ca97ba564d8244 to your computer and use it in GitHub Desktop.
Save ateleshev/a64216ca97ba564d8244 to your computer and use it in GitHub Desktop.

Easy pthreads Pools

The final solution !!

Since the first version of pthreads, PHP has had the ability to initialize Worker threads for users. Onto those Worker threads are stacked objects of class Stackable for execution concurrently.

The objects stacked onto workers do not have their reference counts changed, pthreads forces the user to maintain the reference counts in userland, for the extremely good reason that this enables the programmer to keep control of memory usage; and so, execute indefinitely.

This is the cause of much heartache for newcomers to pthreads; if you do not maintain references properly you will, definitely, experience segmentation faults.

To change the base implementation of Worker and or Stackable, would be to remove all power from the user.

Version 1.0.0 of pthreads comes with a Pool class, this Pool class manages references and implements an easy to use garbage collection mechanism for the user, it therefore eliminates the need to treat Stackables in a special manner, it should be idiot proof.

I'm not calling anyone an idiot !

The Pool Class

How it works

The Pool class is not thread safe; it therefore cannot be used from multiple contexts, since the underlying Worker class has restrictions that make this necessary.

Construction

How to make one!

The Pool constructor has the following prototype:

public function __construct($size, string $class, $ctor = []);
  • $size - the maimum number of Workers that can be started by this Pool.
  • $class - the name of a Worker class implemented by the programmer.
  • $ctor - an array of arguments to pass to the constructor of each Worker upon initialization.

The following code shows how to construct a Pool:

/**
 * An instance of this class shall be passed to Workers on initialization
 */
class WebLogger extends Stackable {
    public function log() {
        /* ... */
    }
}

class WebWorker extends Worker {

    public function __construct(\WebLogger $logger) {
        $this->logger = $logger;
    }
    
    public function run() {
        /* ... */
    }
    
    protected $logger;
}

$pool = new Pool(8, \WebWorker::class, [new \WebLogger()]);

Note: No threads are created at this time, this only initializes the Pool.

Execution

How to use one!

The submit method has the following prototype:

public function submit(Stackable $task)
  • $task - the Stackable to execute in a pooled Worker

The following code shows usage:

class WebWork extends Stackable {
    public function run() {
        /* ... */
    }
}

$pool->submit(new WebWork());

The next available Worker is selected (round robin), or constructed, $task is added to $pool->work and stacked onto the Worker selected.

Collection

How to keep one!

We live in a universe where are only two things are possibly inifinite, and much to the dismay of every shared hosting provider, ever; neither of those two things are RAM or CPU time.

With this in mind, you will periodically want to collect Stackables that are complete from the Pool, possibly execute some kind of final action on these objects and then allow the Pool to release the resources for that Stackable.

This is achieved by calling the collect method, which has the following prototype:

public function collect(Callable $collector)
  • $collector - a function to detect if a Stackable is done executing

$collector is called for each of the Stackable objects submitted for execution, it should accept a Stackable for an argument and return positively if the passed argument has finished executing.

$collector implicitly has the following prototype:

function (Stackable $task)

The following code show usage:

$pool->collect(function(WebWork $task){
   return $task->isComplete();
});

Growing and Shrinking

How to bend one!

The ability to grow and shrink the Pool, that is to say, increase or decrease the number of Workers available, allows the programmer much flexibility to adjust to the environment during execution.

The resize method has the following prototype:

public function resize($size)
  • $size - the new maximum number of Workers to allow

The following code shows usage:

$pool->resize(4);

At this time, if the new $size is smaller than the old, the last Workers started will be shutdown, the new maximum is then adjusted.

Shutting Down

How to stop one!

Shutting down causes all workers to be shutdown, and in so doing, execute all Stackables submitted.

The shutdown method has the following prototype:

public function shutdown()

The following code shows usage:

$pool->shutdown();

The Pool can no longer be used, however, the Stackables submitted are not released at this time.

Destruction

How to destroy one!

Destruction of the Pool will both shutdown all Workers and release all Stackables.

Destruction (__destruct) is invoked when the Pool goes out of scope, or no more references to it remain, just as a normal variable

unset explicitly invokes __destruct:

unset($pool);

Code Listing

U can haz codez!

The following code is a useless, but complete code listing, using the examples in this document:

<?php
class WebWork extends Stackable {

    public function __construct() {
        $this->complete = false;
    }

    public function run() {
        $this->worker->logger->log(
            "%s executing in Thread #%lu", 
            __CLASS__, $this->worker->getThreadId());
        usleep(100);
        $this->complete = true;
    }

    public function isComplete() {
        return $this->complete;
    }

    protected $complete;
}

class WebWorker extends Worker {

    public function __construct(WebLogger $logger) {
        $this->logger = $logger;
    }

    protected $logger;
}

class WebLogger extends Stackable {

    protected function log($message, $args = []) {
        $args = func_get_args();    

        if (($message = array_shift($args))) {
            echo vsprintf(
                "{$message}\n", $args);
        }
    }
}

$logger = new WebLogger();
$pool = new Pool(8, WebWorker::class, [$logger]);

while (@$i++<10)
    $pool->submit(new WebWork());

usleep(2000000);

$logger->log("Shrink !!");

$pool->resize(1);
$pool->collect(function(WebWork $task){
    return $task->isComplete();
});

while (@$j++<10)
    $pool->submit(new WebWork());

$pool->shutdown();	

Will output something like:

[joe@fiji pthreads]$ php-zts src/s.php
WebWork executing in Thread #140130134910720
WebWork executing in Thread #140130126518016
WebWork executing in Thread #140130117801728
WebWork executing in Thread #140130108819200
WebWork executing in Thread #140130099836672
WebWork executing in Thread #140129750480640
WebWork executing in Thread #140129742087936
WebWork executing in Thread #140129733695232
WebWork executing in Thread #140130126518016
WebWork executing in Thread #140130134910720
Shrink !!
WebWork executing in Thread #140130134910720
WebWork executing in Thread #140130134910720
WebWork executing in Thread #140130134910720
WebWork executing in Thread #140130134910720
WebWork executing in Thread #140130134910720
WebWork executing in Thread #140130134910720
WebWork executing in Thread #140130134910720
WebWork executing in Thread #140130134910720
WebWork executing in Thread #140130134910720
WebWork executing in Thread #140130134910720
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment