|
/* |
|
This is free and unencumbered software released into the public domain. |
|
|
|
Anyone is free to copy, modify, publish, use, compile, sell, or |
|
distribute this software, either in source code form or as a compiled |
|
binary, for any purpose, commercial or non-commercial, and by any |
|
means. |
|
|
|
In jurisdictions that recognize copyright laws, the author or authors |
|
of this software dedicate any and all copyright interest in the |
|
software to the public domain. We make this dedication for the benefit |
|
of the public at large and to the detriment of our heirs and |
|
successors. We intend this dedication to be an overt act of |
|
relinquishment in perpetuity of all present and future rights to this |
|
software under copyright law. |
|
|
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, |
|
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF |
|
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. |
|
IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR |
|
OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, |
|
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR |
|
OTHER DEALINGS IN THE SOFTWARE. |
|
|
|
For more information, please refer to <http://unlicense.org> |
|
*/ |
|
|
|
#include <freeradius-devel/radiusd.h> |
|
#include <freeradius-devel/rad_assert.h> |
|
|
|
#include <sys/stat.h> |
|
|
|
#include <cassandra.h> |
|
|
|
#include "rlm_sql.h" |
|
|
|
#define RLM_SQL_CASS_LOG_BUFFER 128 |
|
|
|
typedef struct rlm_sql_cassandra_conn { |
|
CassSession *sess; |
|
CassCluster *cluster; |
|
const CassResult *res; |
|
CassIterator *resIter; |
|
char **resBuffer; |
|
} rlm_sql_cassandra_conn_t; |
|
|
|
typedef struct rlm_sql_cassandra_config { |
|
char const *consistency_str; |
|
CassConsistency consistency; |
|
sql_log_entry_t logBuffer[RLM_SQL_CASS_LOG_BUFFER]; |
|
unsigned int logRingPosition; |
|
} rlm_sql_cassandra_config_t; |
|
|
|
static const CONF_PARSER driver_config[] = { |
|
{ "consistency", FR_CONF_OFFSET(PW_TYPE_STRING, rlm_sql_cassandra_config_t, consistency_str), "quorum" }, |
|
{NULL, -1, 0, NULL, NULL} |
|
}; |
|
|
|
static const FR_NAME_NUMBER consistency_levels[] = { |
|
{ "any", CASS_CONSISTENCY_ANY }, |
|
{ "one", CASS_CONSISTENCY_ONE }, |
|
{ "two", CASS_CONSISTENCY_TWO }, |
|
{ "three", CASS_CONSISTENCY_THREE }, |
|
{ "quorum", CASS_CONSISTENCY_QUORUM }, |
|
{ "all", CASS_CONSISTENCY_ALL }, |
|
{ "each_quorum", CASS_CONSISTENCY_EACH_QUORUM }, |
|
{ "local_quorum", CASS_CONSISTENCY_LOCAL_QUORUM }, |
|
{ "local_one", CASS_CONSISTENCY_LOCAL_ONE }, |
|
{ NULL, 0 } |
|
}; |
|
|
|
void casslog(const CassLogMessage* message, void* data) { |
|
rlm_sql_cassandra_config_t *conf = data; |
|
int logPosition = __sync_fetch_and_add(&conf->logRingPosition, 1) % RLM_SQL_CASS_LOG_BUFFER; |
|
switch(message->severity) { |
|
case CASS_LOG_CRITICAL: |
|
case CASS_LOG_ERROR: |
|
conf->logBuffer[logPosition].type = L_ERR; |
|
break; |
|
case CASS_LOG_WARN: |
|
conf->logBuffer[logPosition].type = L_WARN; |
|
break; |
|
case CASS_LOG_INFO: |
|
case CASS_LOG_DISABLED: |
|
case CASS_LOG_LAST_ENTRY: |
|
conf->logBuffer[logPosition].type = L_INFO; |
|
break; |
|
case CASS_LOG_DEBUG: |
|
case CASS_LOG_TRACE: |
|
conf->logBuffer[logPosition].type = L_DBG; |
|
break; |
|
} |
|
if(conf->logBuffer[logPosition].msg != NULL) talloc_free(conf->logBuffer[logPosition].msg); |
|
MEM(conf->logBuffer[logPosition].msg = talloc_asprintf(conf, "rlm_sql_cassandra: [drv][%" PRId64 "][%d] In file %s, function %s, on line %d: %s", (int64_t)message->time_ms, (int)message->severity, message->file, message->function, message->line, message->message)); |
|
} |
|
|
|
static int _mod_destructor(rlm_sql_cassandra_config_t *conf) |
|
{ |
|
DEBUG2("rlm_sql_cassandra: Module shut down"); |
|
cass_log_cleanup(); |
|
return 0; |
|
} |
|
|
|
static int mod_instantiate(CONF_SECTION *conf, rlm_sql_config_t *config) |
|
{ |
|
static bool version_done = false; |
|
|
|
rlm_sql_cassandra_config_t *driver; |
|
int consistency; |
|
|
|
if (!version_done) { |
|
version_done = true; |
|
|
|
INFO("rlm_sql_cassandra: compiled against Cassandra cpp-driver %d.%d.%d%s", CASS_VERSION_MAJOR, CASS_VERSION_MINOR, CASS_VERSION_PATCH, CASS_VERSION_SUFFIX); |
|
} |
|
|
|
MEM(driver = config->driver = talloc_zero(config, rlm_sql_cassandra_config_t)); |
|
talloc_set_destructor(driver, _mod_destructor); |
|
|
|
if (cf_section_parse(conf, driver, driver_config) < 0) { |
|
return -1; |
|
} |
|
|
|
consistency = fr_str2int(consistency_levels, driver->consistency_str, -1); |
|
if (consistency < 0) { |
|
ERROR("rlm_sql_cassandra: Invalid consistency level %s", driver->consistency_str); |
|
return -1; |
|
} |
|
driver->consistency = (CassConsistency)consistency; |
|
driver->logRingPosition = 0; |
|
for(int i = 0; i < RLM_SQL_CASS_LOG_BUFFER; i++) { |
|
driver->logBuffer[i].msg = NULL; |
|
} |
|
cass_log_set_level(CASS_LOG_INFO); |
|
cass_log_set_callback(casslog, driver); |
|
|
|
return 0; |
|
} |
|
|
|
static int _sql_socket_destructor(rlm_sql_cassandra_conn_t *conn) |
|
{ |
|
DEBUG2("rlm_sql_cassandra: Socket destructor called, closing socket"); |
|
|
|
if (conn->resIter) { |
|
cass_iterator_free(conn->resIter); |
|
} |
|
if (conn->res) { |
|
cass_result_free(conn->res); |
|
} |
|
if (conn->sess) { |
|
cass_session_free(conn->sess); |
|
} |
|
if(conn->cluster) { |
|
cass_cluster_free(conn->cluster); |
|
} |
|
|
|
return 0; |
|
} |
|
|
|
static sql_rcode_t sql_socket_init(rlm_sql_handle_t *handle, rlm_sql_config_t *config) |
|
{ |
|
rlm_sql_cassandra_conn_t *conn; |
|
CassCluster *cluster; |
|
CassSession *sess; |
|
CassFuture *future; |
|
CassError error; |
|
|
|
MEM(conn = handle->conn = talloc_zero(handle, rlm_sql_cassandra_conn_t)); |
|
talloc_set_destructor(conn, _sql_socket_destructor); |
|
|
|
DEBUG4("rlm_sql_cassandra: Configuring CassCluster"); |
|
cluster = conn->cluster = cass_cluster_new(); |
|
cass_cluster_set_contact_points(cluster, config->sql_server); |
|
cass_cluster_set_port(cluster, atoi(config->sql_port)); |
|
cass_cluster_set_connect_timeout(cluster, config->connect_timeout_ms); |
|
cass_cluster_set_request_timeout(cluster, config->query_timeout); |
|
cass_cluster_set_credentials(cluster, config->sql_login, config->sql_password); |
|
|
|
DEBUG4("rlm_sql_cassandra: Connecting to Cassandra Cluster"); |
|
sess = conn->sess = cass_session_new(); |
|
future = cass_session_connect_keyspace(sess, cluster, config->sql_db); |
|
error = cass_future_error_code(future); |
|
if(error != CASS_OK) { |
|
const char *msg; size_t msgLen; |
|
cass_future_error_message(future, &msg, &msgLen); |
|
ERROR("rlm_sql_cassandra: Unable to connect: [%x] %s", (int)error, msg); |
|
cass_future_free(future); |
|
return RLM_SQL_ERROR; |
|
} |
|
cass_future_free(future); |
|
|
|
DEBUG4("rlm_sql_cassandra: Connected to Cassandra Cluster"); |
|
|
|
return RLM_SQL_OK; |
|
} |
|
|
|
static sql_rcode_t sql_query(rlm_sql_handle_t *handle, rlm_sql_config_t *config, char const *query) |
|
{ |
|
rlm_sql_cassandra_conn_t *conn = handle->conn; |
|
rlm_sql_cassandra_config_t *conf = config->driver; |
|
CassStatement *stmt; |
|
CassFuture *future; |
|
CassError error; |
|
|
|
if(query[0] == ';') return RLM_SQL_OK; |
|
|
|
stmt = cass_statement_new(query, 0); |
|
cass_statement_set_consistency(stmt, conf->consistency); |
|
|
|
future = cass_session_execute(conn->sess, stmt); |
|
cass_statement_free(stmt); |
|
error = cass_future_error_code(future); |
|
if(error != CASS_OK) { |
|
const char *msg; size_t msgLen; |
|
cass_future_error_message(future, &msg, &msgLen); |
|
WARN("rlm_sql_cassandra: query failed: [%x] %s", (int)error, msg); |
|
DEBUG("rlm_sql_cassandra: the query was: %s", query); |
|
cass_future_free(future); |
|
switch(error) { |
|
case CASS_ERROR_SERVER_SYNTAX_ERROR: |
|
case CASS_ERROR_SERVER_INVALID_QUERY: |
|
return RLM_SQL_QUERY_INVALID; |
|
default: |
|
return RLM_SQL_ERROR; |
|
} |
|
} |
|
conn->res = cass_future_get_result(future); |
|
cass_future_free(future); |
|
if(!conn->res) DEBUG4("rlm_sql_cassandra: query doesn't return a result."); |
|
|
|
return RLM_SQL_OK; |
|
} |
|
|
|
static sql_rcode_t sql_store_result(rlm_sql_handle_t * handle, UNUSED rlm_sql_config_t *config) |
|
{ |
|
return RLM_SQL_OK; |
|
} |
|
|
|
static int sql_num_fields(rlm_sql_handle_t *handle, UNUSED rlm_sql_config_t *config) |
|
{ |
|
rlm_sql_cassandra_conn_t *conn = handle->conn; |
|
if(conn->res) { |
|
return cass_result_column_count(conn->res); |
|
} |
|
WARN("rlm_sql_cassandra: sql_num_fields called without an outstanding query result."); |
|
return 0; |
|
} |
|
|
|
static int sql_num_rows(rlm_sql_handle_t *handle, UNUSED rlm_sql_config_t *config) |
|
{ |
|
rlm_sql_cassandra_conn_t *conn = handle->conn; |
|
if(conn->res) { |
|
return cass_result_row_count(conn->res); |
|
} |
|
WARN("rlm_sql_cassandra: sql_num_rows called without an outstanding query result."); |
|
return 0; |
|
} |
|
|
|
#define RLM_CASS_ERR_DATA_RETRIVE(type) {\ |
|
const char *colName; size_t colNameLen;\ |
|
if(cass_result_column_name(conn->res, i, &colName, &colNameLen) != CASS_OK) colName = "error";\ |
|
WARN("rlm_sql_cassandra: failed to retrive " type " data at column (%d)%s", i, colName);\ |
|
conn->resBuffer[i] = "";\ |
|
break;} |
|
static sql_rcode_t sql_fetch_row(rlm_sql_handle_t *handle, rlm_sql_config_t *config) |
|
{ |
|
rlm_sql_cassandra_conn_t *conn = handle->conn; |
|
handle->row = NULL; |
|
if(conn->res && !conn->resIter) { |
|
conn->resIter = cass_iterator_from_result(conn->res); |
|
} |
|
if(conn->res && conn->resIter) { |
|
if(cass_iterator_next(conn->resIter)) { |
|
if(conn->resBuffer != NULL) talloc_free(conn->resBuffer); |
|
const CassRow* row = cass_iterator_get_row(conn->resIter); |
|
int fields = sql_num_fields(handle, config); |
|
MEM(handle->row = conn->resBuffer = talloc_array(handle, char *, fields)); |
|
for(int i = 0; i < fields; i++) { |
|
const CassValue *value = cass_row_get_column(row, i); |
|
conn->resBuffer[i] = NULL; |
|
if(cass_value_is_null(value) == cass_true) { |
|
continue; |
|
} |
|
CassValueType type = cass_value_type(value); |
|
switch(type) { |
|
case CASS_VALUE_TYPE_ASCII: |
|
case CASS_VALUE_TYPE_TEXT: |
|
case CASS_VALUE_TYPE_VARCHAR: { |
|
const char *str; size_t strLen; |
|
if(cass_value_get_string(value, &str, &strLen) != CASS_OK) { |
|
RLM_CASS_ERR_DATA_RETRIVE("string"); |
|
} |
|
MEM(conn->resBuffer[i] = talloc_array(conn->resBuffer, char, strLen+1)); |
|
memcpy(conn->resBuffer[i], str, strLen); |
|
conn->resBuffer[i][strLen] = '\0'; |
|
break; |
|
} |
|
case CASS_VALUE_TYPE_BOOLEAN: { |
|
cass_bool_t bv; |
|
if(cass_value_get_bool(value, &bv) != CASS_OK) { |
|
RLM_CASS_ERR_DATA_RETRIVE("bool"); |
|
} |
|
MEM(conn->resBuffer[i] = talloc_zero_array(conn->resBuffer, char, 2)); |
|
if(bv == cass_false) { |
|
conn->resBuffer[i][0] = '0'; |
|
} else { |
|
conn->resBuffer[i][0] = '1'; |
|
} |
|
break; |
|
} |
|
case CASS_VALUE_TYPE_INT: { |
|
cass_int32_t i32v; |
|
if(cass_value_get_int32(value, &i32v) != CASS_OK) { |
|
RLM_CASS_ERR_DATA_RETRIVE("int32"); |
|
} |
|
MEM(conn->resBuffer[i] = talloc_asprintf(conn->resBuffer, "%"PRId32, (int32_t)i32v)); |
|
break; |
|
} |
|
case CASS_VALUE_TYPE_TIMESTAMP: |
|
case CASS_VALUE_TYPE_BIGINT: { |
|
cass_int64_t i64v; |
|
if(cass_value_get_int64(value, &i64v) != CASS_OK) { |
|
RLM_CASS_ERR_DATA_RETRIVE("int64"); |
|
} |
|
MEM(conn->resBuffer[i] = talloc_asprintf(conn->resBuffer, "%"PRId64, (int64_t)i64v)); |
|
break; |
|
} |
|
case CASS_VALUE_TYPE_UUID: |
|
case CASS_VALUE_TYPE_TIMEUUID: { |
|
CassUuid uuid; |
|
if(cass_value_get_uuid(value, &uuid) != CASS_OK) { |
|
RLM_CASS_ERR_DATA_RETRIVE("UUID"); |
|
} |
|
MEM(conn->resBuffer[i] = talloc_array(conn->resBuffer, char, CASS_UUID_STRING_LENGTH)); |
|
cass_uuid_string(uuid, conn->resBuffer[i]); |
|
break; |
|
} |
|
default: { |
|
const char *colName; size_t colNameLen; |
|
if(cass_result_column_name(conn->res, i, &colName, &colNameLen) != CASS_OK) colName = "error"; |
|
WARN("rlm_sql_cassandra: column (%d)%s: Unsupported type %d", i, colName, (int)type); |
|
break; |
|
} |
|
} |
|
DEBUG4("rlm_sql_cassandra: row[%d] => %s", i, conn->resBuffer[i] ? conn->resBuffer[i] : "(null)"); |
|
} |
|
} else DEBUG4("rlm_sql_cassandra: no more results."); |
|
} else WARN("rlm_sql_cassandra: sql_fetch_row called without an outstanding query result."); |
|
return RLM_SQL_OK; |
|
} |
|
|
|
static sql_rcode_t sql_free_result(rlm_sql_handle_t * handle, UNUSED rlm_sql_config_t *config) |
|
{ |
|
rlm_sql_cassandra_conn_t *conn = handle->conn; |
|
|
|
if(conn->resBuffer != NULL) talloc_free(conn->resBuffer); |
|
if(conn->resIter != NULL) cass_iterator_free(conn->resIter); |
|
if(conn->res != NULL) cass_result_free(conn->res); |
|
|
|
conn->resBuffer = NULL; |
|
conn->resIter = NULL; |
|
conn->res = NULL; |
|
|
|
return RLM_SQL_OK; |
|
} |
|
|
|
static size_t sql_error(TALLOC_CTX *ctx, sql_log_entry_t out[], size_t outlen, UNUSED rlm_sql_handle_t *handle, rlm_sql_config_t *config) |
|
{ |
|
rlm_sql_cassandra_config_t *driver = config->driver; |
|
int fillSize = outlen > RLM_SQL_CASS_LOG_BUFFER ? RLM_SQL_CASS_LOG_BUFFER : outlen; |
|
int bufferFilled = driver->logRingPosition; |
|
fillSize = fillSize > bufferFilled ? bufferFilled : fillSize; |
|
|
|
for(int i = 0; i < fillSize; i++) { |
|
sql_log_entry_t entry = driver->logBuffer[(bufferFilled - fillSize + i) % RLM_SQL_CASS_LOG_BUFFER]; |
|
size_t msgLen = strlen(entry.msg)+1; |
|
out[i].type = entry.type; |
|
out[i].msg = talloc_array(ctx, char, msgLen); |
|
memcpy(out[i].msg, entry.msg, msgLen); |
|
} |
|
|
|
return fillSize; |
|
} |
|
|
|
static sql_rcode_t sql_finish_query(rlm_sql_handle_t *handle, rlm_sql_config_t *config) |
|
{ |
|
return sql_free_result(handle, config); |
|
} |
|
|
|
//Unsupported operation on Cassandra Cluster, fake one. |
|
static int sql_affected_rows(UNUSED rlm_sql_handle_t * handle, UNUSED rlm_sql_config_t *config) { return 1; } |
|
|
|
/* Exported to rlm_sql */ |
|
extern rlm_sql_module_t rlm_sql_cassandra; |
|
rlm_sql_module_t rlm_sql_cassandra = { |
|
.name = "rlm_sql_cassandra", |
|
.mod_instantiate = mod_instantiate, |
|
.sql_socket_init = sql_socket_init, |
|
.sql_query = sql_query, |
|
.sql_select_query = sql_query, |
|
.sql_store_result = sql_store_result, |
|
.sql_num_fields = sql_num_fields, |
|
.sql_num_rows = sql_num_rows, |
|
.sql_fetch_row = sql_fetch_row, |
|
.sql_free_result = sql_free_result, |
|
.sql_error = sql_error, |
|
.sql_finish_query = sql_finish_query, |
|
.sql_finish_select_query = sql_finish_query, |
|
.sql_affected_rows = sql_affected_rows |
|
}; |
No, there's not, as that'd require FreeRADIUS to support asynchronous request processing, else there's no benefit to using the async IO AP. It's the direction we'll be heading in, but it is a significant amount of work.
If we did add asynchronous request processing, the SQL module would be one of the first to be rewritten to use it, so there'd be no benefit to using perl anyway :)
linnaea's module has now been merged into the v3.1.x branch (with some changes), and seems to work fine (locally at least). So if you want to use cassandra with FreeRADIUS that's your best bet.