Last active
August 29, 2015 13:59
-
-
Save fresky/10632899 to your computer and use it in GitHub Desktop.
Using Task.ContinueWith and TPL Dataflow to solve the pipeline problem
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Diagnostics; | |
using System.IO; | |
using System.Linq; | |
using System.Threading.Tasks; | |
using System.Threading.Tasks.Dataflow; | |
namespace TPLDataFlowExample | |
{ | |
class Program | |
{ | |
static void Main(string[] args) | |
{ | |
Stopwatch sw = new Stopwatch(); | |
sw.Start(); | |
getWordFrequencyWithTPLDataflow(); | |
getWordFrequencyWithTask(); | |
sw.Stop(); | |
Console.WriteLine("Elapse Time: {0}", sw.ElapsedMilliseconds); | |
} | |
private static void getWordFrequencyWithTask() | |
{ | |
List<Task> allFinalTasks = new List<Task>(); | |
foreach (var fileName in Directory.GetFiles(@"D:\Temp\test")) | |
{ | |
allFinalTasks.Add(Task<string>.Factory.StartNew(() => readFileText(fileName)) | |
.ContinueWith<string[]>(t => createWordList(t.Result)) | |
.ContinueWith<Dictionary<string, int>>(t => fileterWordList(t.Result)) | |
.ContinueWith(t => printWordFrequency(t.Result))); | |
} | |
Task.WaitAll(allFinalTasks.ToArray()); | |
} | |
private static void getWordFrequencyWithTPLDataflow() | |
{ | |
// 1st, Read the files to strings | |
var read = new TransformBlock<string, string>(fileName => readFileText(fileName)); | |
// 2nd, Separates the specified text into an array of words. | |
var create = new TransformBlock<string, string[]>(text => Program.createWordList(text)); | |
// 3rd, Removes short words, orders the resulting words by frequency. | |
var filter = new TransformBlock<string[], Dictionary<string, int>>(words => fileterWordList(words)); | |
// 4th, Prints the provided palindrome to the console. | |
var print = new ActionBlock<Dictionary<string, int>>(dic => printWordFrequency(dic)); | |
// | |
// Connect the dataflow blocks to form a pipeline. | |
// | |
read.LinkTo(create); | |
create.LinkTo(filter); | |
filter.LinkTo(print); | |
// | |
// For each completion task in the pipeline, create a continuation task | |
// that marks the next block in the pipeline as completed. | |
// A completed dataflow block processes any buffered elements, but does | |
// not accept new elements. | |
// | |
read.Completion.ContinueWith(t => | |
{ | |
if (t.IsFaulted) ((IDataflowBlock) create).Fault(t.Exception); | |
else create.Complete(); | |
}); | |
create.Completion.ContinueWith(t => | |
{ | |
if (t.IsFaulted) ((IDataflowBlock) filter).Fault(t.Exception); | |
else filter.Complete(); | |
}); | |
filter.Completion.ContinueWith(t => | |
{ | |
if (t.IsFaulted) ((IDataflowBlock)print).Fault(t.Exception); | |
else print.Complete(); | |
}); | |
foreach (var fileName in Directory.GetFiles(@"D:\Temp\test")) | |
{ | |
read.Post(fileName); | |
} | |
read.Complete(); | |
print.Completion.Wait(); | |
} | |
private static string readFileText(string fileName) | |
{ | |
Console.WriteLine("Reading '{0}'...", fileName); | |
return File.ReadAllText(fileName); | |
} | |
private static string[] createWordList(string text) | |
{ | |
Console.WriteLine("Creating word list..."); | |
// Remove common punctuation by replacing all non-letter characters | |
// with a space character to. | |
char[] tokens = text.ToArray(); | |
for (int i = 0; i < tokens.Length; i++) | |
{ | |
if (!char.IsLetter(tokens[i])) | |
tokens[i] = ' '; | |
} | |
text = new string(tokens); | |
// Separate the text into an array of words. | |
return text.Split(new char[] {' '}, | |
StringSplitOptions.RemoveEmptyEntries); | |
} | |
private static Dictionary<string, int> fileterWordList(string[] words) | |
{ | |
Console.WriteLine("Group and ordering word list..."); | |
return words.Where(word => word.Length > 3).GroupBy(word => word) | |
.OrderByDescending(group => group.Count()) | |
.ToDictionary(group => group.Key, group => group.Count()); | |
} | |
private static void printWordFrequency(Dictionary<string, int> dic) | |
{ | |
Console.WriteLine("Top 5:"); | |
int top = 0; | |
foreach (var pair in dic) | |
{ | |
Console.WriteLine("{0} : {1}", pair.Key, pair.Value); | |
if (++top > 5) | |
break; | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment