Skip to content

Instantly share code, notes, and snippets.

@kjnilsson
Created January 8, 2014 13:57
Show Gist options
  • Save kjnilsson/8317102 to your computer and use it in GitHub Desktop.
Save kjnilsson/8317102 to your computer and use it in GitHub Desktop.
Deduping agent
open System
open System.Collections.Concurrent
type DedupeProtocol<'T> =
| Add of 'T
| Remove of 'T
type DedupingAgent<'T when 'T : comparison>(handle : 'T -> unit) =
let queue = new BlockingCollection<'T>()
let agent = MailboxProcessor.Start (fun inbox ->
let rec loop state = async {
let! msg = inbox.Receive()
match msg with
| Add x ->
if not (Set.contains x state) then
queue.Add x
return! loop <| Set.add x state
| Remove x ->
return! loop <| Set.remove x state }
loop Set.empty<'T>)
do
async {
queue.GetConsumingEnumerable()
|> Seq.iter (fun m ->
handle m
agent.Post (Remove m)) }
|> Async.Start
member this.Add x = agent.Post (Add x)
let a = DedupingAgent<string>(fun s -> Threading.Thread.Sleep 500; printfn ":: %s" s)
a.Add "one"
a.Add "one"
a.Add "two"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment