Created
February 17, 2017 02:34
-
-
Save therako/a359745497feb7b8eb6ac2622b9cbd90 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import os | |
import csv | |
from datetime import datetime | |
from sqlalchemy import create_engine | |
from googleapiclient import discovery | |
from oauth2client.client import SignedJwtAssertionCredentials | |
from googleapiclient.http import MediaFileUpload | |
client_email = os.environ["BQ_CLIENT_EMAIL"] | |
private_key = os.environ["BQ_PRIVATE_KEY"] | |
scope = ['https://www.googleapis.com/auth/bigquery'] | |
project_id = os.environ["BQ_PROJECT_ID"] | |
psql_bind = os.environ["PSQL_BIND"] | |
def __main__(): | |
dataset_id = 'dataset_name' | |
table_id = 'user_table' | |
schema = [{"type": "INTEGER", "name": "user_id"}, | |
{"type": "STRING", "name": "username"}] | |
sql_query = """ | |
SELECT u.id user_id, | |
u.username username, | |
FROM user_table u | |
""" | |
bq_dump_data(schema, dataset_id, table_id, sql_query) | |
def bq_dump_data(schema, dataset_id, table_id, sql_query, sql_bind=psql_bind, fetch_size=3000000): | |
now = int(time.time()) | |
data_path = '%s_%d.csv' % (table_id, now) | |
temp_table_id = '%s_%d' % (table_id, now) | |
disposable_table_id = '%s_disposable' % (temp_table_id,) | |
engine = create_engine(sql_bind, pool_timeout=3000000) | |
connection = engine.connect() | |
cursor = connection.execute(sql_query) | |
# Print BQ dump logs | |
print("BQ dumping... %s.%s -- %-10s rows found" % (dataset_id, table_id, cursor.rowcount)) | |
results = cursor.fetchmany(fetch_size) | |
first_fetch = True | |
while results: | |
with open(data_path, 'w') as csvfile: | |
data_csv = csv.writer(csvfile, delimiter=',', lineterminator='\n', quotechar='"') | |
for result in results: | |
row = [] | |
for r in result: | |
if type(r) is bool: | |
row.append(int(r)) | |
elif type(r) is datetime: | |
unix_timestamp = (r.replace(tzinfo=None) - r.utcoffset() - datetime(1970, 1, 1)).total_seconds() | |
row.append(str(int(unix_timestamp))) | |
elif type(r) is unicode: | |
row.append(r.replace('\r\n', ' ').replace('\n', ' ').replace('\r', ' ').encode('utf-8')) | |
else: | |
row.append(r) | |
data_csv.writerow(row) | |
del results | |
bq_load_data(schema, data_path, dataset_id, disposable_table_id) | |
bq_copy_table(dataset_id, disposable_table_id, dataset_id, temp_table_id, is_write_truncate=first_fetch) | |
bq_delete_table(dataset_id, disposable_table_id) | |
os.remove(data_path) | |
results = cursor.fetchmany(fetch_size) | |
first_fetch = False | |
bq_copy_table(dataset_id, temp_table_id, dataset_id, table_id, is_write_truncate=True) | |
bq_delete_table(dataset_id, temp_table_id) | |
# Print BQ dump logs | |
print("BQ dump completed %s.%s -- completed" % (dataset_id, table_id)) | |
# FUNCTIONS TO DUMP TO BQ | |
def bq_copy_table(source_dataset_id, source_table_id, destination_dataset_id, destination_table_id, is_write_truncate=False): | |
bigquery = get_bigquery() | |
insert_request = bigquery.jobs().insert( | |
projectId=project_id, | |
body={ | |
'configuration': { | |
'copy': { | |
'sourceTable': { | |
'projectId': project_id, | |
'datasetId': source_dataset_id, | |
'tableId': source_table_id | |
}, | |
'destinationTable': { | |
'projectId': project_id, | |
'datasetId': destination_dataset_id, | |
'tableId': destination_table_id | |
}, | |
'writeDisposition': 'WRITE_TRUNCATE' if is_write_truncate else 'WRITE_APPEND' | |
} | |
} | |
}) | |
job = insert_request.execute() | |
status_request = bigquery.jobs().get( | |
projectId=job['jobReference']['projectId'], | |
jobId=job['jobReference']['jobId']) | |
while True: | |
result = status_request.execute(num_retries=2) | |
if result['status']['state'] == 'DONE': | |
if result['status'].get('errors'): | |
raise RuntimeError('\n'.join( | |
e['message'] for e in result['status']['errors'])) | |
return | |
time.sleep(1) | |
def bq_delete_table(dataset_id, table_id): | |
bigquery = get_bigquery() | |
delete_request = bigquery.tables().delete(projectId=project_id, datasetId=dataset_id, tableId=table_id) | |
return delete_request.execute() | |
def bq_load_data(schema, data_path, dataset_id, table_id): | |
bigquery = get_bigquery() | |
insert_request = bigquery.jobs().insert( | |
projectId=project_id, | |
body={ | |
'configuration': { | |
'load': { | |
'schema': { | |
'fields': schema | |
}, | |
'destinationTable': { | |
'projectId': project_id, | |
'datasetId': dataset_id, | |
'tableId': table_id | |
}, | |
'sourceFormat': 'CSV', | |
'allowQuotedNewlines': True, | |
'writeDisposition': 'WRITE_TRUNCATE' | |
} | |
} | |
}, | |
media_body=MediaFileUpload( | |
data_path, | |
mimetype='application/octet-stream')) | |
job = insert_request.execute() | |
status_request = bigquery.jobs().get( | |
projectId=job['jobReference']['projectId'], | |
jobId=job['jobReference']['jobId']) | |
while True: | |
result = status_request.execute(num_retries=2) | |
if result['status']['state'] == 'DONE': | |
if result['status'].get('errors'): | |
raise RuntimeError('\n'.join( | |
e['message'] for e in result['status']['errors'])) | |
return | |
time.sleep(1) | |
def get_bigquery(): | |
credentials = SignedJwtAssertionCredentials(client_email, private_key, scope) | |
bigquery = discovery.build('bigquery', 'v2', credentials=credentials) | |
return bigquery |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment