Skip to content

Instantly share code, notes, and snippets.

@dallasmarlow
Created April 20, 2015 19:33
Show Gist options
  • Save dallasmarlow/899235f5e1e5dbec3e8e to your computer and use it in GitHub Desktop.
Save dallasmarlow/899235f5e1e5dbec3e8e to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
"strings"
"sync"
)
type RowMapper interface {
Map(src chan []string) chan []string
}
type RowProcessor struct {
src, snk chan []string
}
func NewRowProcessor(mappers ...RowMapper) *RowProcessor {
src := make(chan []string)
var snk chan []string
for _, mapper := range mappers {
switch snk {
case nil:
snk = mapper.Map(src)
default:
snk = mapper.Map(snk)
}
}
return &RowProcessor{src, snk}
}
func (p *RowProcessor) Enqueue(r []string) {
p.src <- r
}
func (p *RowProcessor) Process(handler func([]string)) {
for r := range p.snk {
handler(r)
}
}
func (p *RowProcessor) Close() {
close(p.src)
}
type UpperCaseMapper struct{}
func (u UpperCaseMapper) Map(src chan []string) chan []string {
snk := make(chan []string)
go func(numWorkers int) {
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(wg *sync.WaitGroup) {
for r := range src {
for i, c := range r {
r[i] = strings.ToUpper(c)
}
snk <- r
}
wg.Done()
}(&wg)
}
wg.Wait()
close(snk)
}(5)
return snk
}
func main() {
mappers := []RowMapper{UpperCaseMapper{}}
rowProcessor := NewRowProcessor(mappers...)
go func() {
for _, r := range [][]string{[]string{"friday", "friday"}, []string{"i", "love", "audrey"}} {
rowProcessor.Enqueue(r)
}
rowProcessor.Close()
}()
var rows [][]string
rowProcessor.Process(func(r []string) {
rows = append(rows, r)
})
for _, r := range rows {
fmt.Println(r)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment