Created
December 5, 2012 04:10
-
-
Save compwright/4212160 to your computer and use it in GitHub Desktop.
Demo code used in my "Multitasking in PHP" presentation at the December 2012 Atlanta PHP user group meetup
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
require dirname(__FILE__).'/Process_Manager.php'; | |
// Create a batch of test messages to send | |
$email = array( | |
'to' => '[email protected]', | |
'subject' => 'This is a test', | |
'body' => 'Hello, world of multi-processing!' | |
); | |
$queue = array_fill(0, 50, $email); | |
// Create a function simulate sending an email message | |
$sender = function($message_id, $message) | |
{ | |
// Pretend to send it, we'll assume a normal latency of 500-1000ms | |
$ms = rand(500, 1000); | |
usleep($ms * 1000); | |
printf("Process %d: sent message %d (%d ms)\n", posix_getpid(), $message_id, $ms); | |
}; | |
// Start the timer | |
$start_time = microtime(TRUE); | |
// Send the emails | |
foreach ($queue as $message_id => $message) | |
{ | |
$sender($message_id, $message); | |
} | |
// Stop the timer | |
$runtime = microtime(TRUE) - $start_time; | |
printf("\nDone! Sent %d messages in %d seconds\n\n", count($queue), $runtime); | |
exit; | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
require dirname(__FILE__).'/Process_Manager.php'; | |
$pm = new Process_Manager(); | |
declare(ticks = 1); | |
// Setup our signal handlers | |
$callback = array($pm, 'signal_handler'); | |
pcntl_signal(SIGTERM, $callback); | |
pcntl_signal(SIGINT, $callback); | |
pcntl_signal(SIGCHLD, $callback); | |
// Create a batch of test messages to send | |
$email = array( | |
'to' => '[email protected]', | |
'subject' => 'This is a test', | |
'body' => 'Hello, world of multi-processing!' | |
); | |
$queue = array_fill(0, 50, $email); | |
// Create a function simulate sending an email message | |
$sender = function($message_id, $message) | |
{ | |
// Pretend to send it, we'll assume a normal latency of 500-1000ms | |
$ms = rand(500, 1000); | |
usleep($ms * 1000); | |
printf("Process %d: sent message %d (%d ms)\n", posix_getpid(), $message_id, $ms); | |
}; | |
// Start the timer | |
$start_time = microtime(TRUE); | |
// Fork processes to send the emails | |
foreach ($queue as $message_id => $message) | |
{ | |
$args = array( | |
'message_id' => $message_id, | |
'message' => $message, | |
); | |
// Execution will not proceed past this line | |
// except for in the parent process. | |
$pm->fork_child($sender, $args); | |
// Limit concurrency to 5 processes | |
if (count($pm) >= 5) | |
{ | |
while (count($pm) >= 5) | |
{ | |
usleep(500000); // sleep 500 ms | |
} | |
} | |
} | |
// Wait for all processes to end | |
echo "The queue is empty, waiting for all processes to finish\n"; | |
while (count($pm) > 0) | |
{ | |
usleep(500000); // sleep 500 ms | |
} | |
// Stop the timer | |
$runtime = microtime(TRUE) - $start_time; | |
printf("\nDone! Sent %d messages in %d seconds\n\n", count($queue), $runtime); | |
exit; | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
class Process_Manager implements Countable | |
{ | |
protected $processes = array(); | |
protected $is_child = FALSE; | |
public function count() | |
{ | |
return count($this->processes); | |
} | |
public function signal_handler($signal) | |
{ | |
// Don't do anything if we're not in the parent process | |
if ($this->is_child) | |
{ | |
return; | |
} | |
switch ($signal) | |
{ | |
case SIGINT: | |
case SIGTERM: | |
echo "\nUser terminated the application\n"; | |
// Kill all child processes before terminating the parent | |
$this->kill_all(); | |
exit(0); | |
case SIGCHLD: | |
// Reap a child process | |
//echo "SIGCHLD received\n"; | |
$this->reap_child(); | |
} | |
} | |
public function kill_all() | |
{ | |
foreach ($this->processes as $pid => $is_running) | |
{ | |
posix_kill($pid, SIGKILL); | |
} | |
} | |
public function fork_child($callback, $data) | |
{ | |
$pid = pcntl_fork(); | |
switch($pid) | |
{ | |
case 0: | |
// Child process | |
$this->is_child = TRUE; | |
call_user_func_array($callback, $data); | |
posix_kill(posix_getppid(), SIGCHLD); | |
exit; | |
case -1: | |
// Parent process, fork failed | |
throw new Exception("Out of memory!"); | |
default: | |
// Parent process, fork succeeded | |
$this->processes[$pid] = TRUE; | |
return $pid; | |
} | |
} | |
public function reap_child() | |
{ | |
// Check if any child process has terminated, | |
// and if so remove it from memory | |
$pid = pcntl_wait($status, WNOHANG); | |
if ($pid < 0) | |
{ | |
throw new Exception("Out of memory"); | |
} | |
elseif ($pid > 0) | |
{ | |
unset($this->processes[$pid]); | |
} | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@parsibox check your child process code, and ensure that any resources it needs, it opens and closes, and check how errors are handled. Ensure that resources that were open at the time of the error get closed when an error occurs. You might find it helpful to add thorough debug logging throughout your child process code.