Created
September 10, 2012 15:17
-
-
Save keithbloom/3691448 to your computer and use it in GitHub Desktop.
0MQ and threads
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var fileList = EnumerateDirectory(@"C:\example\directory", "*.*", SearchOption.AllDirectories); | |
ventilator.Run(fileList); | |
var result = sink.Run(fileList.Length); | |
Console.WriteLine("Found the length of {0} files in {1} milliseconds.\nDirectory size is {2}", | |
fileList.Length, stopWatch.ElapsedMilliseconds, result); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var context = ZmqContext.Create() | |
var ventilator = new Ventilator(context); | |
var sink = new Sink(context); | |
ventilator.Start(); | |
sink.Start(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const int workersCount = 4; | |
var workers = new Thread[workersCount]; | |
for (int i = 0; i < workersCount; i++) | |
{ | |
(workers[i] = new Thread(() => new TaskWorker(context).Run())).Start(); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public void Start() | |
{ | |
_receiver = _context.CreateSocket(SocketType.PULL); | |
_receiver.Bind("inproc://sink"); | |
} | |
public Int64 Run(int length) | |
{ | |
Int64 sizeOfDirectory = 0; | |
for (var i = 0; i < length; i++) | |
{ | |
var size = _receiver.Receive(Encoding.Unicode); | |
Int64 temp; | |
if(Int64.TryParse(size, out temp)) | |
{ | |
sizeOfDirectory += temp; | |
} | |
} | |
return sizeOfDirectory; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public void Start() | |
{ | |
_ventilator = _context.CreateSocket(SocketType.PUSH); | |
_ventilator.Bind("inproc://ventilator"); | |
} | |
public void Run(string[] fileList) | |
{ | |
foreach (var fileName in fileList) | |
{ | |
_ventilator.Send(fileName, Encoding.Unicode); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public void Run() | |
{ | |
using (ZmqSocket ventilator = _context.CreateSocket(SocketType.PULL), | |
sink = _context.CreateSocket(SocketType.PUSH)) | |
{ | |
ventilator.Connect("inproc://ventilator"); | |
sink.Connect("inproc://sink"); | |
ventilator.ReceiveReady += (socket, events) => | |
{ | |
RecieverPollInHandler(ventilator, sink); | |
}; | |
var poller = new Poller(new[] {ventilator, controller}); | |
while (true) | |
{ | |
poller.Poll(); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private void RecieverPollInHandler(ZmqSocket reciever, ZmqSocket) | |
{ | |
Thread.Sleep(100); | |
// Pull the job from the Ventilator | |
var fileToMeasure = reciever.Receive(Encoding.Unicode); | |
Int64 fileLength = 0; | |
FileStream fs = null; | |
try | |
{ | |
fs = File.OpenRead(fileToMeasure); | |
fileLength = fs.Length; | |
} | |
catch (IOException) { } | |
finally | |
{ | |
if (fs != null) fs.Dispose(); | |
} | |
Console.Write("."); | |
// Push the result to the Sink | |
sender.Send(fileLength.ToString(), Encoding.Unicode); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment