Created
          September 18, 2008 13:31 
        
      - 
      
- 
        Save methodmissing/11429 to your computer and use it in GitHub Desktop. 
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | diff --git a/ext/mysql.c b/ext/mysql.c | |
| index b804723..de9d435 100644 | |
| --- a/ext/mysql.c | |
| +++ b/ext/mysql.c | |
| @@ -61,6 +61,7 @@ struct mysql { | |
| char connection; | |
| char query_with_result; | |
| char blocking; | |
| + int async_in_progress; | |
| }; | |
| struct mysql_res { | |
| @@ -231,6 +232,33 @@ static VALUE init(VALUE klass) | |
| return obj; | |
| } | |
| +static void optimize_for_async( VALUE obj ) | |
| +{ | |
| + struct mysql* myp = GetMysqlStruct(obj); | |
| + my_bool was_blocking; | |
| + vio_blocking(myp->handler.net.vio, 0, &was_blocking); | |
| + myp->blocking = vio_is_blocking( myp->handler.net.vio ); | |
| + | |
| + vio_fastsend( myp->handler.net.vio ); | |
| + myp->async_in_progress = 0; | |
| +} | |
| + | |
| +static VALUE schedule_connect(VALUE obj ) | |
| +{ | |
| + MYSQL* m = GetHandler(obj); | |
| + fd_set read; | |
| + | |
| + struct timeval tv = { tv_sec: m->options.connect_timeout, tv_usec: 0 }; | |
| + | |
| + FD_ZERO(&read); | |
| + FD_SET(m->net.fd, &read); | |
| + | |
| + if (rb_thread_select(m->net.fd + 1, &read, NULL, NULL, &tv) < 0) { | |
| + rb_raise(eMysql, "connect: timeout"); | |
| + } | |
| + | |
| +} | |
| + | |
| /* real_connect(host=nil, user=nil, passwd=nil, db=nil, port=nil, sock=nil, flag=nil) */ | |
| static VALUE real_connect(int argc, VALUE* argv, VALUE klass) | |
| { | |
| @@ -266,19 +294,16 @@ static VALUE real_connect(int argc, VALUE* argv, VALUE klass) | |
| if (mysql_real_connect(&myp->handler, h, u, p, pp, s) == NULL) | |
| #endif | |
| mysql_raise(&myp->handler); | |
| - | |
| + | |
| myp->handler.reconnect = 0; | |
| myp->connection = Qtrue; | |
| - my_bool was_blocking; | |
| - | |
| - vio_blocking(myp->handler.net.vio, 0, &was_blocking); | |
| - myp->blocking = vio_is_blocking( myp->handler.net.vio ); | |
| - | |
| - vio_fastsend( myp->handler.net.vio ); | |
| + optimize_for_async(obj); | |
| myp->query_with_result = Qtrue; | |
| rb_obj_call_init(obj, argc, argv); | |
| + | |
| + schedule_connect(obj); | |
| return obj; | |
| } | |
| @@ -341,6 +366,9 @@ static VALUE real_connect2(int argc, VALUE* argv, VALUE obj) | |
| mysql_raise(m); | |
| m->reconnect = 0; | |
| GetMysqlStruct(obj)->connection = Qtrue; | |
| + | |
| + optimize_for_async(obj); | |
| + schedule_connect(obj); | |
| return obj; | |
| } | |
| @@ -764,6 +792,7 @@ static VALUE socket(VALUE obj) | |
| MYSQL* m = GetHandler(obj); | |
| return INT2NUM(m->net.fd); | |
| } | |
| + | |
| /* socket_type */ | |
| static VALUE socket_type(VALUE obj) | |
| { | |
| @@ -793,18 +822,71 @@ static VALUE readable( int argc, VALUE* argv, VALUE obj ) | |
| return ( vio_poll_read( m->net.vio, INT2NUM(timeout) ) == 0 ? Qtrue : Qfalse ); | |
| } | |
| +static void validate_async_query( VALUE obj ) | |
| +{ | |
| + MYSQL* m = GetHandler(obj); | |
| +/* Aman: Tests passing with reconnect = true as a reconnect would also change the thread_id | |
| + if( m->reconnect == 1 ){ | |
| + rb_raise(eMysql, "Do not use the mysql reconnect feature with the async interface!"); | |
| + } | |
| +*/ | |
| + if ( mysql_thread_id( m ) != GetMysqlStruct(obj)->async_in_progress ){ | |
| + GetMysqlStruct(obj)->async_in_progress = 0; | |
| + } | |
| + | |
| + if( GetMysqlStruct(obj)->async_in_progress != 0 ){ | |
| + GetMysqlStruct(obj)->async_in_progress = 0; | |
| + rb_raise(eMysql, "Query out of sequence: Each call to Mysql#send_query requires a successive Mysql#get_result."); | |
| + } | |
| +} | |
| + | |
| +/*Aman: We may not need this anymore ... */ | |
| +static VALUE connection_dropped( VALUE obj ) | |
| +{ | |
| + MYSQL* m = GetHandler(obj); | |
| + if( (mysql_errno(m) == CR_SERVER_LOST ) || ( mysql_errno(m) == CR_SERVER_GONE_ERROR ) || ( mysql_errno(m) == ER_SERVER_SHUTDOWN) ){ | |
| + return Qtrue; | |
| + }else{ | |
| + return Qfalse; | |
| + } | |
| +} | |
| + | |
| +static void async_reconnect( VALUE obj ) | |
| +{ | |
| + connect(obj); | |
| + /*GetMysqlStruct(obj)->async_in_progress = 0;*/ | |
| +} | |
| + | |
| +static VALUE simulate_disconnect( VALUE obj ) | |
| +{ | |
| + MYSQL* m = GetHandler(obj); | |
| + mysql_library_end(); | |
| + /*mysql_close(m);*/ | |
| + return Qnil; | |
| +} | |
| + | |
| /* send_query(sql) */ | |
| static VALUE send_query(VALUE obj, VALUE sql) | |
| { | |
| MYSQL* m = GetHandler(obj); | |
| - | |
| + | |
| Check_Type(sql, T_STRING); | |
| - if (GetMysqlStruct(obj)->connection == Qfalse) { | |
| - rb_raise(eMysql, "query: not connected"); | |
| + | |
| + /*Aman: the connection member is specific to this extension and won't be changed on an auto reconnect, thus we need to factor in async_in_progress as well*/ | |
| + if (GetMysqlStruct(obj)->connection == Qfalse && GetMysqlStruct(obj)->async_in_progress != 0 ) { | |
| + rb_raise(eMysql, "query: not connected"); | |
| } | |
| - if (mysql_send_query(m, RSTRING_PTR(sql), RSTRING_LEN(sql)) != 0) | |
| - mysql_raise(m); | |
| - return Qnil; | |
| + | |
| + validate_async_query(obj); | |
| + | |
| + if (mysql_send_query(m, RSTRING_PTR(sql), RSTRING_LEN(sql)) != 0){ | |
| + mysql_raise(m); | |
| + /* Aman: Not required when reconnect is true ... but may be useful to have this as a fallback when reconnect == false | |
| + ( connection_dropped(obj) == Qtrue ) ? async_reconnect(obj) : mysql_raise(m); | |
| + */ | |
| + } | |
| + GetMysqlStruct(obj)->async_in_progress = mysql_thread_id(m); | |
| + return Qnil; | |
| } | |
| /* get_result */ | |
| @@ -814,16 +896,17 @@ static VALUE get_result(VALUE obj) | |
| if (GetMysqlStruct(obj)->connection == Qfalse) { | |
| rb_raise(eMysql, "query: not connected"); | |
| } | |
| - if (mysql_read_query_result(m) != 0) | |
| - mysql_raise(m); | |
| + if (mysql_read_query_result(m) != 0) | |
| + mysql_raise(m); | |
| if (GetMysqlStruct(obj)->query_with_result == Qfalse) | |
| return obj; | |
| if (mysql_field_count(m) == 0) | |
| - return Qnil; | |
| + return Qnil; | |
| + GetMysqlStruct(obj)->async_in_progress = 0; | |
| return store_result(obj); | |
| } | |
| -static VALUE schedule(VALUE obj, VALUE timeout) | |
| +static VALUE schedule_query(VALUE obj, VALUE timeout) | |
| { | |
| MYSQL* m = GetHandler(obj); | |
| fd_set read; | |
| @@ -851,7 +934,7 @@ static VALUE async_query(int argc, VALUE* argv, VALUE obj) | |
| send_query(obj,sql); | |
| - schedule(obj, timeout); | |
| + schedule_query(obj, timeout); | |
| return get_result(obj); | |
| } | |
| @@ -2145,6 +2228,7 @@ void Init_mysql(void) | |
| rb_define_method(cMysql, "real_query", query, 1); | |
| rb_define_method(cMysql, "c_async_query", async_query, -1); | |
| rb_define_method(cMysql, "send_query", send_query, 1); | |
| + rb_define_method(cMysql, "simulate_disconnect", simulate_disconnect, 0); | |
| rb_define_method(cMysql, "get_result", get_result, 0); | |
| rb_define_method(cMysql, "readable?", readable, -1); | |
| rb_define_method(cMysql, "blocking?", blocking, 0); | |
  
    Sign up for free
    to join this conversation on GitHub.
    Already have an account?
    Sign in to comment