Skip to content

Instantly share code, notes, and snippets.

@Szer
Created February 4, 2019 15:23
Show Gist options
  • Select an option

  • Save Szer/64c4a10c748734febc81fcc69847e635 to your computer and use it in GitHub Desktop.

Select an option

Save Szer/64c4a10c748734febc81fcc69847e635 to your computer and use it in GitHub Desktop.
Pool of works
open Hopac
open Hopac.Infixes
open Hopac.Stream
/// This is concurrent pool of workers with constant `degree` of parallelism
///
/// Used in `Hopac.Stream.mapPipelinedJob` as temporary workaround before [this](https://github.com/Hopac/Hopac/pull/186) will be released
type Pool<'i,'o>(worker: 'i -> Job<'o>, degree) =
let inputCh, degreeCh, doneCh = Ch(), Ch(), Ch()
let closing = IVar()
let closed = IVar()
let output = Src.create()
let outputStream = Src.tap output
let rec loop usage degree =
Alt.choose [
IVar.read closed ^=>. Alt.once()
degreeCh ^=> loop usage
doneCh ^=> (function
| Choice1Of2 x -> Src.value output x
| Choice2Of2 e -> Src.error output e
>=> fun _ -> loop (usage - 1) degree)
(if usage < degree && not closing.Full then
inputCh ^=> (worker
>> Job.catch
>=> Ch.give doneCh
>> Job.queue
>=> fun _ -> loop (usage + 1) degree)
else Alt.never())
(if usage = 0 then
IVar.read closing ^=>. (Src.close output >>=. closed *<= true)
<|> Alt.never()
else Alt.never())
]
do loop 0 degree |> startIgnore
/// Trying to set `degree` of parallelism to `Pool`
///
/// Will fail if `Pool` closed
member __.SetDegree degree =
(IVar.read closing <|> IVar.read closed)
^-> fun _ -> failwith "pool closed"
<|> degreeCh *<- degree
/// Trying to send new `workItem` to `Pool`
///
/// Will fail if `Pool` closed
member __.Queue workItem =
(IVar.read closing <|> IVar.read closed)
^-> fun _ -> failwith "pool closed"
<|> inputCh *<- workItem
/// Output `Stream` of results
member __.Output = outputStream
/// Closing `Pool`. This operation is idempotent and safe to use many times
member __.Close() =
IVar.tryFill closing true
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment