Skip to content

Instantly share code, notes, and snippets.

@jsam
Created August 15, 2014 14:34
Show Gist options
  • Save jsam/1b85ebb4d7020fea91b0 to your computer and use it in GitHub Desktop.
Save jsam/1b85ebb4d7020fea91b0 to your computer and use it in GitHub Desktop.
my ETL with dataset/sqlalchemy with some sample data users->albums->photos
import dataset
import json
import yaml
from lxml import etree
db = dataset.connect('sqlite:///data.db')
users_table = db['users']
albums_table = db['albums']
photos_table = db['photos']
# extract
class Extract(object):
def __init__(self):
pass
def get_users(self):
_users = None
with open("static/json/users.json") as f:
_users = json.load(f)
return _users
def get_albums(self):
tree = etree.parse("static/xml/albums.xml")
return tree
def get_photos(self):
_photos = None
with open("static/yaml/photos.yaml") as f:
_photos = yaml.load(f)
return _photos
# transform
class Transform(object):
def __init__(self):
self.e = Extract()
def get_clean_users(self):
_users = self.e.get_users()
for item in _users:
if 'company' in item:
del item['company']
if 'address' in item:
del item['address']
if 'geo' in item:
del item['geo']
return _users
def _etree_to_dict(self, t):
d = {t.tag : map(self._etree_to_dict, t.iterchildren())}
d.update(('@' + k, v) for k, v in t.attrib.iteritems())
d['text'] = t.text
return d
def get_clean_albums(self):
albums_xml_tree = self.e.get_albums()
_albums = self._etree_to_dict(albums_xml_tree.getroot())['data']
sol = list()
j = 1
for i in xrange(0, len(_albums)):
if j == 1:
sol.append({ 'id': _albums[i]['text'], 'title': _albums[i + 1]['text'], 'userId': _albums[i + 2]['text'] })
j += 1
if j == 4:
j = 1
return sol
def get_clean_photos(self):
return self.e.get_photos()
# load
class Load(object):
def __init__(self):
self._transformer = Transform()
def load_users(self):
_users = self._transformer.get_clean_users()
for item in _users:
print "[INFO] Inserting to users table: {}".format(item)
users_table.insert(item)
def load_albums(self):
_albums = self._transformer.get_clean_albums()
for item in _albums:
print "[INFO] Inserting to albums table: {}".format(item)
albums_table.insert(dict(item))
def load_photos(self):
_photos = self._transformer.get_clean_photos()
for item in _photos:
print "[INFO] Inserting to photos table: {}".format(item)
photos_table.insert(dict(item))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment