Created
December 13, 2021 22:10
-
-
Save paralin/f780f514ecc0139f3c203c85c85dce52 to your computer and use it in GitHub Desktop.
keyed multi writer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package keyedmultiwriter | |
| import ( | |
| "io" | |
| "sort" | |
| "sync" | |
| ) | |
| // KeyedMultiWriter tees a Write stream to multiple writers. | |
| // | |
| // Each Write will write to (in key sorted order) the writer with the highest | |
| // Priority for each key. | |
| type KeyedMultiWriter struct { | |
| // mtx guards below fields | |
| mtx sync.Mutex | |
| // writerKeys is sorted by key | |
| writerKeys []*writerKey | |
| } | |
| // writerKey contains a set of writers w/ priorities. | |
| type writerKey struct { | |
| // key is the ID for the writer | |
| key string | |
| // writers is the list of writers, sorted by priority. | |
| writers []*priorityWriter | |
| } | |
| // priorityWriter contains a writer with a priority. | |
| type priorityWriter struct { | |
| // w is the writer | |
| w io.Writer | |
| // p is the priority | |
| p int | |
| } | |
| // Add adds a writer to a key with a priority. | |
| func (w *KeyedMultiWriter) Add(key string, priority int, writer io.Writer) { | |
| if writer == nil { | |
| return | |
| } | |
| w.mtx.Lock() | |
| defer w.mtx.Unlock() | |
| wk, _ := w.lookupWriterKey(key, true) | |
| wk.addOrUpdate(priority, writer) | |
| } | |
| // Remove removes the writer at the key. | |
| func (w *KeyedMultiWriter) Remove(key string, writer io.Writer) { | |
| w.mtx.Lock() | |
| defer w.mtx.Unlock() | |
| wk, idx := w.lookupWriterKey(key, false) | |
| if wk != nil { | |
| wk.remove(writer) | |
| if len(wk.writers) == 0 { | |
| // remove the writer key | |
| wk.writers = append(wk.writers[:idx], wk.writers[idx+1:]...) | |
| } | |
| } | |
| } | |
| // Write writes to the highest priority writer for each key. | |
| // | |
| // any error writing to any of the writers is returned. | |
| func (w *KeyedMultiWriter) Write(p []byte) (n int, err error) { | |
| w.mtx.Lock() | |
| defer w.mtx.Unlock() | |
| for _, k := range w.writerKeys { | |
| for _, writer := range k.writers { | |
| if _, werr := writer.w.Write(p); werr != nil { | |
| err = werr | |
| } | |
| } | |
| } | |
| return len(p), err | |
| } | |
| // lookupWriterKey looks up the writer key for the key. | |
| // returns the insert index if not found. | |
| func (w *KeyedMultiWriter) lookupWriterKey(key string, createIfNotFound bool) (*writerKey, int) { | |
| vals := w.writerKeys | |
| idx := sort.Search(len(vals), func(i int) bool { | |
| v := vals[i] | |
| return v.key >= key | |
| }) | |
| var wk *writerKey | |
| if idx < len(vals) && vals[idx].key == key { | |
| wk = vals[idx] | |
| } | |
| if createIfNotFound && wk == nil { | |
| wk := w.insertWriterKey(key, idx) | |
| return wk, idx | |
| } | |
| return wk, idx | |
| } | |
| // insertWriterKey inserts the writer key at an index. | |
| // note: always insert at correct index to maintain sorted order. | |
| func (w *KeyedMultiWriter) insertWriterKey(key string, idx int) *writerKey { | |
| w.writerKeys = append(w.writerKeys, nil) | |
| copy(w.writerKeys[idx+1:], w.writerKeys[idx:]) | |
| nk := &writerKey{key: key} | |
| w.writerKeys[idx] = nk | |
| return nk | |
| } | |
| // addOrUpdate adds or updates a writer to the key. | |
| func (k *writerKey) addOrUpdate(priority int, writer io.Writer) { | |
| var found bool | |
| for _, wt := range k.writers { | |
| if wt.w == writer { | |
| if wt.p == priority { | |
| return | |
| } | |
| wt.p = priority | |
| found = true | |
| break | |
| } | |
| } | |
| if !found { | |
| k.writers = append(k.writers, &priorityWriter{ | |
| w: writer, | |
| p: priority, | |
| }) | |
| } | |
| sort.Slice(k.writers, func(i, j int) bool { | |
| return k.writers[i].p < k.writers[j].p | |
| }) | |
| k.sort() | |
| } | |
| // remove removes the writer from the key. | |
| func (k *writerKey) remove(writer io.Writer) { | |
| for i, v := range k.writers { | |
| if v.w == writer { | |
| k.writers = append(k.writers[:i], k.writers[i+1:]...) | |
| break | |
| } | |
| } | |
| } | |
| // sort sorts the writers by priority | |
| func (k *writerKey) sort() { | |
| sort.Slice(k.writers, func(i, j int) bool { | |
| return k.writers[i].p < k.writers[j].p | |
| }) | |
| } | |
| // _ is a type assertion | |
| var _ io.Writer = ((*KeyedMultiWriter)(nil)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment