Skip to content

Instantly share code, notes, and snippets.

@eulerfx
Created December 10, 2014 16:11
Show Gist options
  • Save eulerfx/6ecf02441dbc1cee375c to your computer and use it in GitHub Desktop.
Save eulerfx/6ecf02441dbc1cee375c to your computer and use it in GitHub Desktop.
An F# Hopac input-output channel to ensure that inputs passed to a function are processed one at a time in fifo order
open Hopac
open Hopac.Infixes
open Hopac.Job.Infixes
open Hopac.Extensions
type IoCh<'i, 'o> = Ch<'i * IVar<'o>>
module IoCh =
let create (f:'i -> Job<'o>) : Job<IoCh<'i, 'o>> =
Ch.create() >>= fun ch ->
let loop() = Job.delay <| fun () ->
Ch.take ch >>= fun (i:'i,iv:IVar<'o>) -> f i >>= IVar.fill iv
loop() |> Job.foreverServer >>% ch
let sendReceive (ch:IoCh<'i, 'o>) (i:'i) : Job<'o> =
Job.delay <| fun () ->
let iv = ivar()
Ch.send ch (i,iv) >>. iv
let fifo (f:'i -> Job<'o>) : Job<'i -> Job<'o>> =
create f |>> sendReceive
@eulerfx
Copy link
Author

eulerfx commented Dec 10, 2014

Does this make sense? Or is there an easier way to do it? What I'm trying to do is ensure that inputs passed to an effectful function 'i -> Job<'o> are processed one at a time, in FIFO order.

@polytypic
Copy link

That should work. In particular, channels do essentially guarantee FIFO ordering. You can simplify the create function. In particular, you could leave out the Job.delay from the loop.

    let create (f:'i -> Job<'o>) : Job<IoCh<'i, 'o>> =
        Ch.create() >>= fun ch ->                
            let loop =
                Ch.take ch >>= fun (i:'i,iv:IVar<'o>) -> f i >>= IVar.fill iv             
            loop |> Job.foreverServer >>% ch

Depending on what you want, there might be simpler ways. For example, consider the following:

module Serializer =
  let create () = mvarFull ()

  let serialize sz xJ =
    sz >>= fun () ->
    Job.tryFinallyJob xJ (sz <<-= ())

The idea is that one can create a "serializer" and use that to execute arbitrary operations. At most one operation can be live at any time per one serializer. MVar also guarantees FIFO ordering.

@eulerfx
Copy link
Author

eulerfx commented Dec 10, 2014

My use case is a service which consumes messages such that each message is processed by a function Message -> Job<unit>. Some of those message will correspond to the same entity and in order to maintain integrity of the entity, the messages must be processed one at a time and in the order they arrive. I can process each message and wait for the function above to complete before processing the next, but this ends up being too slow - it doesn't exploit the independence among entities. To this end, I've a dispatcher which will place messages which match on a specific key in separate "queues".

The following is the current F# MBP-based implementation that I'm porting to Hopac:

type Service<'Input, 'Output> = 'Input -> Async<'Output>

type Filter<'Input, 'Output> = 'Input -> Service<'Input, 'Output> -> Async<'Output>


let serialize (key:'Input -> string) : Filter<'Input, 'Output> =

    // creates an MBP to be associated with each key, which will invoke the service
    // for each input
    let makeAgent (s:Service<'Input, 'Output>) =
        MailboxProcessor.Start <| fun agent ->
            agent.Error.Add(fun ex -> Log.Error(ex.ToString()))
            let rec loop() = async {
                let! (input,ch:AsyncReplyChannel<_>) = agent.Receive()
                let! output = s input
                ch.Reply output
                return! loop()                                            
            }
            loop()

    // an expiring cache to store agents
    let cache = MemoryCache.Default

    let policy = new CacheItemPolicy()
    policy.SlidingExpiration <- TimeSpan.FromSeconds 0.5
    policy.RemovedCallback <- CacheEntryRemovedCallback(fun args ->
        match args.CacheItem.Value with
        | :? IDisposable as x -> x.Dispose()
        | _ -> ()
    )

    // a "filter" which when applied to a service, creates 
    // a new service which ensures inputs mapped to the same key are processed
    // one at a time.
    fun (i:'Input) (s:Service<'Input, 'Output>) ->            
        let agent =
            let key = key i
            let agent' = makeAgent s
            let x = cache.AddOrGetExisting(key, agent', policy, null)
            if Object.ReferenceEquals(x, null) then agent'
            else x :?> MailboxProcessor<'Input * AsyncReplyChannel<'Output>>
        agent.PostAndAsyncReply(fun ch -> i,ch)


let serializeBy (key:'Input -> string) (s:Service<'Input, 'Output>) : Service<'Input, 'Output> =
    serialize key |> Filter.apply s

This approach can be used to implement a variation of a group-by operation for push-driven sequences as in https://gist.github.com/eulerfx/2bcfd5e0ef67d4ae330e. It would have type: ('a -> 'Key) -> Observable<'a> -> Observable<'Key * Observable<'a>> except the difference from a typical group by is that the same key can appear multiple times due to expiration.

@polytypic
Copy link

Interesting. If I'm reading the code right and assuming that the expiration period is long enough that a substantial amount of inputs are processed before a cache item expires, then it would seem that the following two lines might be relatively expensive:

            let agent' = makeAgent s
            let x = cache.AddOrGetExisting(key, agent', policy, null)

If I'm reading the code right, then what happens is that for each item processed a new MBP is created and started. That means a bunch of memory allocations and hop(s) through the ThreadPool. Hopac channels and server loop will definitely be more lightweight than a MBP here, but this pattern might be worth changing as well.

What is the purpose of the MemoryCache here? My guess is that the number of keys is large enough (perhaps unbounded) that you want to clean up the MBPs to avoid a space leak and that there is no other essential reason for using the MemoryCache class. The MemoryCache seems to be poor fit for highly efficient in-memory caching as it does not make it easy to create the cached items lazily (get item or create new item if cache does not have the item). You could try using Get and then AddOrGetExisting. Or you might want to write your own cache.

@polytypic
Copy link

Hmm... BTW, is it guaranteed that a MBP is not processing items when it is being disposed of by the MemoryCache? Say a large number of items is queued for a MBP and then there is a >5 second gap before next item for the same key. That could be another good reason to write your own cache.

@eulerfx
Copy link
Author

eulerfx commented Dec 11, 2014

What is the purpose of the MemoryCache here? My guess is that the number of keys is large enough (perhaps unbounded) that you want to clean up the MBPs to avoid a space leak and that there is no other essential reason for using the MemoryCache class.

Yes exactly - if all the MBPs are kept in memory, the service would very quickly run out of memory. The timeout is arbitrary to some extent - it is meant to allow inputs read in a batch to process before expiring, but still keeping the lifetime short such that there aren't a large number of these in memory at a given point in time. It would be better to have natural expiration through the GC, but MBP doesn't become automatically eligible for collection. It seems like Hopac would be better suited to this style of expiration.

WRT the expensive call - yes, the API of MemoryCache is a bit awkward - it doesn't allow lazy creation (as ConcurrentDictionary does for example). But it did solve the memory issue and the workload the service is largely IO bound anyway such that it didn't incur a performance hit overall. I figured writing my own cache would take more time and be more error prone so MemoryCache seemed like a good compromise.

BTW, is it guaranteed that a MBP is not processing items when it is being disposed of by the MemoryCache?

That is a good point and I'm not 100% sure - I've been logging the message count during expiration and haven't seen it become an issue, but it is a hole in the design.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment