Created
January 15, 2014 17:45
-
-
Save mndrake/8440854 to your computer and use it in GitHub Desktop.
Agent based calculation engine
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
open System | |
open System.Collections.Generic | |
open System.Collections.ObjectModel | |
open System.ComponentModel | |
open System.Threading | |
open Microsoft.FSharp.Control | |
open Microsoft.FSharp.Reflection | |
type Value = | |
| Dirty | |
| Processing | |
| Valid of obj | |
type Message = | |
| Changed of string * Value | |
| Eval of AsyncReplyChannel<Value> | |
| Processed | |
| AutoCalculation of bool | |
//#region helpers | |
/// checks if value is a F# Tuple type | |
let isTuple value = | |
match box value with | |
| null -> false | |
| _ -> Microsoft.FSharp.Reflection.FSharpType.IsTuple(value.GetType()) | |
/// basic message logger for calculation nodes | |
type Log() = | |
let messages = List<_>() | |
let agent = | |
MailboxProcessor.Start(fun inbox -> | |
let rec loop() = async { | |
let! msg = inbox.Receive() | |
match msg with | |
|(name, state, message) -> | |
messages.Add(name, sprintf "STATE: %s MESSAGE: %A" state message) | |
return! loop() | |
} | |
loop()) | |
member this.Post(name, state,message) = agent.Post(name,state,message) | |
member this.Get(name) = | |
messages |> Seq.filter(fun (n,_) -> name = n) | |
|> Seq.map(fun (_,m) -> m) | |
|> Seq.iter (printfn "%s") | |
member this.Get() = | |
messages |> Seq.iter (fun (name, message) -> printfn "NODE: %s %s" name message) | |
//#endregion | |
type INode = | |
abstract Eval : unit -> Async<Value> | |
abstract Name : string with get | |
abstract OnChanged : (string * Value -> unit) -> unit | |
abstract Value : obj with get | |
type CalculationHandler() = | |
let changed = Event<unit>() | |
let automatic = ref false | |
member this.Automatic | |
with get () = !automatic | |
and set v = | |
automatic := v | |
changed.Trigger() | |
member this.Changed = changed.Publish | |
type Input<'U>(name, log : Log, calc : CalculationHandler, initialValue : 'U) = | |
let actions = List<_>() | |
let value = ref initialValue | |
let agent = | |
MailboxProcessor.Start(fun inbox -> | |
let rec valid() = | |
async { | |
let! msg = inbox.Receive() | |
log.Post(name, "valid", msg) | |
match msg with | |
| Eval r -> r.Reply(Valid !value) | |
| Processed -> for action in actions do action(name, Valid !value) | |
| _ -> () | |
return! valid() | |
} | |
valid()) | |
member this.SetValue v = | |
value := v | |
agent.Post(Processed) | |
member this.Eval() = agent.PostAndAsyncReply(fun r -> Eval r) | |
member this.Name = name | |
member this.OnChanged(action) = actions.Add(action) | |
member this.Value = box !value | |
interface INode with | |
member this.Eval() = agent.PostAndAsyncReply(fun r -> Eval r) | |
member this.Name = name | |
member this.OnChanged(action) = actions.Add(action) | |
member this.Value = box !value | |
type Output<'N, 'T, 'U>(name, log : Log, calc : CalculationHandler, nodeInputs : 'N, nodeFunction : 'T -> 'U) = | |
// convert tuple to object array | |
let nodes = | |
if isTuple nodeInputs then FSharpValue.GetTupleFields(nodeInputs) |> Array.map(fun x -> x :?> INode) | |
else [| (box nodeInputs) :?> INode |] | |
let func = | |
if isTuple nodeInputs then fun p -> (FSharpValue.MakeTuple(p, typeof<'T>) :?> 'T) |> nodeFunction | |
else fun p -> (p.[0] :?> 'T) |> nodeFunction | |
let value = ref(Unchecked.defaultof<'U>) | |
let actions = List<_>() | |
let agent = | |
MailboxProcessor.Start(fun inbox -> | |
let rec calculate() = async { | |
let! nodeValues = Async.Parallel(nodes |> Seq.map(fun n -> n.Eval())) | |
if (nodeValues |> Array.forall(function | Valid _ -> true | _ -> false)) then | |
let values = nodeValues |> Array.map(function | Valid v -> v | _ -> null) | |
async { value := func values | |
inbox.Post(Processed) | |
} |> Async.Start | |
return! processing() | |
} | |
and valid() = async { | |
let! msg = inbox.Receive() | |
log.Post(name, "valid", msg) | |
match msg with | |
| Changed (n,v) -> | |
if not calc.Automatic then | |
for action in actions do action(name,Dirty) | |
return! dirty() | |
else | |
for action in actions do action(name,Processing) | |
return! calculate() | |
| Eval r -> | |
r.Reply(Valid !value) | |
return! valid() | |
| _ -> | |
return! valid() | |
} | |
and processing() = async { | |
let! msg = inbox.Receive() | |
log.Post(name, "processing", msg) | |
match msg with | |
| Changed(nodeName, value) -> | |
return! calculate() | |
| Eval r -> | |
r.Reply(Processing) | |
return! processing() | |
| Processed -> | |
for action in actions do action(name,Valid !value) | |
return! valid() | |
| AutoCalculation _ -> | |
return! processing() | |
} | |
and dirty() = | |
async { | |
let! msg = inbox.Receive() | |
log.Post(name, "dirty", msg) | |
match msg with | |
| Changed(nodeName, value) -> | |
return! dirty() | |
| Eval r -> | |
r.Reply(Processing) | |
return! calculate() | |
| AutoCalculation true -> | |
return! calculate() | |
| _ -> return! dirty() | |
} | |
// initial state | |
if calc.Automatic then | |
calculate() | |
else | |
dirty()) | |
do | |
nodes |> Seq.iter(fun n -> n.OnChanged(fun args -> agent.Post(Changed args))) | |
calc.Changed.Add(fun () -> agent.Post(AutoCalculation(calc.Automatic))) | |
member this.Eval() = agent.PostAndAsyncReply(fun r -> Eval r) | |
member this.Name = name | |
member this.OnChanged(action) = actions.Add(action) | |
member this.Value = box !value | |
interface INode with | |
member this.Eval() = agent.PostAndAsyncReply(fun r -> Eval r) | |
member this.Name = name | |
member this.OnChanged(action) = actions.Add(action) | |
member this.Value = box !value | |
// --------------------------------------------------------------------------------- | |
// example | |
let log = Log() | |
let calc = CalculationHandler() | |
calc.Automatic <- false | |
let input name v = Input(name, log, calc, v) | |
let addNode name nodes = | |
Output(name, log, calc, nodes, | |
(fun (x, y) -> | |
log.Post(name, sprintf "eval %s, thread %i" name Thread.CurrentThread.ManagedThreadId, Processed) | |
Thread.Sleep 1000 | |
x + y)) | |
let evalAsync(node : INode) = async { node.Eval() |> ignore } |> Async.Start | |
let i1 = input "i1" 1 | |
let i2 = input "i2" 3 | |
let i3 = input "i3" 5 | |
let n1 = addNode "n1" (i1,i2) | |
let n2 = addNode "n2" (i2,i3) | |
let n3 = addNode "n3" (n1,n2) | |
// run the following a line at a time | |
// enable automatic calculations | |
// calc.Automatic <- true | |
// print the messages log | |
// log.Get() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment