Last active
August 29, 2015 14:10
-
-
Save c93614/b42a77ea7b3b883fc718 to your computer and use it in GitHub Desktop.
Asynchronous client/server in PHP http://zguide.zeromq.org/php:asyncsrv
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
// http://zguide.zeromq.org/php:mdcliapi2 | |
/* ===================================================================== | |
* mdcliapi2.c | |
* | |
* Majordomo Protocol Client API (async version) | |
* Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7. | |
* | |
* --------------------------------------------------------------------- | |
* Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com> | |
* Copyright other contributors as noted in the AUTHORS file. | |
* | |
* This file is part of the ZeroMQ Guide: http://zguide.zeromq.org | |
* | |
* This is free software; you can redistribute it and/or modify it under | |
* the terms of the GNU Lesser General Public License as published by | |
* the Free Software Foundation; either version 3 of the License, or (at | |
* your option) any later version. | |
* | |
* This software is distributed in the hope that it will be useful, but | |
* WITHOUT ANY WARRANTY; without even the implied warranty of | |
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
* Lesser General Public License for more details. | |
* | |
* You should have received a copy of the GNU Lesser General Public | |
* License along with this program. If not, see | |
* <http://www.gnu.org/licenses/>. | |
* ===================================================================== | |
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com> | |
*/ | |
include_once 'zmsg.php'; | |
include_once 'mdp.php'; | |
class MDCli | |
{ | |
// Structure of our class | |
// We access these properties only via class methods | |
private $broker; | |
private $context; | |
private $client; // Socket to broker | |
private $verbose; // Print activity to stdout | |
private $timeout; // Request timeout | |
private $retries; // Request retries | |
/** | |
* Constructor | |
* | |
* @param string $broker | |
* @param boolean $verbose | |
*/ | |
public function __construct($broker, $verbose = false) | |
{ | |
$this->broker = $broker; | |
$this->context = new ZMQContext(); | |
$this->verbose = $verbose; | |
$this->timeout = 2500; // msecs | |
$this->connect_to_broker(); | |
} | |
/** | |
* Connect or reconnect to broker | |
*/ | |
protected function connect_to_broker() | |
{ | |
if ($this->client) { | |
unset($this->client); | |
} | |
$this->client = new ZMQSocket($this->context, ZMQ::SOCKET_DEALER); | |
$this->client->setSockOpt(ZMQ::SOCKOPT_LINGER, 0); | |
$this->client->connect($this->broker); | |
if ($this->verbose) { | |
printf("I: connecting to broker at %s…", $this->broker); | |
} | |
} | |
/** | |
* Set request timeout | |
* | |
* @param int $timeout (msecs) | |
*/ | |
public function set_timeout($timeout) | |
{ | |
$this->timeout = $timeout; | |
} | |
/** | |
* Send request to broker | |
* Takes ownership of request message and destroys it when sent. | |
* | |
* @param string $service | |
* @param Zmsg $request | |
*/ | |
public function send($service, Zmsg $request) | |
{ | |
// Prefix request with protocol frames | |
// Frame 0: empty (REQ emulation) | |
// Frame 1: "MDPCxy" (six bytes, MDP/Client x.y) | |
// Frame 2: Service name (printable string) | |
$request->push($service); | |
$request->push(MDPC_CLIENT); | |
$request->push(""); | |
if ($this->verbose) { | |
printf ("I: send request to '%s' service: %s", $service, PHP_EOL); | |
echo $request->__toString(); | |
} | |
$request->set_socket($this->client)->send(); | |
} | |
/** | |
* Returns the reply message or NULL if there was no reply. Does not | |
* attempt to recover from a broker failure, this is not possible | |
* without storing all unanswered requests and resending them all… | |
* | |
*/ | |
public function recv() | |
{ | |
$read = $write = array(); | |
// Poll socket for a reply, with timeout | |
$poll = new ZMQPoll(); | |
$poll->add($this->client, ZMQ::POLL_IN); | |
$events = $poll->poll($read, $write, $this->timeout); | |
// If we got a reply, process it | |
if ($events) { | |
$msg = new Zmsg($this->client); | |
$msg->recv(); | |
if ($this->verbose) { | |
echo "I: received reply:", $request->__toString(), PHP_EOL; | |
} | |
// Don't try to handle errors, just assert noisily | |
assert ($msg->parts() >= 4); | |
$msg->pop(); // empty | |
$header = $msg->pop(); | |
assert($header == MDPC_CLIENT); | |
$reply_service = $msg->pop(); | |
return $msg; // Success | |
} else { | |
echo "W: permanent error, abandoning request", PHP_EOL; | |
return; // Give up | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
/* | |
* Asynchronous client-to-server (DEALER to ROUTER) | |
* | |
* While this example runs in a single process, that is just to make | |
* it easier to start and stop the example. Each task has its own | |
* context and conceptually acts as a separate process. | |
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com> | |
*/ | |
include 'zmsg.php'; | |
/* --------------------------------------------------------------------- | |
* This is our client task | |
* It connects to the server, and then sends a request once per second | |
* It collects responses as they arrive, and it prints them out. We will | |
* run several client tasks in parallel, each with a different random ID. | |
*/ | |
function client_task() | |
{ | |
$context = new ZMQContext(); | |
$client = new ZMQSocket($context, ZMQ::SOCKET_DEALER); | |
// Generate printable identity for the client | |
$identity = sprintf ("%04X", rand(0, 0x10000)); | |
$client->setSockOpt(ZMQ::SOCKOPT_IDENTITY, $identity); | |
$client->connect("tcp://localhost:5570"); | |
$read = $write = array(); | |
$poll = new ZMQPoll(); | |
$poll->add($client, ZMQ::POLL_IN); | |
$request_nbr = 0; | |
while (true) { | |
// Tick once per second, pulling in arriving messages | |
for ($centitick = 0; $centitick < 100; $centitick++) { | |
$events = $poll->poll($read, $write, 1000); | |
$zmsg = new Zmsg($client); | |
if ($events) { | |
$zmsg->recv(); | |
printf ("%s: %s%s", $identity, $zmsg->body(), PHP_EOL); | |
} | |
} | |
$zmsg = new Zmsg($client); | |
$zmsg->body_fmt("request #%d", ++$request_nbr)->send(); | |
} | |
} | |
/* --------------------------------------------------------------------- | |
* This is our server task | |
* It uses the multithreaded server model to deal requests out to a pool | |
* of workers and route replies back to clients. One worker can handle | |
* one request at a time but one client can talk to multiple workers at | |
* once. | |
*/ | |
function server_task() | |
{ | |
// Launch pool of worker threads, precise number is not critical | |
for ($thread_nbr = 0; $thread_nbr < 5; $thread_nbr++) { | |
$pid = pcntl_fork(); | |
if ($pid == 0) { | |
server_worker(); | |
exit(); | |
} | |
} | |
$context = new ZMQContext(); | |
// Frontend socket talks to clients over TCP | |
$frontend = new ZMQSocket($context, ZMQ::SOCKET_ROUTER); | |
$frontend->bind("tcp://*:5570"); | |
// Backend socket talks to workers over ipc | |
$backend = new ZMQSocket($context, ZMQ::SOCKET_DEALER); | |
$backend->bind("ipc://backend"); | |
// Connect backend to frontend via a queue device | |
// We could do this: | |
// $device = new ZMQDevice($frontend, $backend); | |
// But doing it ourselves means we can debug this more easily | |
$read = $write = array(); | |
// Switch messages between frontend and backend | |
while (true) { | |
$poll = new ZMQPoll(); | |
$poll->add($frontend, ZMQ::POLL_IN); | |
$poll->add($backend, ZMQ::POLL_IN); | |
$poll->poll($read, $write); | |
foreach ($read as $socket) { | |
$zmsg = new Zmsg($socket); | |
$zmsg->recv(); | |
if ($socket === $frontend) { | |
//echo "Request from client:"; | |
//echo $zmsg->__toString(); | |
$zmsg->set_socket($backend)->send(); | |
} elseif ($socket === $backend) { | |
//echo "Request from worker:"; | |
//echo $zmsg->__toString(); | |
$zmsg->set_socket($frontend)->send(); | |
} | |
} | |
} | |
} | |
function server_worker() | |
{ | |
$context = new ZMQContext(); | |
$worker = new ZMQSocket($context, ZMQ::SOCKET_DEALER); | |
$worker->connect("ipc://backend"); | |
$zmsg = new Zmsg($worker); | |
while (true) { | |
// The DEALER socket gives us the address envelope and message | |
$zmsg->recv(); | |
assert($zmsg->parts() == 2); | |
// Send 0..4 replies back | |
$replies = rand(0,4); | |
for ($reply = 0; $reply < $replies; $reply++) { | |
// Sleep for some fraction of a second | |
usleep(rand(0,1000) + 1); | |
$zmsg->send(Zmsg::NOCLEAR); | |
} | |
} | |
} | |
/* This main thread simply starts several clients, and a server, and then | |
* waits for the server to finish. | |
*/ | |
function main() | |
{ | |
for ($num_clients = 0; $num_clients < 3; $num_clients++) { | |
$pid = pcntl_fork(); | |
if ($pid == 0) { | |
client_task(); | |
exit(); | |
} | |
} | |
$pid = pcntl_fork(); | |
if ($pid == 0) { | |
server_task(); | |
exit(); | |
} | |
} | |
main(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment