|
#include <redismodule.h> |
|
#include <hiredis.h> |
|
#include <unistd.h> |
|
#include <pthread.h> |
|
|
|
typedef struct { |
|
pthread_t tid; |
|
redisContext *rc; |
|
} cron_t; |
|
|
|
// global cron stuff |
|
cron_t cron = { |
|
.tid = 0, |
|
.rc = NULL |
|
}; |
|
|
|
// Main cron thread event loop |
|
void *Cron_ThreadMain(void *argv) |
|
{ |
|
REDISMODULE_NOT_USED(argv); |
|
|
|
// TODO: persist the connection, reconnect on disconnect |
|
while(1) { |
|
sleep(1); |
|
|
|
// TODO: discover local settings or use arguments |
|
cron.rc = redisConnect("127.0.0.1", 6379); |
|
if (cron.rc == NULL || cron.rc->err) { |
|
if (cron.rc) { |
|
printf("cron thread error: %s\n", cron.rc->errstr); |
|
// handle error |
|
goto cleanup; |
|
} else { |
|
printf("cron thread can't allocate redis context\n"); |
|
goto cleanup; |
|
} |
|
return NULL; |
|
} |
|
|
|
redisReply *reply = redisCommand(cron.rc, "PING"); |
|
if (!reply) { |
|
printf("cron thread PING reply missing\n"); |
|
goto cleanup; |
|
//handle error |
|
} |
|
|
|
if (reply->type != REDIS_REPLY_STATUS) { |
|
printf("cron thread PING reply is wrong\n"); |
|
goto cleanup; |
|
// handle error |
|
} |
|
|
|
printf("cron thread: %s\n", reply->str); |
|
freeReplyObject(reply); |
|
redisFree(cron.rc); |
|
} |
|
|
|
cleanup: |
|
printf("cron thread exited abnormally\n"); |
|
cron.tid = 0; |
|
return NULL; |
|
} |
|
|
|
// Starts the cron thread |
|
int RedisCommand_CronStart(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) |
|
{ |
|
REDISMODULE_NOT_USED(argv); |
|
if (argc != 1) |
|
return RedisModule_WrongArity(ctx); |
|
|
|
if (cron.tid) { |
|
RedisModule_ReplyWithError(ctx, "cron thread already running"); |
|
return REDISMODULE_ERR; |
|
} |
|
|
|
// TODO: prettier error reporting |
|
if (pthread_create(&cron.tid, NULL, Cron_ThreadMain, NULL)) { |
|
return RedisModule_ReplyWithError(ctx, "could not start cron thread"); |
|
return REDISMODULE_ERR; |
|
} |
|
|
|
RedisModule_ReplyWithSimpleString(ctx, "OK"); |
|
return REDISMODULE_OK; |
|
} |
|
|
|
// Stops the cron thread |
|
int RedisCommand_CronStop(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) |
|
{ |
|
REDISMODULE_NOT_USED(argv); |
|
if (argc != 1) |
|
return RedisModule_WrongArity(ctx); |
|
|
|
if (!cron.tid) { |
|
RedisModule_ReplyWithError(ctx, "cron not running"); |
|
return REDISMODULE_ERR; |
|
} |
|
|
|
// TODO: prettier error reporting |
|
if (pthread_cancel(cron.tid)) { |
|
return RedisModule_ReplyWithError(ctx, "could not cancel cron thread"); |
|
return REDISMODULE_ERR; |
|
} |
|
|
|
cron.tid = 0; |
|
RedisModule_ReplyWithSimpleString(ctx, "OK"); |
|
return REDISMODULE_OK; |
|
} |
|
|
|
int RedisModule_OnLoad(RedisModuleCtx *ctx) |
|
{ |
|
if (RedisModule_Init(ctx, "cron", 1, REDISMODULE_APIVER_1) == REDISMODULE_ERR) |
|
return REDISMODULE_ERR; |
|
|
|
if (RedisModule_CreateCommand(ctx, "cron.start", RedisCommand_CronStart, "readonly", 0, 0, 0) == REDISMODULE_ERR) |
|
return REDISMODULE_ERR; |
|
|
|
if (RedisModule_CreateCommand(ctx, "cron.stop", RedisCommand_CronStop, "readonly", 0, 0, 0) == REDISMODULE_ERR) |
|
return REDISMODULE_ERR; |
|
|
|
return REDISMODULE_OK; |
|
} |
|
|