Skip to content

Instantly share code, notes, and snippets.

@hodzanassredin
Created November 28, 2013 15:06
Show Gist options
  • Save hodzanassredin/7693292 to your computer and use it in GitHub Desktop.
Save hodzanassredin/7693292 to your computer and use it in GitHub Desktop.
trying to understand difference between channel and push and pull
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace AsyncTest
{
public class Program
{
public class Option<T>{
public static Option<T> None { get { return new Option<T>(); } }
public static Option<T> ValueC(T val) { return new Option<T>(val); }
public Option()
{
HasValue = false;
}
public Option(T val)
{
HasValue = true;
Value = val;
}
public bool HasValue { get; set; }
public T Value { get; set; }
}
public class AsyncPull<T>
{
public Func<Task<Option<T>>> Pull { get; set; }
}
public class AsyncPush<T>
{
public Action<Task<Option<T>>> Push { get; set; }
}
public class BlockingChannel<T>
{
public BlockingChannel()
{
Push =
}
public AsyncPush<T> Push { get; set; }
public AsyncPull<T> Pull { get; set; }
}
static AsyncPull<int> PullSource()
{
var i = 0;
Func<Task<Option<int>>> res = () =>
{
if (i < 100) {
i = i + 1;
return Task.FromResult((Option<int>.ValueC(i)));
}
else { return Task.FromResult(Option<int>.None); }
};
return new AsyncPull<int> { Pull = res };
}
static Action PushSource(AsyncPush<int> reciever)
{
Action res = () =>
{
for (int i = 1000; i < 1100; i++)
{
reciever.Push(Task.FromResult(Option<int>.ValueC(i)));
}
reciever.Push(Task.FromResult(Option<int>.None));
};
return res;
}
static AsyncPull<U> Map<T, U>(AsyncPull<T> source, Func<T, Task<U>> f)
{
Func<Task<Option<U>>> res = async () =>
{
var t = await source.Pull();
var u = await f(t.Value);
return t.HasValue ? Option<U>.ValueC(u) : Option<U>.None;
};
return new AsyncPull<U> { Pull = res };
}
static async Task PullPrint<U>(AsyncPull<U> source)
{
var it = await source.Pull();
while (it.HasValue)
{
Console.WriteLine(it.Value);
it = await source.Pull();
}
}
static AsyncPush<U> PushPrint<U>()
{
Action<Task<Option<U>>> res = async (x) =>
{
var r = await x;
if (r.HasValue) Console.WriteLine(r.Value);
};
return new AsyncPush<U>{Push = res};
}
static void Main(string[] args)
{
var src = PullSource();
var mapped = Map(src, x => Task.FromResult(x * 2));
PullPrint(mapped).Wait();
var pushPrinter = PushPrint<int>();
var pusher = PushSource(pushPrinter);
pusher();
var channel = new BlockingChannel<int> { };
var chpusher = PushSource(channel.Push);
var cPrint = PullPrint(channel.Pull);
chpusher();
Console.ReadKey();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment