Skip to content

Instantly share code, notes, and snippets.

@tym-xqo
Last active April 15, 2019 15:49
Show Gist options
  • Save tym-xqo/5cfefe79d6fffdaaa5d0fdb06b5b74ba to your computer and use it in GitHub Desktop.
Save tym-xqo/5cfefe79d6fffdaaa5d0fdb06b5b74ba to your computer and use it in GitHub Desktop.
Incremental session data pull
import os
from argparse import ArgumentParser
from pathlib import Path
import records
from dotenv import load_dotenv
from sh import psql
from sqlalchemy.exc import ProgrammingError
load_dotenv()
TARGET_DB_URL = os.getenv('TARGET_DB_URL')
SOURCE_DB_URL = os.getenv('SOURCE_DB_URL')
# LOCAL_DB = 'postgres://tym@localhost/tym'
def get_target_max_id(table):
metal = records.Database(TARGET_DB_URL)
last_id = 0
try:
row = metal.query(
f'select coalesce(max(id), 0) as last_id from {table}')
max_ = row.first()
last_id = max_['last_id']
except ProgrammingError:
pass
print(table, last_id)
return (last_id)
def get_source_data(table):
cloud = records.Database(SOURCE_DB_URL)
last_id = get_target_max_id(table)
try:
rows = cloud.query(
f'select * from {table} where id > :last_id', last_id=last_id)
if rows.first():
with open(f'{table}.csv', 'w') as copyfile:
copyfile.write(rows.export('csv'))
return copyfile.name
except ProgrammingError:
return None
def load_source_to_target(table):
copyfile_name = f"{table}.csv"
copyfile = Path(copyfile_name)
if copyfile.is_file():
psql("-d", TARGET_DB_URL, "-c",
f"\\copy {table} from {copyfile} with csv header")
def fix_sequence(table):
metal = records.Database(TARGET_DB_URL)
sql = (f"SELECT SETVAL('public.{table}_id_seq', "
f"COALESCE(MAX(id), 1) ) FROM public.{table};")
metal.query(sql)
if __name__ == "__main__":
""" Get study_sessions data *after* study_session_views,
but load it first.
This avoids FK violations loading the views table
"""
parser = ArgumentParser()
parser.add_argument(
"-n",
"--no-load",
action="store_true",
dest="no_load",
default=False)
args = parser.parse_args()
# TABLES = [
# 'course_user_sessions', 'lifetime_users', 'mobile_sessions',
# 'web_sessions', 'sessions', 'v2_sessions', 'user_reports',
# 'study_sessions', 'study_session_views'
# ]
for table in ['v2_sessions', 'study_session_views', 'study_sessions']:
print(f"getting {table} data")
get_source_data(table)
print("Done!")
if not args.no_load:
for table in ['v2_sessions', 'study_sessions', 'study_session_views']:
print(f"loading {table}")
load_source_to_target(table)
fix_sequence(table)
print("Done!")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment