-
-
Save eulerfx/6ecf02441dbc1cee375c to your computer and use it in GitHub Desktop.
open Hopac | |
open Hopac.Infixes | |
open Hopac.Job.Infixes | |
open Hopac.Extensions | |
type IoCh<'i, 'o> = Ch<'i * IVar<'o>> | |
module IoCh = | |
let create (f:'i -> Job<'o>) : Job<IoCh<'i, 'o>> = | |
Ch.create() >>= fun ch -> | |
let loop() = Job.delay <| fun () -> | |
Ch.take ch >>= fun (i:'i,iv:IVar<'o>) -> f i >>= IVar.fill iv | |
loop() |> Job.foreverServer >>% ch | |
let sendReceive (ch:IoCh<'i, 'o>) (i:'i) : Job<'o> = | |
Job.delay <| fun () -> | |
let iv = ivar() | |
Ch.send ch (i,iv) >>. iv | |
let fifo (f:'i -> Job<'o>) : Job<'i -> Job<'o>> = | |
create f |>> sendReceive |
That should work. In particular, channels do essentially guarantee FIFO ordering. You can simplify the create
function. In particular, you could leave out the Job.delay
from the loop
.
let create (f:'i -> Job<'o>) : Job<IoCh<'i, 'o>> =
Ch.create() >>= fun ch ->
let loop =
Ch.take ch >>= fun (i:'i,iv:IVar<'o>) -> f i >>= IVar.fill iv
loop |> Job.foreverServer >>% ch
Depending on what you want, there might be simpler ways. For example, consider the following:
module Serializer =
let create () = mvarFull ()
let serialize sz xJ =
sz >>= fun () ->
Job.tryFinallyJob xJ (sz <<-= ())
The idea is that one can create a "serializer" and use that to execute arbitrary operations. At most one operation can be live at any time per one serializer. MVar
also guarantees FIFO ordering.
My use case is a service which consumes messages such that each message is processed by a function Message -> Job<unit>
. Some of those message will correspond to the same entity and in order to maintain integrity of the entity, the messages must be processed one at a time and in the order they arrive. I can process each message and wait for the function above to complete before processing the next, but this ends up being too slow - it doesn't exploit the independence among entities. To this end, I've a dispatcher which will place messages which match on a specific key in separate "queues".
The following is the current F# MBP-based implementation that I'm porting to Hopac:
type Service<'Input, 'Output> = 'Input -> Async<'Output>
type Filter<'Input, 'Output> = 'Input -> Service<'Input, 'Output> -> Async<'Output>
let serialize (key:'Input -> string) : Filter<'Input, 'Output> =
// creates an MBP to be associated with each key, which will invoke the service
// for each input
let makeAgent (s:Service<'Input, 'Output>) =
MailboxProcessor.Start <| fun agent ->
agent.Error.Add(fun ex -> Log.Error(ex.ToString()))
let rec loop() = async {
let! (input,ch:AsyncReplyChannel<_>) = agent.Receive()
let! output = s input
ch.Reply output
return! loop()
}
loop()
// an expiring cache to store agents
let cache = MemoryCache.Default
let policy = new CacheItemPolicy()
policy.SlidingExpiration <- TimeSpan.FromSeconds 0.5
policy.RemovedCallback <- CacheEntryRemovedCallback(fun args ->
match args.CacheItem.Value with
| :? IDisposable as x -> x.Dispose()
| _ -> ()
)
// a "filter" which when applied to a service, creates
// a new service which ensures inputs mapped to the same key are processed
// one at a time.
fun (i:'Input) (s:Service<'Input, 'Output>) ->
let agent =
let key = key i
let agent' = makeAgent s
let x = cache.AddOrGetExisting(key, agent', policy, null)
if Object.ReferenceEquals(x, null) then agent'
else x :?> MailboxProcessor<'Input * AsyncReplyChannel<'Output>>
agent.PostAndAsyncReply(fun ch -> i,ch)
let serializeBy (key:'Input -> string) (s:Service<'Input, 'Output>) : Service<'Input, 'Output> =
serialize key |> Filter.apply s
This approach can be used to implement a variation of a group-by operation for push-driven sequences as in https://gist.github.com/eulerfx/2bcfd5e0ef67d4ae330e. It would have type: ('a -> 'Key) -> Observable<'a> -> Observable<'Key * Observable<'a>>
except the difference from a typical group by is that the same key can appear multiple times due to expiration.
Interesting. If I'm reading the code right and assuming that the expiration period is long enough that a substantial amount of inputs are processed before a cache item expires, then it would seem that the following two lines might be relatively expensive:
let agent' = makeAgent s
let x = cache.AddOrGetExisting(key, agent', policy, null)
If I'm reading the code right, then what happens is that for each item processed a new MBP is created and started. That means a bunch of memory allocations and hop(s) through the ThreadPool. Hopac channels and server loop will definitely be more lightweight than a MBP here, but this pattern might be worth changing as well.
What is the purpose of the MemoryCache here? My guess is that the number of keys is large enough (perhaps unbounded) that you want to clean up the MBPs to avoid a space leak and that there is no other essential reason for using the MemoryCache class. The MemoryCache seems to be poor fit for highly efficient in-memory caching as it does not make it easy to create the cached items lazily (get item or create new item if cache does not have the item). You could try using Get and then AddOrGetExisting. Or you might want to write your own cache.
Hmm... BTW, is it guaranteed that a MBP is not processing items when it is being disposed of by the MemoryCache? Say a large number of items is queued for a MBP and then there is a >5 second gap before next item for the same key. That could be another good reason to write your own cache.
What is the purpose of the MemoryCache here? My guess is that the number of keys is large enough (perhaps unbounded) that you want to clean up the MBPs to avoid a space leak and that there is no other essential reason for using the MemoryCache class.
Yes exactly - if all the MBPs are kept in memory, the service would very quickly run out of memory. The timeout is arbitrary to some extent - it is meant to allow inputs read in a batch to process before expiring, but still keeping the lifetime short such that there aren't a large number of these in memory at a given point in time. It would be better to have natural expiration through the GC, but MBP doesn't become automatically eligible for collection. It seems like Hopac would be better suited to this style of expiration.
WRT the expensive call - yes, the API of MemoryCache is a bit awkward - it doesn't allow lazy creation (as ConcurrentDictionary does for example). But it did solve the memory issue and the workload the service is largely IO bound anyway such that it didn't incur a performance hit overall. I figured writing my own cache would take more time and be more error prone so MemoryCache seemed like a good compromise.
BTW, is it guaranteed that a MBP is not processing items when it is being disposed of by the MemoryCache?
That is a good point and I'm not 100% sure - I've been logging the message count during expiration and haven't seen it become an issue, but it is a hole in the design.
Does this make sense? Or is there an easier way to do it? What I'm trying to do is ensure that inputs passed to an effectful function
'i -> Job<'o>
are processed one at a time, in FIFO order.