Skip to content

Instantly share code, notes, and snippets.

@teaualune
Created May 24, 2018 03:03
Show Gist options
  • Save teaualune/0b97f020d87d7682c86909424b639370 to your computer and use it in GitHub Desktop.
Save teaualune/0b97f020d87d7682c86909424b639370 to your computer and use it in GitHub Desktop.
Minimal fanout pattern implemented with goroutine
package main
import (
"context"
"sync"
)
// FanOutContext contains fanout data
type FanOutContext struct {
channels []chan interface{}
mutex *sync.Mutex
}
// NewFanOutContext creates new fanout context
func NewFanOutContext() *FanOutContext {
var channels []chan interface{}
return &FanOutContext{
channels: channels,
mutex: &sync.Mutex{},
}
}
// Run starts receiving data from sourceChan and fanning out. This function should be called as a goroutine
func (foc *FanOutContext) Run(fanOutCtx context.Context, sourceChan chan interface{}) {
for {
select {
case <-fanOutCtx.Done():
return
case data := <-sourceChan:
foc.mutex.Lock()
for _, channel := range foc.channels {
channel <- data
}
foc.mutex.Unlock()
}
}
}
// AddChannel adds new channel to be fanned out. The returned index should be kept for calling RemoveAndCloseChannel(index)
func (foc *FanOutContext) AddChannel() (chan interface{}, int) {
index := len(foc.channels)
channel := make(chan interface{}, 1)
foc.mutex.Lock()
foc.channels = append(foc.channels, channel)
foc.mutex.Unlock()
return channel, index
}
// RemoveAndCloseChannel removes and closes channel at index
func (foc *FanOutContext) RemoveAndCloseChannel(index int) {
close(foc.channels[index])
foc.mutex.Lock()
foc.channels = append(foc.channels[:index], foc.channels[index+1:]...)
foc.mutex.Unlock()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment