Last active
September 25, 2016 15:59
-
-
Save sitano/27574bdf611e4825d6b23967e63cf9e4 to your computer and use it in GitHub Desktop.
Simple example of URL fetcher with scalable up thread pool. Fan-out channels. Async reads. Sync writes.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Ivan Prisyazhnyy <[email protected]>, 2016 | |
// Package implements example of parallel URL fetcher counting Go entries. | |
// Test: echo 'https://golang.org\nhttps://golang.org\nars\nhttps://golang.org\n' | go run main.go -debug -k 0 | |
// Test: echo 'https://golang.org\nhttps://golang.org\nars\nhttps://golang.org\n' | go run main.go -debug -k 1 | |
// Test: echo 'https://golang.org\nhttps://golang.org\nars\nhttps://golang.org\n' | go run main.go -debug -k 2 | |
// Test: echo 'https://golang.org\nhttps://golang.org\nars\nhttps://golang.org\n' | go run main.go -debug -k 3 | |
// Test: echo 'https://golang.org\nhttps://golang.org\nars\nhttps://golang.org\n' | go run main.go -debug -k 4 | |
package main | |
import ( | |
"bufio" | |
"os" | |
"strings" | |
"fmt" | |
"flag" | |
"net/http" | |
"io/ioutil" | |
) | |
var k = flag.Uint("k", 1, "Maximum number of concurrent fetchers") | |
var debug = flag.Bool("debug", false, "Enable verbose logging") | |
const search string = "Go" | |
type result struct { | |
Data string | |
Count uint | |
} | |
func (r result) String() string { | |
return fmt.Sprintf("Count for %s: %d", r.Data, r.Count) | |
} | |
type Processor struct { | |
max uint | |
threads uint | |
data chan string | |
sink chan result | |
total uint64 | |
} | |
func NewProcessor(max uint) (*Processor, error) { | |
if max < 1 { | |
return nil, fmt.Errorf("can't process anything w/o a single thread") | |
} | |
return &Processor{ | |
max: max, | |
threads: 0, | |
data: make(chan string), | |
sink: make(chan result), | |
total: 0, | |
}, nil | |
} | |
func (p *Processor) Process(data string) { | |
if p.threads < p.max { | |
// Peek free worker | |
select { | |
case res := <- p.sink: | |
p.done(res) | |
default: | |
// Or scale | |
p.scale() | |
} | |
p.data <- data | |
} else { | |
// Wait for free worker | |
res := <- p.sink | |
p.done(res) | |
// Process data | |
p.data <- data | |
} | |
if *debug { | |
fmt.Println(data, "pushed into the worker queue") | |
} | |
} | |
// WaitAll waits for all workers to finish | |
func (p *Processor) WaitAll() { | |
for p.threads > 0 { | |
res := <- p.sink | |
p.done(res) | |
// TODO: here should be graceful goroutine shutdown, but omitted | |
// TODO: so setting 0, at least prevent breaking FSM, yet leaking goroutines | |
// TODO: but as far as we expect next call to be sys.exit, I don't care. | |
p.threads -- | |
} | |
} | |
func (p *Processor) Total() uint64 { | |
return p.total | |
} | |
func (p *Processor) scale() { | |
p.threads ++ | |
go worker(p.threads, p.data, p.sink) | |
} | |
func (p *Processor) done(r result) { | |
p.total += uint64(r.Count) | |
fmt.Println(r) | |
} | |
func worker(id uint, data <-chan string, sink chan<- result) { | |
for req := range data { | |
if *debug { | |
fmt.Println(id, ": start processing", req) | |
} | |
// Fetch | |
resp, err := http.Get(req) | |
if err != nil { | |
fmt.Fprintln(os.Stderr, id, ": Error fetching", req) | |
sink <- result{req, 0} | |
continue | |
} | |
// Read | |
bodyBytes, err := ioutil.ReadAll(resp.Body) | |
resp.Body.Close() | |
if err != nil { | |
fmt.Fprintln(os.Stderr, id, ": Error reading", req, "body") | |
sink <- result{req, 0} | |
continue | |
} | |
// Count | |
count := uint(strings.Count(string(bodyBytes), search)) | |
if *debug { | |
fmt.Println(id, ": finished processing", req, "with", count) | |
} | |
sink <- result{req, count} | |
} | |
} | |
func init() { | |
flag.Parse() | |
} | |
func main() { | |
p, err := NewProcessor(*k) | |
if err != nil { | |
panic(err) | |
} | |
scanner := bufio.NewScanner(os.Stdin) | |
for scanner.Scan() { | |
url := strings.TrimSpace(scanner.Text()) | |
if len(url) < 1 { | |
continue | |
} | |
p.Process(url) | |
} | |
if err := scanner.Err(); err != nil { | |
fmt.Fprintln(os.Stderr, "reading standard input:", err) | |
} | |
p.WaitAll() | |
fmt.Println("Total:", p.Total()) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment