Controlling the flow of asynchronous code in PHP works much like it does in Java, and looks pretty similar too. Colloquially referred to as "synchronization", pthreads supports both synchronized blocks and synchronized methods:
<?php
class My extends Thread {
public function run() {
/*
Synchronizing with an object causes the caller to acquire what is
referred to as the objects "monitor", which is infact a mutex associated
with a condition variable.
*/
$this->synchronized(function(){
/*
Setting the data here, inside the synchronized block is the safest thing to do
*/
$this->data = "set";
/*
Notify causes all contexts waiting on this object to awaken
*/
$this->notify();
});
}
protected $data;
}
$my = new My();
$my->start();
$my->synchronized(function() use ($my) {
if (!$my->data) {
/*
Waiting on a condition variable causes the system to atomically
unlock and lock the mutex associated with the condition variable
allowing other contexts to acquire the objects monitor and so enter
a synchronized state.
*/
$my->wait();
}
});
?>
Explicit synchronization is powerful, but it is sometimes the case that you just want to ensure that a single method is not executed by >1 contexts concurrently, a good example of where this might be useful would be logging - if >1 threads are writing a stream concurrently they will likely corrupt the contents.
Any method in a class descending from pthreads that is declared using the protected modifier will become a synchronized method:
<?php
class Logger extends Stackable {
/*
This method can only be executed by a single context at a time
*/
protected function log($message, $args = []) {
$args = func_get_args();
if (($message = array_shift($args))) {
echo vsprintf("{$message}\n", $args);
}
}
public function run(){}
}
class My extends Thread {
public function __construct(Logger $logger) {
$this->logger = $logger;
}
public function run() {
while (@$i++<1000) {
$this->logger->log(
"Thread #%lu iteration %d",
$this->getThreadId(), $i);
/* just for effect */
usleep(100);
}
}
protected $logger;
}
$threads = [];
$thread = 0;
$logger = new Logger();
while ($thread < 8) {
$threads[$thread] = new My($logger);
$threads[$thread]->start();
$thread++;
}
foreach ($threads as $thread)
$thread->join();
?>