Skip to content

Instantly share code, notes, and snippets.

@naraga
Created September 20, 2012 19:31
Show Gist options
  • Save naraga/3757871 to your computer and use it in GitHub Desktop.
Save naraga/3757871 to your computer and use it in GitHub Desktop.
Linear stream proccessing with PLINQ
class Program
{
private const int RecordsCount = 3000;
private static readonly Stopwatch Stopwatch = new Stopwatch();
private const string InputDataFile =
"C:\\Users\\Boris\\Documents\\Visual Studio 2012\\Projects\\LinearStreamProccessingWithPlinq\\PlinqTests\\DataFile1.txt";
static void Main(string[] args)
{
GenerateFakeDataFile();
Console.WriteLine("Press ENTER to finish start");
Console.ReadLine();
Stopwatch.Start();
var processedRecords = GetRecordsFromFile().AsParallel().AsOrdered()
.Select(ProccessRecord);
foreach (var r in processedRecords)
{
Console.WriteLine(r.Result);
}
Console.WriteLine("Press ENTER to finish {0}", Stopwatch.Elapsed);
Console.ReadLine();
}
private async static Task<string> ProccessRecord(string s)
{
// perform "compute-boud" operations
var val = Regex.Match(s, "x(\\d+)x").Groups[1].Value;
if (val == "666666666")
{
return "EXXXX-" + s;
}
// perform "io bound operation" (async makes sense only here)
await StoreIntoDbAsync(val);
return "OK-" + s;
}
private async static Task StoreIntoDbAsync(string dataToStore)
{
await Task.Delay(1000); // call DB (1s = network latency + proccessing time)
}
static IEnumerable<string> GetRecordsFromFile()
{
using (var streamReader = new StreamReader(InputDataFile, Encoding.ASCII))
{
string str;
while ((str = streamReader.ReadLine()) != null)
{
yield return str;
}
}
}
static void GenerateFakeDataFile()
{
File.WriteAllLines(InputDataFile, Enumerable.Range(0, RecordsCount).Select(x => string.Format("x{0}x", x)));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment