Skip to content

Instantly share code, notes, and snippets.

@lawrencejones
Created November 23, 2018 12:20
Show Gist options
  • Save lawrencejones/a4e40230ad38438bbc46dfbc7cc6200f to your computer and use it in GitHub Desktop.
Save lawrencejones/a4e40230ad38438bbc46dfbc7cc6200f to your computer and use it in GitHub Desktop.
BigQuery to Postgres for feature repo
import sqlalchemy
from sqlalchemy import Column, Integer, String, Table, MetaData, DateTime
from sqlalchemy.sql import func
from sqlalchemy.dialects.postgresql import insert, JSONB
from google.cloud import bigquery
from itertools import islice, chain
def batch(iterable, batch_size=500):
source = iter(iterable)
while True:
yield chain((next(source),), islice(source, batch_size-1))
def transform_row(row, primary_key):
return {
primary_key: row[primary_key],
"features": {
key: value for key, value in row.items()
if key not in [primary_key, "window_end_at"]
},
}
BIGQUERY_TABLES = {
"store_hashed_bank_details": "hashed_bank_details",
}
engine = sqlalchemy.create_engine("postgres://postgres@localhost:32769")
bqclient = bigquery.Client(project="gc-data-spike")
with engine.connect() as conn:
for table_name, primary_key in BIGQUERY_TABLES.items():
metadata = MetaData(engine)
table = Table(
table_name,
metadata,
Column("hashed_bank_details", String, primary_key=True),
Column("updated_at", DateTime, default=func.now(), nullable=False),
Column("features", JSONB, nullable=False),
)
if not engine.dialect.has_table(engine, table_name):
metadata.create_all()
job = bqclient.query("SELECT * FROM yasir_playground." + table_name + " LIMIT 20")
for rows in batch(job.result(), batch_size=10):
insert_stmt = insert(table).values([transform_row(row, primary_key) for row in rows])
do_update = insert_stmt.on_conflict_do_update(
index_elements=[primary_key],
set_={
"features": insert_stmt.excluded.features,
"updated_at": func.now(),
},
)
print(conn.execute(do_update))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment