Created
February 3, 2011 01:46
-
-
Save methodmissing/808892 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/src/db.c b/src/db.c | |
index 669fba1..6ae83d3 100644 | |
--- a/src/db.c | |
+++ b/src/db.c | |
@@ -514,7 +514,8 @@ void propagateExpire(redisDb *db, robj *key) { | |
argv[0] = createStringObject("DEL",3); | |
argv[1] = key; | |
incrRefCount(key); | |
- | |
+ if (db->expire_channel) | |
+ pubsubPublishMessage(db->expire_channel,key); | |
if (server.appendonly) | |
feedAppendOnlyFile(server.delCommand,db->id,argv,2); | |
if (listLength(server.slaves)) | |
@@ -615,3 +616,27 @@ void persistCommand(redisClient *c) { | |
} | |
} | |
} | |
+ | |
+void recycleCommand(redisClient *c) { | |
+ c->db->expire_channel = c->argv[1]; | |
+ subscribeCommand(c); | |
+} | |
+ | |
+void recycletoCommand(redisClient *c) { | |
+ if (c->db->expire_channel == NULL) { | |
+ addReply(c,shared.ok); | |
+ } else { | |
+ addReply(c,c->db->expire_channel); | |
+ } | |
+} | |
+ | |
+void disposeCommand(redisClient *c) { | |
+ if (c->db->expire_channel == NULL) { | |
+ addReply(c,shared.err); | |
+ } else { | |
+ c->argc = 1; | |
+ c->argv[1] = c->db->expire_channel; | |
+ c->db->expire_channel = NULL; | |
+ unsubscribeCommand(c); | |
+ } | |
+} | |
diff --git a/src/help.h b/src/help.h | |
index 51613c9..ba1df07 100644 | |
--- a/src/help.h | |
+++ b/src/help.h | |
@@ -632,7 +632,22 @@ struct commandHelp { | |
"destination numkeys key [key ...] [WEIGHTS weight] [AGGREGATE SUM|MIN|MAX]", | |
"Add multiple sorted sets and store the resulting sorted set in a new key", | |
4, | |
- "1.3.10" } | |
+ "1.3.10" }, | |
+ { "RECYCLE", | |
+ "channel", | |
+ "Define a Dead Letter Channel to publish expired keys to", | |
+ 6, | |
+ "0.1" }, | |
+ { "RECYCLETO", | |
+ "-", | |
+ "Return the Dead Letter Channel expired keys is published to", | |
+ 6, | |
+ "0.1" }, | |
+ { "DISPOSE", | |
+ "-", | |
+ "Remove a registered Dead Letter Channel and stop broadcasting key expiry", | |
+ 6, | |
+ "0.1" } | |
}; | |
#endif | |
diff --git a/src/redis.c b/src/redis.c | |
index 5548d1c..52fc749 100644 | |
--- a/src/redis.c | |
+++ b/src/redis.c | |
@@ -187,7 +187,10 @@ struct redisCommand redisCommandTable[] = { | |
{"punsubscribe",punsubscribeCommand,-1,0,NULL,0,0,0,0,0}, | |
{"publish",publishCommand,3,REDIS_CMD_FORCE_REPLICATION,NULL,0,0,0,0,0}, | |
{"watch",watchCommand,-2,0,NULL,0,0,0,0,0}, | |
- {"unwatch",unwatchCommand,1,0,NULL,0,0,0,0,0} | |
+ {"unwatch",unwatchCommand,1,0,NULL,0,0,0,0,0}, | |
+ {"recycle",recycleCommand,2,0,NULL,1,1,1,0,0}, | |
+ {"recycleto",recycletoCommand,1,0,NULL,0,0,0,0,0}, | |
+ {"dispose",disposeCommand,1,0,NULL,0,0,0,0,0} | |
}; | |
/*============================ Utility functions ============================ */ | |
@@ -886,6 +889,7 @@ void initServer() { | |
server.db[j].expires = dictCreate(&keyptrDictType,NULL); | |
server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL); | |
server.db[j].watched_keys = dictCreate(&keylistDictType,NULL); | |
+ server.db[j].expire_channel = NULL; | |
if (server.ds_enabled) { | |
server.db[j].io_keys = dictCreate(&keylistDictType,NULL); | |
server.db[j].io_negcache = dictCreate(&setDictType,NULL); | |
@@ -1052,8 +1056,9 @@ int processCommand(redisClient *c) { | |
if ((dictSize(c->pubsub_channels) > 0 || listLength(c->pubsub_patterns) > 0) | |
&& | |
cmd->proc != subscribeCommand && cmd->proc != unsubscribeCommand && | |
- cmd->proc != psubscribeCommand && cmd->proc != punsubscribeCommand) { | |
- addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context"); | |
+ cmd->proc != psubscribeCommand && cmd->proc != punsubscribeCommand && | |
+ cmd->proc != disposeCommand) { | |
+ addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT / DISPOSE allowed in this context"); | |
return REDIS_OK; | |
} | |
diff --git a/src/redis.h b/src/redis.h | |
index 5ae9cc1..886bb40 100644 | |
--- a/src/redis.h | |
+++ b/src/redis.h | |
@@ -279,6 +279,7 @@ typedef struct redisDb { | |
dict *io_negcache; /* Negative caching for disk store */ | |
dict *io_queued; /* Queued IO operations hash table */ | |
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */ | |
+ robj *expire_channel; /* Dead Letter Channel for expired keys */ | |
int id; | |
} redisDb; | |
@@ -873,6 +874,7 @@ int pubsubUnsubscribeAllChannels(redisClient *c, int notify); | |
int pubsubUnsubscribeAllPatterns(redisClient *c, int notify); | |
void freePubsubPattern(void *p); | |
int listMatchPubsubPattern(void *a, void *b); | |
+int pubsubPublishMessage(robj *channel, robj *message); | |
/* Utility functions */ | |
int stringmatchlen(const char *pattern, int patternLen, | |
@@ -1032,6 +1034,9 @@ void punsubscribeCommand(redisClient *c); | |
void publishCommand(redisClient *c); | |
void watchCommand(redisClient *c); | |
void unwatchCommand(redisClient *c); | |
+void recycleCommand(redisClient *c); | |
+void recycletoCommand(redisClient *c); | |
+void disposeCommand(redisClient *c); | |
#if defined(__GNUC__) | |
void *calloc(size_t count, size_t size) __attribute__ ((deprecated)); | |
diff --git a/tests/unit/pubsub.tcl b/tests/unit/pubsub.tcl | |
index c8b547b..c22952e 100644 | |
--- a/tests/unit/pubsub.tcl | |
+++ b/tests/unit/pubsub.tcl | |
@@ -192,4 +192,30 @@ start_server {tags {"pubsub"}} { | |
# clean up clients | |
$rd1 close | |
} | |
-} | |
\ No newline at end of file | |
+ | |
+ test "RECYCLE on expire" { | |
+ # Set keys with a TTL and recycle them to a dead letter channel | |
+ assert_equal "OK" [r set x foo] | |
+ assert_equal "OK" [r set y bar] | |
+ assert_equal 1 [r expire x 1] | |
+ assert_equal 1 [r expire y 2] | |
+ assert_equal "subscribe expired 1" [r recycle expired] | |
+ | |
+ after 3000 | |
+ assert_equal {message expired x} [r read] | |
+ assert_equal {message expired y} [r read] | |
+ } | |
+ | |
+ test "DISPOSE with a dead letter channel set" { | |
+ assert_equal "unsubscribe expired 0" [r dispose] | |
+ } | |
+ | |
+ test "DISPOSE without a dead letter channel set" { | |
+ catch {r dispose} err | |
+ set _ $err | |
+ } {ERR} | |
+ | |
+ test {RECYCLETO without a dead letter channel set} { | |
+ r recycleto | |
+ } {OK} | |
+} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment