Last active
August 29, 2015 14:14
-
-
Save ralfw/72dcaee5470510c43a3c to your computer and use it in GitHub Desktop.
Simple streams based on IEnumerable
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Stream<T> : IEnumerable<T> { | |
private IEnumerable<T> generator; | |
public Stream(IEnumerable<T> generator) { this.generator = generator; } | |
private StreamEnumerator<T> enumerator; | |
public Stream() { this.enumerator = new StreamEnumerator<T> (); } | |
public void Enqueue(T element) { | |
this.enumerator.Enqueue (element); | |
} | |
public void Close() { | |
this.enumerator.Close (); | |
} | |
public void Continue(Action<T> process) { | |
foreach (var e in this) | |
process (e); | |
} | |
#region IEnumerable<T> implementation | |
public IEnumerator<T> GetEnumerator () | |
{ | |
if (this.generator != null) | |
return this.generator.GetEnumerator (); | |
return this.enumerator; | |
} | |
#endregion | |
#region IEnumerable implementation | |
System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator () | |
{ | |
if (this.generator != null) | |
return this.generator.GetEnumerator (); | |
return this.enumerator; | |
} | |
#endregion | |
class StreamEnumerator<T> : IEnumerator<T> { | |
class Element<T> { | |
public T Data; | |
public bool IsEndOfStream; | |
} | |
Queue<Element<T>> elements = new Queue<Element<T>>(); | |
AutoResetEvent elementAvailable = new AutoResetEvent(false); | |
public void Enqueue(T element) { | |
lock (elements) { | |
this.elements.Enqueue (new Element<T>{Data = element}); | |
this.elementAvailable.Set (); | |
} | |
} | |
public void Close() { | |
lock (elements) { | |
this.elements.Enqueue (new Element<T>{IsEndOfStream=true}); | |
this.elementAvailable.Set (); | |
} | |
} | |
#region IEnumerator implementation | |
private T currentElement; | |
public bool MoveNext () | |
{ | |
this.elementAvailable.WaitOne (); | |
lock (this.elements) { | |
var e = this.elements.Dequeue (); | |
if (e.IsEndOfStream) return false; | |
this.currentElement = e.Data; | |
if (this.elements.Count > 0) this.elementAvailable.Set (); | |
return true; | |
} | |
} | |
public object Current { get { | |
lock (this.elements) { | |
return this.currentElement; | |
} | |
}} | |
public void Reset () {} | |
#endregion | |
#region IEnumerator implementation | |
T IEnumerator<T>.Current { get { | |
return (T)this.Current; | |
}} | |
#endregion | |
#region IDisposable implementation | |
public void Dispose () { this.elementAvailable.Dispose (); } | |
#endregion | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class MainClass | |
{ | |
public static void Main (string[] args) | |
{ | |
Console.WriteLine ("Stream returned based on function returning an IEnum<T>"); | |
var s = DoSth (3); | |
foreach (var e in s) { Console.WriteLine (e); } | |
Console.WriteLine ("Streams constructed based on an IEnum<T>"); | |
s = new Stream<int> (new[] {9,8,7}); | |
foreach (var e in s) { Console.WriteLine (e); } | |
s = new Stream<int> (CreateNumbers (3)); | |
foreach (var e in s) { Console.WriteLine (e); } | |
Console.WriteLine ("Stream filled 'manually'; needs to be closed!"); | |
var s2 = new Stream<string> (); | |
s2.Enqueue ("a"); | |
s2.Enqueue ("b"); | |
s2.Enqueue ("c"); | |
s2.Close (); | |
foreach (var e in s2) { Console.WriteLine (e); } | |
Console.WriteLine ("Stream filled in background... (delay: 1sec)"); | |
s = DoInParallel (3); | |
foreach (var e in s) { Console.WriteLine (e); } | |
Console.WriteLine ("Stream elements from background processed with fluent method..."); | |
DoInParallel (3).Continue ( | |
Console.WriteLine); | |
Console.WriteLine ("Output to streams on different threads..."); | |
SplitInParallel (5, | |
sEven => { | |
foreach(var e in sEven) | |
Console.WriteLine(e); | |
}, | |
sOdd => { | |
foreach(var o in sOdd) | |
Console.WriteLine("\t{0}", o); | |
}); | |
} | |
static Stream<int> DoSth(int n) { | |
n = 2 * n; | |
return new Stream<int> (CreateNumbers(n)); | |
} | |
static IEnumerable<int> CreateNumbers(int n) { | |
for (var i = 0; i < n; i++) | |
yield return i; | |
} | |
static Stream<int> DoInParallel(int n) { | |
var s = new Stream<int> (); | |
var th = new System.Threading.Thread (_ => { | |
for(var i = 0; i<n; i++) { | |
s.Enqueue(10*i); | |
System.Threading.Thread.Sleep(1000); | |
} | |
s.Close(); | |
}); | |
th.IsBackground = true; | |
th.Start (); | |
return s; | |
} | |
static void SplitInParallel(int n, Action<Stream<int>> onEven, Action<Stream<int>> onOdd) | |
{ | |
var sEven = new Stream<int> (); | |
var sOdd = new Stream<int>(); | |
System.Threading.ThreadPool.QueueUserWorkItem (_ => onEven (sEven)); | |
System.Threading.ThreadPool.QueueUserWorkItem (_ => onOdd (sOdd)); | |
for(var i = 0; i<n; i++) { | |
if (i % 2 == 0) | |
sEven.Enqueue (i); | |
else | |
sOdd.Enqueue (-i); | |
System.Threading.Thread.Sleep(500); | |
} | |
sEven.Close(); | |
sOdd.Close (); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment