Skip to content

Instantly share code, notes, and snippets.

@methodmissing
Created September 15, 2008 18:15
Show Gist options
  • Save methodmissing/10901 to your computer and use it in GitHub Desktop.
Save methodmissing/10901 to your computer and use it in GitHub Desktop.
diff --git a/ext/mysql.c b/ext/mysql.c
index b804723..3f1a908 100644
--- a/ext/mysql.c
+++ b/ext/mysql.c
@@ -61,6 +61,7 @@ struct mysql {
char connection;
char query_with_result;
char blocking;
+ int async_in_progress;
};
struct mysql_res {
@@ -231,6 +232,17 @@ static VALUE init(VALUE klass)
return obj;
}
+static void optimize_for_async( VALUE obj )
+{
+ struct mysql* myp = GetMysqlStruct(obj);
+ my_bool was_blocking;
+ vio_blocking(myp->handler.net.vio, 0, &was_blocking);
+ myp->blocking = vio_is_blocking( myp->handler.net.vio );
+
+ vio_fastsend( myp->handler.net.vio );
+ myp->async_in_progress = 0;
+}
+
/* real_connect(host=nil, user=nil, passwd=nil, db=nil, port=nil, sock=nil, flag=nil) */
static VALUE real_connect(int argc, VALUE* argv, VALUE klass)
{
@@ -270,12 +282,7 @@ static VALUE real_connect(int argc, VALUE* argv, VALUE klass)
myp->handler.reconnect = 0;
myp->connection = Qtrue;
- my_bool was_blocking;
-
- vio_blocking(myp->handler.net.vio, 0, &was_blocking);
- myp->blocking = vio_is_blocking( myp->handler.net.vio );
-
- vio_fastsend( myp->handler.net.vio );
+ optimize_for_async(obj);
myp->query_with_result = Qtrue;
rb_obj_call_init(obj, argc, argv);
@@ -341,7 +348,8 @@ static VALUE real_connect2(int argc, VALUE* argv, VALUE obj)
mysql_raise(m);
m->reconnect = 0;
GetMysqlStruct(obj)->connection = Qtrue;
-
+
+ optimize_for_async(obj);
return obj;
}
@@ -764,6 +772,7 @@ static VALUE socket(VALUE obj)
MYSQL* m = GetHandler(obj);
return INT2NUM(m->net.fd);
}
+
/* socket_type */
static VALUE socket_type(VALUE obj)
{
@@ -793,18 +802,71 @@ static VALUE readable( int argc, VALUE* argv, VALUE obj )
return ( vio_poll_read( m->net.vio, INT2NUM(timeout) ) == 0 ? Qtrue : Qfalse );
}
+static void validate_async_query( VALUE obj )
+{
+ MYSQL* m = GetHandler(obj);
+/* Aman: Tests passing with reconnect = true as a reconnect would also change the thread_id
+ if( m->reconnect == 1 ){
+ rb_raise(eMysql, "Do not use the mysql reconnect feature with the async interface!");
+ }
+*/
+ if ( mysql_thread_id( m ) != GetMysqlStruct(obj)->async_in_progress ){
+ GetMysqlStruct(obj)->async_in_progress = 0;
+ }
+
+ if( GetMysqlStruct(obj)->async_in_progress != 0 ){
+ GetMysqlStruct(obj)->async_in_progress = 0;
+ rb_raise(eMysql, "Query out of sequence: Each call to Mysql#send_query requires a successive Mysql#get_result.");
+ }
+}
+
+/*Aman: We may not need this anymore ... */
+static VALUE connection_dropped( VALUE obj )
+{
+ MYSQL* m = GetHandler(obj);
+ if( (mysql_errno(m) == CR_SERVER_LOST ) || ( mysql_errno(m) == CR_SERVER_GONE_ERROR ) || ( mysql_errno(m) == ER_SERVER_SHUTDOWN) ){
+ return Qtrue;
+ }else{
+ return Qfalse;
+ }
+}
+
+static void async_reconnect( VALUE obj )
+{
+ connect(obj);
+ /*GetMysqlStruct(obj)->async_in_progress = 0;*/
+}
+
+static VALUE simulate_disconnect( VALUE obj )
+{
+ MYSQL* m = GetHandler(obj);
+ mysql_library_end();
+ /*mysql_close(m);*/
+ return Qnil;
+}
+
/* send_query(sql) */
static VALUE send_query(VALUE obj, VALUE sql)
{
MYSQL* m = GetHandler(obj);
-
+
Check_Type(sql, T_STRING);
- if (GetMysqlStruct(obj)->connection == Qfalse) {
- rb_raise(eMysql, "query: not connected");
+
+ /*Aman: the connection member is specific to this extension and won't be changed on an auto reconnect, thus we need to factor in async_in_progress as well*/
+ if (GetMysqlStruct(obj)->connection == Qfalse && GetMysqlStruct(obj)->async_in_progress != 0 ) {
+ rb_raise(eMysql, "query: not connected");
}
- if (mysql_send_query(m, RSTRING_PTR(sql), RSTRING_LEN(sql)) != 0)
- mysql_raise(m);
- return Qnil;
+
+ validate_async_query(obj);
+
+ if (mysql_send_query(m, RSTRING_PTR(sql), RSTRING_LEN(sql)) != 0){
+ mysql_raise(m);
+ /* Aman: Not required when reconnect is true ... but may be useful to have this as a fallback when reconnect == false
+ ( connection_dropped(obj) == Qtrue ) ? async_reconnect(obj) : mysql_raise(m);
+ */
+ }
+ GetMysqlStruct(obj)->async_in_progress = mysql_thread_id(m);
+ return Qnil;
}
/* get_result */
@@ -814,12 +876,13 @@ static VALUE get_result(VALUE obj)
if (GetMysqlStruct(obj)->connection == Qfalse) {
rb_raise(eMysql, "query: not connected");
}
- if (mysql_read_query_result(m) != 0)
- mysql_raise(m);
+ if (mysql_read_query_result(m) != 0)
+ mysql_raise(m);
if (GetMysqlStruct(obj)->query_with_result == Qfalse)
return obj;
if (mysql_field_count(m) == 0)
- return Qnil;
+ return Qnil;
+ GetMysqlStruct(obj)->async_in_progress = 0;
return store_result(obj);
}
@@ -2145,6 +2208,7 @@ void Init_mysql(void)
rb_define_method(cMysql, "real_query", query, 1);
rb_define_method(cMysql, "c_async_query", async_query, -1);
rb_define_method(cMysql, "send_query", send_query, 1);
+ rb_define_method(cMysql, "simulate_disconnect", simulate_disconnect, 0);
rb_define_method(cMysql, "get_result", get_result, 0);
rb_define_method(cMysql, "readable?", readable, -1);
rb_define_method(cMysql, "blocking?", blocking, 0);
require File.dirname(__FILE__) + '/test_helper'
m = Mysql.real_connect('localhost','root')
m.reconnect = true
class << m
def safe_query( query )
begin
send_query( query )
rescue => e
puts e.message
end
end
end
m.safe_query( 'select sleep(1)' )
m.safe_query( 'select sleep(1)' )#raises
m.simulate_disconnect #fires mysql_library_end
m.safe_query( 'select sleep(1)' )
m.safe_query( 'select sleep(1)' )#raises
m.close
m.connect('localhost','root')
m.safe_query( 'select sleep(1)' )
m.safe_query( 'select sleep(1)' )#raises
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment