Skip to content

Instantly share code, notes, and snippets.

@MartinBodocky
Last active August 29, 2015 13:58
Show Gist options
  • Save MartinBodocky/10013089 to your computer and use it in GitHub Desktop.
Save MartinBodocky/10013089 to your computer and use it in GitHub Desktop.
Asynchronous and Parallel Programming in F#
// Creating new threads
open System
open System.Threading
//What will execute on each thread
let threadBody() =
for i in 1..5 do
//Wait 1/10 of a second
Thread.Sleep(100)
printfn "[Thread %d] %d ..."
Thread.CurrentThread.ManagedThreadId i
let spawnThread() =
let thread = new Thread(threadBody)
thread.Start()
// spawn a few of threads at once
spawnThread()
spawnThread()
// how to use .NET Thread Pool
open System.Threading
ThreadPool.QueueUserWorkItem(fun _ ->
for i=1 to 5 do printfn "[Thread %d] %d ..."
Thread.CurrentThread.ManagedThreadId i)
//Our thread pool task, note that the delegate's
//paramter is of type obj
let printNumbers (max: obj) =
for i=1 to (max :?> int) do
printfn "[Thread %d] %d ..."
Thread.CurrentThread.ManagedThreadId i
ThreadPool.QueueUserWorkItem(new WaitCallback(printNumbers), box 5)
//Sharing data
//Run into race conditions
open System.Threading
let sumArray(arr : int[]) =
//create reference value
let total = ref 0
//Add the first half
let thread1Finished = ref false
ThreadPool.QueueUserWorkItem(
fun _ ->
for i=0 to arr.Length/2-1 do
total := arr.[i] + !total
thread1Finished := true
) |> ignore
// add second half
let thread2Finished = ref false
ThreadPool.QueueUserWorkItem(
fun _ ->
for i = arr.Length/2 to arr.Length - 1 do
total := arr.[i] + !total
thread2Finished := true
) |> ignore
//Wait while the two threads finish their work
while !thread1Finished = false || !thread2Finished = false do
Thread.Sleep(0)
!total
//create a million ones
let millionOnes = Array.create 1000000 1
#time
//right outcome is 1000000
sumArray millionOnes
sumArray millionOnes
// Array summing using lock
let sumArrayLocked (arr : int[]) =
let total = ref 0
//Add the first half
let thread1Finished = ref false
ThreadPool.QueueUserWorkItem(
fun _ ->
for i=0 to arr.Length/2-1 do
lock(total) (fun () -> total := arr.[i] + !total)
thread1Finished := true
) |> ignore
//Add the second half
let thread2Finished = ref false
ThreadPool.QueueUserWorkItem(
fun _ ->
for i=arr.Length/2 to arr.Length-1 do
lock(total) (fun () -> total := arr.[i] + !total)
thread2Finished := true
) |> ignore
//Wait while the two threads finished their work
while !thread1Finished = false || !thread2Finished =false do
Thread.Sleep(0)
!total
#time
sumArray millionOnes
sumArrayLocked millionOnes //it's slower than unlock version
//Run into deadlock in F#
type BackAccount = {AccountID : int; OwnerName : string; mutable Balance : int}
//Transfer money between back accounts
let transferFunds amount (fromAcct : BackAccount) (toAcct : BackAccount) =
printfn "Locking %s's account to deposit funds ..." toAcct.OwnerName
lock fromAcct
(fun () ->
printfn "Locking %s's account to withdraw funds ..." fromAcct.OwnerName
lock toAcct
(fun () ->
fromAcct.Balance <- fromAcct.Balance - amount
toAcct.Balance <- toAcct.Balance + amount
)
)
printfn "Final balance from %d to %d." fromAcct.Balance toAcct.Balance
let john = { AccountID = 1; OwnerName = "John Smith"; Balance = 1000}
let jane = { AccountID = 2; OwnerName = "Jane Doe"; Balance = 2000}
ThreadPool.QueueUserWorkItem(fun _ -> transferFunds 100 john jane)
ThreadPool.QueueUserWorkItem(fun _ -> transferFunds 100 jane john)
//Asynchronous file I/O using F# asycn workflows
open System
open System.IO
let asyncProcessFile (filePath : string) (processBytes : byte[] -> byte[]) =
async {
printfn "Processing file [%s]" (Path.GetFileName(filePath))
use fileStream = new FileStream(filePath, FileMode.Open)
let bytesToRead = int fileStream.Length
let! data = fileStream.AsyncRead(bytesToRead)
printfn "Opened [%s], read [%d] bytes"
(Path.GetFileName(filePath)) data.Length
let data' = processBytes data
use resultFile = new FileStream(filePath + ".results", FileMode.Create)
do! resultFile.AsyncWrite(data', 0, data'.Length)
printfn "Finished processing file [%s]" <| Path.GetFileName(filePath)
} |> Async.Start
//apply with identity function
asyncProcessFile @"C:\Test\ba-soft-skills-20140403.png" id
//Async tasks
open System.IO
open System.Net
let getHtml (url :string) =
async {
let req = WebRequest.Create(url)
let! rsp = req.AsyncGetResponse()
use stream = rsp.GetResponseStream()
use reader = new StreamReader(stream)
return! reader.ReadToEndAsync()
}
let html =
getHtml "http://www.bing.com"
|> Async.RunSynchronously
//execute more tasks
let webPages : string[] =
["http://www.bing.com" ; "http://www.yahoo.com"; "http://www.google.com"]
|> List.map getHtml
|> Async.Parallel
|> Async.RunSynchronously
webPages
//Cancellation async operation
open System
open System.Threading
let cancelableTask =
async {
printfn "Waiting 10 seconds ..."
for i = 1 to 10 do
printfn "%d ..." i
do! Async.Sleep(1000)
printfn "Finished ..."
}
//Callback used when the operation is canceled
let cancelHandler (ex: OperationCanceledException) =
printfn "The task has been cancelled."
Async.TryCancelled(cancelableTask, cancelHandler) |> Async.Start
//this cancel just last asynchronous workflow
Async.CancelDefaultToken()
let computation = Async.TryCancelled(cancelableTask, cancelHandler)
let cancellationSource = new CancellationTokenSource()
Async.Start(computation, cancellationSource.Token)
//cancel by reference
cancellationSource.Cancel()
//Create async primitives
open System
open System.IO
type System.IO.Directory with
//Retrieve all files under a path asynchronously
static member AsyncGetFiles(path : string, searchPattern : string) =
let dele = new Func<string * string, string[]>(Directory.GetFiles)
Async.FromBeginEnd(
(path, searchPattern),
dele.BeginInvoke,
dele.EndInvoke)
type System.IO.File with
//Copy a file asynchronously
static member AsyncCopy(source : string, dest : string) =
let dele = new Func<string * string, unit>(File.Copy)
Async.FromBeginEnd(
(source, dest),
dele.BeginInvoke,
dele.EndInvoke)
let asyncBackup path searchPattern destPath =
async {
let! files = Directory.AsyncGetFiles(path, searchPattern)
for file in files do
let filename = Path.GetFileName(file)
do! File.AsyncCopy(file, Path.Combine(destPath, filename))
}
//backing up my files :)
asyncBackup @"C:\Test" "*.*" @"C:\Backup" |> Async.RunSynchronously
//Parallel Programming with TPL
//Parallel.For
open System
open System.Threading.Tasks
//Multiply two matricies using the TPL
let matrixMultiply (a : float[,]) (b:float[,]) =
let aRow, aCol = Array2D.length1 a, Array2D.length2 b
let bRow, bCol = Array2D.length1 a, Array2D.length2 b
if aCol <> bRow then failwith "Array dimension mismatch"
// Allocate space for the resulting matrix
let c = Array2D.create aCol bRow 0.0
let cRow, cCol = aCol, bRow
//Compute each row of the resulting matrix
let rowTask rowIdx =
for colIdx=0 to cCol-1 do
for x=0 to aRow-1 do
c.[colIdx, rowIdx] <-
c.[colIdx, rowIdx] + a.[x,colIdx] * b.[rowIdx, x]
()
let _ = Parallel.For(0, cRow, new Action<int>(rowTask))
c // return the computed matrix
let x = Array2D.create 10 10 2.0
let y = Array2D.create 10 10 8.0
let result = matrixMultiply x y
//Using the Array.Parallel library
open System.IO
let getSecretData keyword =
let secretFiles = Directory.GetFiles(@"C:\Test", "*.png")
Array.Parallel.iter File.Decrypt secretFiles
let secretData =
Directory.GetFiles(@"C:\Test", "*.png")
|> Array.Parallel.map (fun filePath -> File.ReadAllText(filePath))
|> Array.Parallel.choose (fun contents ->
if contents.Contains(keyword)
then Some(contents)
else None)
Array.Parallel.iter File.Encrypt secretFiles
secretData
//Using parallel primitoves
//using Tasks.WaitAll to wait for taks to complete
open System
open System.IO
open System.Drawing
open System.Drawing.Imaging
open System.Drawing.Drawing2D
open System.Threading.Tasks
//Resize images to a new width, height
let resizeImage (newWidth : int, newHeight : int) (filePath : string) =
let originalImage = Bitmap.FromFile(filePath)
let resizedImage = new Bitmap(newWidth, newHeight)
use g = Graphics.FromImage(resizedImage)
g.InterpolationMode <- InterpolationMode.HighQualityBicubic
g.DrawImage(originalImage, 0, 0, newWidth, newHeight)
let fileName = Path.GetFileNameWithoutExtension(filePath)
let fileFolder = Path.GetDirectoryName(filePath)
resizedImage.Save(
Path.Combine(fileFolder, fileName+".resized.png"),
ImageFormat.Png)
//Spawns a new TPL task to resize an image
let spawnTask filePath =
let taskBody = new Action(fun () -> resizeImage (640, 480) filePath)
Task.Factory.StartNew(taskBody)
let imageFiles = Directory.GetFiles(@"C:\Test\", "*.png")
//Spawning resize tasks
let resizeTasks = imageFiles |> Array.map spawnTask
//Wait for all tasks
Task.WaitAll(resizeTasks)
//Concurrent dictionary
open System
open System.IO
open System.Collections.Concurrent
//Queue of files to process
let filesToProcess =
Directory.GetFiles(@"C:\Test", "*.txt")
|> (fun files -> new ConcurrentQueue<_>(files))
//Dictionary to store the occurence of particular words
let wordUsage = new ConcurrentDictionary<string, int>()
let processFile filePath =
let text = File.ReadAllText(filePath)
let words =
text.Split([| ' '; '\r'; '\n' |], StringSplitOptions.RemoveEmptyEntries)
|> Array.map (fun word -> word.Trim())
//Add the word to our lookup table. Inserts value '1', or if the key
//is already present updates it to be 'count + 1'
Array.Parallel.iter(fun word ->
wordUsage.AddOrUpdate(word, (fun _ -> 1), (fun _ count -> count + 1))
|> ignore
) words
//Begins updating the word Usage dictionary
let fillDictionary() =
let mutable continueWorking = true
while continueWorking do
let dequeueSuccessful, filePath = filesToProcess.TryDequeue()
if not dequeueSuccessful then
//If the queue is empty, then we are done working
if filesToProcess.IsEmpty then
continueWorking <- false
// ... otherwise, two tasks tried to dequeue
// at the same time. Try again!
else
continueWorking <- true
else
//Process the file
processFile filePath
fillDictionary()
fillDictionary()
fillDictionary()
//Using the concurrent bag class to store parallel results
open System
open System.Threading.Tasks
open System.Collections.Concurrent
open System.Collections.Generic
// Check if a member is prime
let isPrime x =
let rec primeCheck count =
//If the counter has reached x, then we know x is prime
if count = x then true
//If x is divisible by the counter, we know x isn't prime
elif x % count = 0 then false
else primeCheck (count + 1)
//Special case
if x = 1 then true
else primeCheck 2
let computePrimes tasksToSpawn maxValue =
let range = maxValue / tasksToSpawn
let primes = new ConcurrentBag<int>()
//Spawn several tasks at once, adding any primes they find
// to the ConcurrentBag
let tasks =
[|
for i in 0 .. tasksToSpawn - 1 do
yield Task.Factory.StartNew(
Action(fun () ->
for x=i * range to (i+1)*range-1 do
if isPrime x then primes.Add(x)
)
)
|]
Task.WaitAll(tasks)
new HashSet<_>(primes :> seq<int>)
#time
computePrimes 10 100000
computePrimes 2 100000
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment