Skip to content

Instantly share code, notes, and snippets.

@kevinz
Last active November 13, 2015 08:21
Show Gist options
  • Save kevinz/b027d4c0656a29ea472b to your computer and use it in GitHub Desktop.
Save kevinz/b027d4c0656a29ea472b to your computer and use it in GitHub Desktop.
codis 2.8.21 diff
diff -bur /home/vagrant/redis-2.8.21/redis.conf /home/vagrant/redis-2.8.21_codis/redis.conf
--- /home/vagrant/redis-2.8.21/redis.conf 2015-06-04 09:32:24.000000000 +0000
+++ /home/vagrant/redis-2.8.21_codis/redis.conf 2015-11-11 02:45:49.550199788 +0000
@@ -34,7 +34,7 @@
# By default Redis does not run as a daemon. Use 'yes' if you need it.
# Note that Redis will write a pid file in /var/run/redis.pid when daemonized.
-daemonize no
+daemonize yes
# When running daemonized, Redis writes a pid file in /var/run/redis.pid by
# default. You can specify a custom pid file location here.
Only in /home/vagrant/redis-2.8.21_codis/src: crc32.c
diff -bur /home/vagrant/redis-2.8.21/src/db.c /home/vagrant/redis-2.8.21_codis/src/db.c
--- /home/vagrant/redis-2.8.21/src/db.c 2015-06-04 09:32:24.000000000 +0000
+++ /home/vagrant/redis-2.8.21_codis/src/db.c 2015-11-11 02:45:49.557196678 +0000
@@ -32,9 +32,6 @@
#include <signal.h>
#include <ctype.h>
-void SlotToKeyAdd(robj *key);
-void SlotToKeyDel(robj *key);
-
/*-----------------------------------------------------------------------------
* C-level DB API
*----------------------------------------------------------------------------*/
@@ -90,11 +87,23 @@
* The program is aborted if the key already exists. */
void dbAdd(redisDb *db, robj *key, robj *val) {
sds copy = sdsdup(key->ptr);
+
int retval = dictAdd(db->dict, copy, val);
//me do insert key/value pair and store key/crc pair in slot[crc(key)%1024]
+ do {
+ uint32_t crc;
+ int hastag;
+ int slot = slots_num(key->ptr, &crc, &hastag);
+ dictAdd(db->hash_slots[slot], copy, (void *)(long)crc);
+ if (hastag) {
+ incrRefCount(key);
// store all keys and their score in this skip list
+ zslInsert(db->tagged_keys, (double)crc, key);
+ }
+ } while (0);
+
redisAssertWithInfo(NULL,key,retval == REDIS_OK);
if (val->type == REDIS_LIST) signalListAsReady(db, key);
- }
+}
/* Overwrite an existing key with a new value. Incrementing the reference
* count of the new value is up to the caller.
@@ -160,6 +169,18 @@
/* Deleting an entry from the expires dict will not free the sds of
* the key, because it is shared with the main dictionary. */
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
+
+ do {
+ uint32_t crc;
+ int hastag;
+ int slot = slots_num(key->ptr, &crc, &hastag);
+ if (dictDelete(db->hash_slots[slot], key->ptr) == DICT_OK) {
+ if (hastag) {
+ zslDelete(db->tagged_keys, (double)crc, key);
+ }
+ }
+ } while (0);
+
if (dictDelete(db->dict,key->ptr) == DICT_OK) {
return 1;
} else {
@@ -206,11 +227,18 @@
}
long long emptyDb(void(callback)(void*)) {
- int j;
+ int i, j;
long long removed = 0;
for (j = 0; j < server.dbnum; j++) {
removed += dictSize(server.db[j].dict);
//clean slot and tagged_keys also
+ for (i = 0; i < HASH_SLOTS_SIZE; i ++) {
+ dictEmpty(server.db[j].hash_slots[i], NULL);
+ }
+ if (server.db[j].tagged_keys->length != 0) {
+ zslFree(server.db[j].tagged_keys);
+ server.db[j].tagged_keys = zslCreate();
+ }
dictEmpty(server.db[j].dict,callback);
dictEmpty(server.db[j].expires,callback);
}
@@ -246,8 +274,16 @@
*----------------------------------------------------------------------------*/
void flushdbCommand(redisClient *c) {
+ int i;
server.dirty += dictSize(c->db->dict);
signalFlushedDb(c->db->id);
//me clean slot and tagged_keys also
+ for (i = 0; i < HASH_SLOTS_SIZE; i ++) {
+ dictEmpty(c->db->hash_slots[i], NULL);
+ }
+ if (c->db->tagged_keys->length != 0) {
+ zslFree(c->db->tagged_keys);
+ c->db->tagged_keys = zslCreate();
+ }
dictEmpty(c->db->dict,NULL);
dictEmpty(c->db->expires,NULL);
addReply(c,shared.ok);
@@ -646,7 +682,15 @@
* Also when in Sentinel mode clear the SAVE flag and force NOSAVE. */
if (server.loading || server.sentinel_mode)
flags = (flags & ~REDIS_SHUTDOWN_SAVE) | REDIS_SHUTDOWN_NOSAVE;
- if (prepareForShutdown(flags) == REDIS_OK) exit(0);
//cleanup slots and zlist
+ if (prepareForShutdown(flags) == REDIS_OK) {
+ for (int j = 0; j < server.dbnum; j ++) {
+ for (int i = 0; i < HASH_SLOTS_SIZE; i ++) {
+ dictRelease(server.db[j].hash_slots[i]);
+ }
+ zslFree(server.db[j].tagged_keys);
+ }
+ exit(0);
+ }
addReplyError(c,"Errors trying to SHUTDOWN. Check logs.");
}
diff -bur /home/vagrant/redis-2.8.21/src/Makefile /home/vagrant/redis-2.8.21_codis/src/Makefile
--- /home/vagrant/redis-2.8.21/src/Makefile 2015-06-04 09:32:24.000000000 +0000
+++ /home/vagrant/redis-2.8.21_codis/src/Makefile 2015-11-11 02:45:49.558196228 +0000
@@ -113,7 +113,7 @@
REDIS_SERVER_NAME=redis-server
REDIS_SENTINEL_NAME=redis-sentinel
-REDIS_SERVER_OBJ=adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o migrate.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o hyperloglog.o latency.o sparkline.o
+REDIS_SERVER_OBJ=adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o migrate.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o crc32.o bitops.o sentinel.o notify.o setproctitle.o hyperloglog.o latency.o sparkline.o slots.o
REDIS_CLI_NAME=redis-cli
REDIS_CLI_OBJ=anet.o sds.o adlist.o redis-cli.o zmalloc.o release.o anet.o ae.o crc64.o
REDIS_BENCHMARK_NAME=redis-benchmark
diff -bur /home/vagrant/redis-2.8.21/src/Makefile.dep /home/vagrant/redis-2.8.21_codis/src/Makefile.dep
--- /home/vagrant/redis-2.8.21/src/Makefile.dep 2015-06-04 09:32:24.000000000 +0000
+++ /home/vagrant/redis-2.8.21_codis/src/Makefile.dep 2015-11-11 02:45:49.557196678 +0000
@@ -20,6 +20,7 @@
../deps/lua/src/luaconf.h ae.h sds.h dict.h adlist.h zmalloc.h anet.h \
ziplist.h intset.h version.h util.h latency.h sparkline.h rdb.h rio.h
crc64.o: crc64.c
+crc32.o: crc32.c
db.o: db.c redis.h fmacros.h config.h ../deps/lua/src/lua.h \
../deps/lua/src/luaconf.h ae.h sds.h dict.h adlist.h zmalloc.h anet.h \
ziplist.h intset.h version.h util.h latency.h sparkline.h rdb.h rio.h
@@ -40,6 +41,7 @@
lzf_c.o: lzf_c.c lzfP.h
lzf_d.o: lzf_d.c lzfP.h
memtest.o: memtest.c config.h
+slots.o: slots.c redis.h
migrate.o: migrate.c redis.h fmacros.h config.h ../deps/lua/src/lua.h \
../deps/lua/src/luaconf.h ae.h sds.h dict.h adlist.h zmalloc.h anet.h \
ziplist.h intset.h version.h util.h latency.h sparkline.h rdb.h rio.h \
diff -bur /home/vagrant/redis-2.8.21/src/migrate.c /home/vagrant/redis-2.8.21_codis/src/migrate.c
--- /home/vagrant/redis-2.8.21/src/migrate.c 2015-06-04 09:32:24.000000000 +0000
+++ /home/vagrant/redis-2.8.21_codis/src/migrate.c 2015-11-11 02:45:49.556197128 +0000
@@ -140,7 +140,7 @@
return;
if (getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != REDIS_OK)
return;
- if (timeout <= 0) timeout = 1;
+ if (timeout <= 0) timeout = 1000;
/* Check if the key is here. If not we reply with success as there is
* nothing to migrate (for instance the key expired in the meantime), but
@@ -158,8 +158,11 @@
server.neterr);
return;
}
//add tcp_no_delay for quick packet sending
- if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) {
+ anetEnableTcpNoDelay(server.neterr,fd);
+
+ if ((aeWait(fd,AE_WRITABLE,timeout) & AE_WRITABLE) == 0) {
addReplySds(c,sdsnew("-IOERR error or timeout connecting to the client\r\n"));
+ close(fd);
return;
}
diff -bur /home/vagrant/redis-2.8.21/src/redis.c /home/vagrant/redis-2.8.21_codis/src/redis.c
--- /home/vagrant/redis-2.8.21/src/redis.c 2015-06-04 09:32:24.000000000 +0000
+++ /home/vagrant/redis-2.8.21_codis/src/redis.c 2015-11-11 02:45:49.558196228 +0000
@@ -274,7 +274,16 @@
{"pfcount",pfcountCommand,-2,"r",0,NULL,1,1,1,0,0},
{"pfmerge",pfmergeCommand,-2,"wm",0,NULL,1,-1,1,0,0},
{"pfdebug",pfdebugCommand,-3,"w",0,NULL,0,0,0,0,0},
- {"latency",latencyCommand,-2,"arslt",0,NULL,0,0,0,0,0}
//add several commands
+ {"latency",latencyCommand,-2,"arslt",0,NULL,0,0,0,0,0},
+ {"slotsinfo",slotsinfoCommand,-1,"rF",0,NULL,0,0,0,0,0},
+ {"slotsdel",slotsdelCommand,-2,"w",0,NULL,1,-1,1,0,0},
+ {"slotsmgrtslot",slotsmgrtslotCommand,5,"aw",0,NULL,0,0,0,0,0},
+ {"slotsmgrtone",slotsmgrtoneCommand,5,"aw",0,NULL,0,0,0,0,0},
+ {"slotsmgrttagslot",slotsmgrttagslotCommand,5,"aw",0,NULL,0,0,0,0,0},
+ {"slotsmgrttagone",slotsmgrttagoneCommand,5,"aw",0,NULL,0,0,0,0,0},
+ {"slotshashkey",slotshashkeyCommand,-1,"rF",0,NULL,0,0,0,0,0},
+ {"slotscheck",slotscheckCommand,0,"r",0,NULL,0,0,0,0,0},
+ {"slotsrestore",slotsrestoreCommand,-4,"awm",0,NULL,1,1,1,0,0},
};
/*============================ Utility functions ============================ */
@@ -531,6 +540,15 @@
dictRedisObjectDestructor /* val destructor */
};
//define the hashSlotType
+dictType hashSlotType = {
+ dictSdsHash, /* hash function */
+ NULL, /* key dup */
+ NULL, /* val dup */
+ dictSdsKeyCompare, /* key compare */
+ NULL, /* key destructor */
+ NULL /* val destructor */
+};
+
/* server.lua_scripts sha (as sds string) -> scripts (as robj) cache. */
dictType shaScriptObjectDictType = {
dictSdsCaseHash, /* hash function */
@@ -595,6 +613,15 @@
NULL /* val destructor */
};
//me define the new dict type
+dictType migrateCacheDictType = {
+ dictSdsHash, /* hash function */
+ NULL, /* key dup */
+ NULL, /* val dup */
+ dictSdsKeyCompare, /* key compare */
+ dictSdsDestructor, /* key destructor */
+ NULL /* val destructor */
+};
+
int htNeedsResize(dict *dict) {
long long size, used;
@@ -607,8 +634,15 @@
/* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
* we resize the hash table to save memory */
void tryResizeHashTables(int dbid) {
- if (htNeedsResize(server.db[dbid].dict))
//also resize the dict of a slot when it's time for resizing db dict
+ if (htNeedsResize(server.db[dbid].dict)) {
dictResize(server.db[dbid].dict);
+ for (int i = 0; i < HASH_SLOTS_SIZE; i ++) {
+ dict *d = server.db[dbid].hash_slots[i];
+ if (htNeedsResize(d)) {
+ dictResize(d);
+ }
+ }
+ }
if (htNeedsResize(server.db[dbid].expires))
dictResize(server.db[dbid].expires);
}
@@ -624,6 +658,12 @@
/* Keys dictionary */
if (dictIsRehashing(server.db[dbid].dict)) {
dictRehashMilliseconds(server.db[dbid].dict,1);
//when doing incremental rehashing handle slot dict also
+ for (int i = 0; i < HASH_SLOTS_SIZE; i ++) {
+ dict *d = server.db[dbid].hash_slots[i];
+ if (dictIsRehashing(d)) {
+ dictRehashMilliseconds(d, 1);
+ }
+ }
return 1; /* already used our millisecond for this loop... */
}
/* Expires */
@@ -1179,6 +1219,10 @@
if (server.sentinel_mode) sentinelTimer();
}
//periodly run task slotsmgrt_cleanup in serverCron
+ run_with_period(1000) {
+ slotsmgrt_cleanup();
+ }
+
server.cronloops++;
return 1000/server.hz;
}
@@ -1385,6 +1429,8 @@
server.next_client_id = 1; /* Client IDs, start from 1 .*/
server.loading_process_events_interval_bytes = (1024*1024*2);
//init member of cached sockfd for migrating
+ server.slotsmgrt_cached_sockfds = dictCreate(&migrateCacheDictType, NULL);
+
updateLRUClock();
resetServerSaveParams();
@@ -1648,12 +1694,14 @@
}
void initServer(void) {
- int j;
+ int i, j;
signal(SIGHUP, SIG_IGN);
signal(SIGPIPE, SIG_IGN);
setupSignalHandlers();
//me init crc32
+ crc32_init();
+
if (server.syslog_enabled) {
openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT,
server.syslog_facility);
@@ -1705,6 +1753,10 @@
server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
server.db[j].id = j;
server.db[j].avg_ttl = 0;
+ for (i = 0; i < HASH_SLOTS_SIZE; i ++) {
+ server.db[j].hash_slots[i] = dictCreate(&hashSlotType, NULL);
+ }
+ server.db[j].tagged_keys = zslCreate();
}
server.pubsub_channels = dictCreate(&keylistDictType,NULL);
server.pubsub_patterns = listCreate();
diff -bur /home/vagrant/redis-2.8.21/src/redis.h /home/vagrant/redis-2.8.21_codis/src/redis.h
--- /home/vagrant/redis-2.8.21/src/redis.h 2015-06-04 09:32:24.000000000 +0000
+++ /home/vagrant/redis-2.8.21_codis/src/redis.h 2015-11-11 02:45:49.556197128 +0000
@@ -421,12 +421,22 @@
_var.ptr = _ptr; \
} while(0);
//add crc32 related functions
+void crc32_init();
+uint32_t crc32_checksum(const char *buf, int len);
+
//define 1024 slots
+#define HASH_SLOTS_MASK 0x000003ff
+#define HASH_SLOTS_SIZE (HASH_SLOTS_MASK + 1)
+
//me add zskiplist
+struct zskiplist;
+
typedef struct redisDb {
dict *dict; /* The keyspace for this DB */
dict *expires; /* Timeout of keys with a timeout set */
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */
dict *ready_keys; /* Blocked keys that received a PUSH */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
// *dict array
+ dict *hash_slots[HASH_SLOTS_SIZE];
+ struct zskiplist *tagged_keys;
int id;
long long avg_ttl; /* Average TTL, just for stats */
} redisDb;
@@ -634,6 +644,7 @@
list *slaves, *monitors; /* List of slaves and MONITORs */
redisClient *current_client; /* Current client, only used on crash report */
char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
//point to cached fds
+ dict *slotsmgrt_cached_sockfds in server;
uint64_t next_client_id; /* Next client unique ID. Incremental. */
/* RDB / AOF loading information */
int loading; /* We are loading data from disk if true */
@@ -1422,6 +1433,18 @@
void pfmergeCommand(redisClient *c);
void pfdebugCommand(redisClient *c);
void latencyCommand(redisClient *c);
//
//new commands
+void slotsinfoCommand(redisClient *c);
+void slotsdelCommand(redisClient *c);
+void slotsmgrtslotCommand(redisClient *c);
+void slotsmgrtoneCommand(redisClient *c);
+void slotsmgrttagslotCommand(redisClient *c);
+void slotsmgrttagoneCommand(redisClient *c);
+void slotshashkeyCommand(redisClient *c);
+void slotscheckCommand(redisClient *c);
+void slotsrestoreCommand(redisClient *c);
+
+void slotsmgrt_cleanup();
+int slots_num(const sds s, uint32_t *pcrc, int *phastag);
#if defined(__GNUC__)
void *calloc(size_t count, size_t size) __attribute__ ((deprecated));
Only in /home/vagrant/redis-2.8.21_codis/src: slots.c
Only in /home/vagrant/redis-2.8.21/tests/assets: encodings.rdb
Only in /home/vagrant/redis-2.8.21/tests/assets: hash-zipmap.rdb
Only in /home/vagrant/redis-2.8.21/tests: tmp
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment