package main

import (
	"bufio"
	"compress/gzip"
	"fmt"
	"github.com/pkg/errors"
	"local/ripley/chucker/config"
	"log"
	"os"
	"sync"
	"time"
)

// get file content
func ContentGet(filename string, lines *chan string, wg *sync.WaitGroup) error {
	// file reader
	f, err := os.Open(filename)
	if err != nil {
		return errors.Wrap(err, "unable to open file reader")
	}

	// gzip reader
	gr, err := gzip.NewReader(f)
	if err != nil {
		return errors.Wrap(err, "unable to open gzip reader")
	}

	// read file into channel
	scanner := bufio.NewScanner(gr)
	for scanner.Scan() {
		*lines <- scanner.Text()
		wg.Add(1)
	}
	if err := scanner.Err(); err != nil {
		return errors.Wrap(err, "unable to read file")
	}

	close(*lines)
	if err := f.Close(); err != nil {
		return errors.Wrap(err, "unable to close file reader")
	}
	if err := gr.Close(); err != nil {
		return errors.Wrap(err, "unable to close gzip reader")
	}

	return nil
}

// some long process
func SomeLongProcess(line string) {
	fmt.Println(line)
	time.Sleep(time.Second * 1)
	fmt.Println("DONE")
}

// push message to somewhere else
func ContentPush(lines *chan string, wg *sync.WaitGroup) () {
	for line := range *lines {
		go func(){
			defer wg.Done()
			SomeLongProcess(line)
		}()
	}
}

func main() {
	lines := make(chan string, config.ChannelBuffer)
	var wg sync.WaitGroup

	// start up the pusher
	go ContentPush(&lines, &wg)

	// throw the content at the channel
	err := ContentGet(config.InputFilename, &lines, &wg)
	if err != nil {
		log.Fatal(err)
	}

	wg.Wait()
}