Skip to content

Instantly share code, notes, and snippets.

@rgarcia
Last active December 22, 2015 04:29
Show Gist options
  • Select an option

  • Save rgarcia/6417378 to your computer and use it in GitHub Desktop.

Select an option

Save rgarcia/6417378 to your computer and use it in GitHub Desktop.
go data piping
package main
import (
"bufio"
"encoding/json"
"fmt"
"io"
"labix.org/v2/pipe"
"os"
)
type LineDecoder interface {
Decode(line []byte, v interface{}) error
}
type LineEncoder interface {
Encode(v interface{}) ([]byte, error)
}
type JSONLineDencoder struct {
}
func (j JSONLineDencoder) Decode(line []byte, v interface{}) error {
return json.Unmarshal(line, v)
}
func (j JSONLineDencoder) Encode(v interface{}) ([]byte, error) {
return json.Marshal(v)
}
// Decodes lines from stdin, applies function, re-encodes to stdout
func LineDencoder(dec LineDecoder, enc LineEncoder, f func(line interface{}) error) pipe.Pipe {
return pipe.TaskFunc(func(s *pipe.State) error {
r := bufio.NewReader(s.Stdin)
for {
line, err := r.ReadBytes('\n')
if len(line) > 0 {
var decoded interface{}
dec.Decode(line, &decoded)
// apply user-supplied function
if err := f(decoded); err != nil {
return err
}
newline, err := enc.Encode(decoded)
if err != nil {
return err
}
if _, err = s.Stdout.Write(newline); err != nil {
return err
}
fmt.Fprintf(s.Stdout, "\n")
}
if err != nil {
if err == io.EOF {
return nil
}
return err
}
}
panic("unreachable")
})
}
func main() {
jsonDencoder := JSONLineDencoder{}
p := pipe.Line(
pipe.ReadFile("test.json"),
LineDencoder(jsonDencoder, jsonDencoder, func(line interface{}) error {
doc := line.(map[string]interface{})
doc["d"] = 0
return nil
}),
pipe.Tee(os.Stdout),
)
err := pipe.Run(p)
if err != nil {
fmt.Fprintf(os.Stderr, "Error running pipe: %v\n", err)
}
}
{"a":1,"b":2,"c":3,"d":0}
{"a":4,"b":5,"c":6,"d":0}
{"a":1, "b":2, "c":3}
{"a":4, "b":5, "c":6}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment