Skip to content

Instantly share code, notes, and snippets.

@mpenet
Last active August 29, 2015 14:12
Show Gist options
  • Save mpenet/709c472aec3e69bfbdb3 to your computer and use it in GitHub Desktop.
Save mpenet/709c472aec3e69bfbdb3 to your computer and use it in GitHub Desktop.
(defn put!
"Takes a `ch`, a `msg`, a fn that enables backpressure, one that disables it,
and a no-arg function which, when invoked, closes the upstream source."
([ch msg suspend! resume! close!]
(let [status (atom ::sending)]
(async/put! ch msg
(fn [result]
(if-not result
(when close! (close!))
(do
(when (identical? @status ::paused)
;; if it was paused resume
(resume!))
(reset! status ::sent)))))
;; it's still sending, means it's parked, so suspend source
(when (compare-and-set! status ::sending ::paused)
(suspend!))
nil))
([ch msg suspend! resume!]
(put! ch msg suspend! resume! nil)))
```
@mpenet
Copy link
Author

mpenet commented Jan 6, 2015

@ztelIman it seems on https://gist.github.com/ztellman/fb64e81d1d7f0b261ccd the suspend clause hits all the time even when the channel is empty (with space in buffer) , since we never set the status to :sent when the put! was "immediate". But I could be overlooking something, let me know

@ztellman
Copy link

ztellman commented Jan 6, 2015

I'm not sure why you've split it into suspend! and resume!. There definitely was a problem with the code in question, though; I was missing an else clause.

@mpenet
Copy link
Author

mpenet commented Jan 6, 2015

the naming was clearer to me, at least in the context where I use this ("suspend reads" "resume reads" etc...), but it's subjective I guess.

@mpenet
Copy link
Author

mpenet commented Jan 6, 2015

but know that I think of it more, I prefer your arguments style ... it's more anonymous to the side effect.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment