Skip to content

Instantly share code, notes, and snippets.

@tmm1
Created August 27, 2008 03:13
Show Gist options
  • Save tmm1/7414 to your computer and use it in GitHub Desktop.
Save tmm1/7414 to your computer and use it in GitHub Desktop.
EM::attach (see github.com/riham/eventmachine and github.com/tmm1/eventmachine/commits/em_detach)
diff --git a/ext/cmain.cpp b/ext/cmain.cpp
index 9890218..c9def4e 100644
--- a/ext/cmain.cpp
+++ b/ext/cmain.cpp
@@ -103,6 +103,32 @@ extern "C" const char *evma_connect_to_unix_server (const char *server)
return EventMachine->ConnectToUnixServer (server);
}
+/**************
+evma_attach_fd
+**************/
+
+extern "C" const char *evma_attach_fd (int file_descriptor, int notify_readable, int notify_writable)
+{
+ if (!EventMachine)
+ throw std::runtime_error ("not initialized");
+ return EventMachine->AttachFD (file_descriptor, (notify_readable ? true : false), (notify_writable ? true : false));
+}
+
+/**************
+evma_detach_fd
+**************/
+
+extern "C" int evma_detach_fd (const char *binding)
+{
+ if (!EventMachine)
+ throw std::runtime_error ("not initialized");
+
+ EventableDescriptor *ed = dynamic_cast <EventableDescriptor*> (Bindable_t::GetObject (binding));
+ if (ed)
+ return EventMachine->DetachFD (ed);
+ else
+ throw std::runtime_error ("invalid binding to detach");
+}
/**********************
evma_create_tcp_server
diff --git a/ext/ed.cpp b/ext/ed.cpp
index 135cb2d..27e2d11 100644
--- a/ext/ed.cpp
+++ b/ext/ed.cpp
@@ -174,6 +174,8 @@ ConnectionDescriptor::ConnectionDescriptor
ConnectionDescriptor::ConnectionDescriptor (int sd, EventMachine_t *em):
EventableDescriptor (sd, em),
bConnectPending (false),
+ bNotifyReadable (false),
+ bNotifyWritable (false),
bReadAttemptedAfterClose (false),
bWriteAttemptedAfterClose (false),
OutboundDataSize (0),
@@ -368,7 +370,7 @@ bool ConnectionDescriptor::SelectForWrite()
* have outgoing data to send.
*/
- if (bConnectPending)
+ if (bConnectPending || bNotifyWritable)
return true;
else {
return (GetOutboundDataSize() > 0);
@@ -413,6 +415,12 @@ void ConnectionDescriptor::Read()
return;
}
+ if (bNotifyReadable) {
+ if (EventCallback)
+ (*EventCallback)(GetBinding().c_str(), EM_CONNECTION_NOTIFY_READABLE, NULL, 0);
+ return;
+ }
+
LastIo = gCurrentLoopTime;
int total_bytes_read = 0;
@@ -549,6 +557,13 @@ void ConnectionDescriptor::Write()
//bCloseNow = true;
}
else {
+
+ if (bNotifyWritable) {
+ if (EventCallback)
+ (*EventCallback)(GetBinding().c_str(), EM_CONNECTION_NOTIFY_WRITABLE, NULL, 0);
+ return;
+ }
+
_WriteOutboundData();
}
}
diff --git a/ext/ed.h b/ext/ed.h
index 21c8a5f..815149f 100644
--- a/ext/ed.h
+++ b/ext/ed.h
@@ -40,6 +40,7 @@ class EventableDescriptor: public Bindable_t
virtual ~EventableDescriptor();
int GetSocket() {return MySocket;}
+ void SetSocketInvalid() { MySocket = INVALID_SOCKET; }
void Close();
virtual void Read() = 0;
@@ -140,6 +141,9 @@ class ConnectionDescriptor: public EventableDescriptor
void SetConnectPending (bool f) { bConnectPending = f; }
+ void SetNotifyReadable (bool readable) { bNotifyReadable = readable; }
+ void SetNotifyWritable (bool writable) { bNotifyWritable = writable; }
+
virtual void Read();
virtual void Write();
virtual void Heartbeat();
@@ -172,6 +176,10 @@ class ConnectionDescriptor: public EventableDescriptor
protected:
bool bConnectPending;
+
+ bool bNotifyReadable;
+ bool bNotifyWritable;
+
bool bReadAttemptedAfterClose;
bool bWriteAttemptedAfterClose;
diff --git a/ext/em.cpp b/ext/em.cpp
index cce5872..4094755 100644
--- a/ext/em.cpp
+++ b/ext/em.cpp
@@ -1165,6 +1165,113 @@ const char *EventMachine_t::ConnectToUnixServer (const char *server)
#endif
}
+/************************
+EventMachine_t::AttachFD
+************************/
+
+const char *EventMachine_t::AttachFD (int fd, bool notify_readable, bool notify_writable)
+{
+ #ifdef OS_UNIX
+ if (fcntl(fd, F_GETFL, 0) < 0)
+ throw std::runtime_error ("invalid file descriptor");
+ #endif
+
+ #ifdef OS_WIN32
+ // TODO: add better check for invalid file descriptors (see ioctlsocket or getsockopt)
+ if (fd == INVALID_SOCKET)
+ throw std::runtime_error ("invalid file descriptor");
+ #endif
+
+ {// Check for duplicate descriptors
+ for (size_t i = 0; i < Descriptors.size(); i++) {
+ EventableDescriptor *ed = Descriptors[i];
+ assert (ed);
+ if (ed->GetSocket() == fd)
+ throw std::runtime_error ("adding existing descriptor");
+ }
+
+ for (size_t i = 0; i < NewDescriptors.size(); i++) {
+ EventableDescriptor *ed = NewDescriptors[i];
+ assert (ed);
+ if (ed->GetSocket() == fd)
+ throw std::runtime_error ("adding existing new descriptor");
+ }
+ }
+
+ ConnectionDescriptor *cd = new ConnectionDescriptor (fd, this);
+ if (!cd)
+ throw std::runtime_error ("no connection allocated");
+
+ cd->SetConnectPending (true);
+ cd->SetNotifyReadable (notify_readable);
+ cd->SetNotifyWritable (notify_writable);
+
+ Add (cd);
+
+ const char *out = NULL;
+ out = cd->GetBinding().c_str();
+ if (out == NULL)
+ closesocket (fd);
+ return out;
+}
+
+/************************
+EventMachine_t::DetachFD
+************************/
+
+int EventMachine_t::DetachFD (EventableDescriptor *ed)
+{
+ if (!ed)
+ throw std::runtime_error ("detaching bad descriptor");
+
+ #ifdef HAVE_EPOLL
+ if (bEpoll) {
+ if (ed->GetSocket() != INVALID_SOCKET) {
+ assert (bEpoll); // wouldn't be in this method otherwise.
+ assert (epfd != -1);
+ int e = epoll_ctl (epfd, EPOLL_CTL_DEL, ed->GetSocket(), ed->GetEpollEvent());
+ // ENOENT or EBADF are not errors because the socket may be already closed when we get here.
+ if (e && (errno != ENOENT) && (errno != EBADF)) {
+ char buf [200];
+ snprintf (buf, sizeof(buf)-1, "unable to delete epoll event: %s", strerror(errno));
+ throw std::runtime_error (buf);
+ }
+ }
+ }
+ #endif
+
+ #ifdef HAVE_KQUEUE
+ if (bKqueue) {
+ struct kevent k;
+ EV_SET (&k, ed->GetSocket(), EVFILT_READ, EV_DELETE, 0, 0, ed);
+ int t = kevent (kqfd, &k, 1, NULL, 0, NULL);
+ assert (t == 0);
+ }
+ #endif
+
+ { // remove descriptor from lists
+ int i, j;
+ int nSockets = Descriptors.size();
+ for (i=0, j=0; i < nSockets; i++) {
+ EventableDescriptor *ted = Descriptors[i];
+ assert (ted);
+ if (ted != ed)
+ Descriptors [j++] = ted;
+ }
+ while ((size_t)j < Descriptors.size())
+ Descriptors.pop_back();
+
+ ModifiedDescriptors.erase (ed);
+ }
+
+ int fd = ed->GetSocket();
+
+ // We depend on ~EventableDescriptor not calling close() if the socket is invalid
+ ed->SetSocketInvalid();
+ delete ed;
+
+ return fd;
+}
/************
name2address
diff --git a/ext/em.h b/ext/em.h
index c83fb92..ad555d3 100644
--- a/ext/em.h
+++ b/ext/em.h
@@ -69,6 +69,8 @@ class EventMachine_t
const char *InstallOneshotTimer (int);
const char *ConnectToServer (const char *, int);
const char *ConnectToUnixServer (const char *);
+ const char *AttachFD (int, bool, bool);
+
const char *CreateTcpServer (const char *, int);
const char *OpenDatagramSocket (const char *, int);
const char *CreateUnixDomainServer (const char*);
@@ -79,6 +81,7 @@ class EventMachine_t
void Add (EventableDescriptor*);
void Modify (EventableDescriptor*);
+ int DetachFD (EventableDescriptor*);
void ArmKqueueWriter (EventableDescriptor*);
void ArmKqueueReader (EventableDescriptor*);
diff --git a/ext/eventmachine.h b/ext/eventmachine.h
index 81e500c..c644348 100644
--- a/ext/eventmachine.h
+++ b/ext/eventmachine.h
@@ -30,7 +30,10 @@ extern "C" {
EM_CONNECTION_UNBOUND = 102,
EM_CONNECTION_ACCEPTED = 103,
EM_CONNECTION_COMPLETED = 104,
- EM_LOOPBREAK_SIGNAL = 105
+ EM_LOOPBREAK_SIGNAL = 105,
+ EM_CONNECTION_NOTIFY_READABLE = 106,
+ EM_CONNECTION_NOTIFY_WRITABLE = 107
+
};
void evma_initialize_library (void(*)(const char*, int, const char*, int));
@@ -39,6 +42,10 @@ extern "C" {
const char *evma_install_oneshot_timer (int seconds);
const char *evma_connect_to_server (const char *server, int port);
const char *evma_connect_to_unix_server (const char *server);
+
+ const char *evma_attach_fd (int file_descriptor, int read_mode, int write_mode);
+ int evma_detach_fd (const char *binding);
+
void evma_stop_tcp_server (const char *signature);
const char *evma_create_tcp_server (const char *address, int port);
const char *evma_create_unix_domain_server (const char *filename);
diff --git a/ext/rubymain.cpp b/ext/rubymain.cpp
index 00fb42a..0b809a4 100644
--- a/ext/rubymain.cpp
+++ b/ext/rubymain.cpp
@@ -39,6 +39,8 @@ static VALUE Intern_delete;
static VALUE Intern_call;
static VALUE Intern_receive_data;
+static VALUE Intern_notify_readable;
+static VALUE Intern_notify_writable;
/****************
t_event_callback
@@ -53,6 +55,20 @@ static void event_callback (const char *a1, int a2, const char *a3, int a4)
rb_raise (rb_eRuntimeError, "no connection");
rb_funcall (q, Intern_receive_data, 1, rb_str_new (a3, a4));
}
+ else if (a2 == EM_CONNECTION_NOTIFY_READABLE) {
+ VALUE t = rb_ivar_get (EmModule, Intern_at_conns);
+ VALUE q = rb_hash_aref (t, rb_str_new2(a1));
+ if (q == Qnil)
+ rb_raise (rb_eRuntimeError, "no connection");
+ rb_funcall (q, Intern_notify_readable, 0);
+ }
+ else if (a2 == EM_CONNECTION_NOTIFY_WRITABLE) {
+ VALUE t = rb_ivar_get (EmModule, Intern_at_conns);
+ VALUE q = rb_hash_aref (t, rb_str_new2(a1));
+ if (q == Qnil)
+ rb_raise (rb_eRuntimeError, "no connection");
+ rb_funcall (q, Intern_notify_writable, 0);
+ }
else if (a2 == EM_LOOPBREAK_SIGNAL) {
rb_funcall (EmModule, Intern_run_deferred_callbacks, 0);
}
@@ -320,6 +336,27 @@ static VALUE t_connect_unix_server (VALUE self, VALUE serversocket)
return rb_str_new2 (f);
}
+/***********
+t_attach_fd
+***********/
+
+static VALUE t_attach_fd (VALUE self, VALUE file_descriptor, VALUE read_mode, VALUE write_mode)
+{
+ const char *f = evma_attach_fd (NUM2INT(file_descriptor), (read_mode == Qtrue) ? 1 : 0, (write_mode == Qtrue) ? 1 : 0);
+ if (!f || !*f)
+ rb_raise (rb_eRuntimeError, "no connection");
+ return rb_str_new2 (f);
+}
+
+/***********
+t_detach_fd
+***********/
+
+static VALUE t_detach_fd (VALUE self, VALUE signature)
+{
+ return INT2NUM(evma_detach_fd (StringValuePtr(signature)));
+}
+
/*****************
t_open_udp_socket
*****************/
@@ -565,6 +602,9 @@ extern "C" void Init_rubyeventmachine()
Intern_call = rb_intern ("call");
Intern_receive_data = rb_intern ("receive_data");
+ Intern_notify_readable = rb_intern ("notify_readable");
+ Intern_notify_writable = rb_intern ("notify_writable");
+
// INCOMPLETE, we need to define class Connections inside module EventMachine
// run_machine and run_machine_without_threads are now identical.
// Must deprecate the without_threads variant.
@@ -590,6 +630,10 @@ extern "C" void Init_rubyeventmachine()
rb_define_module_function (EmModule, "report_connection_error_status", (VALUE(*)(...))t_report_connection_error_status, 1);
rb_define_module_function (EmModule, "connect_server", (VALUE(*)(...))t_connect_server, 2);
rb_define_module_function (EmModule, "connect_unix_server", (VALUE(*)(...))t_connect_unix_server, 1);
+
+ rb_define_module_function (EmModule, "attach_fd", (VALUE (*)(...))t_attach_fd, 3);
+ rb_define_module_function (EmModule, "detach_fd", (VALUE (*)(...))t_detach_fd, 1);
+
rb_define_module_function (EmModule, "open_udp_socket", (VALUE(*)(...))t_open_udp_socket, 2);
rb_define_module_function (EmModule, "read_keyboard", (VALUE(*)(...))t_read_keyboard, 0);
rb_define_module_function (EmModule, "release_machine", (VALUE(*)(...))t_release_machine, 0);
@@ -626,5 +670,9 @@ extern "C" void Init_rubyeventmachine()
rb_define_const (EmModule, "ConnectionAccepted", INT2NUM(103));
rb_define_const (EmModule, "ConnectionCompleted", INT2NUM(104));
rb_define_const (EmModule, "LoopbreakSignalled", INT2NUM(105));
+
+ rb_define_const (EmModule, "ConnectionNotifyReadable", INT2NUM(106));
+ rb_define_const (EmModule, "ConnectionNotifyWritable", INT2NUM(107));
+
}
diff --git a/lib/eventmachine.rb b/lib/eventmachine.rb
index 4f7bd63..c9ad78b 100644
--- a/lib/eventmachine.rb
+++ b/lib/eventmachine.rb
@@ -678,6 +678,71 @@ module EventMachine
c
end
+ # EventMachine::attach registers a given file descriptor or IO object with the eventloop
+ #
+ # If the handler provided has the functions notify_readable or notify_writable defined,
+ # EventMachine will not read or write from the socket, and instead fire the corresponding
+ # callback on the handler.
+ #
+ # To detach the file descriptor, use EventMachine::Connection#detach
+ #
+ # === Usage Example
+ #
+ # module SimpleHttpClient
+ # def initialize sock
+ # @sock = sock
+ # end
+ #
+ # def notify_readable
+ # header = @sock.readline
+ #
+ # if header == "\r\n"
+ # # detach returns the file descriptor number (fd == @sock.fileno)
+ # fd = detach
+ # end
+ # rescue EOFError
+ # detach
+ # end
+ #
+ # def unbind
+ # EM.next_tick do
+ # # socket is detached from the eventloop, but still open
+ # data = @sock.read
+ # end
+ # end
+ # end
+ #
+ # EM.run{
+ # $sock = TCPSocket.new('site.com', 80)
+ # $sock.write("GET / HTTP/1.0\r\n\r\n")
+ # EM.attach $sock, SimpleHttpClient, $sock
+ # }
+ #
+ #--
+ # Thanks to Riham Aldakkak (eSpace Technologies) for the initial patch
+ def EventMachine::attach io, handler=nil, *args
+ klass = if (handler and handler.is_a?(Class))
+ handler
+ else
+ Class.new( Connection ) {handler and include handler}
+ end
+
+ arity = klass.instance_method(:initialize).arity
+ expected = arity >= 0 ? arity : -(arity + 1)
+ if (arity >= 0 and args.size != expected) or (arity < 0 and args.size < expected)
+ raise ArgumentError, "wrong number of arguments for #{klass}#initialize (#{args.size} for #{expected})"
+ end
+
+ readmode = klass.public_instance_methods.any?{|m| m.to_sym == :notify_readable }
+ writemode = klass.public_instance_methods.any?{|m| m.to_sym == :notify_writable }
+
+ s = attach_fd io.respond_to?(:fileno) ? io.fileno : io, readmode, writemode
+
+ c = klass.new s, *args
+ @conns[s] = c
+ block_given? and yield c
+ c
+ end
#--
# EXPERIMENTAL. DO NOT RELY ON THIS METHOD TO BE HERE IN THIS FORM, OR AT ALL.
@@ -1129,6 +1194,12 @@ module EventMachine
c.connection_completed
elsif opcode == LoopbreakSignalled
run_deferred_callbacks
+ elsif opcode == ConnectionNotifyReadable
+ c = @conns[conn_binding] or raise ConnectionNotBound
+ c.notify_readable
+ elsif opcode == ConnectionNotifyWritable
+ c = @conns[conn_binding] or raise ConnectionNotBound
+ c.notify_writable
end
end
@@ -1371,6 +1442,12 @@ class Connection
EventMachine::close_connection @signature, after_writing
end
+ # EventMachine::Connection#detach will remove the given connection from the event loop.
+ # The connection's socket remains open and its file descriptor number is returned
+ def detach
+ EventMachine::detach_fd @signature
+ end
+
# EventMachine::Connection#close_connection_after_writing is a variant of close_connection.
# All of the descriptive comments given for close_connection also apply to
# close_connection_after_writing, <i>with one exception:</i> If the connection has
diff --git a/tests/test_attach.rb b/tests/test_attach.rb
new file mode 100644
index 0000000..6ae2279
--- /dev/null
+++ b/tests/test_attach.rb
@@ -0,0 +1,66 @@
+# $Id$
+#
+#----------------------------------------------------------------------------
+#
+# Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved.
+# Gmail: blackhedd
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of either: 1) the GNU General Public License
+# as published by the Free Software Foundation; either version 2 of the
+# License, or (at your option) any later version; or 2) Ruby's License.
+#
+# See the file COPYING for complete licensing information.
+#
+#---------------------------------------------------------------------------
+#
+
+$:.unshift "../lib"
+require 'eventmachine'
+require 'socket'
+require 'test/unit'
+
+
+class TestAttach < Test::Unit::TestCase
+
+ Host = "127.0.0.1"
+ Port = 9550
+
+ class EchoServer < EM::Connection
+ def receive_data data
+ send_data data
+ end
+ end
+
+ class EchoClient < EM::Connection
+ def connection_completed
+ $sock.write("abc\n")
+ end
+
+ def notify_readable
+ $read = $sock.readline
+ $fd = detach
+ end
+
+ def unbind
+ EM.next_tick do
+ $sock.write("def\n")
+ EM.add_timer(0.5){ EM.stop }
+ end
+ end
+ end
+
+ def test_attach
+ EM.run{
+ EM.start_server Host, Port, EchoServer
+ $sock = TCPSocket.new Host, Port
+ EM.attach $sock, EchoClient
+ }
+
+ assert_equal $read, "abc\n"
+ assert_equal $fd, $sock.fileno
+ assert_equal false, $sock.closed?
+ assert_equal $sock.readline, "def\n"
+ end
+
+end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment