Skip to content

Instantly share code, notes, and snippets.

@clarkmcc
Last active April 17, 2020 00:06
Show Gist options
  • Save clarkmcc/f23a429747bc7ae7bc765636efce536a to your computer and use it in GitHub Desktop.
Save clarkmcc/f23a429747bc7ae7bc765636efce536a to your computer and use it in GitHub Desktop.
I developed this package to handle cases where I wanted to stop running goroutines and verify that they were in fact stopped. This can be paired up with a map to track named goroutines and inform respective goroutines to shut down or start again.
package concurrency
type SingleStructChan chan struct{}
type SingleAckerChan chan *stopAcker
type DoubleStructChan chan chan struct{}
type StopInformer interface {
// Stop blocks until the stop informer has received confirmation that the resource has stopped
Stop()
// StopAndNotify functions the same as Stop but instead of blocking, returns a chan that fires
// when the requested resource has stopped
StopAndNotify(buffer int) SingleStructChan
// Watch is utilized by a goroutine--generally inside of a select statement--to received the
// stop notification
Watch() SingleAckerChan
}
// genericStopInformer implements the StopInformer interface
type genericStopInformer struct {
// Values are sent to this channel when the Stop or StopAndNotify methods are called, this
// channel is used internally to coordinate the stop
internalStopChan DoubleStructChan
// This channel is usually sent as a value into the internalStopChan. Values are sent to this
// channel when the goroutine being stopped has acknowledged the stop
internalNotifyStoppedChan SingleStructChan
// This channel is used by the goroutine to watch for stops. Values are are sent to this
// channel when Stop and StopAndNotify are called
internalWatchChan SingleAckerChan
// This channel is used by callers to get stop acknowledgements from goroutines via a channel
// as opposed to blocking on the Stop method. This channel is returned by StopAndNotify. This
externalWatchStoppedChan SingleStructChan
}
// stopAcker is sent as a value through the internalStopChan and is generally received by the
// target goroutine
type stopAcker struct {
ackChan SingleStructChan
}
// Returns a new instance of the stopAcker
func NewStopAcker(ackChan SingleStructChan) *stopAcker {
return &stopAcker{ackChan: ackChan}
}
// Utilized by the target goroutine to 'acknowledge' the stop command. Calling this method
// unblocks the StopInformer.Stop command or sends a notification to the StopInformer.StopAndNotify
// channel.
func (s *stopAcker) Acknowledge() {
s.ackChan <- struct{}{}
}
// Creates a new stop informer with an optional parameter for the buffer size
func NewGenericStopInformer() StopInformer {
internalStopChan := make(DoubleStructChan)
internalNotifyStoppedChan := make(SingleStructChan)
internalWatchChan := make(SingleAckerChan)
externalWatchStoppedChan := make(SingleStructChan)
go func() {
<-internalStopChan
internalWatchChan <- NewStopAcker(internalNotifyStoppedChan)
}()
return &genericStopInformer{
internalStopChan: internalStopChan,
internalNotifyStoppedChan: internalNotifyStoppedChan,
internalWatchChan: internalWatchChan,
externalWatchStoppedChan: externalWatchStoppedChan,
}
}
// Stop blocks until the stop informer has received confirmation that the resource has stopped
func (g *genericStopInformer) Stop() {
g.internalStopChan <- g.internalNotifyStoppedChan
<-g.internalNotifyStoppedChan
}
// StopAndNotify functions the same as Stop but instead of blocking, returns a buffered channel
// that fires when the requested resource has stopped.
func (g *genericStopInformer) StopAndNotify(buffer int) SingleStructChan {
g.externalWatchStoppedChan = make(SingleStructChan, buffer)
go func() {
g.internalStopChan <- g.internalNotifyStoppedChan
<-g.internalNotifyStoppedChan
g.externalWatchStoppedChan <- struct{}{}
}()
return g.externalWatchStoppedChan
}
// Watch is utilized by a goroutine--generally inside of a select statement--to received the
// stop notification
func (g *genericStopInformer) Watch() SingleAckerChan {
return g.internalWatchChan
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment