Created
October 19, 2017 01:11
-
-
Save jsvisa/01b570051373857c2961ba6e493f1372 to your computer and use it in GitHub Desktop.
picfetcher.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bufio" | |
"flag" | |
"fmt" | |
"io" | |
"io/ioutil" | |
"log" | |
"net/http" | |
"os" | |
"os/signal" | |
"path" | |
"runtime" | |
"strconv" | |
"strings" | |
"syscall" | |
"time" | |
) | |
var ( | |
keyfile = flag.String("f", "key.file", "source key file") | |
debug = flag.Bool("d", false, "Debug print or not") | |
workers = flag.Int("w", 100, "the number of workers") | |
todos chan job | |
) | |
var ( | |
interval = 10 | |
lastLine = 0 | |
currentLine = 0 | |
total = 0 | |
count = 0 | |
) | |
type job struct { | |
retry int | |
base string | |
id string | |
key string | |
escapedKey string | |
} | |
func escapeURI(key string) string { | |
// srcs := strings.Split(key, "/") | |
// dsts := make([]string, len(srcs)) | |
// for i, e := range srcs { | |
// dsts[i] = url.QueryEscape(e) | |
// } | |
// return strings.Join(dsts, "/") | |
return key | |
} | |
func newHTTP() *http.Client { | |
return &http.Client{ | |
Transport: &http.Transport{ | |
MaxIdleConnsPerHost: 10, | |
ResponseHeaderTimeout: 900 * time.Second, | |
}, | |
Timeout: 1200 * time.Second, | |
} | |
} | |
func run(client *http.Client, job job) { | |
if job.retry > 3 { | |
return | |
} | |
job.escapedKey = escapeURI(job.key) | |
req, err := http.NewRequest("GET", job.escapedKey, nil) | |
if err != nil { | |
panic(err) | |
} | |
resp, err := client.Do(req) | |
if err != nil { | |
fmt.Printf("\033[0;31;40mGET %s Error- %v\033[0m\n", job.key, err) | |
job.retry++ | |
todos <- job | |
return | |
} | |
fmt.Printf("GET %s '%d'\n", job.escapedKey, resp.StatusCode) | |
if code := resp.StatusCode; code/100 != 2 { | |
fmt.Printf("\033[0;31;40mGET %s Error- StatusCode: %d \033[0m\n", job.key, code) | |
if code != 404 && job.retry <= 5 { | |
job.retry++ | |
todos <- job | |
} | |
return | |
} | |
defer resp.Body.Close() | |
f := fmt.Sprintf("%s.jpg", job.id) | |
fileName := path.Join(job.base, f) | |
output, err := os.Create(fileName) | |
if err != nil { | |
fmt.Println("Error while creating", fileName, "-", err) | |
job.retry++ | |
todos <- job | |
return | |
} | |
defer output.Close() | |
_, err = io.Copy(output, resp.Body) | |
if err != nil { | |
fmt.Println("Error while downloading", job.key, "-", err) | |
job.retry++ | |
todos <- job | |
return | |
} | |
// couldn't fetch Content-Length for some site at sometime | |
// fetchLen, err := strconv.Atoi(resp.Header.Get("Content-Length")) | |
// | |
// if writeLen != int64(fetchLen) { | |
// fmt.Println("Error mismatch Content-Length ", job.key, "-", writeLen, " vs ", fetchLen) | |
// todos <- job | |
// } | |
} | |
func scanning(keyfile string, sig chan<- os.Signal) { | |
// open a file | |
base := path.Base(keyfile) | |
base = strings.Replace(base, ".txt", ".dir", 1) | |
os.Mkdir(base, os.ModeDir) | |
if file, err := os.Open(keyfile); err == nil { | |
// make sure it gets closed | |
defer file.Close() | |
// create a new scanner and read the file line by line | |
scanner := bufio.NewScanner(file) | |
for scanner.Scan() { | |
currentLine++ | |
// if *debug { | |
// fmt.Printf("Scan: %s\n", scanner.Text()) | |
// } | |
if currentLine >= lastLine { | |
parts := strings.Split(scanner.Text(), " ") | |
id := parts[0] | |
key := parts[1] | |
if *debug { | |
fmt.Printf("Scan: %s\n", key) | |
} | |
if strings.HasPrefix(key, "http://") || strings.HasPrefix(key, "https://") { | |
todos <- job{id: id, key: key, base: base} | |
} | |
} | |
} | |
sig <- syscall.SIGSTOP | |
// check for errors | |
if err = scanner.Err(); err != nil { | |
log.Fatal(err) | |
} | |
} else { | |
log.Fatal(err) | |
} | |
} | |
func status() { | |
start := time.Now() | |
for { | |
time.Sleep(10 * time.Second) | |
fmt.Printf("%v TODO list length is %d, cap is %d, NumGORoutes %d, elapsed with: %v\n", | |
time.Now(), len(todos), cap(todos), runtime.NumGoroutine(), time.Since(start)) | |
} | |
} | |
func keeper(file string) { | |
f, err := os.OpenFile(file, os.O_CREATE|os.O_WRONLY, 0600) | |
if err != nil { | |
log.Fatalf("open %s error: %v\n", file, err) | |
} | |
defer f.Close() | |
for { | |
time.Sleep(10 * time.Second) | |
if _, err := f.WriteString(fmt.Sprintf("%d", currentLine)); err == nil { | |
f.Seek(0, 0) | |
} else { | |
panic(err) | |
} | |
} | |
} | |
func init() { | |
flag.Parse() | |
todos = make(chan job, *workers) | |
} | |
func main() { | |
runtime.GOMAXPROCS(2) | |
keyProcessFile := fmt.Sprintf("%s.process", *keyfile) | |
if buf, err := ioutil.ReadFile(keyProcessFile); err == nil { | |
if num, err := strconv.Atoi(string(buf)); err == nil { | |
lastLine = num | |
} | |
} | |
fmt.Printf("**** starting from line: %d\n", lastLine) | |
go status() | |
go keeper(keyProcessFile) | |
sig := make(chan os.Signal, 1) | |
signal.Notify(sig, syscall.SIGSTOP, syscall.SIGUSR1, os.Interrupt, os.Kill) | |
todos <- job{} | |
go scanning(*keyfile, sig) | |
client := newHTTP() | |
for { | |
select { | |
case signal := <-sig: | |
switch signal { | |
case syscall.SIGSTOP: | |
go func() { | |
for { | |
if len(todos) <= 0 { | |
time.Sleep(time.Duration(10) * time.Second) | |
sig <- syscall.SIGUSR1 | |
} else { | |
time.Sleep(1 * time.Second) | |
} | |
} | |
}() | |
default: | |
fmt.Println("stopping...... got total length: ", total, "count: ", count) | |
os.Exit(0) | |
} | |
case job := <-todos: | |
// fmt.Println("get a new job", job.key) | |
if job.key != "" { | |
go run(client, job) | |
} | |
time.Sleep(time.Duration(interval) * time.Millisecond) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment