Skip to content

Instantly share code, notes, and snippets.

@c93614
Last active August 29, 2015 14:10
Show Gist options
  • Save c93614/b42a77ea7b3b883fc718 to your computer and use it in GitHub Desktop.
Save c93614/b42a77ea7b3b883fc718 to your computer and use it in GitHub Desktop.
Asynchronous client/server in PHP http://zguide.zeromq.org/php:asyncsrv
<?php
// http://zguide.zeromq.org/php:mdcliapi2
/* =====================================================================
* mdcliapi2.c
*
* Majordomo Protocol Client API (async version)
* Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7.
*
* ---------------------------------------------------------------------
* Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
* Copyright other contributors as noted in the AUTHORS file.
*
* This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
*
* This is free software; you can redistribute it and/or modify it under
* the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation; either version 3 of the License, or (at
* your option) any later version.
*
* This software is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this program. If not, see
* <http://www.gnu.org/licenses/>.
* =====================================================================
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/
include_once 'zmsg.php';
include_once 'mdp.php';
class MDCli
{
// Structure of our class
// We access these properties only via class methods
private $broker;
private $context;
private $client; // Socket to broker
private $verbose; // Print activity to stdout
private $timeout; // Request timeout
private $retries; // Request retries
/**
* Constructor
*
* @param string $broker
* @param boolean $verbose
*/
public function __construct($broker, $verbose = false)
{
$this->broker = $broker;
$this->context = new ZMQContext();
$this->verbose = $verbose;
$this->timeout = 2500; // msecs
$this->connect_to_broker();
}
/**
* Connect or reconnect to broker
*/
protected function connect_to_broker()
{
if ($this->client) {
unset($this->client);
}
$this->client = new ZMQSocket($this->context, ZMQ::SOCKET_DEALER);
$this->client->setSockOpt(ZMQ::SOCKOPT_LINGER, 0);
$this->client->connect($this->broker);
if ($this->verbose) {
printf("I: connecting to broker at %s…", $this->broker);
}
}
/**
* Set request timeout
*
* @param int $timeout (msecs)
*/
public function set_timeout($timeout)
{
$this->timeout = $timeout;
}
/**
* Send request to broker
* Takes ownership of request message and destroys it when sent.
*
* @param string $service
* @param Zmsg $request
*/
public function send($service, Zmsg $request)
{
// Prefix request with protocol frames
// Frame 0: empty (REQ emulation)
// Frame 1: "MDPCxy" (six bytes, MDP/Client x.y)
// Frame 2: Service name (printable string)
$request->push($service);
$request->push(MDPC_CLIENT);
$request->push("");
if ($this->verbose) {
printf ("I: send request to '%s' service: %s", $service, PHP_EOL);
echo $request->__toString();
}
$request->set_socket($this->client)->send();
}
/**
* Returns the reply message or NULL if there was no reply. Does not
* attempt to recover from a broker failure, this is not possible
* without storing all unanswered requests and resending them all…
*
*/
public function recv()
{
$read = $write = array();
// Poll socket for a reply, with timeout
$poll = new ZMQPoll();
$poll->add($this->client, ZMQ::POLL_IN);
$events = $poll->poll($read, $write, $this->timeout);
// If we got a reply, process it
if ($events) {
$msg = new Zmsg($this->client);
$msg->recv();
if ($this->verbose) {
echo "I: received reply:", $request->__toString(), PHP_EOL;
}
// Don't try to handle errors, just assert noisily
assert ($msg->parts() >= 4);
$msg->pop(); // empty
$header = $msg->pop();
assert($header == MDPC_CLIENT);
$reply_service = $msg->pop();
return $msg; // Success
} else {
echo "W: permanent error, abandoning request", PHP_EOL;
return; // Give up
}
}
}
<?php
/*
* Asynchronous client-to-server (DEALER to ROUTER)
*
* While this example runs in a single process, that is just to make
* it easier to start and stop the example. Each task has its own
* context and conceptually acts as a separate process.
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/
include 'zmsg.php';
/* ---------------------------------------------------------------------
* This is our client task
* It connects to the server, and then sends a request once per second
* It collects responses as they arrive, and it prints them out. We will
* run several client tasks in parallel, each with a different random ID.
*/
function client_task()
{
$context = new ZMQContext();
$client = new ZMQSocket($context, ZMQ::SOCKET_DEALER);
// Generate printable identity for the client
$identity = sprintf ("%04X", rand(0, 0x10000));
$client->setSockOpt(ZMQ::SOCKOPT_IDENTITY, $identity);
$client->connect("tcp://localhost:5570");
$read = $write = array();
$poll = new ZMQPoll();
$poll->add($client, ZMQ::POLL_IN);
$request_nbr = 0;
while (true) {
// Tick once per second, pulling in arriving messages
for ($centitick = 0; $centitick < 100; $centitick++) {
$events = $poll->poll($read, $write, 1000);
$zmsg = new Zmsg($client);
if ($events) {
$zmsg->recv();
printf ("%s: %s%s", $identity, $zmsg->body(), PHP_EOL);
}
}
$zmsg = new Zmsg($client);
$zmsg->body_fmt("request #%d", ++$request_nbr)->send();
}
}
/* ---------------------------------------------------------------------
* This is our server task
* It uses the multithreaded server model to deal requests out to a pool
* of workers and route replies back to clients. One worker can handle
* one request at a time but one client can talk to multiple workers at
* once.
*/
function server_task()
{
// Launch pool of worker threads, precise number is not critical
for ($thread_nbr = 0; $thread_nbr < 5; $thread_nbr++) {
$pid = pcntl_fork();
if ($pid == 0) {
server_worker();
exit();
}
}
$context = new ZMQContext();
// Frontend socket talks to clients over TCP
$frontend = new ZMQSocket($context, ZMQ::SOCKET_ROUTER);
$frontend->bind("tcp://*:5570");
// Backend socket talks to workers over ipc
$backend = new ZMQSocket($context, ZMQ::SOCKET_DEALER);
$backend->bind("ipc://backend");
// Connect backend to frontend via a queue device
// We could do this:
// $device = new ZMQDevice($frontend, $backend);
// But doing it ourselves means we can debug this more easily
$read = $write = array();
// Switch messages between frontend and backend
while (true) {
$poll = new ZMQPoll();
$poll->add($frontend, ZMQ::POLL_IN);
$poll->add($backend, ZMQ::POLL_IN);
$poll->poll($read, $write);
foreach ($read as $socket) {
$zmsg = new Zmsg($socket);
$zmsg->recv();
if ($socket === $frontend) {
//echo "Request from client:";
//echo $zmsg->__toString();
$zmsg->set_socket($backend)->send();
} elseif ($socket === $backend) {
//echo "Request from worker:";
//echo $zmsg->__toString();
$zmsg->set_socket($frontend)->send();
}
}
}
}
function server_worker()
{
$context = new ZMQContext();
$worker = new ZMQSocket($context, ZMQ::SOCKET_DEALER);
$worker->connect("ipc://backend");
$zmsg = new Zmsg($worker);
while (true) {
// The DEALER socket gives us the address envelope and message
$zmsg->recv();
assert($zmsg->parts() == 2);
// Send 0..4 replies back
$replies = rand(0,4);
for ($reply = 0; $reply < $replies; $reply++) {
// Sleep for some fraction of a second
usleep(rand(0,1000) + 1);
$zmsg->send(Zmsg::NOCLEAR);
}
}
}
/* This main thread simply starts several clients, and a server, and then
* waits for the server to finish.
*/
function main()
{
for ($num_clients = 0; $num_clients < 3; $num_clients++) {
$pid = pcntl_fork();
if ($pid == 0) {
client_task();
exit();
}
}
$pid = pcntl_fork();
if ($pid == 0) {
server_task();
exit();
}
}
main();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment