Created
April 5, 2018 13:51
-
-
Save run-dlang/58ba0e698ee3e5580b6a063206b2dd9f to your computer and use it in GitHub Desktop.
Code shared from run.dlang.io.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import std.concurrency : receiveOnly, send, | |
| spawn, Tid, thisTid; | |
| import core.atomic : atomicOp, atomicLoad; | |
| import core.sync.mutex; | |
| shared Mutex mtx; | |
| /* | |
| Queue that can be used safely among | |
| different threads. All access to an | |
| instance is automatically locked thanks to | |
| synchronized keyword. | |
| */ | |
| synchronized class SafeQueue(T) | |
| { | |
| // Note: must be private in synchronized | |
| // classes otherwise D complains. | |
| private T[] elements; | |
| void push(T value) { | |
| elements ~= value; | |
| } | |
| /// Return T.init if queue empty | |
| T pop() { | |
| import std.array : empty; | |
| T value; | |
| if (elements.empty) | |
| return value; | |
| value = elements[0]; | |
| elements = elements[1 .. $]; | |
| return value; | |
| } | |
| auto length() { return elements.length; } | |
| } | |
| /* | |
| Safely print messages independent of | |
| number of concurrent threads. | |
| Note that variadic parameters are used | |
| for args! That is args might be 0 .. N | |
| parameters. | |
| */ | |
| void safePrint(T...)(T args) | |
| { | |
| // Just executed by one concurrently | |
| synchronized { | |
| import std.stdio : writeln; | |
| writeln(args); | |
| } | |
| } | |
| void threadProducer(shared(SafeQueue!int) queue, | |
| shared(int)* queueCounter) | |
| { | |
| import std.range : iota; | |
| // Push values 1 to 11 | |
| foreach (i; iota(1,11)) { | |
| mtx.lock(); | |
| atomicOp!"+="(*queueCounter, 1); | |
| queue.push(i); | |
| safePrint("Pushed ", i); | |
| mtx.unlock; | |
| } | |
| } | |
| void threadConsumer(Tid owner, | |
| shared(SafeQueue!int) queue, | |
| shared(int)* queueCounter) | |
| { | |
| int popped = 0; | |
| while (popped != 10) { | |
| if( queue.length == 0 ) | |
| continue; | |
| mtx.lock; | |
| auto i = queue.pop(); | |
| safePrint("Popped ", i, | |
| " (Producer pushed ", | |
| // safely fetch current value of | |
| // queueCounter using atomicLoad | |
| atomicLoad(*queueCounter), ")"); | |
| mtx.unlock; | |
| ++popped; | |
| } | |
| // I'm done! | |
| owner.send(true); | |
| } | |
| void main() | |
| { | |
| auto queue = new shared(SafeQueue!int); | |
| shared int counter = 0; | |
| mtx = new shared(Mutex)(); | |
| spawn(&threadProducer, queue, &counter); | |
| spawn(&threadConsumer, thisTid, queue, | |
| &counter); | |
| auto stopped = receiveOnly!bool; | |
| assert(stopped); | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment