Created
August 15, 2014 14:34
-
-
Save jsam/1b85ebb4d7020fea91b0 to your computer and use it in GitHub Desktop.
my ETL with dataset/sqlalchemy with some sample data users->albums->photos
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import dataset | |
import json | |
import yaml | |
from lxml import etree | |
db = dataset.connect('sqlite:///data.db') | |
users_table = db['users'] | |
albums_table = db['albums'] | |
photos_table = db['photos'] | |
# extract | |
class Extract(object): | |
def __init__(self): | |
pass | |
def get_users(self): | |
_users = None | |
with open("static/json/users.json") as f: | |
_users = json.load(f) | |
return _users | |
def get_albums(self): | |
tree = etree.parse("static/xml/albums.xml") | |
return tree | |
def get_photos(self): | |
_photos = None | |
with open("static/yaml/photos.yaml") as f: | |
_photos = yaml.load(f) | |
return _photos | |
# transform | |
class Transform(object): | |
def __init__(self): | |
self.e = Extract() | |
def get_clean_users(self): | |
_users = self.e.get_users() | |
for item in _users: | |
if 'company' in item: | |
del item['company'] | |
if 'address' in item: | |
del item['address'] | |
if 'geo' in item: | |
del item['geo'] | |
return _users | |
def _etree_to_dict(self, t): | |
d = {t.tag : map(self._etree_to_dict, t.iterchildren())} | |
d.update(('@' + k, v) for k, v in t.attrib.iteritems()) | |
d['text'] = t.text | |
return d | |
def get_clean_albums(self): | |
albums_xml_tree = self.e.get_albums() | |
_albums = self._etree_to_dict(albums_xml_tree.getroot())['data'] | |
sol = list() | |
j = 1 | |
for i in xrange(0, len(_albums)): | |
if j == 1: | |
sol.append({ 'id': _albums[i]['text'], 'title': _albums[i + 1]['text'], 'userId': _albums[i + 2]['text'] }) | |
j += 1 | |
if j == 4: | |
j = 1 | |
return sol | |
def get_clean_photos(self): | |
return self.e.get_photos() | |
# load | |
class Load(object): | |
def __init__(self): | |
self._transformer = Transform() | |
def load_users(self): | |
_users = self._transformer.get_clean_users() | |
for item in _users: | |
print "[INFO] Inserting to users table: {}".format(item) | |
users_table.insert(item) | |
def load_albums(self): | |
_albums = self._transformer.get_clean_albums() | |
for item in _albums: | |
print "[INFO] Inserting to albums table: {}".format(item) | |
albums_table.insert(dict(item)) | |
def load_photos(self): | |
_photos = self._transformer.get_clean_photos() | |
for item in _photos: | |
print "[INFO] Inserting to photos table: {}".format(item) | |
photos_table.insert(dict(item)) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment