Forked from valm/TPL DataFlow and Reactive Extensions Examples
Created
February 7, 2017 23:55
-
-
Save elranu/693a6b0bd5802d84ada886b41572b55d to your computer and use it in GitHub Desktop.
TPL DataFlow and Reactive Extensions Example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Diagnostics; | |
using System.Globalization; | |
using System.IO; | |
using System.Linq; | |
using System.Net.Http; | |
using System.Reactive.Linq; | |
using System.Text; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using System.Threading.Tasks.Dataflow; | |
using System.Xml.Linq; | |
namespace TPLDataFlowTest | |
{ | |
class Program | |
{ | |
static void Main(string[] args) | |
{ | |
//TPLDemo1(); | |
RxMix5(); | |
//RxMix4(); | |
Console.ReadKey(); | |
} | |
private static void RxMix5() | |
{ | |
var blockOptions = new ExecutionDataflowBlockOptions | |
{ | |
MaxDegreeOfParallelism = 1, | |
BoundedCapacity = 1 | |
}; | |
ActionBlock<int> warmupBlock = new ActionBlock<int>(async i => | |
{ | |
await Task.Delay(1000); | |
Console.WriteLine(i); | |
}, blockOptions); | |
ActionBlock<int> postBlock = new ActionBlock<int>(async i => | |
{ | |
await Task.Delay(1000); | |
Console.WriteLine(i); | |
}, blockOptions); | |
IObservable<int> warmUpSource = Observable.Range(1, 100).TakeUntil(DateTimeOffset.UtcNow.AddSeconds(5)); | |
warmUpSource.Subscribe(warmupBlock.AsObserver()); | |
IObservable<int> testSource = Observable.Range(1000, 1000).TakeUntil(DateTimeOffset.UtcNow.AddSeconds(10)); | |
testSource.Subscribe(postBlock.AsObserver()); | |
} | |
private static void RxMix1() | |
{ | |
IPropagatorBlock<int, string> source = new TransformBlock<int, string>(i => i.ToString()); | |
IObservable<int> observable = source.AsObservable().Select(Int32.Parse); | |
IDisposable subscription = observable.Subscribe(i => Console.WriteLine(i)); | |
// send some data into TDF | |
source.Post(138); | |
} | |
private static void RxMix2() | |
{ | |
IPropagatorBlock<string, int> target = new TransformBlock<string, int>(s => Int32.Parse(s)); | |
IDisposable link = target.LinkTo(new ActionBlock<int>(i => Console.WriteLine(i))); | |
IObserver<string> observer = target.AsObserver(); | |
IObservable<string> observable = Observable.Range(1, 10).Select(i => i.ToString()); | |
observable.Subscribe(observer); | |
} | |
private static void RxMix4() | |
{ | |
var inputBlock = new BufferBlock<string>(); | |
var transformInputBlock = new TransformBlock<string, XDocument>(s => XDocument.Parse(s)); | |
var processBlock = new TransformBlock<XDocument, Tuple<string, int>>( | |
x => | |
{ | |
var person = x.Element("person"); | |
return Tuple.Create((string)person.Element("name"), (int)person.Element("age")); | |
}); | |
var transformOutputBlock = | |
new TransformBlock<Tuple<string, int>, string>( | |
t => string.Format(CultureInfo.CurrentCulture, "{0} is {1} years old", t.Item1, t.Item2)); | |
var outputBlock = new ActionBlock<string>(m => Console.Out.WriteLine(m)); | |
using (inputBlock.LinkTo(transformInputBlock)) | |
using (transformInputBlock.LinkTo(processBlock)) | |
using (processBlock.LinkTo(transformOutputBlock)) | |
using (transformOutputBlock.LinkTo(outputBlock)) | |
{ | |
inputBlock.Completion.ContinueWith(t => transformInputBlock.Complete()); | |
transformInputBlock.Completion.ContinueWith(t => processBlock.Complete()); | |
processBlock.Completion.ContinueWith(t => transformOutputBlock.Complete()); | |
transformOutputBlock.Completion.ContinueWith(t => outputBlock.Complete()); | |
var records = new[] | |
{ | |
"<person><name>Michael Collins</name><age>38</age></person>", | |
"<person><name>George Washington</name><age>281</age></person>", | |
"<person><name>Abraham Lincoln</name><age>204</age></person>" | |
}; | |
foreach (var record in records) | |
{ | |
inputBlock.Post(record); | |
} | |
inputBlock.Complete(); | |
outputBlock.Completion.Wait(); | |
} | |
} | |
private static void RxMix3() | |
{ | |
IObservable<int> originalInts = Observable.Range(1, 10); | |
IPropagatorBlock<int, int[]> batch = new BatchBlock<int>(2); | |
IObservable<int[]> batched = batch.AsObservable(); | |
originalInts.Subscribe(batch.AsObserver()); | |
IObservable<int> added = batched.Timeout(TimeSpan.FromMilliseconds(50)).Select(a => a.Sum()); | |
IPropagatorBlock<int, string> toString = new TransformBlock<int, string>(i => i.ToString()); | |
added.Subscribe(toString.AsObserver()); | |
JoinBlock<string, int> join = new JoinBlock<string, int>(); | |
toString.LinkTo(join.Target1); | |
IObserver<int> joinIn2 = join.Target2.AsObserver(); | |
originalInts.Subscribe(joinIn2); | |
IObservable<Tuple<string, int>> joined = join.AsObservable(); | |
joined.Subscribe(t => Console.WriteLine("{0};{1}", t.Item1, t.Item2)); | |
} | |
private static void TPLDemo1() | |
{ | |
// Create the cancellation source. | |
var cancellationSource = new CancellationTokenSource(); | |
var inputWorkBufferBlock = new BufferBlock<Uri>(); | |
// Input - Uri - seed address | |
// Output - Uri - key, content, content-type | |
var downloaderBlock = new TransformBlock<Uri, string>(address => | |
{ | |
var httpClient = new HttpClient(); | |
// Downloads the requested resource as a string. | |
Console.WriteLine("Downloading '{0}'... Thread id {1}", address.OriginalString, Thread.CurrentThread.ManagedThreadId); | |
var contentType = string.Empty; | |
var content = httpClient.GetAsync(address).ContinueWith(task => | |
{ | |
HttpResponseMessage response = task.Result; | |
if (task.Result.IsSuccessStatusCode) | |
{ | |
return task.Result.Content.ReadAsStringAsync(); | |
} | |
return new Task<string>(() => null); | |
}).Unwrap(); | |
return content.Result; | |
}, new ExecutionDataflowBlockOptions | |
{ | |
CancellationToken = cancellationSource.Token, | |
MaxDegreeOfParallelism = 5 | |
}); | |
var outputBufferBlock = new BufferBlock<string>(); | |
var saverBlock = new ActionBlock<string>(content => | |
{ | |
if (content != null) | |
{ | |
const string targetPath = "c:\\temp\\TPLtest"; | |
const string extension = ".html"; | |
var fileName = Path.ChangeExtension(Path.Combine(targetPath, Path.GetRandomFileName()), extension); | |
Console.WriteLine("Saving {0} ...Thread: {1}", fileName, Thread.CurrentThread.ManagedThreadId); | |
using (var stream = new StreamWriter(fileName)) | |
{ | |
stream.Write(content); | |
} | |
} | |
}, new ExecutionDataflowBlockOptions | |
{ | |
MaxDegreeOfParallelism = 1 | |
}); | |
// Blocks linking | |
inputWorkBufferBlock.LinkTo(downloaderBlock); | |
// Filtering, skips empty response | |
downloaderBlock.LinkTo(outputBufferBlock, s => !string.IsNullOrWhiteSpace(s)); | |
outputBufferBlock.LinkTo(saverBlock); | |
// Propagating completition | |
inputWorkBufferBlock.Completion.ContinueWith(t => | |
{ | |
if (t.IsFaulted) | |
{ | |
((IDataflowBlock)downloaderBlock).Fault(t.Exception); | |
} | |
else | |
{ | |
downloaderBlock.Complete(); | |
} | |
}); | |
downloaderBlock.Completion.ContinueWith(t => | |
{ | |
if (t.IsFaulted) | |
{ | |
((IDataflowBlock)outputBufferBlock).Fault(t.Exception); | |
} | |
else | |
{ | |
outputBufferBlock.Complete(); | |
} | |
}); | |
outputBufferBlock.Completion.ContinueWith(t => | |
{ | |
if (t.IsFaulted) | |
{ | |
((IDataflowBlock)outputBufferBlock).Fault(t.Exception); | |
} | |
else | |
{ | |
outputBufferBlock.Complete(); | |
} | |
}); | |
outputBufferBlock.Completion.ContinueWith(t => | |
{ | |
if (t.IsFaulted) | |
{ | |
((IDataflowBlock)saverBlock).Fault(t.Exception); | |
} | |
else | |
{ | |
saverBlock.Complete(); | |
} | |
}); | |
// Message passing | |
inputWorkBufferBlock.Post(new Uri("http://svnbook.red-bean.com/nightly/ru/svn-book.html")); | |
inputWorkBufferBlock.Post(new Uri("http://bash.im")); | |
inputWorkBufferBlock.Post(new Uri("http://habrahabr.ru")); | |
inputWorkBufferBlock.Post(new Uri("http://lb.ua")); | |
inputWorkBufferBlock.Post(new Uri("http://blogs.msdn.com/b/pfxteam/")); | |
inputWorkBufferBlock.Post(new Uri("http://hgbook.red-bean.com/read/a-tour-of-mercurial-merging-work.html")); | |
inputWorkBufferBlock.Complete(); | |
saverBlock.Completion.Wait(); | |
Console.WriteLine("Job is DONE..."); | |
Console.WriteLine("Hit ANY KEY to exit..."); | |
Console.ReadKey(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment