Created
June 17, 2013 15:05
-
-
Save dawnbreaks/5797591 to your computer and use it in GitHub Desktop.
redis-udf
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
** Copyright (c) 2012, Alex Fok [email protected] | |
** All rights reserved. | |
** | |
** This program is free software: you can redistribute it and/or modify | |
** it under the terms of the Lesser GNU General Public License as published by | |
** the Free Software Foundation, either version 3 of the License, or | |
** (at your option) any later version. | |
** | |
** This program is distributed in the hope that it will be useful, | |
** but WITHOUT ANY WARRANTY; without even the implied warranty of | |
** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
** Lesser GNU General Public License for more details. | |
** | |
** You should have received a copy of the Lesser GNU General Public License | |
** along with this program. If not, see <http://www.gnu.org/licenses/>. | |
** | |
** | |
** | |
** Syntax for the new commands are: | |
** create function <function_name> returns {string|real|integer} | |
** soname <name_of_shared_library> | |
** drop function <function_name> | |
** | |
** Each defined function may have a xxxx_init function and a xxxx_deinit | |
** function. The init function should alloc memory for the function | |
** and tell the main function about the max length of the result | |
** (for string functions), number of decimals (for double functions) and | |
** if the result may be a null value. | |
** Compile: | |
** gcc -shared -o redis_udf.so redis_udf.c -I "/root/mysql-src/mysql-5.5.21/include" -I "/usr/include/mysql" -fPIC libhiredis.a | |
** copy the lib to MySQL plugin directory | |
** cp redis_udf.so /usr/lib64/mysql/plugin | |
** After the library is made one must notify mysqld about the new | |
** functions with the commands: | |
** CREATE FUNCTION redis_set RETURNS INTEGER SONAME 'redis_udf.so'; | |
** CREATE FUNCTION redis_servers_set RETURNS INTEGER SONAME 'redis_udf.so'; | |
** | |
** to drop the functions, do the following: | |
** DROP FUNCTION redis_set; | |
** DROP FUNCTION redis_servers_set; | |
** | |
** The CREATE FUNCTION and DROP FUNCTION update the func@mysql table. | |
** | |
** | |
** Usage examples: | |
** select redis_servers_set('192.168.60.10',6379,'password'); | |
** select redis_servers_set('192.168.60.10',6379); | |
** select redis_set('ro222','1121235'); | |
** select redis_sadd('ro222','1121235'); | |
** select redis_srem('ro222','1121235'); | |
** | |
** Thanks to Salvatore Sanfilippo <antirez at gmail dot com> and Pieter Noordhuis <pcnoordhuis at gmail dot com> for used hiredis C client | |
** | |
** | |
*/ | |
#include <pthread.h> | |
#include <strings.h> | |
#include <sys/time.h> | |
#include <assert.h> | |
#include <unistd.h> | |
#include <signal.h> | |
#include <errno.h> | |
#include <stdlib.h> | |
#include <stdio.h> | |
#include <string.h> | |
//#include <my_global.h> | |
#include <my_sys.h> | |
#include <mysql.h> | |
#include "fmacros.h" | |
#include "hiredis.h" | |
/* Redis context structures and types */ | |
enum connection_type { | |
CONN_TCP, | |
CONN_UNIX | |
}; | |
struct config { | |
enum connection_type type; | |
struct { | |
char host[256]; | |
int port; | |
} tcp; | |
struct { | |
const char *path; | |
} unix1; | |
char password[256]; | |
int auth; | |
char log_file[256]; | |
int bdebug; | |
}; | |
static struct config cfg = { | |
.tcp = { | |
.host = "192.168.60.10", | |
.port = 6380 | |
}, | |
.unix1 = { | |
.path = "/tmp/redis.sock" | |
}, | |
.password = "go2oovoo", | |
.auth = 0, | |
.log_file = "/tmp/redis_udf.log", | |
.bdebug = 1, | |
.type = CONN_TCP | |
}; | |
/* Redis Connection pool creation mutex */ | |
pthread_mutex_t sredisContext_mutex = PTHREAD_MUTEX_INITIALIZER; | |
/* Redis Connection pool object */ | |
static redisContext *sredisContext = NULL; | |
/* end of Redis context structures and types */ | |
/* These must be right or mysqld will not find the symbol! */ | |
/* Redis Functions prototypes */ | |
my_bool redis_servers_set_init(UDF_INIT *initid, UDF_ARGS *args, char *message); | |
void redis_servers_set_deinit(UDF_INIT *initid); | |
long long redis_servers_set(UDF_INIT *initid, UDF_ARGS *args, char *is_null, char *error); | |
/* SET Operation */ | |
my_bool redis_set_init(UDF_INIT *initid, UDF_ARGS *args, char *message); | |
void redis_set_deinit(UDF_INIT *initid); | |
long long redis_set(UDF_INIT *initid, UDF_ARGS *args, char *is_null, char *error); | |
/* SADD */ | |
my_bool redis_sadd_init(UDF_INIT *initid, UDF_ARGS *args, char *message); | |
void redis_sadd_deinit(UDF_INIT *initid); | |
long long redis_sadd(UDF_INIT *initid, UDF_ARGS *args, char *is_null, char *error); | |
/* SREM */ | |
my_bool redis_srem_init(UDF_INIT *initid, UDF_ARGS *args, char *message); | |
void redis_sadd_deinit(UDF_INIT *initid); | |
long long redis_srem(UDF_INIT *initid, UDF_ARGS *args, char *is_null, char *error); | |
/* Helpers */ | |
my_bool _init_redis_command(UDF_INIT *initid, UDF_ARGS *args, char *message); | |
long long _do_redis_command(char* cmd, UDF_INIT *initid, UDF_ARGS *args, char *is_null, char *error); | |
redisContext *_myredisConnect(struct config config); | |
void _myredisDconnect(redisContext *c); | |
redisContext *_redis_context_init(); | |
void _redis_context_deinit(); | |
/* SADD */ | |
my_bool redis_sadd_init(UDF_INIT *initid, UDF_ARGS *args, char *message) | |
{ | |
return _init_redis_command(initid, args, message); | |
} | |
void redis_sadd_deinit(UDF_INIT *initid) | |
{ | |
} | |
/*************************************************************************** | |
** UDF long long function. | |
** Arguments: | |
** initid Return value from xxxx_init | |
** args The same structure as to xxx_init. This structure | |
** contains values for all parameters. | |
** Note that the functions MUST check and convert all | |
** to the type it wants! Null values are represented by | |
** a NULL pointer | |
** is_null If the result is null, one should store 1 here. | |
** error If something goes fatally wrong one should store 1 here. | |
** | |
** This function should return the result as a long long | |
***************************************************************************/ | |
long long redis_sadd(UDF_INIT *initid, UDF_ARGS *args, char *is_null, char *error) | |
{ | |
return _do_redis_command("SADD", initid, args, is_null, error); | |
} | |
/* SREM */ | |
my_bool redis_srem_init(UDF_INIT *initid, UDF_ARGS *args, char *message) | |
{ | |
return _init_redis_command(initid, args, message); | |
} | |
void redis_srem_deinit(UDF_INIT *initid) | |
{ | |
} | |
long long redis_srem(UDF_INIT *initid, UDF_ARGS *args, char *is_null, char *error) | |
{ | |
return _do_redis_command("SREM", initid, args, is_null, error); | |
} | |
my_bool redis_set_init(UDF_INIT *initid, UDF_ARGS *args, char *message) | |
{ | |
return _init_redis_command(initid, args, message); | |
} | |
void redis_set_deinit(UDF_INIT *initid) | |
{ | |
} | |
long long redis_set(UDF_INIT *initid, UDF_ARGS *args, char *is_null, char *error) | |
{ | |
return _do_redis_command("SET", initid, args, is_null, error); | |
} | |
my_bool redis_lpush_init(UDF_INIT *initid, UDF_ARGS *args, char *message) | |
{ | |
return _init_redis_command(initid, args, message); | |
} | |
void redis_lpush_deinit(UDF_INIT *initid) | |
{ | |
} | |
long long redis_lpush(UDF_INIT *initid, UDF_ARGS *args, char *is_null, char *error) | |
{ | |
return _do_redis_command("lpush", initid, args, is_null, error); | |
} | |
/************************************************************************* | |
** Example of init function | |
** Arguments: | |
** initid Points to a structure that the init function should fill. | |
** This argument is given to all other functions. | |
** my_bool maybe_null 1 if function can return NULL | |
** Default value is 1 if any of the arguments | |
** is declared maybe_null. | |
** unsigned int decimals Number of decimals. | |
** Default value is max decimals in any of the | |
** arguments. | |
** unsigned int max_length Length of string result. | |
** The default value for integer functions is 21 | |
** The default value for real functions is 13+ | |
** default number of decimals. | |
** The default value for string functions is | |
** the longest string argument. | |
** char *ptr; A pointer that the function can use. | |
** | |
** args Points to a structure which contains: | |
** unsigned int arg_count Number of arguments | |
** enum Item_result *arg_type Types for each argument. | |
** Types are STRING_RESULT, REAL_RESULT | |
** and INT_RESULT. | |
** char **args Pointer to constant arguments. | |
** Contains 0 for not constant argument. | |
** unsigned long *lengths; max string length for each argument | |
** char *maybe_null Information of which arguments | |
** may be NULL | |
** | |
** message Error message that should be passed to the user on fail. | |
** The message buffer is MYSQL_ERRMSG_SIZE big, but one should | |
** try to keep the error message less than 80 bytes long! | |
** | |
** This function should return 1 if something goes wrong. In this case | |
** message should contain something usefull! | |
**************************************************************************/ | |
/**************************************************************************** | |
** Deinit function. This should free all resources allocated by | |
** this function. | |
** Arguments: | |
** initid Return value from xxxx_init | |
****************************************************************************/ | |
/*************************************************************************** | |
** UDF long long function. | |
** Arguments: | |
** initid Return value from xxxx_init | |
** args The same structure as to xxx_init. This structure | |
** contains values for all parameters. | |
** Note that the functions MUST check and convert all | |
** to the type it wants! Null values are represented by | |
** a NULL pointer | |
** is_null If the result is null, one should store 1 here. | |
** error If something goes fatally wrong one should store 1 here. | |
** | |
** This function should return the result as a long long | |
***************************************************************************/ | |
my_bool redis_servers_set_init(UDF_INIT *initid, UDF_ARGS *args, char *message) | |
{ | |
redisContext *c = NULL; | |
if (args->arg_count < 2 || args->arg_type[0] != STRING_RESULT || args->arg_type[1] != INT_RESULT) | |
{ | |
strncpy(message,"Wrong arguments to Redis function. Usage: 'tcp.host' (string) 'tcp.port' (string)", MYSQL_ERRMSG_SIZE); | |
return -1; | |
} | |
if (args->arg_count == 3 && args->arg_type[2] != STRING_RESULT) | |
{ | |
strncpy(message,"Wrong arguments to Redis function1. Usage: 'password' (string)", MYSQL_ERRMSG_SIZE); | |
return -2; | |
} | |
strncpy(cfg.tcp.host, (char*)args->args[0], 256); | |
cfg.tcp.port = *((longlong*)args->args[1]); | |
if (args->arg_count == 3) | |
{ | |
cfg.auth = 1; | |
strncpy(cfg.password, (char*)args->args[2], 256); | |
} | |
_redis_context_deinit(); | |
c = (redisContext*)_redis_context_init();; | |
if (!c) | |
{ | |
strncpy(message, "Failed to connect to Redis", MYSQL_ERRMSG_SIZE); | |
return 2; | |
} | |
return 0; | |
} | |
void redis_servers_set_deinit(UDF_INIT *initid) | |
{ | |
} | |
long long redis_servers_set(UDF_INIT *initid, UDF_ARGS *args, char *is_null, char *error) | |
{ | |
return 0; | |
// return _do_redis_command("SET", initid, args, is_null, error); | |
} | |
/* Internal Helper Functions */ | |
redisContext *_redis_context_init() | |
{ | |
pthread_mutex_lock(&sredisContext_mutex); | |
if (!sredisContext) | |
sredisContext = _myredisConnect(cfg); | |
if (!sredisContext) | |
{ | |
pthread_mutex_unlock(&sredisContext_mutex); | |
return NULL; | |
} | |
pthread_mutex_unlock(&sredisContext_mutex); | |
return sredisContext; | |
} | |
/* Internal Helper Functions */ | |
redisContext *_redis_context_reinit() | |
{ | |
pthread_mutex_lock(&sredisContext_mutex); | |
if (!sredisContext) | |
{ | |
sredisContext = _myredisConnect(cfg); | |
} | |
else | |
{ | |
redisFree(sredisContext); | |
sredisContext = NULL; | |
sredisContext = _myredisConnect(cfg); | |
} | |
if (!sredisContext) | |
{ | |
pthread_mutex_unlock(&sredisContext_mutex); | |
return NULL; | |
} | |
pthread_mutex_unlock(&sredisContext_mutex); | |
return sredisContext; | |
} | |
void _redis_context_deinit() | |
{ | |
pthread_mutex_lock(&sredisContext_mutex); | |
if (!sredisContext) | |
_myredisDconnect(sredisContext); | |
sredisContext = NULL; | |
pthread_mutex_unlock(&sredisContext_mutex); | |
return; | |
} | |
my_bool _init_redis_command(UDF_INIT *initid, UDF_ARGS *args, char *message) | |
{ | |
redisContext *c = NULL; | |
if (args->arg_count != 2 || args->arg_type[0] != STRING_RESULT || args->arg_type[1] != STRING_RESULT) | |
{ | |
strncpy(message,"Wrong arguments to Redis function. Usage: 'key' (string) 'value' (string)", MYSQL_ERRMSG_SIZE); | |
return 1; | |
} | |
c = (redisContext*)_redis_context_init();; | |
if (!c) | |
{ | |
strncpy(message, "Failed to connect to Redis", MYSQL_ERRMSG_SIZE); | |
return 2; | |
} | |
return 0; | |
} | |
long long _do_redis_command(char* cmd, UDF_INIT *initid, UDF_ARGS *args, char *is_null, char *error) | |
{ | |
redisContext *c = NULL; | |
redisReply *reply; | |
FILE * pFile; | |
*is_null = 0; | |
*error = 0; | |
c = (redisContext*)_redis_context_init(); | |
if (!c) | |
{ | |
*error = 1; | |
return -1; | |
} | |
if (cfg.bdebug) | |
{ | |
pFile = fopen(cfg.log_file,"a"); | |
if (!pFile) | |
return -2; | |
fprintf(pFile,"%s %s %s\n",cmd ,args->args[0] ,args->args[1]); | |
fclose(pFile); | |
} | |
// reply = redisCommand(c,"%s %s %s",cmd , args->args[0] ,args->args[1]); | |
char * commandArgs[3]; | |
commandArgs[0] = cmd; | |
commandArgs[1] = args->args[0]; | |
commandArgs[2] = args->args[1]; | |
reply = redisCommandArgv(c,3,commandArgs,NULL); | |
if(!reply) | |
{ | |
c = (redisContext*)_redis_context_reinit(); | |
if (!c) | |
{ | |
*error = 1; | |
if (cfg.bdebug) | |
{ | |
pFile = fopen(cfg.log_file,"a"); | |
if (!pFile) | |
return -2; | |
fprintf(pFile,"Cannot reconnect to redis\n "); | |
fclose(pFile); | |
} | |
return -1; | |
} | |
reply = redisCommandArgv(c,3,commandArgs,NULL); | |
if (!reply) | |
{ | |
*error = 1; | |
if (cfg.bdebug) | |
{ | |
pFile = fopen(cfg.log_file,"a"); | |
if (!pFile) | |
return -2; | |
fprintf(pFile,"execute redisCommandArgv failed\n "); | |
fclose(pFile); | |
} | |
return -1; | |
} | |
} | |
// test_cond(reply->type == REDIS_REPLY_INTEGER && reply->integer == 1) | |
freeReplyObject(reply); | |
return 0; | |
} | |
static long long usec(void) { | |
struct timeval tv; | |
gettimeofday(&tv,NULL); | |
return (((long long)tv.tv_sec)*1000000)+tv.tv_usec; | |
} | |
static redisContext *select_database(redisContext *c) { | |
redisReply *reply; | |
/* Switch to DB 9 for testing, now that we know we can chat. */ | |
reply = redisCommand(c,"SELECT 9"); | |
assert(reply != NULL); | |
freeReplyObject(reply); | |
/* Make sure the DB is emtpy */ | |
reply = redisCommand(c,"DBSIZE"); | |
assert(reply != NULL); | |
if (reply->type == REDIS_REPLY_INTEGER && reply->integer == 0) { | |
/* Awesome, DB 9 is empty and we can continue. */ | |
freeReplyObject(reply); | |
} else { | |
printf("Database #9 is not empty: type: %d, %d, test can not continue\n", reply->type, reply->integer); | |
exit(1); | |
} | |
return c; | |
} | |
void _myredisDconnect(redisContext *c) | |
{ | |
/* Free the context */ | |
redisFree(c); | |
} | |
redisContext *_myredisConnect(struct config config) | |
{ | |
redisContext *c = NULL; | |
redisReply *reply; | |
char cmd[256]; | |
int len; | |
if (config.type == CONN_TCP) { | |
c = redisConnect(config.tcp.host, config.tcp.port); | |
} else if (config.type == CONN_UNIX) { | |
c = redisConnectUnix(config.unix1.path); | |
} else { | |
assert(NULL); | |
} | |
if (c->err) { | |
printf("Connection error: %s\n", c->errstr); | |
return c; | |
} | |
/* Authenticate */ | |
if (config.auth) | |
{ | |
reply = redisCommand(c,"AUTH %s",config.password); | |
freeReplyObject(reply); | |
} | |
return c; | |
// return select_database(c); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment