Skip to content

Instantly share code, notes, and snippets.

@efenderbosch
Last active February 28, 2017 16:18
Show Gist options
  • Save efenderbosch/7e15ff7cc93d3a45dbbf24c2233313fb to your computer and use it in GitHub Desktop.
Save efenderbosch/7e15ff7cc93d3a45dbbf24c2233313fb to your computer and use it in GitHub Desktop.
fetch up to N entries from a redis sorted set
public class Envelope {
private Object body;
private long timestamp;
private int retries;
public Envelope() { }
public Envelope(Object body) {
this(body, System.currentTimeMillis());
}
public Envelope(Object body, long timestamp) {
this(body, timestamp, 0);
}
public Envelope(Object body, long timestamp, int retries) {
this.body = body;
this.timestamp = timestamp;
this.retries = retries;
}
public Object getBody() {
return body;
}
public void setBody(Object body) {
this.body = body;
}
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
public int getRetries() {
return retries;
}
public void setRetries(int retries) {
this.retries = retries;
}
public String toString() {
return ReflectionToStringBuilder.toString(this);
}
}
public List<Envelope> head(int n) {
Set<Tuple> tuples;
List<Response<Long>> responses;
try (Jedis jedis = jedisPool.getResource()) {
tuples = jedis.zrangeByScoreWithScores(key, NEGATIVE_INFINITY, System.currentTimeMillis(), 0, n);
if (tuples == null || tuples.isEmpty()) return Collections.emptyList();
responses = new ArrayList<>(tuples.size());
try (Pipeline pipeline = jedis.pipelined()) {
for (Tuple tuple : tuples) {
responses.add(pipeline.zrem(key, tuple.getBinaryElement()));
}
pipeline.sync();
} catch (IOException e) {
LOG.warn("error closing pipeline", e);
}
}
int i = 0;
List<Envelope> envelopes = new ArrayList<>(tuples.size());
for (Tuple tuple : tuples) {
Response<Long> r = responses.get(i++);
if (r.get() == 1L) {
// this member was successfully deleted from the zset, process it
// if it wasn't successfully deleted, another instance must have deleted it and will process it
byte[] element = tuple.getBinaryElement();
Envelope envelope = deserializer.deserialize(element);
envelopes.add(envelope);
}
}
return envelopes;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment