Created
October 3, 2017 08:41
-
-
Save wodCZ/14890ad255dc736e8e1a2600cc379601 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
/** | |
* Simple message queue implementation. | |
*/ | |
class Extra_MessageQueue | |
{ | |
private $maxRunTime; | |
private $startTime; | |
private $queue; | |
/** @var int */ | |
private $messageQueueKey; | |
/** | |
* @param int $messageQueueKey | |
* @param int $maxRunTime in seconds, after this time, if there are no messages to process, exit loop | |
*/ | |
public function __construct($messageQueueKey, $maxRunTime = 3600) | |
{ | |
$this->startTime = time(); | |
$this->maxRunTime = $maxRunTime; | |
$this->messageQueueKey = $messageQueueKey; | |
$this->connect(); | |
} | |
/** | |
* @param $message | |
* @param int $priority from range 1 = highest to 100 = lowest | |
* @return bool | |
* @throws \RuntimeException on macOS, there is limit of 40 msgs / 2048 bytes per message queue and it can't be changed. | |
* On Linux run `ipcs -l` to see current limits. | |
*/ | |
public function publish($message, $priority = 1) | |
{ | |
if (@msg_send($this->queue, $priority, $message, true, false, $errorCode)) { // @ - intentional | |
return true; | |
} | |
// sending failed, try recreating it, if it's empty | |
if ($this->getMessageCount() === 0) { | |
$this->restartQueue(); | |
if ( ! @msg_send($this->queue, $priority, $message, true, false, $errorCode)) { | |
$lastError = error_get_last(); | |
$errmessage = @$lastError['message'] ?: 'No error message'; | |
throw new RuntimeException("Queue {$this->messageQueueKey} is broken, " . | |
'error returned by msg_send:' . $errmessage); | |
} | |
return true; | |
} | |
throw new RuntimeException("{$this->messageQueueKey} is broken, and is still processing messages."); | |
} | |
public function subscribe(callable $callback) | |
{ | |
while(time() < $this->startTime + $this->maxRunTime) { | |
while ($this->hasMessages()) { // check if there are some messages | |
try { | |
$message = $this->receiveMessage(); | |
} catch (RuntimeException $e){ | |
if($e->getCode() === MSG_ENOMSG) { // no messages, someone fetched it before us | |
continue; // that's fine, go on and wait for more messages | |
} | |
throw $e; // anything else is not fine | |
} | |
$callback($message); | |
} | |
sleep(1); | |
} | |
} | |
public function getMessageCount() | |
{ | |
return $this->getQueueInfo()['msg_qnum']; | |
} | |
public function getQueueInfo() | |
{ | |
return msg_stat_queue($this->queue); | |
} | |
public function destroyQueue() | |
{ | |
msg_remove_queue($this->queue); | |
} | |
private function restartQueue() | |
{ | |
$this->destroyQueue(); | |
$this->connect(); | |
} | |
private function hasMessages() | |
{ | |
return $this->getMessageCount() > 0; | |
} | |
private function receiveMessage() | |
{ | |
$msgType = $message = $errcode = null; | |
if ( ! msg_receive($this->queue, -100, $msgType, 10000, $message, true, MSG_IPC_NOWAIT, $errcode)) { | |
throw new RuntimeException("Failed to receive message from queue with code {$errcode}", $errcode); | |
} | |
return $message; | |
} | |
private function connect() | |
{ | |
$this->queue = msg_get_queue($this->messageQueueKey); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
$queueKey = 123; | |
$maxSubscribeTime = 3600; | |
$messageQueue = new Extra_MessageQueue($queueKey, $maxSubscribeTime); | |
$message = time(); | |
$messageQueue->publish($message); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
$queueKey = 123; | |
$maxSubscribeTime = 3600; | |
$messageQueue = new Extra_MessageQueue($queueKey, $maxSubscribeTime); | |
$onMessage = function ($message) { | |
var_dump($message); | |
}; | |
// this call is synchronous, nothing after that will run for $maxSubscribeTime seconds | |
$messageQueue->subscribe($onMessage); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment