Created
June 20, 2018 00:38
-
-
Save michael-grunder/910b4a7328f55750d023fc928179494e to your computer and use it in GitHub Desktop.
Example producer and consumer for the new Redis 5.0 stream API
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
$opt = getopt('', ['host:', 'port:', 'skey:', 'group:', 'consumer:', 'sleep:']); | |
$host = isset($opt['host']) ? $opt['host'] : 'localhost'; | |
$port = isset($opt['port']) ? $opt['port'] : '6379'; | |
$skey = isset($opt['skey']) ? $opt['skey'] : 'stream'; | |
$group = isset($opt['group']) ? $opt['group'] : 'group'; | |
$consumer = isset($opt['consumer']) ? $opt['consumer'] : 'Alice'; | |
$sleep = isset($opt['sleep']) ? $opt['sleep'] : 100000; | |
$obj_r = new Redis(); | |
$obj_r->connect($host, $port); | |
if (!$obj_r->isConnected()) { | |
fprintf(stderr, "Error: Cannot connect to redis at $host:$port\n"); | |
exit(-1); | |
} | |
/* Make sure phpredis has stream support */ | |
if (!method_exists($obj_r, 'xAdd')) { | |
fprintf(STDERR, "Error: You have to run a phpredis version with stream support\n"); | |
exit(-1); | |
} | |
/* Finally make sure Redis has stream support */ | |
if ($obj_r->xLen('not_a_key: ' . uniqid()) === false) { | |
fprintf(STDERR, "Error: Must connect to a redis-server with stream support!\n"); | |
exit(-1); | |
} | |
$obj_r->xGroup('CREATE', $skey, $group, '0'); | |
echo "Consuming messages from '$skey' with (group: $group, consumer: $consumer)\n"; | |
$prefix = "[$group::$consumer] "; | |
while (true) { | |
try { | |
$ids = $obj_r->xReadGroup($group, $consumer, [$skey => '>']); | |
if ($ids) { | |
/* We're just looking at one stream */ | |
$ids = $ids[$skey]; | |
foreach ($ids as $id => $payload) { | |
$msg = "id: " . $payload['id'] . ", payload: '" . $payload['payload'] . "'"; | |
echo "$prefix: $id => [$msg]\n"; | |
} | |
$obj_r->xAck($skey, $group, array_keys($ids)); | |
} | |
usleep($sleep); | |
} catch(Exception $ex) { | |
echo "Exception: " . $ex->getMessage() . "\n"; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
$opt = getopt('', ['host:', 'port:', 'skey:', 'sleep:']); | |
$host = isset($opt['host']) ? $opt['host'] : 'localhost'; | |
$port = isset($opt['port']) ? $opt['port'] : '6379'; | |
$skey = isset($opt['skey']) ? $opt['skey'] : 'stream'; | |
$sleep = isset($opt['sleep']) ? $opt['sleep'] : 100000; | |
$obj_r = new Redis(); | |
$obj_r->connect($host, $port); | |
if (!$obj_r->isConnected()) { | |
fprintf(STDERR, "Error: Cannot connect to redis at $host:$port\n"); | |
exit(-1); | |
} | |
/* Make sure phpredis has stream support */ | |
if (!method_exists($obj_r, 'xAdd')) { | |
fprintf(STDERR, "Error: You have to run a phpredis version with stream support\n"); | |
exit(-1); | |
} | |
/* Finally make sure Redis has stream support */ | |
if ($obj_r->xLen('not_a_key: ' . uniqid()) === false) { | |
fprintf(STDERR, "Error: Must connect to a redis-server with stream support!\n"); | |
exit(-1); | |
} | |
/* Let the user know where we'll be sending data */ | |
echo "Producing contrived data to key: '$skey'\n"; | |
/* Keep track of messages produced */ | |
$n = 0; | |
/* Just loop forever */ | |
while (true) { | |
$message = [ | |
'id' => $obj_r->incr('__stream_next_id'), | |
'payload' => 'payload:' . uniqid() | |
]; | |
$obj_r->xAdd($skey, '*', $message); | |
if (++$n % 1000 == 0) { | |
echo "Added " . number_format($n) . " messages to stream '$skey' so far\n"; | |
} | |
/* Don't totally spam CPU */ | |
usleep($sleep); | |
} | |
?> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment