Last active
August 29, 2016 07:18
-
-
Save haf/b13d3b09953e252f6bea4fe40c03be0f to your computer and use it in GitHub Desktop.
Supervision Hopac
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Logary.Supervisor | |
open System | |
open Hopac | |
open Hopac.Infixes | |
open NodaTime | |
type PolicyStep = | |
| Sleep of dur:Duration | |
| RestartOne | |
| RestartTree | |
type Policy = | |
| Restart | |
| AllowedFailureRate of rate:float | |
type PointName = PointName of string [] | |
type private Logger = string -> unit | |
type private JobId = PointName | |
type LastWillCh = Ch<obj> | |
type private NamedJob = JobId * LastWillCh * (LastWillCh -> Job<unit>) | |
type Runnable = | |
LastWillCh -> Job<unit> | |
type Running = | |
private { shutdown : Ch<unit> } | |
type Initial = | |
private { | |
start : IVar<unit> | |
supervise : Ch<Policy * Runnable * IVar<JobId>> | |
running : Running } | |
let private name = PointName [| "Logary"; "Supervisor" |] | |
let create (logger : Logger) : Initial = | |
// HACK to inject logging for PoC: | |
let log msg next x = logger msg ; next x | |
let startCh, shutdownCh, superviseCh, erroredCh = | |
IVar (), Ch (), Ch (), Ch () | |
let lastWill : Map<JobId, obj> ref = ref Map.empty | |
let rec ready (jobs : NamedJob list) = | |
Alt.choose [ | |
startCh ^=> fun () -> | |
log "started" running jobs | |
superviseCh ^=> fun (policy, fxJ, idRepl) -> | |
let jobId = PointName [| jobs.Length.ToString() |] | |
let lastWillCh = Ch () | |
idRepl *<= jobId >>=. ready ((jobId, lastWillCh, fxJ) :: jobs) | |
] | |
and running jobs = | |
for jobId, lastWillCh, fxJ in jobs do | |
queue ( | |
Job.tryIn (fxJ lastWillCh) | |
Job.result | |
(fun ex -> erroredCh *<- (jobId, ex)) | |
) | |
Alt.choose [ | |
erroredCh ^=> fun (jobId, ex) -> | |
logger (sprintf "%O errored" jobId) | |
// TODO: | |
// match policy jobId ex with | |
// | ... | |
running jobs | |
upcast shutdownCh | |
] | |
start (ready []) | |
{ start = startCh | |
supervise = superviseCh | |
running = { shutdown = shutdownCh } | |
} | |
/// Start the non-running supervisor | |
let start (sup : Initial) = | |
sup.start *<= () >>-. sup.running | |
/// Register a named job and its policy | |
let register (p : Policy, fxJ) (sup : Initial) : Alt<JobId> = | |
sup.supervise *<-=>- fun ackCh -> p, fxJ, ackCh | |
/// Shutdown the running supervisor | |
let shutdown (sup : Running) = | |
sup.shutdown *<- () | |
// usage: | |
let runit lastWillCh = | |
Job.iterateServer 1 (fun i -> | |
timeOutMillis 1000 >>=. | |
lastWillCh *<- box (i + 1) | |
>>-. i + 1 | |
) | |
[<EntryPoint>] | |
let main argv = | |
let sup = create (printfn "%s") | |
let jobId = sup |> register (Restart, runit) |> run | |
printfn "Got job id: %A" jobId | |
let running = sup |> start |> run | |
printfn "Shutting down" | |
running |> shutdown |> run | |
0 |
BTW, the Proc mechanism in Hopac is partly for supervision. Currently it gives no information on the reason why a running job was terminated, but that could be changed (e.g. giving exn
instance in case of job being terminated due to raising an exception). It uses finalizers, so it adds some overhead, but it can also account for jobs that are garbage collected (e.g. block forever).
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
The idea is that the lastWillCh lets Logary Targets send state that they get re-initialised with should they fail. E.g. the RabbitMQ target could send its Map<RequestId, Message> to avoid being befuddled if we get an IOException. We can also make the RingBuffer persist across invocations to the named job, to avoid dropping messages. Next iteration of the PoC I'll add in policies. I should probably look at the F# actor frameworks for inspiration of policies, too.