Skip to content

Instantly share code, notes, and snippets.

@jpf91
Created December 25, 2011 19:45
Show Gist options
  • Save jpf91/1519658 to your computer and use it in GitHub Desktop.
Save jpf91/1519658 to your computer and use it in GitHub Desktop.
libev D example
///
module libev;
import deimos.ev;
import std.socket;
import std.typecons;
import std.exception;
import std.stdio;
/**
* Reference-counted
*/
struct EventLoop
{
private:
struct Payload
{
ev_loop_t* _nativeLoop;
this(ev_loop_t* ptr)
{
_nativeLoop = ptr;
}
~this()
{
if (_nativeLoop !is null) // Refcounted bug: dtor is being called for no reason before the ctor is called.
{
dispose();
}
}
void dispose()
{
ev_loop_destroy(_nativeLoop);
//_nativeLoop = null;
}
this(this) { assert(false); }
void opAssign(EventLoop.Payload rhs) { assert(false); }
}
alias RefCounted!(Payload, RefCountedAutoInitialize.yes) Data;
Data _data;
this(ev_loop_t* ptr)
{
enforce(ptr, "Couldn't create EventLoop!");
_data = Data(ptr);
}
public:
this(uint flags)
{
auto ptr = ev_loop_new(flags);
enforce(ptr, "Couldn't create EventLoop!");
_data = Data(ptr);
}
/**
* TODO: Should only be called from one thread! How to verify
* that?
*/
@property static EventLoop defaultLoop()
{
auto ptr = ev_default_loop(0);
enforce(ptr, "Couldn't create EventLoop!");
return EventLoop(ptr);
}
void fork()
{
ev_loop_fork(_data._nativeLoop);
}
@property bool isDefault()
{
return ev_is_default_loop(_data._nativeLoop) ? true : false;
}
@property ev_tstamp now()
{
return ev_now(_data._nativeLoop);
}
void suspend()
{
ev_suspend(_data._nativeLoop);
}
void resume()
{
ev_resume(_data._nativeLoop);
}
void run(uint flags = 0)
{
ev_run(_data._nativeLoop, flags);
}
/**
* how = EVBREAK_ONE / EVBREAK_ALL
*/
void doBreak(uint how)
{
ev_break(_data._nativeLoop, how);
}
void dispose()
{
_data.dispose();
}
@property ev_loop_t* nativePointer()
{
return _data._nativeLoop;
}
}
private extern(C) void _nativeCallback(ev_loop_t* cLoop, ev_timer* timer,
int revent)
{
Timer dTimer = cast(Timer)timer.data;
assert(dTimer._loop.nativePointer == cLoop);
dTimer.onTimer();
}
class Timer
{
private:
EventLoop _loop;
ev_timer _nativeTimer;
TimerCallback _callback;
protected:
void onTimer()
{
_callback(this);
}
public:
alias void delegate(Timer) TimerCallback;
this(EventLoop loop, ev_tstamp repeat, TimerCallback cb)
{
_loop = loop;
_callback = cb;
ev_timer_init(&_nativeTimer, &_nativeCallback, repeat, repeat);
_nativeTimer.data = cast(void*)this;
}
void start()
{
ev_timer_start(_loop.nativePointer, &_nativeTimer);
}
void stop()
{
ev_timer_stop(_loop.nativePointer, &_nativeTimer);
}
@property EventLoop loop()
{
return _loop;
}
/**
* TODO: Timer keeps a reference to EventLoop,
* this method should remove this reference. Not sure if this works
*/
void dispose()
{
if(_loop != EventLoop.init)
{
_loop = EventLoop.init;
}
}
}
private extern(C) void _nativeIORead(ev_loop_t* cLoop, ev_io* watcher,
int revent)
{
EventIO evIO = cast(EventIO)watcher.data;
assert(evIO._loop.nativePointer == cLoop);
evIO.onReadable();
}
private extern(C) void _nativeIOWrite(ev_loop_t* cLoop, ev_io* watcher,
int revent)
{
EventIO evIO = cast(EventIO)watcher.data;
assert(evIO._loop.nativePointer == cLoop);
evIO.onWriteable();
}
abstract class EventIO
{
private:
EventLoop _loop;
ev_io _nativeRead, _nativeWrite;
Timer _timer;
Socket _socket;
void _onTimeout(Timer time)
{
onTimeout();
}
protected:
abstract void onReadable();
abstract void onWriteable();
abstract void onTimeout();
bool _reading = true;
bool _writing = true;
public:
this(EventLoop loop, Socket socket, ev_tstamp timeout)
{
_socket = socket;
_socket.blocking = false;
_loop = loop;
_timer = new Timer(loop, timeout, &_onTimeout);
ev_io_init(&_nativeRead, &_nativeIORead, _socket.handle, EV_READ);
ev_io_init(&_nativeWrite, &_nativeIOWrite, _socket.handle, EV_WRITE);
_nativeRead.data = cast(void*)this;
_nativeWrite.data = cast(void*)this;
}
void start()
{
if(_reading)
ev_io_start(_loop.nativePointer, &_nativeRead);
if(_writing)
ev_io_start(_loop.nativePointer, &_nativeWrite);
_timer.start();
}
void stop()
{
ev_io_stop(_loop.nativePointer, &_nativeRead);
ev_io_stop(_loop.nativePointer, &_nativeWrite);
_timer.stop();
}
@property EventLoop loop()
{
return _loop;
}
/**
* TODO: Timer keeps a reference to EventLoop,
* this method should remove this reference. Not sure if this works
*/
void dispose()
{
if(_loop != EventLoop.init)
{
_loop = EventLoop.init;
}
}
}
import core.stdc.errno;
/*Nonblocking Read/Write functions*/
bool writeNonBlock(Socket fd, ref ubyte[] buffer)
{
while(true)
{
auto ret = fd.send(buffer);
if(ret == Socket.ERROR)
{
auto code = errno;
if(code == EAGAIN || code == EWOULDBLOCK)
return false;
else if(code == EINTR)
continue; //Interrupted by signal, try again
else
{
throw new ErrnoException("Socket.write failed!",
__FILE__, __LINE__);
}
}
else if(ret != buffer.length)
{
assert(ret < buffer.length, "WTF: Wrote more data than in the _buffer?");
buffer = buffer[ret .. $];
continue;
}
else
{
buffer = [];
return true;
}
}
}
bool readNonBlock(Socket fd, ref ubyte[] buffer)
{
while(true)
{
auto ret = fd.receive(buffer);
if(ret == Socket.ERROR)
{
auto code = errno;
if(code == EAGAIN || code == EWOULDBLOCK)
return false; //Read would block
else if(code == EINTR)
continue; //Interrupted by signal, try again
else
{
throw new ErrnoException("Socket.read failed!",
__FILE__, __LINE__);
}
}
else
{
assert(ret < buffer.length, "WTF: Read more data than buffer size?");
buffer = buffer[0 .. ret];
return true;
}
}
}
import std.stdio;
class HTTPIO : EventIO
{
protected:
override void onReadable()
{
ubyte[1024*64] buf;
auto slice = buf[];
if(readNonBlock(_socket, slice))
{
if(slice.length == 0)
this.stop();
else
{
write(cast(char[])slice); //TODO: UTF support!
}
}
}
override void onWriteable()
{
//Not the correct way to do nonblocking reads/writes!
string buf = "GET /\r\n"
"Host: www.google.com\r\n"
"\r\n";
auto slice = cast(ubyte[])buf[];
if(writeNonBlock(_socket, slice))
{
if(slice.length == 0)
{
//All data written
//Should disable write watcher
_writing = false; //TODO: stopWriting();
}
else
{
//Should write rest of slice in next call, not supported!
}
}
}
override void onTimeout()
{
writeln("HTTPIO: Timeout occured!");
stop();
}
public:
this(EventLoop loop, in char[] host, ushort port, ev_tstamp timeout = 30)
{
auto socket = new Socket(AddressFamily.INET, SocketType.STREAM);
socket.connect(new InternetAddress(host, port));
super(loop, socket, timeout);
}
}
import std.stdio;
void main()
{
int count = 0;
void onTimer(Timer timer)
{
count++;
writefln("Hello from onTimer! Called %s times", count);
if(count == 10)
timer.stop();
}
auto loop = EventLoop.defaultLoop;
Timer timer = new Timer(loop, 1, &onTimer);
timer.start();
auto io = new HTTPIO(loop, "www.google.com", 80);
io.start();
loop.run();
}
@sha0coder
Copy link

Nice EventLoop and EventIO classes, I whish to use them to develop TurboHTTP, a fast http lib, I will fork it to bitbucket:
https://bitbucket.org/sha0coder/turbohttp/

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment