Created
December 20, 2020 10:29
-
-
Save asterite3/95f5f7adabce0e60526de946c953f158 to your computer and use it in GitHub Desktop.
Unbuffered with CleanContext with a second struct
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package broadcaster | |
import ( | |
"context" | |
"io" | |
"sync" | |
"sync/atomic" | |
) | |
// Unbuffered accumulates multiple io.WriteCloser by stream. | |
type Unbuffered struct { | |
mu sync.Mutex | |
writers writers | |
} | |
// Add adds new io.WriteCloser. | |
func (w *Unbuffered) Add(writer io.WriteCloser) { | |
w.mu.Lock() | |
w.writers.add(writer) | |
w.mu.Unlock() | |
} | |
// Write writes bytes to all writers. Failed writers will be evicted during | |
// this call. | |
func (w *Unbuffered) Write(p []byte) (n int, err error) { | |
w.mu.Lock() | |
defer w.mu.Unlock() | |
writers := w.writers.get() | |
if writers == nil { | |
return | |
} | |
var evict []int | |
for i, sw := range *writers { | |
if n, err := sw.Write(p); err != nil || n != len(p) { | |
// On error, evict the writer | |
evict = append(evict, i) | |
} | |
} | |
w.writers.mu.Lock() | |
for n, i := range evict { | |
*writers = append((*writers)[:i-n], (*writers)[i-n+1:]...) | |
} | |
w.writers.mu.Unlock() | |
return len(p), nil | |
} | |
// Clean closes and removes all writers. Last non-eol-terminated part of data | |
// will be saved. | |
func (w *Unbuffered) Clean() error { | |
w.mu.Lock() | |
w.writers.clean() | |
w.mu.Unlock() | |
return nil | |
} | |
// CleanContext closes and removes all writers. | |
// CleanContext supports timeouts via the context to unblock and forcefully | |
// close the io streams. This function should only be used if all writers | |
// added to Unbuffered support concurrent calls to Close and Write: it will | |
// call Close while Write may be in progress in order to forcefully close | |
// writers | |
func (w *Unbuffered) CleanContext(ctx context.Context) error { | |
var cleaningUnderway int32 = 0 | |
regularCleanDone := make(chan struct{}, 1) | |
go func() { | |
defer close(regularCleanDone) | |
w.mu.Lock() | |
defer w.mu.Unlock() | |
if !atomic.CompareAndSwapInt32(&cleaningUnderway, 0, 1) { | |
return | |
} | |
w.writers.clean() | |
}() | |
select { | |
case <-regularCleanDone: | |
return nil | |
case <-ctx.Done(): | |
} | |
if !atomic.CompareAndSwapInt32(&cleaningUnderway, 0, 1) { | |
return nil | |
} | |
w.writers.clean() | |
return nil | |
} | |
type writers struct { | |
mu sync.Mutex | |
writers *[]io.WriteCloser | |
} | |
func (w *writers) add(writer io.WriteCloser) { | |
w.mu.Lock() | |
if w.writers == nil { | |
w.writers = new([]io.WriteCloser) | |
} | |
*w.writers = append(*w.writers, writer) | |
w.mu.Unlock() | |
} | |
func (w *writers) get() *[]io.WriteCloser { | |
w.mu.Lock() | |
defer w.mu.Unlock() | |
return w.writers | |
} | |
func (w *writers) clean() { | |
w.mu.Lock() | |
if w.writers == nil { | |
return | |
} | |
for _, sw := range *w.writers { | |
sw.Close() | |
} | |
w.writers = nil | |
w.mu.Unlock() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment