Created
April 16, 2013 21:07
-
-
Save cthom06/5399658 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Text; | |
using System.Threading.Tasks; | |
namespace ConsoleApplication8 | |
{ | |
class Program | |
{ | |
static void Main(string[] args) | |
{ | |
long i = 2; | |
var c = new TaskChan<long>(); | |
Task _ = Worker(c); | |
while (true) | |
c.Send(i++).GetAwaiter().GetResult(); | |
} | |
static async Task foo(TaskChan<int> c, int i) | |
{ | |
Console.WriteLine("Waiting {0}", i); | |
await c.Send(i); | |
Console.WriteLine("Finished {0}", i); | |
} | |
static async Task Worker(TaskChan<long> c) | |
{ | |
long v = await c.Recv(); | |
Console.WriteLine(v); | |
var nc = new TaskChan<long>(); | |
bool subStarted = false; | |
while (true) | |
{ | |
long n = await c.Recv(); | |
if (n % v != 0) | |
{ | |
if (!subStarted) | |
{ | |
Task _ = Worker(nc); | |
subStarted = true; | |
} | |
await nc.Send(n); | |
} | |
} | |
} | |
} | |
public class TaskChan<T> : IEnumerable<T> | |
{ | |
int buffsize; | |
object l; | |
Queue<T> q; | |
Queue<Task<TaskChanTuple>> r; | |
Queue<Tuple<T,Task<bool>>> s; | |
bool closed; | |
public TaskChan() | |
: this(0) | |
{ | |
} | |
public TaskChan(int buff) | |
{ | |
buffsize = buff; | |
if (buff > 0) | |
q = new Queue<T>(buff); | |
else if (buff < 0) | |
q = new Queue<T>(); | |
else | |
q = null; | |
r = new Queue<Task<TaskChanTuple>>(); | |
s = new Queue<Tuple<T, Task<bool>>>(); | |
l = new object(); | |
closed = false; | |
} | |
public async Task Send(T v) | |
{ | |
if (!await TrySend(v)) | |
throw new Exception("Send on a closed TaskChan"); | |
} | |
public async Task<bool> TrySend(T v) | |
{ | |
Task<bool> t; | |
lock (l) | |
{ | |
if (closed) | |
return false; | |
if (r.Count > 0) | |
{ | |
var f = r.Dequeue(); | |
((ctx)f.AsyncState).V = v; | |
((ctx)f.AsyncState).C = true; | |
f.Start(); | |
return true; | |
} | |
else if (q != null && (buffsize < 0 || q.Count < buffsize)) | |
{ | |
q.Enqueue(v); | |
return true; | |
} | |
else | |
{ | |
s.Enqueue(new Tuple<T, Task<bool>>(v, t = new Task<bool>((o) => { | |
return ((ctx)o).C; | |
}, new ctx()))); | |
} | |
} | |
return await t; | |
} | |
public async Task<T> Recv() | |
{ | |
var t = await TryRecv(); | |
if (!t.WasUnclosed) | |
throw new Exception("Recv on a closed TaskChan"); | |
return t.Value; | |
} | |
public async Task<TaskChanTuple> TryRecv() | |
{ | |
Task<TaskChanTuple> t; | |
lock (l) | |
{ | |
if (closed && (q == null || q.Count == 0)) | |
return new TaskChanTuple(default(T), false); | |
if (q != null && q.Count > 0) | |
{ | |
return new TaskChanTuple(q.Dequeue(), true); | |
} | |
else if (s.Count > 0) | |
{ | |
var f = s.Dequeue(); | |
((ctx)f.Item2.AsyncState).C = true; | |
f.Item2.Start(); | |
return new TaskChanTuple(f.Item1, true); | |
} | |
else | |
{ | |
r.Enqueue(t = new Task<TaskChanTuple>((o) => | |
{ | |
return new TaskChanTuple(((ctx)o).V, ((ctx)o).C); | |
}, new ctx())); | |
} | |
} | |
return await t; | |
} | |
public class TaskChanTuple | |
{ | |
public T Value; | |
public bool WasUnclosed; | |
internal TaskChanTuple(T v, bool c) | |
{ | |
Value = v; | |
WasUnclosed = c; | |
} | |
} | |
class ctx | |
{ | |
public T V; | |
public bool C; | |
} | |
class ChanEnumerator : IEnumerator<T> | |
{ | |
T curr; | |
bool currIsSet = false; | |
TaskChan<T> c; | |
internal ChanEnumerator(TaskChan<T> tc) | |
{ | |
c = tc; | |
} | |
public T Current | |
{ | |
get | |
{ | |
if (!currIsSet) | |
throw new InvalidOperationException(); | |
return curr; | |
} | |
} | |
public void Dispose() | |
{ | |
} | |
object System.Collections.IEnumerator.Current | |
{ | |
get | |
{ | |
return Current; | |
} | |
} | |
public bool MoveNext() | |
{ | |
var t = c.TryRecv().GetAwaiter().GetResult(); | |
if (t.WasUnclosed) | |
{ | |
curr = t.Value; | |
currIsSet = true; | |
} | |
return t.WasUnclosed; | |
} | |
public void Reset() | |
{ | |
throw new NotImplementedException(); | |
} | |
} | |
public IEnumerator<T> GetEnumerator() | |
{ | |
return new ChanEnumerator(this); | |
} | |
System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() | |
{ | |
return GetEnumerator(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment