Skip to content

Instantly share code, notes, and snippets.

@cthom06
Created April 16, 2013 21:07
Show Gist options
  • Save cthom06/5399658 to your computer and use it in GitHub Desktop.
Save cthom06/5399658 to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ConsoleApplication8
{
class Program
{
static void Main(string[] args)
{
long i = 2;
var c = new TaskChan<long>();
Task _ = Worker(c);
while (true)
c.Send(i++).GetAwaiter().GetResult();
}
static async Task foo(TaskChan<int> c, int i)
{
Console.WriteLine("Waiting {0}", i);
await c.Send(i);
Console.WriteLine("Finished {0}", i);
}
static async Task Worker(TaskChan<long> c)
{
long v = await c.Recv();
Console.WriteLine(v);
var nc = new TaskChan<long>();
bool subStarted = false;
while (true)
{
long n = await c.Recv();
if (n % v != 0)
{
if (!subStarted)
{
Task _ = Worker(nc);
subStarted = true;
}
await nc.Send(n);
}
}
}
}
public class TaskChan<T> : IEnumerable<T>
{
int buffsize;
object l;
Queue<T> q;
Queue<Task<TaskChanTuple>> r;
Queue<Tuple<T,Task<bool>>> s;
bool closed;
public TaskChan()
: this(0)
{
}
public TaskChan(int buff)
{
buffsize = buff;
if (buff > 0)
q = new Queue<T>(buff);
else if (buff < 0)
q = new Queue<T>();
else
q = null;
r = new Queue<Task<TaskChanTuple>>();
s = new Queue<Tuple<T, Task<bool>>>();
l = new object();
closed = false;
}
public async Task Send(T v)
{
if (!await TrySend(v))
throw new Exception("Send on a closed TaskChan");
}
public async Task<bool> TrySend(T v)
{
Task<bool> t;
lock (l)
{
if (closed)
return false;
if (r.Count > 0)
{
var f = r.Dequeue();
((ctx)f.AsyncState).V = v;
((ctx)f.AsyncState).C = true;
f.Start();
return true;
}
else if (q != null && (buffsize < 0 || q.Count < buffsize))
{
q.Enqueue(v);
return true;
}
else
{
s.Enqueue(new Tuple<T, Task<bool>>(v, t = new Task<bool>((o) => {
return ((ctx)o).C;
}, new ctx())));
}
}
return await t;
}
public async Task<T> Recv()
{
var t = await TryRecv();
if (!t.WasUnclosed)
throw new Exception("Recv on a closed TaskChan");
return t.Value;
}
public async Task<TaskChanTuple> TryRecv()
{
Task<TaskChanTuple> t;
lock (l)
{
if (closed && (q == null || q.Count == 0))
return new TaskChanTuple(default(T), false);
if (q != null && q.Count > 0)
{
return new TaskChanTuple(q.Dequeue(), true);
}
else if (s.Count > 0)
{
var f = s.Dequeue();
((ctx)f.Item2.AsyncState).C = true;
f.Item2.Start();
return new TaskChanTuple(f.Item1, true);
}
else
{
r.Enqueue(t = new Task<TaskChanTuple>((o) =>
{
return new TaskChanTuple(((ctx)o).V, ((ctx)o).C);
}, new ctx()));
}
}
return await t;
}
public class TaskChanTuple
{
public T Value;
public bool WasUnclosed;
internal TaskChanTuple(T v, bool c)
{
Value = v;
WasUnclosed = c;
}
}
class ctx
{
public T V;
public bool C;
}
class ChanEnumerator : IEnumerator<T>
{
T curr;
bool currIsSet = false;
TaskChan<T> c;
internal ChanEnumerator(TaskChan<T> tc)
{
c = tc;
}
public T Current
{
get
{
if (!currIsSet)
throw new InvalidOperationException();
return curr;
}
}
public void Dispose()
{
}
object System.Collections.IEnumerator.Current
{
get
{
return Current;
}
}
public bool MoveNext()
{
var t = c.TryRecv().GetAwaiter().GetResult();
if (t.WasUnclosed)
{
curr = t.Value;
currIsSet = true;
}
return t.WasUnclosed;
}
public void Reset()
{
throw new NotImplementedException();
}
}
public IEnumerator<T> GetEnumerator()
{
return new ChanEnumerator(this);
}
System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment