Created
February 3, 2011 01:13
-
-
Save methodmissing/808859 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
From 67da63b1bec8ba5fe0c71e019f1d287e447ca566 Mon Sep 17 00:00:00 2001 | |
From: =?UTF-8?q?Lourens=20Naud=C3=A9?= <[email protected]> | |
Date: Thu, 3 Feb 2011 01:05:41 +0000 | |
Subject: [PATCH] RECYCLE, RECYCLETO and DISPOSE commands supporting a Dead Letter Channel infrastructure for expired keys | |
--- | |
src/db.c | 27 ++++++++++++++++++++++++++- | |
src/help.h | 17 ++++++++++++++++- | |
src/redis.c | 11 ++++++++--- | |
src/redis.h | 5 +++++ | |
tests/unit/pubsub.tcl | 28 +++++++++++++++++++++++++++- | |
5 files changed, 82 insertions(+), 6 deletions(-) | |
diff --git a/src/db.c b/src/db.c | |
index 669fba1..6ae83d3 100644 | |
--- a/src/db.c | |
+++ b/src/db.c | |
@@ -514,7 +514,8 @@ void propagateExpire(redisDb *db, robj *key) { | |
argv[0] = createStringObject("DEL",3); | |
argv[1] = key; | |
incrRefCount(key); | |
- | |
+ if (db->expire_channel) | |
+ pubsubPublishMessage(db->expire_channel,key); | |
if (server.appendonly) | |
feedAppendOnlyFile(server.delCommand,db->id,argv,2); | |
if (listLength(server.slaves)) | |
@@ -615,3 +616,27 @@ void persistCommand(redisClient *c) { | |
} | |
} | |
} | |
+ | |
+void recycleCommand(redisClient *c) { | |
+ c->db->expire_channel = c->argv[1]; | |
+ subscribeCommand(c); | |
+} | |
+ | |
+void recycletoCommand(redisClient *c) { | |
+ if (c->db->expire_channel == NULL) { | |
+ addReply(c,shared.ok); | |
+ } else { | |
+ addReply(c,c->db->expire_channel); | |
+ } | |
+} | |
+ | |
+void disposeCommand(redisClient *c) { | |
+ if (c->db->expire_channel == NULL) { | |
+ addReply(c,shared.err); | |
+ } else { | |
+ c->argc = 1; | |
+ c->argv[1] = c->db->expire_channel; | |
+ c->db->expire_channel = NULL; | |
+ unsubscribeCommand(c); | |
+ } | |
+} | |
diff --git a/src/help.h b/src/help.h | |
index 51613c9..ba1df07 100644 | |
--- a/src/help.h | |
+++ b/src/help.h | |
@@ -632,7 +632,22 @@ struct commandHelp { | |
"destination numkeys key [key ...] [WEIGHTS weight] [AGGREGATE SUM|MIN|MAX]", | |
"Add multiple sorted sets and store the resulting sorted set in a new key", | |
4, | |
- "1.3.10" } | |
+ "1.3.10" }, | |
+ { "RECYCLE", | |
+ "channel", | |
+ "Define a Dead Letter Channel to publish expired keys to", | |
+ 6, | |
+ "0.1" }, | |
+ { "RECYCLETO", | |
+ "-", | |
+ "Return the Dead Letter Channel expired keys is published to", | |
+ 6, | |
+ "0.1" }, | |
+ { "DISPOSE", | |
+ "-", | |
+ "Remove a registered Dead Letter Channel and stop broadcasting key expiry", | |
+ 6, | |
+ "0.1" } | |
}; | |
#endif | |
diff --git a/src/redis.c b/src/redis.c | |
index af956e9..be6e12c 100644 | |
--- a/src/redis.c | |
+++ b/src/redis.c | |
@@ -187,7 +187,10 @@ struct redisCommand readonlyCommandTable[] = { | |
{"punsubscribe",punsubscribeCommand,-1,0,NULL,0,0,0}, | |
{"publish",publishCommand,3,REDIS_CMD_FORCE_REPLICATION,NULL,0,0,0}, | |
{"watch",watchCommand,-2,0,NULL,0,0,0}, | |
- {"unwatch",unwatchCommand,1,0,NULL,0,0,0} | |
+ {"unwatch",unwatchCommand,1,0,NULL,0,0,0}, | |
+ {"recycle",recycleCommand,2,0,NULL,1,1,1}, | |
+ {"recycleto",recycletoCommand,1,0,NULL,0,0,0}, | |
+ {"dispose",disposeCommand,1,0,NULL,0,0,0} | |
}; | |
/*============================ Utility functions ============================ */ | |
@@ -874,6 +877,7 @@ void initServer() { | |
server.db[j].expires = dictCreate(&keyptrDictType,NULL); | |
server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL); | |
server.db[j].watched_keys = dictCreate(&keylistDictType,NULL); | |
+ server.db[j].expire_channel = NULL; | |
if (server.ds_enabled) { | |
server.db[j].io_keys = dictCreate(&keylistDictType,NULL); | |
server.db[j].io_negcache = dictCreate(&setDictType,NULL); | |
@@ -1026,8 +1030,9 @@ int processCommand(redisClient *c) { | |
if ((dictSize(c->pubsub_channels) > 0 || listLength(c->pubsub_patterns) > 0) | |
&& | |
cmd->proc != subscribeCommand && cmd->proc != unsubscribeCommand && | |
- cmd->proc != psubscribeCommand && cmd->proc != punsubscribeCommand) { | |
- addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context"); | |
+ cmd->proc != psubscribeCommand && cmd->proc != punsubscribeCommand && | |
+ cmd->proc != disposeCommand) { | |
+ addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT / DISPOSE allowed in this context"); | |
return REDIS_OK; | |
} | |
diff --git a/src/redis.h b/src/redis.h | |
index aa42e2c..8ec9765 100644 | |
--- a/src/redis.h | |
+++ b/src/redis.h | |
@@ -279,6 +279,7 @@ typedef struct redisDb { | |
dict *io_negcache; /* Negative caching for disk store */ | |
dict *io_queued; /* Queued IO operations hash table */ | |
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */ | |
+ robj *expire_channel; /* Dead Letter Channel for expired keys */ | |
int id; | |
} redisDb; | |
@@ -871,6 +872,7 @@ int pubsubUnsubscribeAllChannels(redisClient *c, int notify); | |
int pubsubUnsubscribeAllPatterns(redisClient *c, int notify); | |
void freePubsubPattern(void *p); | |
int listMatchPubsubPattern(void *a, void *b); | |
+int pubsubPublishMessage(robj *channel, robj *message); | |
/* Utility functions */ | |
int stringmatchlen(const char *pattern, int patternLen, | |
@@ -1030,6 +1032,9 @@ void punsubscribeCommand(redisClient *c); | |
void publishCommand(redisClient *c); | |
void watchCommand(redisClient *c); | |
void unwatchCommand(redisClient *c); | |
+void recycleCommand(redisClient *c); | |
+void recycletoCommand(redisClient *c); | |
+void disposeCommand(redisClient *c); | |
#if defined(__GNUC__) | |
void *calloc(size_t count, size_t size) __attribute__ ((deprecated)); | |
diff --git a/tests/unit/pubsub.tcl b/tests/unit/pubsub.tcl | |
index c8b547b..c22952e 100644 | |
--- a/tests/unit/pubsub.tcl | |
+++ b/tests/unit/pubsub.tcl | |
@@ -192,4 +192,30 @@ start_server {tags {"pubsub"}} { | |
# clean up clients | |
$rd1 close | |
} | |
-} | |
\ No newline at end of file | |
+ | |
+ test "RECYCLE on expire" { | |
+ # Set keys with a TTL and recycle them to a dead letter channel | |
+ assert_equal "OK" [r set x foo] | |
+ assert_equal "OK" [r set y bar] | |
+ assert_equal 1 [r expire x 1] | |
+ assert_equal 1 [r expire y 2] | |
+ assert_equal "subscribe expired 1" [r recycle expired] | |
+ | |
+ after 3000 | |
+ assert_equal {message expired x} [r read] | |
+ assert_equal {message expired y} [r read] | |
+ } | |
+ | |
+ test "DISPOSE with a dead letter channel set" { | |
+ assert_equal "unsubscribe expired 0" [r dispose] | |
+ } | |
+ | |
+ test "DISPOSE without a dead letter channel set" { | |
+ catch {r dispose} err | |
+ set _ $err | |
+ } {ERR} | |
+ | |
+ test {RECYCLETO without a dead letter channel set} { | |
+ r recycleto | |
+ } {OK} | |
+} | |
-- | |
1.7.2.1 | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment