Created
August 23, 2020 18:57
-
-
Save darvell/bc0a672237c1759785ed914a8d8177c0 to your computer and use it in GitHub Desktop.
Starting point for a scraper for use with FirstVoices. For archival purposes only.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#/usr/bin/python3 | |
try: | |
import requests | |
except: | |
raise Exception("Requests library not installed. Please run 'easy_install requests' or 'pip3 install requests") | |
import os | |
import csv | |
import json | |
modify_mp3 = False | |
try: | |
import mutagen | |
from mutagen.easyid3 import EasyID3 | |
from mutagen.id3 import ID3 | |
modify_mp3 = True | |
except: | |
print("Mutagen not installed, MP3's won't have comments.") | |
class FirstVoicesApi(): | |
def __init__(self, language_name = None): | |
self.language_name = language_name | |
self.session = requests.Session() | |
self.session.headers["Accept"] = "*/*" | |
self.session.headers["enrichers.document"] = "ancestry,dialect,character,word,media,book" | |
self.session.headers["Accept-Encoding"] = "gzip, deflate, br" | |
self.session.headers["properties"] = "*" | |
self.session.headers["Origin"] = "https://www.firstvoices.com" | |
self.session.headers["Nuxeo-transaction-timeout"] = "60000" | |
self.session.headers["Accept-Language"] = "en-US,en;q=0.9" | |
self.session.headers["User-Agent"] = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.135 Safari/537.36" | |
with self.session.get("https://www.firstvoices.com",stream=True) as base_get: | |
pass | |
with self.session.get("https://www.firstvoices.com/nuxeo/login.jsp?nxtimeout=true&forceAnonymousLogin=true", stream=True) as user_get: | |
pass | |
self.languages = {} | |
self.language_objects = [] | |
for language in self.get_all_languages(): | |
self.languages[str(language)] = {"words": [], "language": {"family": language.language_family, "subgroup": language.subgroup, "dialect": language.dialect}} | |
def get_all_languages(self): | |
if len(self.language_objects) > 0: | |
return self.language_objects | |
print("Getting all languages...") | |
result = [] | |
data = self.session.get("https://www.firstvoices.com/nuxeo/api/v1/query/get_dialects?pageProvider=get_dialects&queryParams=sections").json() | |
for entry in data["entries"]: | |
path_split = entry["path"].split("/") | |
result.append(LanguageDefinition(path_split[-4],path_split[-3],path_split[-2], entry["uid"],entry["path"])) | |
print(f"Retrieved {len(result)} languages.") | |
self.language_objects = result | |
return result | |
def get_language(self,family,group,dialect): | |
for language in self.get_all_languages(): | |
if language.language_family in family and language.group in group and language.dialect in dialect: | |
return language | |
def get_language(self,dialect): | |
for language in self.get_all_languages(): | |
if dialect.lower() in language.dialect.lower(): | |
return language | |
elif dialect.lower() in language.subgroup.lower(): | |
return language | |
elif dialect.lower() in language.language_family.lower(): | |
return language | |
raise Exception("Can't find language.") | |
def get_language_family(self, family): | |
result = [] | |
for language in self.get_all_languages(): | |
if language.language_family.lower() in family.lower(): | |
result.append(language) | |
return result | |
def query_words(self, language): | |
if not isinstance(language, LanguageDefinition): | |
language = self.get_language(language) | |
print("Setting up session for full word query.") | |
wrd_req = self.session.get(f"https://www.firstvoices.com/explore/FV/sections/Data/{str(language)}/learn/words/") | |
print("Getting page 1 of ?") | |
results = self.query("FVWord", language, page_size=2000) | |
entries = results["entries"] | |
if results["numberOfPages"] > 1: | |
for x in range(1,results["numberOfPages"] + 1 ): | |
print("Getting page {0} of {1}".format(x + 1, results["numberOfPages"])) | |
entries.extend(self.query("FVWord", language, page_size=2000, page=x)["entries"]) | |
words = [] | |
for entry in entries: | |
words.append(Word(entry)) | |
self.languages[str(language)]["entries"] = entries | |
self.languages[str(language)]["words"] = words | |
return words | |
# FVWord, FVCharacter, FVDialect, FVPhrase, FVWord | |
def query(self, db_name = "FVWord", language = None, **kwargs): | |
base_url = "https://www.firstvoices.com/nuxeo/api/v1/automation/Document.EnrichedQuery" | |
escaped_lang = str(language).replace("'","\\'") | |
query = f"SELECT * FROM {db_name} WHERE ecm:path STARTSWITH '/FV/sections/Data/{escaped_lang}' AND ecm:isVersion = 0 AND ecm:isTrashed = 0 " | |
page_size = kwargs['page_size'] if 'page_size' in kwargs else 10 | |
current_page = kwargs["page"] if 'page' in kwargs else 0 | |
payload = {"params": | |
{"language":"NXQL","dialectId": "undefined","sortBy":"fv:custom_order","pageSize": page_size ,"sortOrder":"asc","currentPageIndex":f"{current_page}","query":query},"context":{}} | |
results = self.session.post(base_url, json=payload).json() | |
return results | |
def dump_language(self, language, target_directory = None): | |
if isinstance(language,LanguageDefinition) == False: | |
language = self.get_language(language) | |
if target_directory == None: | |
target_directory = os.path.join(os.path.abspath("."), language.language_family,language.subgroup,language.dialect) | |
os.makedirs(target_directory, exist_ok=True) | |
if len(self.languages[str(language)]["words"]) <= 0: | |
print(f"Retrieving words for {language}.") | |
self.query_words(language) | |
print("Generating CSV.") | |
with open(os.path.join(target_directory,'language.csv'), 'w') as csvf: | |
csvfile = csv.writer(csvf) | |
csvfile.writerow(["Word","Plural","Translation","Category","Lexical Category","Notes","Pronunciation", "Recording", "Original Recording URL"]) | |
for word in self.languages[str(language)]["words"]: | |
csvfile.writerow([word.word,word.plural,word.translation,word.category,word.lexical_category, word.notes,word.pronunciation, word.word.replace("/","%2F").replace("\\","_") + ".mp3" if word.recording_url else "" ,"https://www.firstvoices.com/nuxeo/" + word.recording_url if word.recording_url else ""]) | |
print("Generating JSON dump.") | |
words_raw = {} | |
for word in self.languages[str(language)]["words"]: | |
words_raw[word.word] = word.raw_entry | |
with open(os.path.join(target_directory, "language_raw.json"), "w",encoding="utf8") as f: | |
f.write(json.dumps(words_raw, indent=2, sort_keys=True,ensure_ascii=False)) | |
print("Downloading recordings.") | |
for word in self.languages[str(language)]["words"]: | |
audio = word.get_audio_dl() | |
if audio: | |
try: | |
filename = word.get_sanitized_filename() | |
with open(os.path.join(target_directory, filename), "wb+") as f: | |
with self.session.get(audio[1], stream=True) as s: | |
f.write(s.content) | |
if modify_mp3 and '.mp3' in filename: | |
try: | |
mp3 = ID3(os.path.join(target_directory, filename), translate=True, v2_version=4) | |
mp3.save() | |
mp3 = EasyID3(os.path.join(target_directory, filename)) | |
original_filename = word.original_filename | |
mp3["title"] = word.word | |
comment = f"{word.word} -> {word.translation}\n({original_filename})" | |
if len(comment.encode("utf8")) >= 28: | |
comment = original_filename | |
mp3["comment"] = comment | |
mp3.save() | |
except: | |
pass | |
except: | |
print(f"Unable to download {audio[0]} {audio[1]} -> {filename}") | |
def download_word_data(self, word): | |
pass | |
class Word(): | |
def __init__(self, word_obj): | |
self.raw_entry = word_obj | |
property_obj = word_obj["properties"] | |
self.lexical_category = property_obj["fv-word:part_of_speech"] | |
self.notes = "\n".join(property_obj["fv-word:notes"]) | |
self.word = word_obj["title"] | |
self.plural = ", ".join(property_obj["fv-word:plural"]) if len(property_obj["fv-word:plural"]) > 0 else "" | |
self.plural = self.plural.strip() | |
self.cultural_note = "" | |
self.recording_source = "" | |
if len(property_obj["fv:cultural_note"]) != 0: | |
self.cultural_note = ", ".join(property_obj["fv:cultural_note"]) | |
if len(self.notes.strip('\n')): | |
self.notes = "" | |
self.category = "" | |
self.translation = "" | |
context = word_obj["contextParameters"]["word"] | |
if 'categories' in context and len(context["categories"]) > 0: | |
self.category = ", " .join([x["dc:title"] for x in context["categories"]]) | |
self.category = self.category.strip() | |
if self.category[-1] == ',': | |
self.category = self.category[:-1] | |
self.picture_url = None | |
if 'related_pictures' in context and len(context["related_pictures"]) > 0: | |
self.picture_url = context["related_pictures"][0]["path"] | |
self.recording_url = None | |
if 'related_audio' in context and len(context["related_audio"]) > 0: | |
self.recording_url = "https://www.firstvoices.com/" + context["related_audio"][0]["path"] | |
self.original_filename = self.recording_url.split("/")[-1] | |
self.filetype = self.original_filename.split(".")[-1] | |
if 'sources' in context and len(context['sources']) > 0: | |
self.recording_source = ", ".join([x["dc:title"] for x in context["sources"]]) | |
if self.recording_source[-2:] == ", ": | |
self.recording_source = self.recording_source[:-2] | |
for translations in property_obj["fv:definitions"]: | |
if "en" in translations["language"]: | |
self.translation = translations["translation"] | |
self.pronunciation = property_obj["fv-word:pronunciation"] if property_obj["fv-word:pronunciation"] else "" | |
def get_audio_dl(self): | |
if self.recording_url == None: | |
return | |
return self.parse_dl(self.recording_url) | |
def get_sanitized_filename(self): | |
if self.recording_url == None: | |
return "" | |
return self.word.replace("/","%2F").replace("\\","_") + "." + self.filetype.replace(".","") | |
def get_picture_dl(self): | |
if self.picture_url is None: | |
return | |
return self.parse_dl(self.picture_url) | |
def parse_dl(self, path): | |
if path == None or len(path.strip()) == 0: | |
return None | |
real_filename = path.split('/')[-1] | |
download_url = '/'.join(path.split('/')[:-1]) | |
download_url = "www.firstvoices.com/nuxeo/" + download_url | |
download_url = "https://" + download_url.replace("//","/") | |
return (real_filename, download_url) | |
class LanguageDefinition(): | |
def __init__(self, language_family, subgroup, dialect, portal_id = None, path = None): | |
self.language_family = language_family | |
self.subgroup = subgroup | |
self.dialect = dialect | |
self.portal_id = portal_id | |
self.path = path | |
def __str__(self): | |
return f"{self.language_family}/{self.subgroup}/{self.dialect}" | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment