Skip to content

Instantly share code, notes, and snippets.

@ryu1
Forked from ifandelse/FileCombiner.cs
Last active August 29, 2015 14:17
Show Gist options
  • Save ryu1/a73d1cde044fb3e45931 to your computer and use it in GitHub Desktop.
Save ryu1/a73d1cde044fb3e45931 to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace FileCombiner
{
class Program
{
private static string _dirPath = @"C:\GIT\FileCombiner\FileCombiner\Files";
private static string _outFile = Path.Combine(_dirPath, @"Output\output.txt");
static void Main(string[] args)
{
File.Delete(_outFile);
var output = new OutputWriter(_outFile);
var eventLoop = new EventLoop();
eventLoop.Start();
eventLoop.Enqueue( () => Directory
.GetFiles( _dirPath )
.AsParallel()
.ForAll( file =>
{
File.ReadAllLines( file )
.AsParallel()
.ForAll(
line => eventLoop.Enqueue(() => output.WriteOutput( String.Format("{0}\t{1}",file,line) ) ) );
;
} ));
// not worried about completion notice at the moment - just waiting for sample file to hit 4290k.
Console.WriteLine("Processing....press Enter to exit when complete");
Console.ReadLine();
}
}
public class OutputWriter
{
private readonly Object _lockSync = new Object();
private readonly string _path;
public OutputWriter(string path)
{
_path = path;
}
public void WriteOutput( string line )
{
lock(_lockSync) // Yep, locks every time - enough for example
{
using(var fs = File.AppendText(_path))
{
fs.WriteLine(line);
}
}
}
}
public class EventLoop
{
public bool Running { get; set; }
public ConcurrentQueue<Action> ActionQueue { get; set; }
public ManualResetEventSlim Wait { get; set; }
public void Loop()
{
while( Running )
{
Action action = null;
if( ActionQueue.TryDequeue( out action ) )
{
try
{
Task.Factory.StartNew( action );
}
catch( Exception ex )
{
Console.WriteLine( ex );
}
}
else
{
Wait.Reset();
Wait.Wait();
}
}
}
public void Enqueue( Action action )
{
ActionQueue.Enqueue( action );
Wait.Set();
}
public void Start()
{
Running = true;
Task.Factory.StartNew(Loop);
}
public void Stop()
{
Running = false;
Wait.Set();
}
public EventLoop( )
{
ActionQueue = new ConcurrentQueue<Action>();
Wait = new ManualResetEventSlim( false );
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment