Skip to content

Instantly share code, notes, and snippets.

@sandeepone
Last active September 21, 2015 04:00
Show Gist options
  • Save sandeepone/7eaae78527cd012a7b7c to your computer and use it in GitHub Desktop.
Save sandeepone/7eaae78527cd012a7b7c to your computer and use it in GitHub Desktop.
Async Redis PubSub using HHVM-EXT-UV

RedisSubscribe

This is a truly async POC Redis client using LibUV. The LibUV extension used here is hhvm-ext-uv by RickySu. That extension has some problems with the latest version of HHVM and has a multiloop branch that I compiled and used.

<?hh
class RedisSubscribe {
public Map<string, Vector<(function(string):void)>> $Callbacks = Map{};
public function __construct(private $socket = null){}
public function connect(UVLoop $loop, $server = "127.0.0.1", $port = "6379") {
$this->socket = new UVTcp($loop);
$this->socket->connect($server, $port, null);
$this->socket->setCallback(function(UVTcp $socket, string $Contents) {
$Contents = static::Parse($Contents)['Value'];
if (is_array($Contents) && $Contents[0] === 'message') {
// It's a channel publish
// 0 => message, 1 => CHANNEL, 2 => CONTENT
if ($this->Callbacks->contains($Contents[1])) {
foreach ($this->Callbacks[$Contents[1]] as $Callback) {
$Callback($Contents[2]);
}
}
}
}, null, null);
}
public function close():void {
$this->socket->close();
}
public function subscribe(string $Channel, (function(string):void) $Callback):void {
if (!$this->Callbacks->contains($Channel)) {
$this->Callbacks->set($Channel, Vector{});
$this->socket->write("*2\r\n$9\r\nSUBSCRIBE\r\n$".strlen($Channel)."\r\n$Channel\r\n");
}
$this->Callbacks->get($Channel)?->add($Callback);
}
public function unsubscribe(string $Channel):void {
if (!$this->Callbacks->contains($Channel)) {
return ;
}
$this->Callbacks->remove($Channel);
$this->socket->write("*2\r\n$11\r\nUNSUBSCRIBE\r\n$".strlen($Channel)."\r\n$Channel\r\n");
}
public static function Parse($Content) {
// From: https://github.com/steelbrain/Redis-Proto
$Type = substr($Content, 0, 1);
$Index = (int) strpos($Content, "\r\n");
$Count = (int) substr($Content, 1, $Index - 1);
$Offset = $Index + 2;
if ($Type === '*') {
$ToReturn = [];
for ($i = 0; $i < $Count; ++$i) {
$Entry = static::Parse(substr($Content, $Offset));
$ToReturn[] = $Entry['Value'];
$Offset += $Entry['Offset'];
}
return ['Value' => $ToReturn, 'Offset' => $Offset];
} else if ($Type === '$') {
return $Count === -1 ?
['Value' => null, 'Offset' => $Offset] :
['Value' => substr($Content, $Index + 2, $Count), 'Offset' => $Offset + $Count + 2];
} else if ($Type === '-') {
// Ignore errors
return ['Value' => null, 'Offset' => $Offset];
} else if ($Type === '+') {
return ['Value' => substr($Content, 1, $Index - 1), 'Offset' => $Offset];
} else if ($Type === ':') {
return ['Value' => (int) substr($Content, 1, $Index - 1), 'Offset' => $Offset];
} else {
return ['Value' => null, 'Offset' => $Offset];
// Ignore unknown types for now
}
}
}
<?php
require(__DIR__.'/Redis.hh');
$loop = new UVLoop();
$Redis = new RedisSubscribe();
$Redis->connect($loop);
$Redis->subscribe("TEST", function($Message) {
echo "Got Message\t", $Message, "\n";
});
$Redis->subscribe("TEST2", function($Message) use ($Redis) {
echo "Another\t", $Message, "\n";
$Redis->unsubscribe('TEST2');
});
$loop->run();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment