Skip to content

Instantly share code, notes, and snippets.

@gartenfeld
Created December 3, 2014 14:36
Show Gist options
  • Save gartenfeld/2f27a52047121b385772 to your computer and use it in GitHub Desktop.
Save gartenfeld/2f27a52047121b385772 to your computer and use it in GitHub Desktop.
Importing data using PyMongo.
from bs4 import BeautifulSoup
import re # Regular Expressions
import collections # Data Types
import sys # File operations
import codecs # UniCode support
import os
import random
import locale
from pymongo import Connection # For DB Connection
from pymongo.errors import ConnectionFailure # For catching exeptions
def load_entry(file_name):
raw_file = source_path + file_name
raw_soup = BeautifulSoup(open(raw_file), from_encoding="utf-8") # Read the whole file into a soup
# Extract the content of the DIV and discard the outmost wrapper
entry_contents = raw_soup.find('div', class_="entry").contents
entries_str = ""
for entry_item in entry_contents:
entries_str = entries_str + str(entry_item)
return entries_str
def extract_string(soup_item):
out_string = "".join(soup_item.find_all(text=True)).strip()
return out_string
def unpack_head(head_soup):
# Extract HEADWORD
headword_soup = head_soup.find('div', class_="headword")
headword = extract_string(headword_soup)
# Extract INDICES
indices_soup = head_soup.find('div', class_="indices")
# Extract PRIMARY
primary_soup = indices_soup.find('div', class_="primary")
primary = extract_string(primary_soup)
# Create dictionary object
h_dict = {
"headword": headword,
"primary": primary
}
# Contingent Updates
# Try find FACTORS
factor_set = indices_soup.find_all('div', class_="factor")
# If found
if len(factor_set) > 0:
factors = []
# Extract and pack FACTORS
for factor_tag in factor_set:
factor = extract_string(factor_tag)
factors.append(factor)
# Add array to dictionary
h_dict.update({'factors':factors})
# Try find DOMAINS
domains_soup = head_soup.find('div', class_="domains")
if domains_soup != None:
domain_set = domains_soup.find_all('div', class_="domain")
domains = []
for domain_tag in domain_set:
domain = extract_string(domain_tag)
domains.append(domain)
# Add array to dictionary
h_dict.update({'domains':domains})
# Try find NOTES
notes_tag = head_soup.find('div', class_="notes")
if notes_tag != None:
notes = extract_string(notes_tag)
h_dict.update({'notes':notes})
# Try find HG-NUMBER
hg_tag = head_soup.find('div', class_="hg-number")
if hg_tag != None:
hg_number = extract_string(hg_tag)
h_dict.update({'hg_number':hg_number})
return h_dict
def process_all(files_list):
# MongoDB connection
print("Connecting to database...")
try:
# Here the default parameters are specified explicitly
db_connection = Connection(host="localhost", port=27017)
print ("Connected to MongoDB successfully!")
except (ConnectionFailure, e):
sys.stderr.write("Could not connect to MongoDB: %s" % e)
# Specify a database
db = db_connection["stage"]
# Iterate through all files
for i, file_name in enumerate(files_list):
if i%5000==0: print ("Progress: "+str(int(i*100/len(files_list)))+"%")
# Fetch the page content converted to a string
entries_string = load_entry(file_name)
# Split the content on dividers
entry_set = entries_string.split('<div class="divider"></div>')
for flat_entry in entry_set:
try: # For catching exceptions
# Convert the string into Soup
entry = BeautifulSoup(flat_entry)
# Unpack Headword Block
head_soup = entry.find('div', class_="head-block")
# Save unpacked data into a dictionary
head_dict = unpack_head(head_soup)
# Remove Headword Block from soup
head_soup.decompose()
# Add Headword Block to the entry dictionary
entry_dict = head_dict
# Convert what is left of the Soup into a string
senses_string = str(entry).strip()
# Remove line breaks
senses_string = re.sub(r'>\n<','><', senses_string)
# Add the key SENSES and its value to the entry dictionary
entry_dict.update({'senses':senses_string})
# Insert document into DB
# Collections (such as 'sanat' here) are lazily created
db.sanat.insert(entry_dict, safe=True)
except: # Exception handler
print ("Something wrong with: " + file_name)
print ("Error message: ", sys.exc_info())
continue
return
def load_directory(source_path):
files_list = []
for file_name in os.listdir(source_path):
try:
if file_name.endswith(".html"):
files_list.append(file_name)
except IndexError:
sys.stderr.write("Something went wrong with " + file_name + ".")
continue
locale.setlocale(locale.LC_ALL, 'en_AU')
nr_loaded = locale.format("%d", len(files_list), grouping=True)
print(nr_loaded + " files loaded.")
return files_list
if __name__ == '__main__':
source_path = "Data-FI-EN/"
print("Loading files...")
files_list = load_directory(source_path) # Load list of raw files
print("Processing all files...")
process_all(files_list)
print("Valmis!")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment