Created
May 24, 2018 03:03
-
-
Save teaualune/0b97f020d87d7682c86909424b639370 to your computer and use it in GitHub Desktop.
Minimal fanout pattern implemented with goroutine
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"sync" | |
) | |
// FanOutContext contains fanout data | |
type FanOutContext struct { | |
channels []chan interface{} | |
mutex *sync.Mutex | |
} | |
// NewFanOutContext creates new fanout context | |
func NewFanOutContext() *FanOutContext { | |
var channels []chan interface{} | |
return &FanOutContext{ | |
channels: channels, | |
mutex: &sync.Mutex{}, | |
} | |
} | |
// Run starts receiving data from sourceChan and fanning out. This function should be called as a goroutine | |
func (foc *FanOutContext) Run(fanOutCtx context.Context, sourceChan chan interface{}) { | |
for { | |
select { | |
case <-fanOutCtx.Done(): | |
return | |
case data := <-sourceChan: | |
foc.mutex.Lock() | |
for _, channel := range foc.channels { | |
channel <- data | |
} | |
foc.mutex.Unlock() | |
} | |
} | |
} | |
// AddChannel adds new channel to be fanned out. The returned index should be kept for calling RemoveAndCloseChannel(index) | |
func (foc *FanOutContext) AddChannel() (chan interface{}, int) { | |
index := len(foc.channels) | |
channel := make(chan interface{}, 1) | |
foc.mutex.Lock() | |
foc.channels = append(foc.channels, channel) | |
foc.mutex.Unlock() | |
return channel, index | |
} | |
// RemoveAndCloseChannel removes and closes channel at index | |
func (foc *FanOutContext) RemoveAndCloseChannel(index int) { | |
close(foc.channels[index]) | |
foc.mutex.Lock() | |
foc.channels = append(foc.channels[:index], foc.channels[index+1:]...) | |
foc.mutex.Unlock() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment