Skip to content

Instantly share code, notes, and snippets.

@fresky
Last active August 29, 2015 13:59
Show Gist options
  • Save fresky/10632899 to your computer and use it in GitHub Desktop.
Save fresky/10632899 to your computer and use it in GitHub Desktop.
Using Task.ContinueWith and TPL Dataflow to solve the pipeline problem
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace TPLDataFlowExample
{
class Program
{
static void Main(string[] args)
{
Stopwatch sw = new Stopwatch();
sw.Start();
getWordFrequencyWithTPLDataflow();
getWordFrequencyWithTask();
sw.Stop();
Console.WriteLine("Elapse Time: {0}", sw.ElapsedMilliseconds);
}
private static void getWordFrequencyWithTask()
{
List<Task> allFinalTasks = new List<Task>();
foreach (var fileName in Directory.GetFiles(@"D:\Temp\test"))
{
allFinalTasks.Add(Task<string>.Factory.StartNew(() => readFileText(fileName))
.ContinueWith<string[]>(t => createWordList(t.Result))
.ContinueWith<Dictionary<string, int>>(t => fileterWordList(t.Result))
.ContinueWith(t => printWordFrequency(t.Result)));
}
Task.WaitAll(allFinalTasks.ToArray());
}
private static void getWordFrequencyWithTPLDataflow()
{
// 1st, Read the files to strings
var read = new TransformBlock<string, string>(fileName => readFileText(fileName));
// 2nd, Separates the specified text into an array of words.
var create = new TransformBlock<string, string[]>(text => Program.createWordList(text));
// 3rd, Removes short words, orders the resulting words by frequency.
var filter = new TransformBlock<string[], Dictionary<string, int>>(words => fileterWordList(words));
// 4th, Prints the provided palindrome to the console.
var print = new ActionBlock<Dictionary<string, int>>(dic => printWordFrequency(dic));
//
// Connect the dataflow blocks to form a pipeline.
//
read.LinkTo(create);
create.LinkTo(filter);
filter.LinkTo(print);
//
// For each completion task in the pipeline, create a continuation task
// that marks the next block in the pipeline as completed.
// A completed dataflow block processes any buffered elements, but does
// not accept new elements.
//
read.Completion.ContinueWith(t =>
{
if (t.IsFaulted) ((IDataflowBlock) create).Fault(t.Exception);
else create.Complete();
});
create.Completion.ContinueWith(t =>
{
if (t.IsFaulted) ((IDataflowBlock) filter).Fault(t.Exception);
else filter.Complete();
});
filter.Completion.ContinueWith(t =>
{
if (t.IsFaulted) ((IDataflowBlock)print).Fault(t.Exception);
else print.Complete();
});
foreach (var fileName in Directory.GetFiles(@"D:\Temp\test"))
{
read.Post(fileName);
}
read.Complete();
print.Completion.Wait();
}
private static string readFileText(string fileName)
{
Console.WriteLine("Reading '{0}'...", fileName);
return File.ReadAllText(fileName);
}
private static string[] createWordList(string text)
{
Console.WriteLine("Creating word list...");
// Remove common punctuation by replacing all non-letter characters
// with a space character to.
char[] tokens = text.ToArray();
for (int i = 0; i < tokens.Length; i++)
{
if (!char.IsLetter(tokens[i]))
tokens[i] = ' ';
}
text = new string(tokens);
// Separate the text into an array of words.
return text.Split(new char[] {' '},
StringSplitOptions.RemoveEmptyEntries);
}
private static Dictionary<string, int> fileterWordList(string[] words)
{
Console.WriteLine("Group and ordering word list...");
return words.Where(word => word.Length > 3).GroupBy(word => word)
.OrderByDescending(group => group.Count())
.ToDictionary(group => group.Key, group => group.Count());
}
private static void printWordFrequency(Dictionary<string, int> dic)
{
Console.WriteLine("Top 5:");
int top = 0;
foreach (var pair in dic)
{
Console.WriteLine("{0} : {1}", pair.Key, pair.Value);
if (++top > 5)
break;
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment