Skip to content

Instantly share code, notes, and snippets.

@danielecr
Created July 31, 2019 19:31
Show Gist options
  • Save danielecr/8edcbe33d6b7ab394edb90f9fb9e4149 to your computer and use it in GitHub Desktop.
Save danielecr/8edcbe33d6b7ab394edb90f9fb9e4149 to your computer and use it in GitHub Desktop.
Compare node with reactphp
const zmq = require('zeromq-stable');
const messageToJson = (request) => {
try {
return JSON.parse(request.toString());
} catch (err) {
return { error: {msg: 'error while parsing', detail: err.toString}}
}
}
const objToMessage = (obj) => JSON.stringify(obj);
const requester = (target) => (obj) => new Promise( (resolve, reject) => {
let requester = zmq.socket('req');
requester.on("message", (reply) => {
requester.close()
resolve(messageToJson(reply));
});
requester.on('error', (err) => {
requester.close()
reject(err.toString());
})
requester.connect(target);
requester.send(objToMessage(obj));
})
const MessageIt = (value) => {
const message = {"cmd":"/ping", data: value};
return requester(`tcp://localhost:5559`)(message)
}
const to_map = [ 1,2,3,4,5,9,14,21,36,8,1,2,3,76,41,1,1,1,1,2];
let promises = to_map.map(MessageIt);
Promise.all(promises).then(results=>{
console.log(results);
})
<?php
require "vendor/autoload.php";
$loop = React\EventLoop\Factory::create();
$context = new React\ZMQ\Context($loop);
function prequest($serviceaddr, $message)
{
global $context;
$deferred = new \React\Promise\Deferred();
$client = $context->getSocket(\ZMQ::SOCKET_REQ);
$client->setSockOpt(\ZMQ::SOCKOPT_TCP_KEEPALIVE,0);
$client->setSockOpt(\ZMQ::SOCKOPT_LINGER, 60000);
if (is_object($message) || is_array($message)) {
$message = json_encode($message);
}
$client->connect($serviceaddr);
$client->send($message);
$client->on('message', function($response) use($deferred, $client) {
$response_obj = json_decode($response);
$deferred->resolve($response_obj);
$client->close();
});
$client->on('error', function($error) use($deferred, $client) {
if($error instanceof \Exception) {
print "ERROR: ".$error->getMessage() ."\n";
$deferred->reject($error->getMessage());
} else {
$type = gettype($error);
if('object'===$type) $type = get_class($error);
$deferred->reject($type);
}
$client->close();
});
return $deferred->promise();
}
$setup_data = json_decode(file_get_contents('./zsqlservice_def.json'));
$setup_msg = [
"cmd" => "/service_rep_add",
"data" => $setup_data
];
$requester = new \ZSSW\Requester();
$to_map = [ 1,2,3,4,5,9,14,21,36,8,1,2,3,76,41,1,1,1,1,2];
$sendMsg = function($i) {
$msg = [ "cmd" => "/ping", "data" => $i];
prequest('tcp://localhost:5559', json_encode($msg));
};
$promises = array_map($sendMsg, $to_map);
print "BEFORE: " .time(). "\n";
\React\Promise\all($promises)->then(function ($results) {
print_r($results);
print "TIME: " .time(). "\n";
});
$loop->run();
const zmq = require('zeromq-stable');
const responder = zmq.socket('router');
const port = 5559;
responder.bind(`tcp://*:${port}`)
const { messageToJson, objToMessage, makeErrorMessage } = require('../message_to_json');
const rand = (min,max) => Math.floor(Math.random() * (+max - +min)) + +min;
responder.on('message', (identity, delimiter, request) => {
try {
const msg = messageToJson(request);
console.log('msg', msg);
const aRand = rand(1,4)*1000;
console.log('aRand', aRand);
setTimeout(function() {
const doubleIt = { error:null, data: msg.data * 2};
responder.send([identity, '', objToMessage(doubleIt)]);
}, aRand)
} catch(err) {
const response = {error:err}
console.log('last catch', response)
responder.send([identity, '', objToMessage(response)]);
}
});
process.on('SIGTERM', () => {
console.log('Received SIGTERM. Termination.');
responder.close();
process.exit(0)
});
<?php
require "vendor/autoload.php";
$loop = React\EventLoop\Factory::create();
$context = new React\ZMQ\Context($loop);
$service = $context->getSocket(ZMQ::SOCKET_ROUTER);
$service->bind('tcp://0.0.0.0:5559');
$service->setSockOpt(\ZMQ::SOCKOPT_TCP_KEEPALIVE,0);
$service->on('error', function ($e) {
global $loop;
var_dump($e->getMessage());
// eventually exit ? this way:
$loop->stop();
});
function random_interval($request)
{
$deferred = new \React\Promise\Deferred();
global $loop;
$ra = rand(1,4);
$loop->addTimer($ra, function() use($request, $deferred) {
$deferred->resolve($request->data*2);
});
return $deferred->promise();
}
$service->on('messages', function ($message) {
list($identity, $empty, $msg) = $message;
global $service, $handler;
print "$identity . $empty . $msg\n";
try {
$request = json_decode($msg);
print_r($request);
// do the work
random_interval($request)->then(function($response) use($service, $identity) {
$service->send([$identity, '', json_encode($response)]);
})->otherwise(function ($reason) {
print_r($reason);
$service->send([$identity, '', json_encode(["error"=>"some error"])]);
});
} catch (Exception $e) {
$error = new \stdClass;
$error->message = $e->getMessage();
$error->file = $e->getFile();
$error->line = $e->getLine();
$service->send(json_encode($error));
}
});
$loop->run();
@danielecr
Copy link
Author

composer.json require:

    "react/zmq": "^0.4.0",
    "react/promise": "^2.7",

also enable uv for php for the same performance as nodejs, some hints in:
https://hub.docker.com/r/starsellersworld/reactphpzeromq/dockerfile

nodejs:

npm i zeromq-stable

(a very long process)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment