Created
November 26, 2012 18:26
-
-
Save dspezia/4149768 to your computer and use it in GitHub Desktop.
Example of Redis zset polling daemon
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdio.h> | |
#include <stdlib.h> | |
#include <string.h> | |
#include <signal.h> | |
#include <time.h> | |
#include "hiredis.h" | |
#include "async.h" | |
#include "adapters/ae.h" | |
#include "sha1.h" | |
struct Singleton { | |
int n; | |
int *port; | |
redisAsyncContext **servers; | |
aeEventLoop *loop; | |
char luasha1[48]; | |
} singleton; | |
const char *LuaCmd = | |
"local res = redis.call('ZRANGEBYSCORE',KEYS[1], 0, ARGV[1], 'LIMIT', 0, 10 ) " | |
"if #res > 0 then " | |
" redis.call( 'ZREMRANGEBYRANK', KEYS[1], 0, #res-1 ) " | |
" return res " | |
"else " | |
" return false " | |
"end "; | |
void sha1hex(char *digest, const char *script, size_t len) { | |
SHA1_CTX ctx; | |
unsigned char hash[20]; | |
char *cset = "0123456789abcdef"; | |
int j; | |
SHA1Init(&ctx); | |
SHA1Update(&ctx,(unsigned char*)script,len); | |
SHA1Final(hash,&ctx); | |
for (j = 0; j < 20; j++) { | |
digest[j*2] = cset[((hash[j]&0xF0)>>4)]; | |
digest[j*2+1] = cset[(hash[j]&0xF)]; | |
} | |
digest[40] = '\0'; | |
} | |
void dequeuedItem(redisAsyncContext *c, void *r, void *privdata) { | |
int i; | |
redisReply *reply = r; | |
if (reply == NULL) return; | |
switch( reply->type ) { | |
case REDIS_REPLY_ARRAY: | |
for ( i=0; i<reply->elements; ++i ) { | |
printf("Expired: %s\n", reply->element[i]->str ); | |
redisAsyncCommand( c, NULL, NULL, "DEL %s", reply->element[i]->str ); | |
} | |
if ( i>0 ) | |
redisAsyncCommand( c, dequeuedItem, NULL, "EVALSHA %s 1 to_be_expired %ld", singleton.luasha1, time(NULL) ); | |
break; | |
case REDIS_REPLY_ERROR: | |
case REDIS_REPLY_STATUS: | |
printf("ERror: %s\n",reply->str ); | |
break; | |
case REDIS_REPLY_NIL: | |
break; | |
default: | |
printf("Error\n"); | |
break; | |
} | |
} | |
int mainLoop( struct aeEventLoop *loop, long long id, void *clientData) { | |
time_t t = time(NULL); | |
for ( int i=0; i<singleton.n; ++i ) { | |
if ( singleton.servers[i] != NULL ) { | |
redisAsyncCommand( singleton.servers[i], dequeuedItem, NULL, "EVALSHA %s 1 to_be_expired %ld", singleton.luasha1, t ); | |
} | |
} | |
fflush(stdout); | |
return 1000+rand()%1000; | |
} | |
void connectCallback(const redisAsyncContext *c, int status) { | |
if ( status != REDIS_OK ) | |
{ | |
printf("Error: %s\n", c->errstr); | |
for (int i=0; i<singleton.n; ++i ) | |
if ( singleton.servers[i] == c ) | |
singleton.servers[i] = NULL; | |
} | |
else | |
printf("connected...\n"); | |
} | |
void disconnectCallback(const redisAsyncContext *c, int status) { | |
if (status != REDIS_OK) { | |
printf("Error: %s\n", c->errstr); | |
} | |
printf("disconnected...\n"); | |
for (int i=0; i<singleton.n; ++i ) | |
if ( singleton.servers[i] == c ) | |
singleton.servers[i] = NULL; | |
} | |
void checkConnections() | |
{ | |
for ( int i=0; i<singleton.n; ++i ) { | |
if ( singleton.servers[i] == NULL ) { | |
printf("Connecting %d...\n",singleton.port[i] ); | |
singleton.servers[i] = redisAsyncConnect("127.0.0.1", singleton.port[i] ); | |
if ( singleton.servers[i]->err ) { perror("redisAsyncConnect"); exit( -1 ); } | |
redisAeAttach( singleton.loop, singleton.servers[i] ); | |
redisAsyncSetConnectCallback( singleton.servers[i],connectCallback); | |
redisAsyncSetDisconnectCallback( singleton.servers[i],disconnectCallback); | |
redisAsyncCommand( singleton.servers[i], NULL, NULL, "SCRIPT LOAD %s", LuaCmd ); | |
} | |
} | |
} | |
int reconnectIfNeeded( struct aeEventLoop *loop, long long id, void *clientData) { | |
checkConnections(); | |
return 1000; | |
} | |
int main ( int argc, char *argv[] ) { | |
srand(time(NULL)); | |
signal(SIGPIPE, SIG_IGN); | |
memset( &singleton, '\0', sizeof(struct Singleton) ); | |
singleton.n = argc - 1; | |
singleton.servers = (redisAsyncContext **) malloc( singleton.n*sizeof( redisAsyncContext *) ); | |
if ( !singleton.servers ) { perror("malloc"); exit( -1 ); } | |
singleton.port = (int *) malloc( singleton.n*sizeof(int) ); | |
if ( !singleton.port ) { perror("malloc"); exit( -1 ); } | |
singleton.loop = aeCreateEventLoop(256); | |
memset( singleton.servers, '\0', singleton.n*sizeof( redisAsyncContext *) ); | |
memset( singleton.port, '\0', singleton.n*sizeof(int) ); | |
for ( int i=0; i<singleton.n; ++i ) | |
singleton.port[i] = atoi(argv[i+1]); | |
sha1hex( singleton.luasha1, LuaCmd, strlen(LuaCmd) ); | |
checkConnections(); | |
aeCreateTimeEvent( singleton.loop,5,reconnectIfNeeded, NULL, NULL ); | |
aeCreateTimeEvent( singleton.loop,5,mainLoop, NULL, NULL ); | |
aeMain(singleton.loop); | |
return 0; | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import time | |
for x in range(0,1000): | |
print "multi" | |
print 'set c%06d dummydata%d' % ( x,x ) | |
print 'zadd to_be_expired %ld c%06d' % ( time.time()+20,x) | |
print "exec" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment