Created
August 23, 2014 19:50
-
-
Save nickstenning/f3606ebf7633d1950da8 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"encoding/json" | |
"errors" | |
"fmt" | |
"io/ioutil" | |
"log" | |
"net/http" | |
_ "net/http/pprof" | |
"os" | |
"path" | |
"strings" | |
"time" | |
"github.com/PuerkitoBio/goquery" | |
) | |
var StatusNotOk = errors.New("non-200 status code received") | |
type Doc struct { | |
Id int `json:"id"` | |
URL string `json:"url"` | |
Timestamp string `json:"public_timestamp"` | |
} | |
type Page struct { | |
Cur int `json:"current_page"` | |
Total int `json:"total_pages"` | |
Docs []Doc `json:"results"` | |
} | |
type Crawler struct { | |
base string | |
start string | |
outdir string | |
fetchers int | |
count chan struct{} | |
skip chan struct{} | |
done chan struct{} | |
pending chan Doc | |
ticker <-chan time.Time | |
} | |
func main() { | |
go func() { | |
log.Println(http.ListenAndServe("localhost:6060", nil)) | |
}() | |
c := NewCrawler() | |
c.Start() | |
c.Wait() | |
} | |
func NewCrawler() *Crawler { | |
c := &Crawler{} | |
c.base = "https://www.gov.uk" | |
c.start = "/government/announcements.json" | |
c.fetchers = 5 | |
c.ticker = time.Tick(50 * time.Millisecond) | |
c.outdir = "data/" | |
c.count = make(chan struct{}) | |
c.skip = make(chan struct{}) | |
c.done = make(chan struct{}) | |
c.pending = make(chan Doc, 256) | |
return c | |
} | |
func (c *Crawler) Start() { | |
go c.fetchPages() | |
for i := 0; i < c.fetchers; i++ { | |
go c.fetchDocs() | |
} | |
go c.printStats() | |
} | |
func (c *Crawler) Wait() { | |
<-c.done | |
} | |
func (c *Crawler) printStats() { | |
count := 0 | |
skip := 0 | |
t := time.Tick(5 * time.Second) | |
for { | |
select { | |
case <-c.count: | |
count += 1 | |
case <-c.skip: | |
skip += 1 | |
case <-t: | |
log.Printf("%d docs read, %d skipped, %d pending\n", count, skip, len(c.pending)) | |
} | |
} | |
} | |
func (c *Crawler) fetchPages() { | |
url := c.base + c.start | |
p, err := fetchPage(url) | |
if err != nil { | |
panic(err) | |
} | |
for p.Cur <= p.Total { | |
for _, d := range p.Docs { | |
c.pending <- d | |
} | |
nextPage := p.Cur + 1 | |
nextURL := fmt.Sprintf("%s?page=%d", url, nextPage) | |
// Read from ticker to throttle rate | |
<-c.ticker | |
np, err := fetchPage(nextURL) | |
if err != nil { | |
log.Printf("error while fetching page %v (%v), trying again", nextPage, err) | |
// If we encounter an error here, then just go round again. We don't | |
// update the value of p so we do add the documents from the current page | |
// to pending again, but it doesn't really matter because they'll just be | |
// skipped. | |
continue | |
} | |
p = np | |
} | |
close(c.pending) | |
close(c.done) | |
} | |
func (c *Crawler) fetchDocs() { | |
for d := range c.pending { | |
if c.shouldSkipDoc(d) { | |
c.skip <- struct{}{} | |
continue | |
} | |
// Read from ticker to throttle rate | |
<-c.ticker | |
content, err := fetchDoc(c.base + d.URL) | |
if err != nil { | |
log.Printf("error while processing %v (%v), queuing for retry", d, err) | |
c.retryDoc(d) | |
continue | |
} | |
c.writeDoc(d, content) | |
c.count <- struct{}{} | |
} | |
} | |
func (c *Crawler) docFilename(d Doc) string { | |
ts := strings.SplitN(d.Timestamp, "T", 2) | |
return path.Join(c.outdir, ts[0], fmt.Sprintf("%d.txt", d.Id)) | |
} | |
func (c *Crawler) shouldSkipDoc(d Doc) bool { | |
_, err := os.Stat(c.docFilename(d)) | |
return !os.IsNotExist(err) | |
} | |
func (c *Crawler) writeDoc(d Doc, content string) { | |
filename := c.docFilename(d) | |
dirname := path.Dir(filename) | |
err := os.MkdirAll(dirname, os.FileMode(0755)) | |
if err != nil { | |
log.Fatalln(err) | |
} | |
err = ioutil.WriteFile(filename, []byte(content), os.FileMode(0644)) | |
if err != nil { | |
log.Fatalln(err) | |
} | |
} | |
func (c *Crawler) retryDoc(d Doc) { | |
c.pending <- d | |
} | |
func fetchPage(url string) (*Page, error) { | |
res, err := http.Get(url) | |
if err != nil { | |
return nil, err | |
} | |
if res.StatusCode != http.StatusOK { | |
return nil, StatusNotOk | |
} | |
data, err := ioutil.ReadAll(res.Body) | |
res.Body.Close() | |
if err != nil { | |
return nil, err | |
} | |
page := &Page{} | |
err = json.Unmarshal(data, page) | |
if err != nil { | |
return nil, err | |
} | |
log.Printf("read page %d\n", page.Cur) | |
return page, nil | |
} | |
func fetchDoc(url string) (string, error) { | |
res, err := http.Get(url) | |
if err != nil { | |
return "", err | |
} | |
if res.StatusCode != http.StatusOK { | |
return "", StatusNotOk | |
} | |
doc, err := goquery.NewDocumentFromResponse(res) | |
res.Body.Close() | |
if err != nil { | |
log.Fatalln(err) | |
} | |
s := "# " | |
s += strings.Trim(doc.Find("h1").First().Text(), " \t\r\n") | |
s += "\n\n## " | |
s += strings.Trim(doc.Find("div.summary").First().Text(), " \t\r\n") | |
s += "\n\n" | |
s += strings.Trim(doc.Find("div.document.body").First().Text(), " \t\r\n") | |
s += "\n" | |
return s, nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment