Skip to content

Instantly share code, notes, and snippets.

@maxsei
Last active January 30, 2024 22:48
Show Gist options
  • Select an option

  • Save maxsei/bc3ac6c0cecbb96d71b879979cdf677a to your computer and use it in GitHub Desktop.

Select an option

Save maxsei/bc3ac6c0cecbb96d71b879979cdf677a to your computer and use it in GitHub Desktop.
Safe channel closing in go
package main
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
)
// How can you safely close a channel with multiple producers?
func main() {
// ctx, cancel := context.WithCancel(context.Background())
ctx := context.Background()
ps := NewSafeChannel[int](ctx)
for i := 0; i < 5; i++ {
go func(i int) {
workDuration := 600 * time.Millisecond
for {
select {
case <-ps.Done():
return
case <-time.NewTicker(workDuration).C:
}
go ps.Send(i)
}
}(i)
}
go func() {
time.Sleep(3 * time.Second)
fmt.Println("closing...")
ps.Close()
// cancel()
fmt.Println("closing...done.")
}()
counts := make(map[int]int)
for x := range ps.Recv() {
fmt.Println(x)
prev, ok := counts[x]
if ok {
counts[x] = prev + 1
continue
}
counts[x] = 0
}
countsJson, _ := json.MarshalIndent(counts, "", "\t")
fmt.Printf("counts =\n%s\n", countsJson)
}
func NewSafeChannel[T any](ctx context.Context, size ...int) *SafeChannel[T] {
var ret SafeChannel[T]
switch len(size) {
case 0:
ret.ch = make(chan T)
case 1:
ret.ch = make(chan T, size[0])
default:
panic("must either specify size or not")
}
ret.ctx, ret.cancel = context.WithCancel(ctx)
go func() {
<-ret.ctx.Done()
ret.wg.Wait()
close(ret.ch)
}()
return &ret
}
type SafeChannel[T any] struct {
ch chan T
cancel context.CancelFunc
ctx context.Context
wg sync.WaitGroup
}
func (x *SafeChannel[T]) Cap() int { return cap(x.ch) }
func (x *SafeChannel[T]) Recv() <-chan T { return x.ch }
func (x *SafeChannel[T]) Send(v T) error {
x.wg.Add(1)
defer x.wg.Done()
if err := x.ctx.Err(); err != nil {
return err
}
select {
case <-x.ctx.Done():
return x.ctx.Err()
case x.ch <- v:
return nil
}
}
func (x *SafeChannel[T]) Done() <-chan struct{} { return x.ctx.Done() }
func (x *SafeChannel[T]) Close() { x.cancel() }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment