Skip to content

Instantly share code, notes, and snippets.

@wodCZ
Created October 3, 2017 08:41
Show Gist options
  • Save wodCZ/14890ad255dc736e8e1a2600cc379601 to your computer and use it in GitHub Desktop.
Save wodCZ/14890ad255dc736e8e1a2600cc379601 to your computer and use it in GitHub Desktop.
<?php
/**
* Simple message queue implementation.
*/
class Extra_MessageQueue
{
private $maxRunTime;
private $startTime;
private $queue;
/** @var int */
private $messageQueueKey;
/**
* @param int $messageQueueKey
* @param int $maxRunTime in seconds, after this time, if there are no messages to process, exit loop
*/
public function __construct($messageQueueKey, $maxRunTime = 3600)
{
$this->startTime = time();
$this->maxRunTime = $maxRunTime;
$this->messageQueueKey = $messageQueueKey;
$this->connect();
}
/**
* @param $message
* @param int $priority from range 1 = highest to 100 = lowest
* @return bool
* @throws \RuntimeException on macOS, there is limit of 40 msgs / 2048 bytes per message queue and it can't be changed.
* On Linux run `ipcs -l` to see current limits.
*/
public function publish($message, $priority = 1)
{
if (@msg_send($this->queue, $priority, $message, true, false, $errorCode)) { // @ - intentional
return true;
}
// sending failed, try recreating it, if it's empty
if ($this->getMessageCount() === 0) {
$this->restartQueue();
if ( ! @msg_send($this->queue, $priority, $message, true, false, $errorCode)) {
$lastError = error_get_last();
$errmessage = @$lastError['message'] ?: 'No error message';
throw new RuntimeException("Queue {$this->messageQueueKey} is broken, " .
'error returned by msg_send:' . $errmessage);
}
return true;
}
throw new RuntimeException("{$this->messageQueueKey} is broken, and is still processing messages.");
}
public function subscribe(callable $callback)
{
while(time() < $this->startTime + $this->maxRunTime) {
while ($this->hasMessages()) { // check if there are some messages
try {
$message = $this->receiveMessage();
} catch (RuntimeException $e){
if($e->getCode() === MSG_ENOMSG) { // no messages, someone fetched it before us
continue; // that's fine, go on and wait for more messages
}
throw $e; // anything else is not fine
}
$callback($message);
}
sleep(1);
}
}
public function getMessageCount()
{
return $this->getQueueInfo()['msg_qnum'];
}
public function getQueueInfo()
{
return msg_stat_queue($this->queue);
}
public function destroyQueue()
{
msg_remove_queue($this->queue);
}
private function restartQueue()
{
$this->destroyQueue();
$this->connect();
}
private function hasMessages()
{
return $this->getMessageCount() > 0;
}
private function receiveMessage()
{
$msgType = $message = $errcode = null;
if ( ! msg_receive($this->queue, -100, $msgType, 10000, $message, true, MSG_IPC_NOWAIT, $errcode)) {
throw new RuntimeException("Failed to receive message from queue with code {$errcode}", $errcode);
}
return $message;
}
private function connect()
{
$this->queue = msg_get_queue($this->messageQueueKey);
}
}
<?php
$queueKey = 123;
$maxSubscribeTime = 3600;
$messageQueue = new Extra_MessageQueue($queueKey, $maxSubscribeTime);
$message = time();
$messageQueue->publish($message);
<?php
$queueKey = 123;
$maxSubscribeTime = 3600;
$messageQueue = new Extra_MessageQueue($queueKey, $maxSubscribeTime);
$onMessage = function ($message) {
var_dump($message);
};
// this call is synchronous, nothing after that will run for $maxSubscribeTime seconds
$messageQueue->subscribe($onMessage);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment