<?php

$opt = getopt('', ['host:', 'port:', 'skey:', 'group:', 'consumer:', 'sleep:']);

$host = isset($opt['host']) ? $opt['host'] : 'localhost';
$port = isset($opt['port']) ? $opt['port'] : '6379';
$skey = isset($opt['skey']) ? $opt['skey'] : 'stream';
$group = isset($opt['group']) ? $opt['group'] : 'group';
$consumer = isset($opt['consumer']) ? $opt['consumer'] : 'Alice';
$sleep = isset($opt['sleep']) ? $opt['sleep'] : 100000;

$obj_r = new Redis();
$obj_r->connect($host, $port);
if (!$obj_r->isConnected()) {
    fprintf(stderr, "Error:  Cannot connect to redis at $host:$port\n");
    exit(-1);
}

/*  Make sure phpredis has stream support */
if (!method_exists($obj_r, 'xAdd')) {
    fprintf(STDERR, "Error:  You have to run a phpredis version with stream support\n");
    exit(-1);
}

/* Finally make sure Redis has stream support */
if ($obj_r->xLen('not_a_key: ' . uniqid()) === false) {
    fprintf(STDERR, "Error:  Must connect to a redis-server with stream support!\n");
    exit(-1);
}

$obj_r->xGroup('CREATE', $skey, $group, '0');

echo "Consuming messages from '$skey' with (group: $group, consumer: $consumer)\n";
$prefix = "[$group::$consumer] ";

while (true) {
    try {
        $ids = $obj_r->xReadGroup($group, $consumer, [$skey => '>']);
        if ($ids) {
            /* We're just looking at one stream */
            $ids = $ids[$skey];

            foreach ($ids as $id => $payload) {
                $msg = "id: " . $payload['id'] . ", payload: '" . $payload['payload'] . "'";
                echo "$prefix: $id => [$msg]\n";
            }
            $obj_r->xAck($skey, $group, array_keys($ids));
        }

        usleep($sleep);
    } catch(Exception $ex) {
        echo "Exception: " . $ex->getMessage() . "\n";
    }
}