Created
December 3, 2014 14:36
-
-
Save gartenfeld/2f27a52047121b385772 to your computer and use it in GitHub Desktop.
Importing data using PyMongo.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from bs4 import BeautifulSoup | |
import re # Regular Expressions | |
import collections # Data Types | |
import sys # File operations | |
import codecs # UniCode support | |
import os | |
import random | |
import locale | |
from pymongo import Connection # For DB Connection | |
from pymongo.errors import ConnectionFailure # For catching exeptions | |
def load_entry(file_name): | |
raw_file = source_path + file_name | |
raw_soup = BeautifulSoup(open(raw_file), from_encoding="utf-8") # Read the whole file into a soup | |
# Extract the content of the DIV and discard the outmost wrapper | |
entry_contents = raw_soup.find('div', class_="entry").contents | |
entries_str = "" | |
for entry_item in entry_contents: | |
entries_str = entries_str + str(entry_item) | |
return entries_str | |
def extract_string(soup_item): | |
out_string = "".join(soup_item.find_all(text=True)).strip() | |
return out_string | |
def unpack_head(head_soup): | |
# Extract HEADWORD | |
headword_soup = head_soup.find('div', class_="headword") | |
headword = extract_string(headword_soup) | |
# Extract INDICES | |
indices_soup = head_soup.find('div', class_="indices") | |
# Extract PRIMARY | |
primary_soup = indices_soup.find('div', class_="primary") | |
primary = extract_string(primary_soup) | |
# Create dictionary object | |
h_dict = { | |
"headword": headword, | |
"primary": primary | |
} | |
# Contingent Updates | |
# Try find FACTORS | |
factor_set = indices_soup.find_all('div', class_="factor") | |
# If found | |
if len(factor_set) > 0: | |
factors = [] | |
# Extract and pack FACTORS | |
for factor_tag in factor_set: | |
factor = extract_string(factor_tag) | |
factors.append(factor) | |
# Add array to dictionary | |
h_dict.update({'factors':factors}) | |
# Try find DOMAINS | |
domains_soup = head_soup.find('div', class_="domains") | |
if domains_soup != None: | |
domain_set = domains_soup.find_all('div', class_="domain") | |
domains = [] | |
for domain_tag in domain_set: | |
domain = extract_string(domain_tag) | |
domains.append(domain) | |
# Add array to dictionary | |
h_dict.update({'domains':domains}) | |
# Try find NOTES | |
notes_tag = head_soup.find('div', class_="notes") | |
if notes_tag != None: | |
notes = extract_string(notes_tag) | |
h_dict.update({'notes':notes}) | |
# Try find HG-NUMBER | |
hg_tag = head_soup.find('div', class_="hg-number") | |
if hg_tag != None: | |
hg_number = extract_string(hg_tag) | |
h_dict.update({'hg_number':hg_number}) | |
return h_dict | |
def process_all(files_list): | |
# MongoDB connection | |
print("Connecting to database...") | |
try: | |
# Here the default parameters are specified explicitly | |
db_connection = Connection(host="localhost", port=27017) | |
print ("Connected to MongoDB successfully!") | |
except (ConnectionFailure, e): | |
sys.stderr.write("Could not connect to MongoDB: %s" % e) | |
# Specify a database | |
db = db_connection["stage"] | |
# Iterate through all files | |
for i, file_name in enumerate(files_list): | |
if i%5000==0: print ("Progress: "+str(int(i*100/len(files_list)))+"%") | |
# Fetch the page content converted to a string | |
entries_string = load_entry(file_name) | |
# Split the content on dividers | |
entry_set = entries_string.split('<div class="divider"></div>') | |
for flat_entry in entry_set: | |
try: # For catching exceptions | |
# Convert the string into Soup | |
entry = BeautifulSoup(flat_entry) | |
# Unpack Headword Block | |
head_soup = entry.find('div', class_="head-block") | |
# Save unpacked data into a dictionary | |
head_dict = unpack_head(head_soup) | |
# Remove Headword Block from soup | |
head_soup.decompose() | |
# Add Headword Block to the entry dictionary | |
entry_dict = head_dict | |
# Convert what is left of the Soup into a string | |
senses_string = str(entry).strip() | |
# Remove line breaks | |
senses_string = re.sub(r'>\n<','><', senses_string) | |
# Add the key SENSES and its value to the entry dictionary | |
entry_dict.update({'senses':senses_string}) | |
# Insert document into DB | |
# Collections (such as 'sanat' here) are lazily created | |
db.sanat.insert(entry_dict, safe=True) | |
except: # Exception handler | |
print ("Something wrong with: " + file_name) | |
print ("Error message: ", sys.exc_info()) | |
continue | |
return | |
def load_directory(source_path): | |
files_list = [] | |
for file_name in os.listdir(source_path): | |
try: | |
if file_name.endswith(".html"): | |
files_list.append(file_name) | |
except IndexError: | |
sys.stderr.write("Something went wrong with " + file_name + ".") | |
continue | |
locale.setlocale(locale.LC_ALL, 'en_AU') | |
nr_loaded = locale.format("%d", len(files_list), grouping=True) | |
print(nr_loaded + " files loaded.") | |
return files_list | |
if __name__ == '__main__': | |
source_path = "Data-FI-EN/" | |
print("Loading files...") | |
files_list = load_directory(source_path) # Load list of raw files | |
print("Processing all files...") | |
process_all(files_list) | |
print("Valmis!") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment