Created
April 27, 2015 21:40
-
-
Save gburgett/2e01236c11f3cd2e0c7f to your computer and use it in GitHub Desktop.
A broadcast channel implementation in Go.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
Package broadcast implements a broadcasting channel. The broadcaster | |
creates or wraps a channel which when written to is replicated to all attached | |
receivers. | |
example: | |
ticker := time.NewTicker(1 * time.Millisecond) | |
b, ch := broadcast.NewBroadcaster() | |
go func() { | |
for v := range ticker.C { | |
ch <- v | |
} | |
}() | |
for i := 0; i < 10; i++ { | |
go func(i int, recv <-chan interface{}) { | |
for v = range receiver { | |
... do something every millisecond on 10 different goroutines | |
} | |
}(i, b.Receiver()) | |
} | |
*/ | |
package broadcast | |
type Broadcaster struct { | |
publisher chan<- interface{} | |
toadd chan *receiver | |
receive *receiver | |
} | |
type receiver struct { | |
v chan interface{} | |
b *Broadcaster | |
next *receiver | |
closed bool | |
} | |
// Creates a new broadcaster, with associated write channel. Values written to the write channel | |
// will be propagated to every receiver channel. | |
func NewBroadcaster() (*Broadcaster, chan<- interface{}) { | |
ch := make(chan interface{}) | |
ret := &Broadcaster{ | |
publisher: ch, | |
toadd: make(chan *receiver), | |
} | |
runBroadcaster(ret, ch) | |
return ret, ch | |
} | |
// Creates a new broadcaster around the given channel. All events read from the channel | |
// will be propagated to every receiver channel. The given channel must be closed independently | |
// of the broadcaster. | |
func Broadcast(ch <-chan interface{}) *Broadcaster { | |
ret := &Broadcaster{ | |
publisher: nil, | |
toadd: make(chan *receiver), | |
} | |
runBroadcaster(ret, ch) | |
return ret | |
} | |
func runBroadcaster(b *Broadcaster, ch <-chan interface{}) { | |
go func() { | |
defer b.Close() | |
for v := range ch { | |
propagate(b, v) | |
} | |
b.publisher = nil //already closed | |
}() | |
go func() { | |
for r := range b.toadd { | |
r.next = b.receive | |
b.receive = r | |
} | |
for n := b.receive; n != nil; n = n.next { | |
close(n.v) | |
} | |
}() | |
} | |
func propagate(b *Broadcaster, v interface{}) { | |
for n := b.receive; n != nil; n = n.next { | |
n.v <- v | |
} | |
} | |
// Closes the writer channel, which propagates the close event to all receiver channels. | |
func (b *Broadcaster) Close() { | |
if b.toadd != nil { | |
close(b.toadd) | |
b.toadd = nil | |
} | |
if b.publisher != nil { | |
close(b.publisher) | |
b.publisher = nil | |
} | |
} | |
// Gets a receiver channel from the broadcaster. Every receiver channel receives all messages sent to the publisher. | |
func (b *Broadcaster) Receive() <-chan interface{} { | |
ch := make(chan interface{}) | |
ret := &receiver{ | |
v: ch, | |
b: b, | |
} | |
b.toadd <- ret | |
return ret.v | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment