<?php $opt = getopt('', ['host:', 'port:', 'skey:', 'group:', 'consumer:', 'sleep:']); $host = isset($opt['host']) ? $opt['host'] : 'localhost'; $port = isset($opt['port']) ? $opt['port'] : '6379'; $skey = isset($opt['skey']) ? $opt['skey'] : 'stream'; $group = isset($opt['group']) ? $opt['group'] : 'group'; $consumer = isset($opt['consumer']) ? $opt['consumer'] : 'Alice'; $sleep = isset($opt['sleep']) ? $opt['sleep'] : 100000; $obj_r = new Redis(); $obj_r->connect($host, $port); if (!$obj_r->isConnected()) { fprintf(stderr, "Error: Cannot connect to redis at $host:$port\n"); exit(-1); } /* Make sure phpredis has stream support */ if (!method_exists($obj_r, 'xAdd')) { fprintf(STDERR, "Error: You have to run a phpredis version with stream support\n"); exit(-1); } /* Finally make sure Redis has stream support */ if ($obj_r->xLen('not_a_key: ' . uniqid()) === false) { fprintf(STDERR, "Error: Must connect to a redis-server with stream support!\n"); exit(-1); } $obj_r->xGroup('CREATE', $skey, $group, '0'); echo "Consuming messages from '$skey' with (group: $group, consumer: $consumer)\n"; $prefix = "[$group::$consumer] "; while (true) { try { $ids = $obj_r->xReadGroup($group, $consumer, [$skey => '>']); if ($ids) { /* We're just looking at one stream */ $ids = $ids[$skey]; foreach ($ids as $id => $payload) { $msg = "id: " . $payload['id'] . ", payload: '" . $payload['payload'] . "'"; echo "$prefix: $id => [$msg]\n"; } $obj_r->xAck($skey, $group, array_keys($ids)); } usleep($sleep); } catch(Exception $ex) { echo "Exception: " . $ex->getMessage() . "\n"; } }