Skip to content

Instantly share code, notes, and snippets.

@akavel
Created May 7, 2014 11:12
Show Gist options
  • Save akavel/01077e81def6929b325f to your computer and use it in GitHub Desktop.
Save akavel/01077e81def6929b325f to your computer and use it in GitHub Desktop.
piping experiment for Go, by egon
// https://groups.google.com/d/msg/golang-nuts/LqDtKSpo5YY/bd5Z5xbNIVgJ
// by egon
package main
import (
"fmt"
"reflect"
)
type Pipe chan interface{}
var X = struct{}{}
func Start() Pipe {
out := make(Pipe)
return out
}
func (p Pipe) Done() Pipe {
close(p)
return p
}
func (p Pipe) Wait() {
for _ = range p {}
}
func (p Pipe) To(fn interface{}, params ...interface{}) Pipe {
f := reflect.ValueOf(fn)
def := f.Type()
self := reflect.ValueOf(p)
piping := make(map[int]bool)
for i, param := range params {
if param == X {
piping[i] = true
}
}
pipe := -1
for i := 0; i < def.NumIn(); i += 1 {
v := def.In(i)
if v == self.Type() {
pipe = i
}
}
// if the piping positions isn't defined use values as the last argument
if pipe < 0 && len(piping) == 0 && len(params) + 1 == def.NumIn() {
piping[len(params)] = true
params = append(params, X)
}
paramCount := len(params)
if pipe >= 0 {
paramCount += 1
}
expectsAtLeast := def.NumIn()
if def.IsVariadic() {
expectsAtLeast -= 1
}
if paramCount < expectsAtLeast {
panic(fmt.Errorf("'%v' incorrect no. of args: expected %v; got %v", f, expectsAtLeast, paramCount))
}
in := make([]reflect.Value, paramCount)
k := 0
for i, _ := range in {
if i == pipe {
continue
}
in[i] = reflect.ValueOf(params[k])
k += 1
}
if pipe >= 0 {
in[pipe] = self
result := f.Call(in)
if len(result) > 0 {
return result[0].Interface().(Pipe)
}
return Start().Done()
}
out := Start()
go func() {
for v := range p {
for k, _ := range piping {
in[k] = reflect.ValueOf(v)
}
r := f.Call(in)
if len(r) > 0 {
out <- r[0].Interface()
}
}
out.Done()
}()
return out
}
func Take(from Pipe, amount int) Pipe {
out := Start()
go func() {
count := 0
for v := range from {
out <- v
count += 1
if count >= amount {
break
}
}
out.Done()
}()
return out
}
func Add(a int, b int) int {
return a + b
}
func Mul(a int, b int) int {
return a * b
}
func Gen(data []int) Pipe {
out := Start()
go func() {
for _, v := range data {
out <- v
}
out.Done()
}()
return out
}
func main() {
items := [...]int{1, 512, 3, 4, 5, 10}
pipe := Start()
go func(){
for _, v := range items {
pipe <- v
}
pipe.Done()
}()
pipe.To(Take, 4).To(Mul, X, X).To(fmt.Sprintf, "> %5.v <").To(fmt.Println).Wait()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment