Last active
April 22, 2023 15:55
-
-
Save davidbalbert/a58261c3851d3ca383334b776c2c97d2 to your computer and use it in GitHub Desktop.
Broadcast notifier sketches
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Lightly modified from the slides for "Rethinking Classical Concurrency Patterns" | |
package sync | |
import "context" | |
type state struct { | |
seq int64 | |
changed chan struct{} // closed upon notify | |
} | |
type Notifier struct { | |
st chan state | |
} | |
func NewNotifier() *Notifier { | |
st := make(chan state, 1) | |
st <- state{ | |
seq: 0, | |
changed: make(chan struct{}), | |
} | |
return &Notifier{st: st} | |
} | |
func (n *Notifier) NotifyChange() { | |
st := <-n.st | |
close(st.changed) | |
n.st <- state{ | |
seq: st.seq + 1, | |
changed: make(chan struct{}), | |
} | |
} | |
// If you call AwaitChange() with a wrong seq, it'll immediately notify you | |
// with the current one. | |
func (n *Notifier) AwaitChange(ctx context.Context, seq int64) (newSeq int64) { | |
st := <-n.st | |
n.st <- st | |
if st.seq != seq { | |
return st.seq | |
} | |
select { | |
case <-ctx.Done(): | |
return seq | |
case <-st.changed: | |
return seq + 1 | |
} | |
} | |
// Calling Seq is usually unnecessary. You can just start with 0, and AwaitChange | |
// will give you the correct seq. But if getting signaled twice is expensive, you | |
// can limit the likelyhood of getting signaled twice by calling Seq() first. | |
func (n *Notifier) Seq() int64 { | |
st := <-n.st | |
n.st <- st | |
return st.seq | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Same guarantees as ReliableNotifier (I think, none of this is tested), but | |
// with a similar interface to Notifier. Queue is copied verbatim from "Rethinking | |
// Classical Concurrency Primatives" | |
package sync | |
import ( | |
"context" | |
) | |
type Queue[T any] struct { | |
items chan []T // contains 0 or 1 non-empty slices | |
empty chan bool // contains true if items is empty | |
} | |
func NewQueue[T any]() *Queue[T] { | |
items := make(chan []T, 1) | |
empty := make(chan bool, 1) | |
empty <- true | |
return &Queue[T]{items, empty} | |
} | |
func (q *Queue[T]) Put(item T) { | |
var items []T | |
select { | |
case items = <-q.items: | |
case <-q.empty: | |
} | |
items = append(items, item) | |
q.items <- items | |
} | |
func (q *Queue[T]) Get(ctx context.Context) T { | |
var items []T | |
select { | |
case <-ctx.Done(): | |
var zero T | |
return zero | |
case items = <-q.items: | |
} | |
item := items[0] | |
items = items[1:] | |
if len(items) == 0 { | |
q.empty <- true | |
} else { | |
q.items <- items | |
} | |
return item | |
} | |
type Token struct { | |
t chan struct{} | |
} | |
// A struct that facilitates one-to-many broadcast notifications. All listeners are guaranteed | |
// to be notified of every change, and once you've registered, you won't miss any changes. | |
// | |
// St is a buffered channel that acts as a mutex for the notifier's state. The state is a map | |
// from arbitrary unique values (in this case a channelvalue ) to queues. The queues function | |
// similarly to a buffered channel, except they can grow infinitely, which means a slow listener | |
// will never block the notifier. Essentially, you're trading potentially unbounded memory growth | |
// for the guarantee that you won't miss any messages and no goroutine can slow another down. | |
// | |
// The channel is wrapped in a Token struct to hide its implementation details. No values are | |
// ever sent on the channel, it's just used a unique value. | |
// | |
// One thing this is missing right now: when you register a listener, it's sometimes useful to | |
// immediately receive the most recent value. Right now, you can't do that, but it would be | |
// pretty easy to add "lastValue" to the state. | |
type QueuedNotifier[T any] struct { | |
st chan map[chan struct{}]*Queue[T] | |
} | |
func NewQueuedNotifier[T any]() *QueuedNotifier[T] { | |
state := make(chan map[chan struct{}]*Queue[T], 1) | |
state <- make(map[chan struct{}]*Queue[T]) | |
return &QueuedNotifier[T]{ | |
st: state, | |
} | |
} | |
func (n *QueuedNotifier[T]) Register() Token { | |
q := NewQueue[T]() | |
t := make(chan struct{}) | |
st := <-n.st | |
st[t] = q | |
n.st <- st | |
return Token{t} | |
} | |
func (n *QueuedNotifier[T]) Unregister(t Token) { | |
st := <-n.st | |
delete(st, t.t) | |
n.st <- st | |
} | |
func (n *QueuedNotifier[T]) NotifyChange(v T) { | |
st := <-n.st | |
for _, q := range st { | |
q.Put(v) | |
} | |
n.st <- st | |
} | |
func (n *QueuedNotifier[T]) AwaitChange(ctx context.Context, t Token) (T, bool) { | |
st := <-n.st | |
q := st[t.t] | |
n.st <- st | |
if q == nil { | |
var zero T | |
return zero, false | |
} | |
return q.Get(ctx), true | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Ensures you don't miss a message. Interface based on | |
// signal.Notify(). Overly complicated for my tastes. | |
package sync | |
import "context" | |
type waiter[T any] struct { | |
q *Queue[T] | |
cancel context.CancelFunc | |
} | |
type ReliableNotifier[T any] struct { | |
st chan map[chan<- T]waiter[T] | |
} | |
func NewReliableNotifier[T any]() *ReliableNotifier[T] { | |
state := make(chan map[chan<- T]waiter[T], 1) | |
state <- make(map[chan<- T]waiter[T]) | |
return &ReliableNotifier[T]{ | |
st: state, | |
} | |
} | |
func (n *ReliableNotifier[T]) Broadcast(v T) { | |
st := <-n.st | |
for _, w := range st { | |
w.q.Put(v) | |
} | |
n.st <- st | |
} | |
func (n *ReliableNotifier[T]) Notify(c chan<- T) { | |
ctx, cancel := context.WithCancel(context.Background()) | |
w := waiter[T]{ | |
q: NewQueue[T](), | |
cancel: cancel, | |
} | |
st := <-n.st | |
st[c] = w | |
n.st <- st | |
go func() { | |
for { | |
v := w.q.Get(ctx) | |
select { | |
case <-ctx.Done(): | |
return | |
default: | |
} | |
c <- v | |
} | |
}() | |
} | |
func (n *ReliableNotifier[T]) Stop(c chan<- T) { | |
st := <-n.st | |
w, ok := st[c] | |
if ok { | |
w.cancel() | |
delete(st, c) | |
} | |
n.st <- st | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment