Created
December 5, 2010 12:59
-
-
Save ianbarber/729057 to your computer and use it in GitHub Desktop.
Distribute work across subprocesses using 0MQ
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
$num_workers = 5; | |
$pid = 1; | |
for($i = 0; $i < $num_workers; $i++) { | |
$pid = pcntl_fork(); | |
if($pid == 0) { | |
break; | |
} | |
} | |
if($pid == 0) { | |
work($i); | |
} else { | |
serve($num_workers); | |
} | |
function work($id) { | |
$context = new ZMQContext(); | |
$workpipe = new ZMQSocket($context, ZMQ::SOCKET_PULL); | |
$workpipe->connect("ipc://work.ipc"); | |
$poll = new ZMQPoll(); | |
$poll->add($workpipe, ZMQ::POLL_IN); | |
$controlpipe = new ZMQSocket($context, ZMQ::SOCKET_SUB); | |
$controlpipe->connect("ipc://control.ipc"); | |
$controlpipe->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, ""); | |
$controlpipe->setSockOpt(ZMQ::SOCKOPT_IDENTITY, "worker" . $id); | |
$controlpoll = new ZMQPoll(); | |
$controlpoll->add($controlpipe, ZMQ::POLL_IN); | |
$syncpipe = new ZMQSOcket($context, ZMQ::SOCKET_PUSH); | |
$syncpipe->connect("ipc://sync.ipc"); | |
$syncpipe->send($id); | |
$readable = array(); | |
$worked = 0; | |
while(true) { | |
$events = $poll->poll($readable, null, 5000); | |
if($events > 0) { | |
foreach($readable as $r) { | |
$message = $r->recv(); | |
//echo "Worker $id got message " . $message . "\n"; | |
usleep(250); | |
$worked++; | |
} | |
} else { | |
$events = $controlpoll->poll($readable, null, 100); | |
if($events > 0) { | |
$message = $readable[0]->recv(); | |
echo "$id Ending!\n"; | |
break; | |
} | |
} | |
} | |
echo "Worker $id worked $worked times \n"; | |
} | |
function serve($workers) { | |
$context = new ZMQContext(); | |
$controlpipe = new ZMQSocket($context, ZMQ::SOCKET_PUB); | |
$controlpipe->bind("ipc://control.ipc"); | |
$workpipe = new ZMQSocket($context, ZMQ::SOCKET_PUSH); | |
$workpipe->bind("ipc://work.ipc"); | |
// Sync up with workers | |
$syncpipe = new ZMQSocket($context, ZMQ::SOCKET_PULL); | |
$syncpipe->bind("ipc://sync.ipc"); | |
$found = 0; | |
$poll = new ZMQPoll(); | |
$poll->add($syncpipe, ZMQ::POLL_IN); | |
$readable = array(); | |
while(true) { | |
$found += $poll->poll($readable, null);; | |
foreach($readable as $r) { | |
$message = $r->recv(); | |
echo "Hello $message!\n"; | |
} | |
if($found == $workers) { | |
echo "All workers checked in\n"; | |
break; | |
} | |
} | |
foreach(range(1, 10000) as $message) { | |
//echo "Sending $message\n"; | |
$workpipe->send($message); | |
} | |
echo "Sending END\n"; | |
$controlpipe->send("END"); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment