Skip to content

Instantly share code, notes, and snippets.

@paralin
Created December 13, 2021 22:10
Show Gist options
  • Select an option

  • Save paralin/f780f514ecc0139f3c203c85c85dce52 to your computer and use it in GitHub Desktop.

Select an option

Save paralin/f780f514ecc0139f3c203c85c85dce52 to your computer and use it in GitHub Desktop.
keyed multi writer
package keyedmultiwriter
import (
"io"
"sort"
"sync"
)
// KeyedMultiWriter tees a Write stream to multiple writers.
//
// Each Write will write to (in key sorted order) the writer with the highest
// Priority for each key.
type KeyedMultiWriter struct {
// mtx guards below fields
mtx sync.Mutex
// writerKeys is sorted by key
writerKeys []*writerKey
}
// writerKey contains a set of writers w/ priorities.
type writerKey struct {
// key is the ID for the writer
key string
// writers is the list of writers, sorted by priority.
writers []*priorityWriter
}
// priorityWriter contains a writer with a priority.
type priorityWriter struct {
// w is the writer
w io.Writer
// p is the priority
p int
}
// Add adds a writer to a key with a priority.
func (w *KeyedMultiWriter) Add(key string, priority int, writer io.Writer) {
if writer == nil {
return
}
w.mtx.Lock()
defer w.mtx.Unlock()
wk, _ := w.lookupWriterKey(key, true)
wk.addOrUpdate(priority, writer)
}
// Remove removes the writer at the key.
func (w *KeyedMultiWriter) Remove(key string, writer io.Writer) {
w.mtx.Lock()
defer w.mtx.Unlock()
wk, idx := w.lookupWriterKey(key, false)
if wk != nil {
wk.remove(writer)
if len(wk.writers) == 0 {
// remove the writer key
wk.writers = append(wk.writers[:idx], wk.writers[idx+1:]...)
}
}
}
// Write writes to the highest priority writer for each key.
//
// any error writing to any of the writers is returned.
func (w *KeyedMultiWriter) Write(p []byte) (n int, err error) {
w.mtx.Lock()
defer w.mtx.Unlock()
for _, k := range w.writerKeys {
for _, writer := range k.writers {
if _, werr := writer.w.Write(p); werr != nil {
err = werr
}
}
}
return len(p), err
}
// lookupWriterKey looks up the writer key for the key.
// returns the insert index if not found.
func (w *KeyedMultiWriter) lookupWriterKey(key string, createIfNotFound bool) (*writerKey, int) {
vals := w.writerKeys
idx := sort.Search(len(vals), func(i int) bool {
v := vals[i]
return v.key >= key
})
var wk *writerKey
if idx < len(vals) && vals[idx].key == key {
wk = vals[idx]
}
if createIfNotFound && wk == nil {
wk := w.insertWriterKey(key, idx)
return wk, idx
}
return wk, idx
}
// insertWriterKey inserts the writer key at an index.
// note: always insert at correct index to maintain sorted order.
func (w *KeyedMultiWriter) insertWriterKey(key string, idx int) *writerKey {
w.writerKeys = append(w.writerKeys, nil)
copy(w.writerKeys[idx+1:], w.writerKeys[idx:])
nk := &writerKey{key: key}
w.writerKeys[idx] = nk
return nk
}
// addOrUpdate adds or updates a writer to the key.
func (k *writerKey) addOrUpdate(priority int, writer io.Writer) {
var found bool
for _, wt := range k.writers {
if wt.w == writer {
if wt.p == priority {
return
}
wt.p = priority
found = true
break
}
}
if !found {
k.writers = append(k.writers, &priorityWriter{
w: writer,
p: priority,
})
}
sort.Slice(k.writers, func(i, j int) bool {
return k.writers[i].p < k.writers[j].p
})
k.sort()
}
// remove removes the writer from the key.
func (k *writerKey) remove(writer io.Writer) {
for i, v := range k.writers {
if v.w == writer {
k.writers = append(k.writers[:i], k.writers[i+1:]...)
break
}
}
}
// sort sorts the writers by priority
func (k *writerKey) sort() {
sort.Slice(k.writers, func(i, j int) bool {
return k.writers[i].p < k.writers[j].p
})
}
// _ is a type assertion
var _ io.Writer = ((*KeyedMultiWriter)(nil))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment