-
-
Save clojj/1e1ec241d1a01d114cf4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import core.sync.mutex : Mutex; | |
import core.thread : Thread, Fiber; | |
/** | |
* chan allows messaging between threads without having to deal with locks, similar to how chan works in golang | |
*/ | |
shared | |
class chan(T) { | |
Mutex lock; | |
private bool closed_; bool closed() {synchronized (lock) {return closed_;}} void Close() { synchronized(lock) { closed_ = true; } } | |
struct Container(T) { | |
T value; | |
Container!T* next; | |
} | |
Container!T* current; | |
Container!T* last; | |
size_t length; | |
void insert(T v) { | |
Container!T* newItem = new Container!T(); | |
newItem.value = v; | |
synchronized (lock) { | |
if (current is null) { | |
current = cast(shared)newItem; | |
last = cast(shared)newItem; | |
} else { | |
last.next = cast(shared)newItem; | |
last = cast(shared)newItem; | |
} | |
length++; | |
} | |
} | |
T popFront() { | |
T ret; | |
synchronized (lock) { | |
ret = cast(T)current.value; | |
current = current.next; | |
length--; | |
} | |
return ret; | |
} | |
size_t maxItems; | |
bool blockOnFull = false; | |
this(int maxItems = 1024, bool blockOnFull = true) { | |
lock = cast(shared)new Mutex; | |
length = 0; | |
this.maxItems = maxItems; | |
this.blockOnFull = blockOnFull; | |
} | |
@property | |
void _(T value) { | |
bool done; | |
while(true) { | |
synchronized(lock) { | |
if (closed) { | |
throw new ChannelClosedException(); | |
} | |
if (!done && length < maxItems) { | |
insert(value); | |
done = true; | |
} else if (!blockOnFull) { | |
throw new ChannelFullException("Channel Full"); | |
} | |
if (length <= maxItems-1) { | |
break; | |
} | |
} | |
if (Fiber.getThis() !is null) { | |
Fiber.yield(); | |
} else { | |
Thread.sleep(dur!"msecs"(5)); | |
} | |
} | |
} | |
@property | |
T _() { | |
_startagain: | |
while(true) { | |
size_t len; | |
synchronized(lock) { | |
if (closed) { | |
throw new ChannelClosedException(); | |
} | |
len = length; | |
} | |
if (len > 0) { | |
break; | |
} | |
if (Fiber.getThis() !is null) { | |
Fiber.yield(); | |
} else { | |
Thread.sleep(dur!"msecs"(5)); | |
} | |
}; | |
T r; | |
synchronized(lock) { | |
auto len = length; | |
if (len <= 0) { | |
goto _startagain; | |
} | |
r = popFront(); | |
} | |
return r; | |
} | |
} | |
auto makeChan(T)(int n, bool blockOnFull = true) { | |
return cast(shared)new chan!T(n, blockOnFull); | |
} | |
class ChannelException : Exception { | |
this(string msg, string file = __FILE__, ulong line = cast(ulong)__LINE__, Throwable next = null) { | |
super(msg,file,line,next); | |
} | |
} | |
class ChannelFullException : Exception { | |
this(string msg = "Channel Full", string file = __FILE__, ulong line = cast(ulong)__LINE__, Throwable next = null) { | |
super(msg,file,line,next); | |
} | |
} | |
class ChannelClosedException : Exception { | |
this(string msg = "Channel Closed", string file = __FILE__, ulong line = cast(ulong)__LINE__, Throwable next = null) { | |
super(msg,file,line,next); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import std.stdio; | |
import go; | |
import chan; | |
void main() { | |
writeln("=-=========-="); | |
auto ch = makeChan!int(1); | |
go!({ | |
foreach (i; 22..44) { | |
ch._ = i; | |
} | |
writeln("done"); | |
}); | |
foreach (i; 0..10) { | |
writeln("pop: ", ch._); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import core.thread : Thread, Fiber, thread_isMainThread; | |
import std.datetime; | |
import std.conv; | |
import std.process; | |
import chan; | |
void go(alias F)() { | |
scheduler._ = new Fiber(F); | |
} | |
shared chan!Fiber scheduler; // channel contains Fibers waiting for their time slice | |
shared static this () { | |
scheduler = makeChan!Fiber(100); | |
// create the workers | |
auto goprocs = environment.get("GOPROCS"); | |
int num_threads = 1; | |
if (goprocs != null) { | |
num_threads = to!int(goprocs); | |
} | |
foreach (i; 0..num_threads) { | |
// create threads that process the live fibers | |
auto t = new Thread(() { | |
for (;;) { | |
Fiber fiber; | |
try { | |
fiber = scheduler._; | |
} catch (ChannelClosedException cce) { | |
break; | |
} | |
// don't catch any exceptions from the user code | |
fiber.call(); | |
if (fiber.state != Fiber.State.TERM) { | |
try { | |
scheduler._ (fiber); | |
} catch (ChannelClosedException cce) { | |
break; | |
} | |
} | |
} | |
}); | |
t.start(); | |
} | |
} | |
static ~this() { | |
if (thread_isMainThread()) { | |
scheduler.Close(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment