Skip to content

Instantly share code, notes, and snippets.

@akatrevorjay
Last active August 29, 2015 13:59
Show Gist options
  • Select an option

  • Save akatrevorjay/10950795 to your computer and use it in GitHub Desktop.

Select an option

Save akatrevorjay/10950795 to your computer and use it in GitHub Desktop.
Google IO 2014 Crawlers
#!/usr/bin/env python
##
## -----
## ATTN: Latest version has been moved to a repo, see: https://github.com/akatrevorjay/crawlfisher
## -----
##
## Crawls through URLs looking for goo.gl links, verifies they are IO event
## invites before saving them to sqlite.
## If it's a Youtube video watch link, also check annotations.
##
## If this helps you find a code, then it has served it's purpose.
## FYI, I haven't yet found one that isn't already used.
##
## ~trevorj 041714
#!/usr/bin/env python
from __future__ import print_function
import gevent.monkey
gevent.monkey.patch_all()
import gevent
import logging
logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger(__name__)
requests_log = logging.getLogger('requests');
requests_log.setLevel(logging.WARNING)
import requests
import re
import sqlite3
import datetime
from BeautifulSoup import BeautifulSoup, SoupStrainer
from urlparse import urlparse
#from Queue import Queue
from gevent.queue import Queue
q = Queue()
# Table syntax:
# create table googls (url varchar(255) unique, date datetime, found_url varchar(255));
conn = sqlite3.connect('googls.db')
crawled_urls = set()
checked_youtube_annotation_ids = set()
checked_googls = set()
class Crawler(gevent.Greenlet):
def __init__(self, q, conn):
self._q = q
self._conn = conn
self._cursor = conn.cursor()
gevent.Greenlet.__init__(self)
def run(self):
while True:
try:
url, self._parent_url = self._q.get()
except StopIteration:
log.info('Got StopIteration')
break
self.crawl(url)
def fix_relative_url(self, url):
if url.startswith('/'):
# get relative to self._parent_url
u = urlparse(self._parent_url)
url = '%s://%s%s' % (u.scheme, u.hostname, url)
log.debug('Fixed relative /url "%s" using "%s"', url, self._parent_url)
elif url.startswith('./'):
u = urlparse(self._parent_url)
url = '%s://%s%s%s' % (u.scheme, u.hostname, u.path, url)
log.debug('Fixed relative ./url "%s" using "%s"', url, self._parent_url)
return url
def test_googl(self, url):
""" Tests goo.gl url to see if it's an invite """
if not 'goo.gl' in url:
log.warning('Got bad GOOGL: "%s"', url)
return
if not url.startswith('http://'):
url = 'http://%s' % url
global checked_googls
if url in checked_googls:
log.debug('Skipping GOOGL: "%s"', url)
return
r = requests.get(url, allow_redirects=False)
loc = r.headers.get('location')
checked_googls.add(url)
if loc and loc.startswith('http://developers.google.com/events/io'):
log.error('Found GOOGL: "%s"', url)
ts = datetime.datetime.now()
try:
self._cursor.execute(
'''INSERT INTO googls (url, date, found_url)
VALUES (?, ?, ?)''',
(url, ts, self._cur_url))
self._conn.commit()
except Exception as e:
pass
return True
else:
return False
def test_googls(self, *urls):
for u in urls:
self.test_googl(u)
def find_googls(self, text):
m = re.findall(r'goo.gl/\w{6}', text)
if m:
#for i in m:
# yield i
self.test_googls(*m)
def find_urls_in_html(self, html):
for link in BeautifulSoup(html, parseOnlyThese=SoupStrainer('a')):
href = link.get('href')
if href:
yield link['href']
def fix_youtube_url(self, url):
#m = re.search(r'(?:https?://(?:www\.)?youtube.com/)?/?(?:watch\?)?(?:v=)?([-_A-z0-9]+)', url)
m = re.search(r'^(?:https?://(?:www\.)?youtube.com/)?/?(?:watch\?)?(?:v=)?([-_A-z0-9]+)$', url)
if m:
url = m.groups()[0]
return url
def check_youtube_annotations(self, url):
video_id = self.fix_youtube_url(url)
if not video_id:
return
global checked_youtube_annotation_ids
if video_id in checked_youtube_annotation_ids:
return
log.debug('Checking video annotations for: "%s"', video_id)
annot_url = 'https://www.youtube.com/annotations_invideo?features=1&legacy=1&video_id=%s' % video_id
r = requests.get(annot_url)
self.find_googls(r.text)
checked_youtube_annotation_ids.add(video_id)
def crawl(self, url):
m = re.search(r'(?:^\/+(?:www\.)google.com|account)', url)
if m:
log.debug('Skipping URL: "%s"', url)
return
url = self.fix_relative_url(url)
m = re.search(r'(?:youtube\.com|(?:plus|developers)\.google\.com)', url)
if not m:
log.debug('Skipping URL: "%s"', url)
return
if url in crawled_urls:
log.debug('Already crawled URL: "%s"', url)
return
self._cur_url = url
crawled_urls.add(url)
log.info('Crawling: "%s"', url)
self.check_youtube_annotations(url)
try:
r = requests.get(url)
except Exception as e:
log.error('Exception: %s', e)
return
self.find_googls(r.text)
urls = set(self.find_urls_in_html(r.text))
# Check child videos for annotations first
for child in urls:
self._q.put((child, url))
def crawl_one(url):
c = Crawler(q, conn)
c.start()
q.put((url, ''))
#gevent.wait()
def crawl_pool(url, num):
crawlers = []
for _ in range(num):
c = Crawler(q, conn)
c.start()
crawlers.append(c)
q.put((url, ''))
#gevent.wait()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment