Skip to content

Instantly share code, notes, and snippets.

@c93614
Created April 25, 2014 06:16
Show Gist options
  • Save c93614/11279367 to your computer and use it in GitHub Desktop.
Save c93614/11279367 to your computer and use it in GitHub Desktop.

Asnychronous Flow Control in PHP (pthreads)

Controlling the flow of asynchronous code in PHP works much like it does in Java, and looks pretty similar too. Colloquially referred to as "synchronization", pthreads supports both synchronized blocks and synchronized methods:

Blocks

<?php
class My extends Thread {
    public function run() {
    
      /*
       Synchronizing with an object causes the caller to acquire what is
       referred to as the objects "monitor", which is infact a mutex associated
       with a condition variable. 
      */
      $this->synchronized(function(){
      	/*
      	 Setting the data here, inside the synchronized block is the safest thing to do
      	*/
        $this->data = "set";
      
        /*
         Notify causes all contexts waiting on this object to awaken
        */
        $this->notify();
      });
      
      
    }
    
    protected $data;
}

$my = new My();
$my->start();

$my->synchronized(function() use ($my) {
  if (!$my->data) {
  	/*
  	 Waiting on a condition variable causes the system to atomically
  	 unlock and lock the mutex associated with the condition variable
  	 allowing other contexts to acquire the objects monitor and so enter
  	 a synchronized state.
  	*/
    $my->wait();
  }
});
?>

Methods

Explicit synchronization is powerful, but it is sometimes the case that you just want to ensure that a single method is not executed by >1 contexts concurrently, a good example of where this might be useful would be logging - if >1 threads are writing a stream concurrently they will likely corrupt the contents.

Any method in a class descending from pthreads that is declared using the protected modifier will become a synchronized method:

<?php
class Logger extends Stackable {
	
	/*
	 This method can only be executed by a single context at a time
	*/
	protected function log($message, $args = []) {
		$args = func_get_args();
		if (($message = array_shift($args))) {
			echo vsprintf("{$message}\n", $args);
		}
	}
	
	public function run(){}
}

class My extends Thread {
	public function __construct(Logger $logger) {
		$this->logger = $logger;
	}
	
	public function run() {
		while (@$i++<1000) {
			$this->logger->log(
				"Thread #%lu iteration %d",
				$this->getThreadId(), $i);
			/* just for effect */
			usleep(100);
		}
	}
	
	protected $logger;
}

$threads = [];
$thread  = 0;
$logger  = new Logger();

while ($thread < 8) {
	$threads[$thread] = new My($logger);
	$threads[$thread]->start();
	$thread++;
}

foreach ($threads as $thread)
	$thread->join();
?>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment