package main import ( "bufio" "compress/gzip" "fmt" "github.com/pkg/errors" "local/ripley/chucker/config" "log" "os" "sync" "time" ) // get file content func ContentGet(filename string, lines *chan string, wg *sync.WaitGroup) error { // file reader f, err := os.Open(filename) if err != nil { return errors.Wrap(err, "unable to open file reader") } // gzip reader gr, err := gzip.NewReader(f) if err != nil { return errors.Wrap(err, "unable to open gzip reader") } // read file into channel scanner := bufio.NewScanner(gr) for scanner.Scan() { *lines <- scanner.Text() wg.Add(1) } if err := scanner.Err(); err != nil { return errors.Wrap(err, "unable to read file") } close(*lines) if err := f.Close(); err != nil { return errors.Wrap(err, "unable to close file reader") } if err := gr.Close(); err != nil { return errors.Wrap(err, "unable to close gzip reader") } return nil } // some long process func SomeLongProcess(line string) { fmt.Println(line) time.Sleep(time.Second * 1) fmt.Println("DONE") } // push message to somewhere else func ContentPush(lines *chan string, wg *sync.WaitGroup) () { for line := range *lines { go func(){ defer wg.Done() SomeLongProcess(line) }() } } func main() { lines := make(chan string, config.ChannelBuffer) var wg sync.WaitGroup // start up the pusher go ContentPush(&lines, &wg) // throw the content at the channel err := ContentGet(config.InputFilename, &lines, &wg) if err != nil { log.Fatal(err) } wg.Wait() }