Skip to content

Instantly share code, notes, and snippets.

@sirex
Created May 13, 2015 17:06
Show Gist options
  • Save sirex/32818379a55fc8e3edf5 to your computer and use it in GitHub Desktop.
Save sirex/32818379a55fc8e3edf5 to your computer and use it in GitHub Desktop.
from sqlalchemy_utils import database_exists, create_database, drop_database
from sqlalchemy.dialects import postgresql, mysql, sqlite
from sqlalchemy import (
create_engine, select, func, and_, Column, Table, MetaData,
Integer, BigInteger, SmallInteger, LargeBinary,
)
BigInt = BigInteger()
BigInt = BigInt.with_variant(postgresql.BIGINT(), 'postgresql')
BigInt = BigInt.with_variant(mysql.BIGINT(), 'mysql')
BigInt = BigInt.with_variant(sqlite.INTEGER(), 'sqlite')
class Storage(object):
def __init__(self):
self.metadata = MetaData()
self.engines = {}
self.tables = {}
self.schemas = {
'tasks': (
Column('pk', BigInt, primary_key=True),
Column('key', Integer, index=True),
Column('status', SmallInteger),
Column('data', LargeBinary),
),
'items': (
Column('pk', BigInt, primary_key=True),
Column('key', Integer, index=True),
Column('status', SmallInteger),
Column('data', LargeBinary),
),
'processed_items': (
Column('pk', BigInt, primary_key=True),
Column('key', Integer, index=True),
Column('status', SmallInteger),
Column('data', LargeBinary),
),
}
def get_engine(self, url):
if url not in self.engines:
if not database_exists(url):
create_database(url)
self.engines[url] = create_engine(url)
return self.engines[url]
def get_table(self, url, schema, name):
if (url, schema, name) not in self.tables:
table = Table(name, self.metadata, *self.schemas[schema])
table.create(self.get_engine(url))
self.tables[(url, schema, name)] = table
return self.tables[(url, schema, name)]
def query_new_items(self):
pass
class Task(object):
def __init__(self, storage, name, source=None):
self.storage = storage
self.name = name
self.source = source
def open(self):
pass
def close(self):
pass
def process(self):
for item in self.items():
self.parse(item)
def items(self):
return self.storage.query_new_items()
def parse(self, item):
pass
class Orchestrator(object):
def __init__(self, config):
self.config = config
def run(self):
for group in self.config:
self.run_group(group['name'])
def get_group(self, group_name):
for group in self.config:
if group['name'] == group_name:
return group
def run_group(self, group_name):
group = self.get_group(group_name)
for task in group['tasks']:
self.run_task(group_name, task['name'])
def get_task(self, group_name, task_name):
for group in self.config:
if group['name'] == group_name:
for task in group['tasks']:
if task['name'] == task_name:
return group, task
def run_task(self, group_name, task_name):
group, task = self.get_task()
Scraper = task.pop('scraper')
scraper = Scraper(**task)
scraper.open()
scraper.process()
scraper.close()
class Download(object):
pass
import unittest
import scraper
import scrapers
class ScraperTests(unittest.TestCase):
def test_(self):
orchestrator = scraper.Orchestrator([
{
'name': 'mps',
'database': 'sqlite:///mps.db',
'tasks': [
{
'name': 'download-start-urls',
'scraper': scraper.Download,
'urls': [
'http://www3.lrs.lt/pls/inter/w5_show?p_r=8801&p_k=1',
],
},
{
'name': 'get-profile-links',
'scraper': scrapers.ProfileLinks,
'source': 'download-start-urls',
},
{
'name': 'download-profile-links',
'scraper': scraper.Download,
'source': 'get-profile-links',
},
{
'name': 'get-profile-details',
'scraper': scrapers.ProfileLinks,
'source': 'download-start-urls',
},
{
'name': 'write-csv-file',
'scraper': scraper.CsvWriter,
'source': 'get-profile-details',
'path': 'profile-details.csv',
},
],
},
])
orchestrator.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment