Skip to content

Instantly share code, notes, and snippets.

@ralfw
Last active August 29, 2015 14:14
Show Gist options
  • Save ralfw/72dcaee5470510c43a3c to your computer and use it in GitHub Desktop.
Save ralfw/72dcaee5470510c43a3c to your computer and use it in GitHub Desktop.
Simple streams based on IEnumerable
class Stream<T> : IEnumerable<T> {
private IEnumerable<T> generator;
public Stream(IEnumerable<T> generator) { this.generator = generator; }
private StreamEnumerator<T> enumerator;
public Stream() { this.enumerator = new StreamEnumerator<T> (); }
public void Enqueue(T element) {
this.enumerator.Enqueue (element);
}
public void Close() {
this.enumerator.Close ();
}
public void Continue(Action<T> process) {
foreach (var e in this)
process (e);
}
#region IEnumerable<T> implementation
public IEnumerator<T> GetEnumerator ()
{
if (this.generator != null)
return this.generator.GetEnumerator ();
return this.enumerator;
}
#endregion
#region IEnumerable implementation
System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator ()
{
if (this.generator != null)
return this.generator.GetEnumerator ();
return this.enumerator;
}
#endregion
class StreamEnumerator<T> : IEnumerator<T> {
class Element<T> {
public T Data;
public bool IsEndOfStream;
}
Queue<Element<T>> elements = new Queue<Element<T>>();
AutoResetEvent elementAvailable = new AutoResetEvent(false);
public void Enqueue(T element) {
lock (elements) {
this.elements.Enqueue (new Element<T>{Data = element});
this.elementAvailable.Set ();
}
}
public void Close() {
lock (elements) {
this.elements.Enqueue (new Element<T>{IsEndOfStream=true});
this.elementAvailable.Set ();
}
}
#region IEnumerator implementation
private T currentElement;
public bool MoveNext ()
{
this.elementAvailable.WaitOne ();
lock (this.elements) {
var e = this.elements.Dequeue ();
if (e.IsEndOfStream) return false;
this.currentElement = e.Data;
if (this.elements.Count > 0) this.elementAvailable.Set ();
return true;
}
}
public object Current { get {
lock (this.elements) {
return this.currentElement;
}
}}
public void Reset () {}
#endregion
#region IEnumerator implementation
T IEnumerator<T>.Current { get {
return (T)this.Current;
}}
#endregion
#region IDisposable implementation
public void Dispose () { this.elementAvailable.Dispose (); }
#endregion
}
}
class MainClass
{
public static void Main (string[] args)
{
Console.WriteLine ("Stream returned based on function returning an IEnum<T>");
var s = DoSth (3);
foreach (var e in s) { Console.WriteLine (e); }
Console.WriteLine ("Streams constructed based on an IEnum<T>");
s = new Stream<int> (new[] {9,8,7});
foreach (var e in s) { Console.WriteLine (e); }
s = new Stream<int> (CreateNumbers (3));
foreach (var e in s) { Console.WriteLine (e); }
Console.WriteLine ("Stream filled 'manually'; needs to be closed!");
var s2 = new Stream<string> ();
s2.Enqueue ("a");
s2.Enqueue ("b");
s2.Enqueue ("c");
s2.Close ();
foreach (var e in s2) { Console.WriteLine (e); }
Console.WriteLine ("Stream filled in background... (delay: 1sec)");
s = DoInParallel (3);
foreach (var e in s) { Console.WriteLine (e); }
Console.WriteLine ("Stream elements from background processed with fluent method...");
DoInParallel (3).Continue (
Console.WriteLine);
Console.WriteLine ("Output to streams on different threads...");
SplitInParallel (5,
sEven => {
foreach(var e in sEven)
Console.WriteLine(e);
},
sOdd => {
foreach(var o in sOdd)
Console.WriteLine("\t{0}", o);
});
}
static Stream<int> DoSth(int n) {
n = 2 * n;
return new Stream<int> (CreateNumbers(n));
}
static IEnumerable<int> CreateNumbers(int n) {
for (var i = 0; i < n; i++)
yield return i;
}
static Stream<int> DoInParallel(int n) {
var s = new Stream<int> ();
var th = new System.Threading.Thread (_ => {
for(var i = 0; i<n; i++) {
s.Enqueue(10*i);
System.Threading.Thread.Sleep(1000);
}
s.Close();
});
th.IsBackground = true;
th.Start ();
return s;
}
static void SplitInParallel(int n, Action<Stream<int>> onEven, Action<Stream<int>> onOdd)
{
var sEven = new Stream<int> ();
var sOdd = new Stream<int>();
System.Threading.ThreadPool.QueueUserWorkItem (_ => onEven (sEven));
System.Threading.ThreadPool.QueueUserWorkItem (_ => onOdd (sOdd));
for(var i = 0; i<n; i++) {
if (i % 2 == 0)
sEven.Enqueue (i);
else
sOdd.Enqueue (-i);
System.Threading.Thread.Sleep(500);
}
sEven.Close();
sOdd.Close ();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment