Created
February 5, 2014 12:43
-
-
Save forcewake/8822822 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Collections.Generic; | |
using System.Threading.Tasks; | |
using System.Collections.Concurrent; | |
using System.Text; | |
using System.Threading; | |
namespace MapReduceWords | |
{ | |
public class WordReducer | |
{ | |
public static ConcurrentBag wordBag = new ConcurrentBag(); | |
public BlockingCollection wordChunks = new BlockingCollection(wordBag); | |
/// | |
/// 1. Produce 250 character or less chunks of text. | |
/// 2. Break chunks on the first space encountered before 250 characters. | |
/// | |
/// | |
/// | |
public IEnumerable produceWordBlocks(string fileText) | |
{ | |
int blockSize = 250; | |
int startPos = 0; | |
int len = 0; | |
for (int i = 0; i < fileText.Length; i++) | |
{ | |
i = i + blockSize > fileText.Length -1 ? fileText.Length -1 : i + blockSize; | |
while (i >= startPos && fileText[i] != ' ') | |
{ | |
i--; | |
} | |
if (i == startPos) | |
{ | |
i = i + blockSize > (fileText.Length - 1) ? fileText.Length - 1 : i + blockSize; | |
len = (i - startPos) + 1; | |
} | |
else | |
{ | |
len = i - startPos; | |
} | |
yield return fileText.Substring(startPos, len).Trim(); | |
startPos = i; | |
} | |
} | |
public void mapWords(string fileText) | |
{ | |
Parallel.ForEach(produceWordBlocks(fileText), wordBlock => | |
{ //split the block into words | |
string[] words = wordBlock.Split(' '); | |
StringBuilder wordBuffer = new StringBuilder(); | |
//cleanup each word and map it | |
foreach (string word in words) | |
{ //Remove all spaces and punctuation | |
foreach (char c in word) | |
{ | |
if (char.IsLetterOrDigit(c) || c == '\'' || c == '-') | |
wordBuffer.Append(c); | |
} | |
//Send word to the wordChunks Blocking Collection | |
if (wordBuffer.Length > 0) | |
{ | |
wordChunks.Add(wordBuffer.ToString()); | |
wordBuffer.Clear(); | |
} | |
} | |
}); | |
wordChunks.CompleteAdding(); | |
} | |
public ConcurrentDictionary wordStore = new ConcurrentDictionary(); | |
public void reduceWords() | |
{ | |
Parallel.ForEach(wordChunks.GetConsumingEnumerable(), word => | |
{ //if the word exists, use a thread safe delegate to increment the value by 1 | |
//otherwise, add the word with a default value of 1 | |
wordStore.AddOrUpdate(word, 1, (key, oldValue) => Interlocked.Increment(ref oldValue)); | |
}); | |
} | |
public void mapReduce(string fileText) | |
{ //Reset the Blocking Collection, if already used | |
if (wordChunks.IsAddingCompleted) | |
{ | |
wordBag = new ConcurrentBag(); | |
wordChunks = new BlockingCollection(wordBag); | |
} | |
//Create background process to map input data to words | |
System.Threading.ThreadPool.QueueUserWorkItem(delegate(object state) | |
{ | |
mapWords(fileText); | |
}); | |
//Reduce mapped words | |
reduceWords(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment