Created
March 29, 2019 20:51
-
-
Save rikonor/6db3dd55bc0829c066b1f4b5232b2786 to your computer and use it in GitHub Desktop.
Self-healing io.Writer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package x | |
| type InitializeFunc func() (io.Writer, error) | |
| type Writer struct { | |
| initFn InitializeFunc | |
| backoff Backoff | |
| w io.Writer | |
| mu *sync.Mutex | |
| nowFn func() time.Time | |
| } | |
| func NewWriter(initFn InitializeFunc, backoff Backoff) (io.WriteCloser, error) { | |
| // Initialize writer | |
| w, err := initFn() | |
| if err != nil { | |
| return nil, err | |
| } | |
| // Return wrapper | |
| return &Writer{ | |
| initFn: initFn, | |
| backoff: backoff, | |
| w: w, | |
| mu: &sync.Mutex{}, | |
| nowFn: time.Now, | |
| }, nil | |
| } | |
| func (w *Writer) Write(b []byte) (int, error) { | |
| w.mu.Lock() | |
| defer w.mu.Unlock() | |
| return w.write(b) | |
| } | |
| func (w *Writer) write(b []byte) (int, error) { | |
| n, err := w.w.Write(b) | |
| if isBrokenPipe(err) { | |
| // Decide if we should try to reconnect or wait a bit (Backoff) | |
| if !w.backoff.Ready() { | |
| return n, err | |
| } | |
| w.backoff.SetLastFailureTime(w.nowFn()) | |
| // Recover | |
| ww, iErr := w.initFn() | |
| if iErr != nil { | |
| // Ignore and try again later, return original error | |
| return n, err | |
| } | |
| // Close | |
| if err := w.close(); err != nil { | |
| // Ignore this error | |
| } | |
| w.w = ww | |
| } | |
| return n, err | |
| } | |
| func (w *Writer) Close() error { | |
| w.mu.Lock() | |
| defer w.mu.Unlock() | |
| return w.close() | |
| } | |
| func (w *Writer) close() error { | |
| if c, ok := w.w.(io.Closer); ok { | |
| return c.Close() | |
| } | |
| return nil | |
| } | |
| func isBrokenPipe(err error) bool { | |
| nErr, ok := err.(*net.OpError) | |
| if !ok { | |
| return false | |
| } | |
| sErr, ok := nErr.Err.(*os.SyscallError) | |
| if !ok { | |
| return false | |
| } | |
| return sErr.Err == syscall.EPIPE | |
| } | |
| type Backoff interface { | |
| Ready() bool | |
| SetLastFailureTime(t time.Time) | |
| } | |
| type backoff struct { | |
| d time.Duration | |
| lastFailureTime time.Time | |
| nowFn func() time.Time | |
| } | |
| func NewBackoff(d time.Duration) Backoff { | |
| return &backoff{ | |
| d: d, | |
| nowFn: time.Now, | |
| } | |
| } | |
| func (b *backoff) Ready() bool { | |
| if d := b.nowFn().Sub(b.lastFailureTime); d < b.d { | |
| return false | |
| } | |
| return true | |
| } | |
| func (b *backoff) SetLastFailureTime(t time.Time) { | |
| b.lastFailureTime = t | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment