Skip to content

Instantly share code, notes, and snippets.

@qdm12
Created February 9, 2020 23:16
Show Gist options
  • Save qdm12/a91fdc976d09c1b4bbbd61929330ac67 to your computer and use it in GitHub Desktop.
Save qdm12/a91fdc976d09c1b4bbbd61929330ac67 to your computer and use it in GitHub Desktop.
package command
import (
"bufio"
"context"
"fmt"
"io"
)
type StreamMerger interface {
// Merge merges the given stream with the given name to the CollectLines method, and should be run in a goroutine
Merge(name string, stream io.ReadCloser)
// CollectLines collects lines received from each stream merged in the streammerger and runs onNewLine on each line
CollectLines(onNewLine func(line string)) error
}
type streamMerger struct {
ctx context.Context
cancel context.CancelFunc
chLine chan string
chErr chan error
}
func NewStreamMerger(ctx context.Context) StreamMerger {
ctx, cancel := context.WithCancel(ctx)
return &streamMerger{
ctx: ctx,
cancel: cancel,
chLine: make(chan string),
chErr: make(chan error),
}
}
// Merge merges the given stream with the given name to the CollectLines method, and should be run in a goroutine
func (s *streamMerger) Merge(name string, stream io.ReadCloser) {
defer stream.Close()
go func() { // Read lines infinitely
scanner := bufio.NewScanner(stream)
for scanner.Scan() {
line := string(scanner.Bytes())
s.chLine <- fmt.Sprintf("%s: %s", name, line)
}
if err := scanner.Err(); err != nil {
s.chErr <- fmt.Errorf("%s: stream error: %w", name, err)
}
}()
<-s.ctx.Done() // blocks until context is canceled
}
// CollectLines collects lines received from each stream merged in the streammerger and runs onNewLine on each line
func (s *streamMerger) CollectLines(onNewLine func(line string)) error {
defer func() {
s.cancel() // stops other streams
close(s.chLine)
close(s.chErr)
}()
for {
select {
case line := <-s.chLine:
onNewLine(line)
case err := <-s.chErr:
return err
case <-s.ctx.Done():
return nil
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment