Skip to content

Instantly share code, notes, and snippets.

@mlabouardy
Created April 16, 2019 18:42
Show Gist options
  • Select an option

  • Save mlabouardy/dd58df03a2b5b20b0f47bf0bb7c05d0d to your computer and use it in GitHub Desktop.

Select an option

Save mlabouardy/dd58df03a2b5b20b0f47bf0bb7c05d0d to your computer and use it in GitHub Desktop.
Load csv to BigQuery
import mysql.connector
import os
import time
from mysql.connector import Error
from google.cloud import bigquery
bigquery_client = bigquery.Client()
def mapToBigQueryDataType(columnType):
if columnType.startswith('int'):
return 'INT64'
if columnType.startswith('varchar'):
return 'STRING'
if columnType.startswith('decimal'):
return 'FLOAT64'
if columnType.startswith('datetime'):
return 'DATETIME'
if columnType.startswith('text'):
return 'STRING'
if columnType.startswith('date'):
return 'DATE'
if columnType.startswith('time'):
return 'TIME'
def wait_for_job(job):
while True:
job.reload()
if job.state == 'DONE':
if job.error_result:
raise RuntimeError(job.errors)
return
time.sleep(1)
try:
conn = mysql.connector.connect(host=os.environ['MYSQL_HOST'],
database=os.environ['MYSQL_DB'],
user=os.environ['MYSQL_USER'],
password=os.environ['MYSQL_PWD'])
if conn.is_connected():
print('Connected to MySQL database')
lines = open('tables').read().split("\n")
for tableName in lines:
print('Table:',tableName)
cursor = conn.cursor()
cursor.execute('SHOW FIELDS FROM '+os.environ['MYSQL_DB']+'.'+tableName)
rows = cursor.fetchall()
schema = []
for row in rows:
schema.append(bigquery.SchemaField(row[0].replace('\'', ''), mapToBigQueryDataType(row[1])))
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.CSV
job_config.autodetect = True
job_config.max_bad_records = 2
job_config.allow_quoted_newlines = True
job_config.schema = schema
job = bigquery_client.load_table_from_uri(
'gs://'+os.environ['GCE_BUCKET']+'/'+tableName+'.csv',
bigquery_client.dataset(os.environ['BQ_DATASET']).table(tableName),
location=os.environ['BQ_LOCATION'],
job_config=job_config)
print('Loading data to BigQuery:', tableName)
wait_for_job(job)
print('Loaded {} rows into {}:{}.'.format(
job.output_rows, os.environ['BQ_DATASET'], tableName))
except Error as e:
print(e)
finally:
conn.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment