Skip to content

Instantly share code, notes, and snippets.

@clojj
Forked from rjmcguire/chan.d
Last active August 29, 2015 14:07
Show Gist options
  • Save clojj/1e1ec241d1a01d114cf4 to your computer and use it in GitHub Desktop.
Save clojj/1e1ec241d1a01d114cf4 to your computer and use it in GitHub Desktop.
import core.sync.mutex : Mutex;
import core.thread : Thread, Fiber;
/**
* chan allows messaging between threads without having to deal with locks, similar to how chan works in golang
*/
shared
class chan(T) {
Mutex lock;
private bool closed_; bool closed() {synchronized (lock) {return closed_;}} void Close() { synchronized(lock) { closed_ = true; } }
struct Container(T) {
T value;
Container!T* next;
}
Container!T* current;
Container!T* last;
size_t length;
void insert(T v) {
Container!T* newItem = new Container!T();
newItem.value = v;
synchronized (lock) {
if (current is null) {
current = cast(shared)newItem;
last = cast(shared)newItem;
} else {
last.next = cast(shared)newItem;
last = cast(shared)newItem;
}
length++;
}
}
T popFront() {
T ret;
synchronized (lock) {
ret = cast(T)current.value;
current = current.next;
length--;
}
return ret;
}
size_t maxItems;
bool blockOnFull = false;
this(int maxItems = 1024, bool blockOnFull = true) {
lock = cast(shared)new Mutex;
length = 0;
this.maxItems = maxItems;
this.blockOnFull = blockOnFull;
}
@property
void _(T value) {
bool done;
while(true) {
synchronized(lock) {
if (closed) {
throw new ChannelClosedException();
}
if (!done && length < maxItems) {
insert(value);
done = true;
} else if (!blockOnFull) {
throw new ChannelFullException("Channel Full");
}
if (length <= maxItems-1) {
break;
}
}
if (Fiber.getThis() !is null) {
Fiber.yield();
} else {
Thread.sleep(dur!"msecs"(5));
}
}
}
@property
T _() {
_startagain:
while(true) {
size_t len;
synchronized(lock) {
if (closed) {
throw new ChannelClosedException();
}
len = length;
}
if (len > 0) {
break;
}
if (Fiber.getThis() !is null) {
Fiber.yield();
} else {
Thread.sleep(dur!"msecs"(5));
}
};
T r;
synchronized(lock) {
auto len = length;
if (len <= 0) {
goto _startagain;
}
r = popFront();
}
return r;
}
}
auto makeChan(T)(int n, bool blockOnFull = true) {
return cast(shared)new chan!T(n, blockOnFull);
}
class ChannelException : Exception {
this(string msg, string file = __FILE__, ulong line = cast(ulong)__LINE__, Throwable next = null) {
super(msg,file,line,next);
}
}
class ChannelFullException : Exception {
this(string msg = "Channel Full", string file = __FILE__, ulong line = cast(ulong)__LINE__, Throwable next = null) {
super(msg,file,line,next);
}
}
class ChannelClosedException : Exception {
this(string msg = "Channel Closed", string file = __FILE__, ulong line = cast(ulong)__LINE__, Throwable next = null) {
super(msg,file,line,next);
}
}
import std.stdio;
import go;
import chan;
void main() {
writeln("=-=========-=");
auto ch = makeChan!int(1);
go!({
foreach (i; 22..44) {
ch._ = i;
}
writeln("done");
});
foreach (i; 0..10) {
writeln("pop: ", ch._);
}
}
import core.thread : Thread, Fiber, thread_isMainThread;
import std.datetime;
import std.conv;
import std.process;
import chan;
void go(alias F)() {
scheduler._ = new Fiber(F);
}
shared chan!Fiber scheduler; // channel contains Fibers waiting for their time slice
shared static this () {
scheduler = makeChan!Fiber(100);
// create the workers
auto goprocs = environment.get("GOPROCS");
int num_threads = 1;
if (goprocs != null) {
num_threads = to!int(goprocs);
}
foreach (i; 0..num_threads) {
// create threads that process the live fibers
auto t = new Thread(() {
for (;;) {
Fiber fiber;
try {
fiber = scheduler._;
} catch (ChannelClosedException cce) {
break;
}
// don't catch any exceptions from the user code
fiber.call();
if (fiber.state != Fiber.State.TERM) {
try {
scheduler._ (fiber);
} catch (ChannelClosedException cce) {
break;
}
}
}
});
t.start();
}
}
static ~this() {
if (thread_isMainThread()) {
scheduler.Close();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment