Skip to content

Instantly share code, notes, and snippets.

@sitano
Last active September 25, 2016 15:59
Show Gist options
  • Save sitano/27574bdf611e4825d6b23967e63cf9e4 to your computer and use it in GitHub Desktop.
Save sitano/27574bdf611e4825d6b23967e63cf9e4 to your computer and use it in GitHub Desktop.
Simple example of URL fetcher with scalable up thread pool. Fan-out channels. Async reads. Sync writes.
// Ivan Prisyazhnyy <[email protected]>, 2016
// Package implements example of parallel URL fetcher counting Go entries.
// Test: echo 'https://golang.org\nhttps://golang.org\nars\nhttps://golang.org\n' | go run main.go -debug -k 0
// Test: echo 'https://golang.org\nhttps://golang.org\nars\nhttps://golang.org\n' | go run main.go -debug -k 1
// Test: echo 'https://golang.org\nhttps://golang.org\nars\nhttps://golang.org\n' | go run main.go -debug -k 2
// Test: echo 'https://golang.org\nhttps://golang.org\nars\nhttps://golang.org\n' | go run main.go -debug -k 3
// Test: echo 'https://golang.org\nhttps://golang.org\nars\nhttps://golang.org\n' | go run main.go -debug -k 4
package main
import (
"bufio"
"os"
"strings"
"fmt"
"flag"
"net/http"
"io/ioutil"
)
var k = flag.Uint("k", 1, "Maximum number of concurrent fetchers")
var debug = flag.Bool("debug", false, "Enable verbose logging")
const search string = "Go"
type result struct {
Data string
Count uint
}
func (r result) String() string {
return fmt.Sprintf("Count for %s: %d", r.Data, r.Count)
}
type Processor struct {
max uint
threads uint
data chan string
sink chan result
total uint64
}
func NewProcessor(max uint) (*Processor, error) {
if max < 1 {
return nil, fmt.Errorf("can't process anything w/o a single thread")
}
return &Processor{
max: max,
threads: 0,
data: make(chan string),
sink: make(chan result),
total: 0,
}, nil
}
func (p *Processor) Process(data string) {
if p.threads < p.max {
// Peek free worker
select {
case res := <- p.sink:
p.done(res)
default:
// Or scale
p.scale()
}
p.data <- data
} else {
// Wait for free worker
res := <- p.sink
p.done(res)
// Process data
p.data <- data
}
if *debug {
fmt.Println(data, "pushed into the worker queue")
}
}
// WaitAll waits for all workers to finish
func (p *Processor) WaitAll() {
for p.threads > 0 {
res := <- p.sink
p.done(res)
// TODO: here should be graceful goroutine shutdown, but omitted
// TODO: so setting 0, at least prevent breaking FSM, yet leaking goroutines
// TODO: but as far as we expect next call to be sys.exit, I don't care.
p.threads --
}
}
func (p *Processor) Total() uint64 {
return p.total
}
func (p *Processor) scale() {
p.threads ++
go worker(p.threads, p.data, p.sink)
}
func (p *Processor) done(r result) {
p.total += uint64(r.Count)
fmt.Println(r)
}
func worker(id uint, data <-chan string, sink chan<- result) {
for req := range data {
if *debug {
fmt.Println(id, ": start processing", req)
}
// Fetch
resp, err := http.Get(req)
if err != nil {
fmt.Fprintln(os.Stderr, id, ": Error fetching", req)
sink <- result{req, 0}
continue
}
// Read
bodyBytes, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
fmt.Fprintln(os.Stderr, id, ": Error reading", req, "body")
sink <- result{req, 0}
continue
}
// Count
count := uint(strings.Count(string(bodyBytes), search))
if *debug {
fmt.Println(id, ": finished processing", req, "with", count)
}
sink <- result{req, count}
}
}
func init() {
flag.Parse()
}
func main() {
p, err := NewProcessor(*k)
if err != nil {
panic(err)
}
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
url := strings.TrimSpace(scanner.Text())
if len(url) < 1 {
continue
}
p.Process(url)
}
if err := scanner.Err(); err != nil {
fmt.Fprintln(os.Stderr, "reading standard input:", err)
}
p.WaitAll()
fmt.Println("Total:", p.Total())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment