Skip to content

Instantly share code, notes, and snippets.

@sstugk
Last active September 13, 2015 20:50
Show Gist options
  • Select an option

  • Save sstugk/67a3f3af8f6fa9f1891c to your computer and use it in GitHub Desktop.

Select an option

Save sstugk/67a3f3af8f6fa9f1891c to your computer and use it in GitHub Desktop.
PyVideo: SciPy 2015 import
#!/usr/bin/env python
# coding: utf-8
from __future__ import unicode_literals
import os, sys
import re
import json
import urllib2
from bs4 import BeautifulSoup
from pattern import web
#from steve.util import (
# #get_from_config,
# get_project_config,
# save_json_files,
# load_json_files,
#)
import json
def parse_speaker_data(datagroup):
title = datagroup.find('span', attrs={'style': 'font-weight:bold;'}).text
anchors = datagroup.find_all('a')
speakernames = []
for span in datagroup.find_all('span', class_='speaker_name'):
speakernames.append(span.text)
return {
title: {
'speaker_names': speakernames,
'speaker_links': [a.text for a in anchors if 'speakerid' in a.attrs.get('href', '')],
}
}
def parse_slot(slot):
result = [s for s in slot.strings]
if len(result) != 0:
title = result[0]
return {
title: {
"speaker_data": result[1:-1],
"description": "".join(result[-1:]),
}
}
return {'': {}}
def parse_descriptions(fname):
talkdata = {}
with open(fname) as fh:
soup = BeautifulSoup(fh)
timeslots = soup.find_all(class_='agenda_time_slot')
for t in timeslots:
talkdata.update(parse_slot(t))
return talkdata
def parse_speakers(fname):
speakerdata = {}
with open(fname) as fh:
soup = BeautifulSoup(fh)
datagroups = soup.find_all(class_='data-group')
for talk in datagroups:
speakerdata.update(parse_speaker_data(talk))
return speakerdata
def merge2steve(sched, stevedata):
for f, data in stevedata:
title = data.get("title")
if title in sched:
data.update(sched.get(title))
def parse_speaker_title(title):
"""
"title": "LIGHTNING TALKS | SciPy 2015 | 20150709",
"title": "Statistical Thinking for Data Science | SciPy 2015 | Chris Fonnesbeck",
"title": "Keynote: Data Science at the New York Times | SciPy 2015 | Chris Wiggins",
"""
title = title.split(' | ')
return title[0]
def clean_data(data):
title = data.get('title', '')
data["title"] = parse_speaker_title(title)
data["tags"] = []
data["language"] = "English"
def process_raw_data(cfg):
file_data = load_json_files(cfg)
for fname, data in file_data:
data["category"] = "SciPy 2015"
clean_data(data)
save_json_files(cfg, file_data)
# from: http://hetland.org/coding/python/levenshtein.py
def levenshtein(a,b):
"Calculates the Levenshtein distance between a and b."
n, m = len(a), len(b)
if n > m:
# Make sure n <= m, to use O(min(n,m)) space
a,b = b,a
n,m = m,n
current = range(n+1)
for i in range(1,m+1):
previous, current = current, [i]+[0]*n
for j in range(1,n+1):
add, delete = previous[j]+1, current[j-1]+1
change = previous[j-1]
if a[j-1] != b[i-1]:
change = change + 1
current[j] = min(add, delete, change)
return current[n]
if __name__ == "__main__":
YOUTUBE_API_KEY = '<some youtube API key>'
yt_playlist = []
talks = []
videos = []
reload(sys)
sys.setdefaultencoding('utf8')
"""
Download the talk details (if the `./data/talks_raw/` directory doesn't exist)
"""
if not os.path.isdir("./data/talks_raw/"):
os.makedirs("./data/talks_raw/")
with open("./data/scipytalklinks.txt", "r") as input_file:
talks_links = input_file.readlines()
for idx, link in enumerate(talks_links):
talk_id = int(re.match('.*sessionid=(\d+)', link).group(1))
print "%d (%d of %d)" % (talk_id, idx+1, len(talks_links))
talk_raw = urllib2.urlopen(link).read()
with open("./data/talks_raw/%d.html" % talk_id, "w") as output_file:
output_file.write(talk_raw)
"""
Download Youtube playlist data via API (if `./data/videos.json` and './data/youtube_playlist.json' don't exist)
"""
if not os.path.isfile('./data/videos.json') \
and not os.path.isfile('./data/youtube_playlist.json'):
yt_api_baselink = "https://www.googleapis.com/youtube/v3/playlistItems?part=snippet&playlistId=PLYx7XA2nY5Gcpabmu61kKcToLz0FapmHu&key=" + YOUTUBE_API_KEY
yt_api_nextpage_token = None
yt_api_response = None
while True:
yt_api_link = yt_api_baselink
if yt_api_nextpage_token is not None:
yt_api_link = yt_api_link + "&pageToken=" + yt_api_nextpage_token
yt_api_response = urllib2.urlopen(yt_api_link).read()
yt_api_response = json.loads(yt_api_response)
yt_playlist = yt_playlist + yt_api_response['items']
if yt_api_response.has_key('nextPageToken'):
yt_api_nextpage_token = yt_api_response['nextPageToken']
else:
break
with open('./data/youtube_playlist.json', 'w') as fh:
fh.write(json.dumps(yt_playlist, fh, sort_keys=True, indent=4, ensure_ascii=False))
"""
Extract the video data from the playlist (if `./data/videos.json` doesn't exist)
"""
if not os.path.isfile('./data/videos.json'):
with open('./data/youtube_playlist.json') as fh:
yt_playlist = json.load(fh)
for item in yt_playlist:
yt_video_id = item['snippet']['resourceId']['videoId']
yt_link = 'http://www.youtube.com/watch?v=' + yt_video_id
yt_title = item['snippet']['title']
videos.append(dict(yt_id=yt_video_id, link=yt_link, yt_title=yt_title))
with open('./data/videos.json', 'w') as fh:
fh.write(json.dumps(videos, fh, sort_keys=True, indent=4, ensure_ascii=False))
if len(videos) == 0:
with open('./data/videos.json', 'r') as fh:
videos = json.load(fh)
"""
Parse the talk data: title, description, author names, author detail links
(if the file `./data/talks.json` does not exist)
"""
if not os.path.isfile('./data/talks.json'):
for rel_path in os.listdir("./data/talks_raw/"):
if not rel_path.endswith(".html"):
continue
with open("./data/talks_raw/" + rel_path) as fh:
html = fh.read()
dom = web.DOM(html)
# session id
sess_id = int(rel_path.split(".")[0])
parts = dom('body > strong')
# title
title = web.plaintext(parts[0].content)
# description
# old variant (didn't work in all cases):
# descr = str(parts[1].next.next.children[2])
for node in parts[1].next.next.children:
# look for first text node with length > 7
# (all other cases are single text nodes of the form '&#13;\n')
if node.type == 'text' and len(str(node)) > 7:
descr = str(node)
break
# authors
authors = map(lambda elem: dict(link = elem.attrs['href'],
name = re.sub('\s+',' ',elem.content)),
parts[1].next.next('td > a'))
talks.append(dict(sess_id = sess_id,
title = title,
authors = authors,
desc = descr,
yt_link = None,
type = 'talk_or_postersession'))
talks = sorted(talks, key=lambda elem: elem['title'])
"""
Merge data from the homepage and Youtube
"""
talks_added = []
for video in videos:
yt_title_raw = video['yt_title']
title_data = re.split('\s*\|\s*', yt_title_raw)
title = title_data[0]
authors = []
v_type = "talk_or_postersession"
# recognize Lightning talks and reformat title
if title.lower().startswith('lightning talks') and len(title_data) == 3:
title = " ".join(map(lambda w: w[0].upper() + w[1:], re.split('\s+', title.lower())))
title += " " + ("%s%s-%s-%s" % tuple(re.findall('..', title_data[-1])))
v_type = "lightning_talks"
# Fix cases where the "Part X" notice is placed after author names
elif len(title_data) == 3:
authors = title_data[-1]
sanitize_match = re.match('(.*)\s+(Part ([IV]+|\d+))$', title_data[-1])
if sanitize_match:
title += " " + sanitize_match.group(2)
authors = sanitize_match.group(1)
authors = re.split('\s*(?:[&,]|(?<!\w)and)\s*', authors)
else:
authors = []
title = re.sub('\s+', ' ', title)
authors = filter(len, map(lambda a: re.sub('\s+', ' ', a), authors))
authors = map(lambda a: dict(name=a, link=None), authors)
# recognize keynotes and tutorials
if title_data[0].lower().startswith('keynote'):
v_type = "keynote"
elif len(title_data) >= 2:
if (title_data[1].lower().endswith('tutorial')):
v_type = "tutorial"
# Keynotes, tutorials and Lightning talks are not contained
# in the homepage data and can be added without further checks
if v_type in ("keynote", "tutorial", 'lightning_talks'):
talks_added.append(dict(sess_id = None,
title = title,
authors = authors,
desc = None,
yt_link = video['link'],
type = v_type))
continue
# match Youtube titles against the ones frome the homepage
try:
# replace all special chars ("-", "&", etc.) by single spaces
# and lowercase everything
vt = re.sub('\s*[^\w ]\s*', ' ', title).lower()
# matching criteria (one of them must be fulfilled)
# 1. Levenshtein distance < 5
# 2 and 3. one of the title variants is prefix of the other
search = next((t for t in talks
if levenshtein(vt, re.sub('\s*[^\w ]\s*', ' ', t['title']).lower()) < 5
or re.sub('\s*[^\w ]\s*', ' ', t['title']).lower().startswith(vt)
or vt.startswith(re.sub('\s*[^\w ]\s*', ' ', t['title']).lower())
))
# ... and add the link to the existing entry
search['yt_link'] = video['link']
except StopIteration as e:
# one silly special case reaches this point (no matching):
# homepage link: http://www.scipy2015.scipy.org/ereg/popups/sessiondetails.php?eventid=115969&sessionid=7424519&sessionchoice=1&&
# youtube link: http://www.youtube.com/watch?v=XjHzLUnHeM0
pass
# save everything
talks = sorted(talks + talks_added, key=lambda elem: elem['title'])
with open('./data/talks.json', 'w') as fh:
fh.write(json.dumps(talks, fh, sort_keys=True, indent=4, ensure_ascii=False))
if len(talks) == 0:
if os.path.isfile('./data/talks_fixed.json'):
talks_path = './data/talks_fixed.json'
else:
talks_path = './data/talks.json'
with open(talks_path, 'r') as fh:
talks = json.load(fh)
# print talks[0]
# cfg = get_project_config()
# process_raw_data(cfg)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment