Last active
August 29, 2015 13:58
-
-
Save MartinBodocky/10013089 to your computer and use it in GitHub Desktop.
Asynchronous and Parallel Programming in F#
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Creating new threads | |
open System | |
open System.Threading | |
//What will execute on each thread | |
let threadBody() = | |
for i in 1..5 do | |
//Wait 1/10 of a second | |
Thread.Sleep(100) | |
printfn "[Thread %d] %d ..." | |
Thread.CurrentThread.ManagedThreadId i | |
let spawnThread() = | |
let thread = new Thread(threadBody) | |
thread.Start() | |
// spawn a few of threads at once | |
spawnThread() | |
spawnThread() | |
// how to use .NET Thread Pool | |
open System.Threading | |
ThreadPool.QueueUserWorkItem(fun _ -> | |
for i=1 to 5 do printfn "[Thread %d] %d ..." | |
Thread.CurrentThread.ManagedThreadId i) | |
//Our thread pool task, note that the delegate's | |
//paramter is of type obj | |
let printNumbers (max: obj) = | |
for i=1 to (max :?> int) do | |
printfn "[Thread %d] %d ..." | |
Thread.CurrentThread.ManagedThreadId i | |
ThreadPool.QueueUserWorkItem(new WaitCallback(printNumbers), box 5) | |
//Sharing data | |
//Run into race conditions | |
open System.Threading | |
let sumArray(arr : int[]) = | |
//create reference value | |
let total = ref 0 | |
//Add the first half | |
let thread1Finished = ref false | |
ThreadPool.QueueUserWorkItem( | |
fun _ -> | |
for i=0 to arr.Length/2-1 do | |
total := arr.[i] + !total | |
thread1Finished := true | |
) |> ignore | |
// add second half | |
let thread2Finished = ref false | |
ThreadPool.QueueUserWorkItem( | |
fun _ -> | |
for i = arr.Length/2 to arr.Length - 1 do | |
total := arr.[i] + !total | |
thread2Finished := true | |
) |> ignore | |
//Wait while the two threads finish their work | |
while !thread1Finished = false || !thread2Finished = false do | |
Thread.Sleep(0) | |
!total | |
//create a million ones | |
let millionOnes = Array.create 1000000 1 | |
#time | |
//right outcome is 1000000 | |
sumArray millionOnes | |
sumArray millionOnes | |
// Array summing using lock | |
let sumArrayLocked (arr : int[]) = | |
let total = ref 0 | |
//Add the first half | |
let thread1Finished = ref false | |
ThreadPool.QueueUserWorkItem( | |
fun _ -> | |
for i=0 to arr.Length/2-1 do | |
lock(total) (fun () -> total := arr.[i] + !total) | |
thread1Finished := true | |
) |> ignore | |
//Add the second half | |
let thread2Finished = ref false | |
ThreadPool.QueueUserWorkItem( | |
fun _ -> | |
for i=arr.Length/2 to arr.Length-1 do | |
lock(total) (fun () -> total := arr.[i] + !total) | |
thread2Finished := true | |
) |> ignore | |
//Wait while the two threads finished their work | |
while !thread1Finished = false || !thread2Finished =false do | |
Thread.Sleep(0) | |
!total | |
#time | |
sumArray millionOnes | |
sumArrayLocked millionOnes //it's slower than unlock version | |
//Run into deadlock in F# | |
type BackAccount = {AccountID : int; OwnerName : string; mutable Balance : int} | |
//Transfer money between back accounts | |
let transferFunds amount (fromAcct : BackAccount) (toAcct : BackAccount) = | |
printfn "Locking %s's account to deposit funds ..." toAcct.OwnerName | |
lock fromAcct | |
(fun () -> | |
printfn "Locking %s's account to withdraw funds ..." fromAcct.OwnerName | |
lock toAcct | |
(fun () -> | |
fromAcct.Balance <- fromAcct.Balance - amount | |
toAcct.Balance <- toAcct.Balance + amount | |
) | |
) | |
printfn "Final balance from %d to %d." fromAcct.Balance toAcct.Balance | |
let john = { AccountID = 1; OwnerName = "John Smith"; Balance = 1000} | |
let jane = { AccountID = 2; OwnerName = "Jane Doe"; Balance = 2000} | |
ThreadPool.QueueUserWorkItem(fun _ -> transferFunds 100 john jane) | |
ThreadPool.QueueUserWorkItem(fun _ -> transferFunds 100 jane john) | |
//Asynchronous file I/O using F# asycn workflows | |
open System | |
open System.IO | |
let asyncProcessFile (filePath : string) (processBytes : byte[] -> byte[]) = | |
async { | |
printfn "Processing file [%s]" (Path.GetFileName(filePath)) | |
use fileStream = new FileStream(filePath, FileMode.Open) | |
let bytesToRead = int fileStream.Length | |
let! data = fileStream.AsyncRead(bytesToRead) | |
printfn "Opened [%s], read [%d] bytes" | |
(Path.GetFileName(filePath)) data.Length | |
let data' = processBytes data | |
use resultFile = new FileStream(filePath + ".results", FileMode.Create) | |
do! resultFile.AsyncWrite(data', 0, data'.Length) | |
printfn "Finished processing file [%s]" <| Path.GetFileName(filePath) | |
} |> Async.Start | |
//apply with identity function | |
asyncProcessFile @"C:\Test\ba-soft-skills-20140403.png" id | |
//Async tasks | |
open System.IO | |
open System.Net | |
let getHtml (url :string) = | |
async { | |
let req = WebRequest.Create(url) | |
let! rsp = req.AsyncGetResponse() | |
use stream = rsp.GetResponseStream() | |
use reader = new StreamReader(stream) | |
return! reader.ReadToEndAsync() | |
} | |
let html = | |
getHtml "http://www.bing.com" | |
|> Async.RunSynchronously | |
//execute more tasks | |
let webPages : string[] = | |
["http://www.bing.com" ; "http://www.yahoo.com"; "http://www.google.com"] | |
|> List.map getHtml | |
|> Async.Parallel | |
|> Async.RunSynchronously | |
webPages | |
//Cancellation async operation | |
open System | |
open System.Threading | |
let cancelableTask = | |
async { | |
printfn "Waiting 10 seconds ..." | |
for i = 1 to 10 do | |
printfn "%d ..." i | |
do! Async.Sleep(1000) | |
printfn "Finished ..." | |
} | |
//Callback used when the operation is canceled | |
let cancelHandler (ex: OperationCanceledException) = | |
printfn "The task has been cancelled." | |
Async.TryCancelled(cancelableTask, cancelHandler) |> Async.Start | |
//this cancel just last asynchronous workflow | |
Async.CancelDefaultToken() | |
let computation = Async.TryCancelled(cancelableTask, cancelHandler) | |
let cancellationSource = new CancellationTokenSource() | |
Async.Start(computation, cancellationSource.Token) | |
//cancel by reference | |
cancellationSource.Cancel() | |
//Create async primitives | |
open System | |
open System.IO | |
type System.IO.Directory with | |
//Retrieve all files under a path asynchronously | |
static member AsyncGetFiles(path : string, searchPattern : string) = | |
let dele = new Func<string * string, string[]>(Directory.GetFiles) | |
Async.FromBeginEnd( | |
(path, searchPattern), | |
dele.BeginInvoke, | |
dele.EndInvoke) | |
type System.IO.File with | |
//Copy a file asynchronously | |
static member AsyncCopy(source : string, dest : string) = | |
let dele = new Func<string * string, unit>(File.Copy) | |
Async.FromBeginEnd( | |
(source, dest), | |
dele.BeginInvoke, | |
dele.EndInvoke) | |
let asyncBackup path searchPattern destPath = | |
async { | |
let! files = Directory.AsyncGetFiles(path, searchPattern) | |
for file in files do | |
let filename = Path.GetFileName(file) | |
do! File.AsyncCopy(file, Path.Combine(destPath, filename)) | |
} | |
//backing up my files :) | |
asyncBackup @"C:\Test" "*.*" @"C:\Backup" |> Async.RunSynchronously | |
//Parallel Programming with TPL | |
//Parallel.For | |
open System | |
open System.Threading.Tasks | |
//Multiply two matricies using the TPL | |
let matrixMultiply (a : float[,]) (b:float[,]) = | |
let aRow, aCol = Array2D.length1 a, Array2D.length2 b | |
let bRow, bCol = Array2D.length1 a, Array2D.length2 b | |
if aCol <> bRow then failwith "Array dimension mismatch" | |
// Allocate space for the resulting matrix | |
let c = Array2D.create aCol bRow 0.0 | |
let cRow, cCol = aCol, bRow | |
//Compute each row of the resulting matrix | |
let rowTask rowIdx = | |
for colIdx=0 to cCol-1 do | |
for x=0 to aRow-1 do | |
c.[colIdx, rowIdx] <- | |
c.[colIdx, rowIdx] + a.[x,colIdx] * b.[rowIdx, x] | |
() | |
let _ = Parallel.For(0, cRow, new Action<int>(rowTask)) | |
c // return the computed matrix | |
let x = Array2D.create 10 10 2.0 | |
let y = Array2D.create 10 10 8.0 | |
let result = matrixMultiply x y | |
//Using the Array.Parallel library | |
open System.IO | |
let getSecretData keyword = | |
let secretFiles = Directory.GetFiles(@"C:\Test", "*.png") | |
Array.Parallel.iter File.Decrypt secretFiles | |
let secretData = | |
Directory.GetFiles(@"C:\Test", "*.png") | |
|> Array.Parallel.map (fun filePath -> File.ReadAllText(filePath)) | |
|> Array.Parallel.choose (fun contents -> | |
if contents.Contains(keyword) | |
then Some(contents) | |
else None) | |
Array.Parallel.iter File.Encrypt secretFiles | |
secretData | |
//Using parallel primitoves | |
//using Tasks.WaitAll to wait for taks to complete | |
open System | |
open System.IO | |
open System.Drawing | |
open System.Drawing.Imaging | |
open System.Drawing.Drawing2D | |
open System.Threading.Tasks | |
//Resize images to a new width, height | |
let resizeImage (newWidth : int, newHeight : int) (filePath : string) = | |
let originalImage = Bitmap.FromFile(filePath) | |
let resizedImage = new Bitmap(newWidth, newHeight) | |
use g = Graphics.FromImage(resizedImage) | |
g.InterpolationMode <- InterpolationMode.HighQualityBicubic | |
g.DrawImage(originalImage, 0, 0, newWidth, newHeight) | |
let fileName = Path.GetFileNameWithoutExtension(filePath) | |
let fileFolder = Path.GetDirectoryName(filePath) | |
resizedImage.Save( | |
Path.Combine(fileFolder, fileName+".resized.png"), | |
ImageFormat.Png) | |
//Spawns a new TPL task to resize an image | |
let spawnTask filePath = | |
let taskBody = new Action(fun () -> resizeImage (640, 480) filePath) | |
Task.Factory.StartNew(taskBody) | |
let imageFiles = Directory.GetFiles(@"C:\Test\", "*.png") | |
//Spawning resize tasks | |
let resizeTasks = imageFiles |> Array.map spawnTask | |
//Wait for all tasks | |
Task.WaitAll(resizeTasks) | |
//Concurrent dictionary | |
open System | |
open System.IO | |
open System.Collections.Concurrent | |
//Queue of files to process | |
let filesToProcess = | |
Directory.GetFiles(@"C:\Test", "*.txt") | |
|> (fun files -> new ConcurrentQueue<_>(files)) | |
//Dictionary to store the occurence of particular words | |
let wordUsage = new ConcurrentDictionary<string, int>() | |
let processFile filePath = | |
let text = File.ReadAllText(filePath) | |
let words = | |
text.Split([| ' '; '\r'; '\n' |], StringSplitOptions.RemoveEmptyEntries) | |
|> Array.map (fun word -> word.Trim()) | |
//Add the word to our lookup table. Inserts value '1', or if the key | |
//is already present updates it to be 'count + 1' | |
Array.Parallel.iter(fun word -> | |
wordUsage.AddOrUpdate(word, (fun _ -> 1), (fun _ count -> count + 1)) | |
|> ignore | |
) words | |
//Begins updating the word Usage dictionary | |
let fillDictionary() = | |
let mutable continueWorking = true | |
while continueWorking do | |
let dequeueSuccessful, filePath = filesToProcess.TryDequeue() | |
if not dequeueSuccessful then | |
//If the queue is empty, then we are done working | |
if filesToProcess.IsEmpty then | |
continueWorking <- false | |
// ... otherwise, two tasks tried to dequeue | |
// at the same time. Try again! | |
else | |
continueWorking <- true | |
else | |
//Process the file | |
processFile filePath | |
fillDictionary() | |
fillDictionary() | |
fillDictionary() | |
//Using the concurrent bag class to store parallel results | |
open System | |
open System.Threading.Tasks | |
open System.Collections.Concurrent | |
open System.Collections.Generic | |
// Check if a member is prime | |
let isPrime x = | |
let rec primeCheck count = | |
//If the counter has reached x, then we know x is prime | |
if count = x then true | |
//If x is divisible by the counter, we know x isn't prime | |
elif x % count = 0 then false | |
else primeCheck (count + 1) | |
//Special case | |
if x = 1 then true | |
else primeCheck 2 | |
let computePrimes tasksToSpawn maxValue = | |
let range = maxValue / tasksToSpawn | |
let primes = new ConcurrentBag<int>() | |
//Spawn several tasks at once, adding any primes they find | |
// to the ConcurrentBag | |
let tasks = | |
[| | |
for i in 0 .. tasksToSpawn - 1 do | |
yield Task.Factory.StartNew( | |
Action(fun () -> | |
for x=i * range to (i+1)*range-1 do | |
if isPrime x then primes.Add(x) | |
) | |
) | |
|] | |
Task.WaitAll(tasks) | |
new HashSet<_>(primes :> seq<int>) | |
#time | |
computePrimes 10 100000 | |
computePrimes 2 100000 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment