Skip to content

Instantly share code, notes, and snippets.

@therako
Created February 17, 2017 02:34
Show Gist options
  • Save therako/a359745497feb7b8eb6ac2622b9cbd90 to your computer and use it in GitHub Desktop.
Save therako/a359745497feb7b8eb6ac2622b9cbd90 to your computer and use it in GitHub Desktop.
import time
import os
import csv
from datetime import datetime
from sqlalchemy import create_engine
from googleapiclient import discovery
from oauth2client.client import SignedJwtAssertionCredentials
from googleapiclient.http import MediaFileUpload
client_email = os.environ["BQ_CLIENT_EMAIL"]
private_key = os.environ["BQ_PRIVATE_KEY"]
scope = ['https://www.googleapis.com/auth/bigquery']
project_id = os.environ["BQ_PROJECT_ID"]
psql_bind = os.environ["PSQL_BIND"]
def __main__():
dataset_id = 'dataset_name'
table_id = 'user_table'
schema = [{"type": "INTEGER", "name": "user_id"},
{"type": "STRING", "name": "username"}]
sql_query = """
SELECT u.id user_id,
u.username username,
FROM user_table u
"""
bq_dump_data(schema, dataset_id, table_id, sql_query)
def bq_dump_data(schema, dataset_id, table_id, sql_query, sql_bind=psql_bind, fetch_size=3000000):
now = int(time.time())
data_path = '%s_%d.csv' % (table_id, now)
temp_table_id = '%s_%d' % (table_id, now)
disposable_table_id = '%s_disposable' % (temp_table_id,)
engine = create_engine(sql_bind, pool_timeout=3000000)
connection = engine.connect()
cursor = connection.execute(sql_query)
# Print BQ dump logs
print("BQ dumping... %s.%s -- %-10s rows found" % (dataset_id, table_id, cursor.rowcount))
results = cursor.fetchmany(fetch_size)
first_fetch = True
while results:
with open(data_path, 'w') as csvfile:
data_csv = csv.writer(csvfile, delimiter=',', lineterminator='\n', quotechar='"')
for result in results:
row = []
for r in result:
if type(r) is bool:
row.append(int(r))
elif type(r) is datetime:
unix_timestamp = (r.replace(tzinfo=None) - r.utcoffset() - datetime(1970, 1, 1)).total_seconds()
row.append(str(int(unix_timestamp)))
elif type(r) is unicode:
row.append(r.replace('\r\n', ' ').replace('\n', ' ').replace('\r', ' ').encode('utf-8'))
else:
row.append(r)
data_csv.writerow(row)
del results
bq_load_data(schema, data_path, dataset_id, disposable_table_id)
bq_copy_table(dataset_id, disposable_table_id, dataset_id, temp_table_id, is_write_truncate=first_fetch)
bq_delete_table(dataset_id, disposable_table_id)
os.remove(data_path)
results = cursor.fetchmany(fetch_size)
first_fetch = False
bq_copy_table(dataset_id, temp_table_id, dataset_id, table_id, is_write_truncate=True)
bq_delete_table(dataset_id, temp_table_id)
# Print BQ dump logs
print("BQ dump completed %s.%s -- completed" % (dataset_id, table_id))
# FUNCTIONS TO DUMP TO BQ
def bq_copy_table(source_dataset_id, source_table_id, destination_dataset_id, destination_table_id, is_write_truncate=False):
bigquery = get_bigquery()
insert_request = bigquery.jobs().insert(
projectId=project_id,
body={
'configuration': {
'copy': {
'sourceTable': {
'projectId': project_id,
'datasetId': source_dataset_id,
'tableId': source_table_id
},
'destinationTable': {
'projectId': project_id,
'datasetId': destination_dataset_id,
'tableId': destination_table_id
},
'writeDisposition': 'WRITE_TRUNCATE' if is_write_truncate else 'WRITE_APPEND'
}
}
})
job = insert_request.execute()
status_request = bigquery.jobs().get(
projectId=job['jobReference']['projectId'],
jobId=job['jobReference']['jobId'])
while True:
result = status_request.execute(num_retries=2)
if result['status']['state'] == 'DONE':
if result['status'].get('errors'):
raise RuntimeError('\n'.join(
e['message'] for e in result['status']['errors']))
return
time.sleep(1)
def bq_delete_table(dataset_id, table_id):
bigquery = get_bigquery()
delete_request = bigquery.tables().delete(projectId=project_id, datasetId=dataset_id, tableId=table_id)
return delete_request.execute()
def bq_load_data(schema, data_path, dataset_id, table_id):
bigquery = get_bigquery()
insert_request = bigquery.jobs().insert(
projectId=project_id,
body={
'configuration': {
'load': {
'schema': {
'fields': schema
},
'destinationTable': {
'projectId': project_id,
'datasetId': dataset_id,
'tableId': table_id
},
'sourceFormat': 'CSV',
'allowQuotedNewlines': True,
'writeDisposition': 'WRITE_TRUNCATE'
}
}
},
media_body=MediaFileUpload(
data_path,
mimetype='application/octet-stream'))
job = insert_request.execute()
status_request = bigquery.jobs().get(
projectId=job['jobReference']['projectId'],
jobId=job['jobReference']['jobId'])
while True:
result = status_request.execute(num_retries=2)
if result['status']['state'] == 'DONE':
if result['status'].get('errors'):
raise RuntimeError('\n'.join(
e['message'] for e in result['status']['errors']))
return
time.sleep(1)
def get_bigquery():
credentials = SignedJwtAssertionCredentials(client_email, private_key, scope)
bigquery = discovery.build('bigquery', 'v2', credentials=credentials)
return bigquery
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment