Skip to content

Instantly share code, notes, and snippets.

@methodmissing
Created February 3, 2011 01:46
Show Gist options
  • Save methodmissing/808892 to your computer and use it in GitHub Desktop.
Save methodmissing/808892 to your computer and use it in GitHub Desktop.
diff --git a/src/db.c b/src/db.c
index 669fba1..6ae83d3 100644
--- a/src/db.c
+++ b/src/db.c
@@ -514,7 +514,8 @@ void propagateExpire(redisDb *db, robj *key) {
argv[0] = createStringObject("DEL",3);
argv[1] = key;
incrRefCount(key);
-
+ if (db->expire_channel)
+ pubsubPublishMessage(db->expire_channel,key);
if (server.appendonly)
feedAppendOnlyFile(server.delCommand,db->id,argv,2);
if (listLength(server.slaves))
@@ -615,3 +616,27 @@ void persistCommand(redisClient *c) {
}
}
}
+
+void recycleCommand(redisClient *c) {
+ c->db->expire_channel = c->argv[1];
+ subscribeCommand(c);
+}
+
+void recycletoCommand(redisClient *c) {
+ if (c->db->expire_channel == NULL) {
+ addReply(c,shared.ok);
+ } else {
+ addReply(c,c->db->expire_channel);
+ }
+}
+
+void disposeCommand(redisClient *c) {
+ if (c->db->expire_channel == NULL) {
+ addReply(c,shared.err);
+ } else {
+ c->argc = 1;
+ c->argv[1] = c->db->expire_channel;
+ c->db->expire_channel = NULL;
+ unsubscribeCommand(c);
+ }
+}
diff --git a/src/help.h b/src/help.h
index 51613c9..ba1df07 100644
--- a/src/help.h
+++ b/src/help.h
@@ -632,7 +632,22 @@ struct commandHelp {
"destination numkeys key [key ...] [WEIGHTS weight] [AGGREGATE SUM|MIN|MAX]",
"Add multiple sorted sets and store the resulting sorted set in a new key",
4,
- "1.3.10" }
+ "1.3.10" },
+ { "RECYCLE",
+ "channel",
+ "Define a Dead Letter Channel to publish expired keys to",
+ 6,
+ "0.1" },
+ { "RECYCLETO",
+ "-",
+ "Return the Dead Letter Channel expired keys is published to",
+ 6,
+ "0.1" },
+ { "DISPOSE",
+ "-",
+ "Remove a registered Dead Letter Channel and stop broadcasting key expiry",
+ 6,
+ "0.1" }
};
#endif
diff --git a/src/redis.c b/src/redis.c
index 5548d1c..52fc749 100644
--- a/src/redis.c
+++ b/src/redis.c
@@ -187,7 +187,10 @@ struct redisCommand redisCommandTable[] = {
{"punsubscribe",punsubscribeCommand,-1,0,NULL,0,0,0,0,0},
{"publish",publishCommand,3,REDIS_CMD_FORCE_REPLICATION,NULL,0,0,0,0,0},
{"watch",watchCommand,-2,0,NULL,0,0,0,0,0},
- {"unwatch",unwatchCommand,1,0,NULL,0,0,0,0,0}
+ {"unwatch",unwatchCommand,1,0,NULL,0,0,0,0,0},
+ {"recycle",recycleCommand,2,0,NULL,1,1,1,0,0},
+ {"recycleto",recycletoCommand,1,0,NULL,0,0,0,0,0},
+ {"dispose",disposeCommand,1,0,NULL,0,0,0,0,0}
};
/*============================ Utility functions ============================ */
@@ -886,6 +889,7 @@ void initServer() {
server.db[j].expires = dictCreate(&keyptrDictType,NULL);
server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
+ server.db[j].expire_channel = NULL;
if (server.ds_enabled) {
server.db[j].io_keys = dictCreate(&keylistDictType,NULL);
server.db[j].io_negcache = dictCreate(&setDictType,NULL);
@@ -1052,8 +1056,9 @@ int processCommand(redisClient *c) {
if ((dictSize(c->pubsub_channels) > 0 || listLength(c->pubsub_patterns) > 0)
&&
cmd->proc != subscribeCommand && cmd->proc != unsubscribeCommand &&
- cmd->proc != psubscribeCommand && cmd->proc != punsubscribeCommand) {
- addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context");
+ cmd->proc != psubscribeCommand && cmd->proc != punsubscribeCommand &&
+ cmd->proc != disposeCommand) {
+ addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT / DISPOSE allowed in this context");
return REDIS_OK;
}
diff --git a/src/redis.h b/src/redis.h
index 5ae9cc1..886bb40 100644
--- a/src/redis.h
+++ b/src/redis.h
@@ -279,6 +279,7 @@ typedef struct redisDb {
dict *io_negcache; /* Negative caching for disk store */
dict *io_queued; /* Queued IO operations hash table */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
+ robj *expire_channel; /* Dead Letter Channel for expired keys */
int id;
} redisDb;
@@ -873,6 +874,7 @@ int pubsubUnsubscribeAllChannels(redisClient *c, int notify);
int pubsubUnsubscribeAllPatterns(redisClient *c, int notify);
void freePubsubPattern(void *p);
int listMatchPubsubPattern(void *a, void *b);
+int pubsubPublishMessage(robj *channel, robj *message);
/* Utility functions */
int stringmatchlen(const char *pattern, int patternLen,
@@ -1032,6 +1034,9 @@ void punsubscribeCommand(redisClient *c);
void publishCommand(redisClient *c);
void watchCommand(redisClient *c);
void unwatchCommand(redisClient *c);
+void recycleCommand(redisClient *c);
+void recycletoCommand(redisClient *c);
+void disposeCommand(redisClient *c);
#if defined(__GNUC__)
void *calloc(size_t count, size_t size) __attribute__ ((deprecated));
diff --git a/tests/unit/pubsub.tcl b/tests/unit/pubsub.tcl
index c8b547b..c22952e 100644
--- a/tests/unit/pubsub.tcl
+++ b/tests/unit/pubsub.tcl
@@ -192,4 +192,30 @@ start_server {tags {"pubsub"}} {
# clean up clients
$rd1 close
}
-}
\ No newline at end of file
+
+ test "RECYCLE on expire" {
+ # Set keys with a TTL and recycle them to a dead letter channel
+ assert_equal "OK" [r set x foo]
+ assert_equal "OK" [r set y bar]
+ assert_equal 1 [r expire x 1]
+ assert_equal 1 [r expire y 2]
+ assert_equal "subscribe expired 1" [r recycle expired]
+
+ after 3000
+ assert_equal {message expired x} [r read]
+ assert_equal {message expired y} [r read]
+ }
+
+ test "DISPOSE with a dead letter channel set" {
+ assert_equal "unsubscribe expired 0" [r dispose]
+ }
+
+ test "DISPOSE without a dead letter channel set" {
+ catch {r dispose} err
+ set _ $err
+ } {ERR}
+
+ test {RECYCLETO without a dead letter channel set} {
+ r recycleto
+ } {OK}
+}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment