Skip to content

Instantly share code, notes, and snippets.

@rikonor
Created March 29, 2019 20:51
Show Gist options
  • Select an option

  • Save rikonor/6db3dd55bc0829c066b1f4b5232b2786 to your computer and use it in GitHub Desktop.

Select an option

Save rikonor/6db3dd55bc0829c066b1f4b5232b2786 to your computer and use it in GitHub Desktop.
Self-healing io.Writer
package x
type InitializeFunc func() (io.Writer, error)
type Writer struct {
initFn InitializeFunc
backoff Backoff
w io.Writer
mu *sync.Mutex
nowFn func() time.Time
}
func NewWriter(initFn InitializeFunc, backoff Backoff) (io.WriteCloser, error) {
// Initialize writer
w, err := initFn()
if err != nil {
return nil, err
}
// Return wrapper
return &Writer{
initFn: initFn,
backoff: backoff,
w: w,
mu: &sync.Mutex{},
nowFn: time.Now,
}, nil
}
func (w *Writer) Write(b []byte) (int, error) {
w.mu.Lock()
defer w.mu.Unlock()
return w.write(b)
}
func (w *Writer) write(b []byte) (int, error) {
n, err := w.w.Write(b)
if isBrokenPipe(err) {
// Decide if we should try to reconnect or wait a bit (Backoff)
if !w.backoff.Ready() {
return n, err
}
w.backoff.SetLastFailureTime(w.nowFn())
// Recover
ww, iErr := w.initFn()
if iErr != nil {
// Ignore and try again later, return original error
return n, err
}
// Close
if err := w.close(); err != nil {
// Ignore this error
}
w.w = ww
}
return n, err
}
func (w *Writer) Close() error {
w.mu.Lock()
defer w.mu.Unlock()
return w.close()
}
func (w *Writer) close() error {
if c, ok := w.w.(io.Closer); ok {
return c.Close()
}
return nil
}
func isBrokenPipe(err error) bool {
nErr, ok := err.(*net.OpError)
if !ok {
return false
}
sErr, ok := nErr.Err.(*os.SyscallError)
if !ok {
return false
}
return sErr.Err == syscall.EPIPE
}
type Backoff interface {
Ready() bool
SetLastFailureTime(t time.Time)
}
type backoff struct {
d time.Duration
lastFailureTime time.Time
nowFn func() time.Time
}
func NewBackoff(d time.Duration) Backoff {
return &backoff{
d: d,
nowFn: time.Now,
}
}
func (b *backoff) Ready() bool {
if d := b.nowFn().Sub(b.lastFailureTime); d < b.d {
return false
}
return true
}
func (b *backoff) SetLastFailureTime(t time.Time) {
b.lastFailureTime = t
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment