Skip to content

Instantly share code, notes, and snippets.

@JoaoVagner
Created June 20, 2018 17:54
Show Gist options
  • Save JoaoVagner/a1d6f31c5ca3fc9d63e9bf318c90c2aa to your computer and use it in GitHub Desktop.
Save JoaoVagner/a1d6f31c5ca3fc9d63e9bf318c90c2aa to your computer and use it in GitHub Desktop.
from __future__ import generators
from google.cloud import bigquery
from datetime import datetime, date, time
import conn
import os
import sys
reload(sys)
sys.setdefaultencoding('utf8')
bigquery_client = bigquery.Client()
dataset_id = 'infosaude'
dataset_ref = bigquery_client.dataset(dataset_id)
def ResultIter(cursor, arraysize=1000):
'An iterator that uses fetchmany to keep memory usage down'
while True:
results = cursor.fetchmany(arraysize)
if not results:
break
for result in results:
yield result
def root_dir():
return os.path.abspath(os.path.dirname(__file__))
def get_file(filename):
try:
src = os.path.join(root_dir(), filename)
return open(src).read()
except IOError as exc:
return str(exc)
def allowed_file(filename):
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def process_xls(file):
data = get_data(file)
return data
def count_rows(table):
cursor = conn.connection()
sql = "SELECT COUNT(*) as total FROM {}".format(table)
cursor.execute(sql)
count = cursor.fetchone()
return count.total
def get_structure_table(table):
sql = "select TABLE_NAME, COLUMN_NAME, IS_NULLABLE, DATA_TYPE from INFORMATION_SCHEMA.COLUMNS where TABLE_NAME='{}'".format(table)
cursor = conn.connection()
cursor.execute(sql)
data = cursor.fetchall()
return data
def replace_type(type):
if(type == "varchar"):
return "STRING"
if(type == "nvarchar"):
return "STRING"
if(type == "varbinary"):
return "STRING"
if(type == "char"):
return "STRING"
if(type == "decimal"):
return "FLOAT64"
if(type == "tinyint"):
return "integer"
if(type == "binary"):
return "BOOL"
if(type == "smallint"):
return "INTEGER"
if(type == "text"):
return "STRING"
if(type == "int"):
return "INTEGER"
if(type == "numeric"):
return "INTEGER"
if(type == "datetime"):
return "DATETIME"
if(type == "date"):
return "DATE"
if(type == "time"):
return "TIME"
return "STRING"
def replace_required(value):
if(value == "NO"):
return "REQUIRED"
else:
return "NULLABLE"
def create_dataset():
dataset_ref = bigquery_client.dataset(dataset_id)
dataset = bigquery.Dataset(dataset_ref)
print('Dataset {} created.'.format(dataset.dataset_id))
# Creates the new dataset
dataset = bigquery_client.create_dataset(dataset)
# return render_template('bigquery/dataset/create.html')
def create_dataset_tables():
client = bigquery.Client()
dataset_ref = client.dataset(dataset_id)
database = 'INFOSAUDE'
sql_list_tables = "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE' AND TABLE_CATALOG='"+ database +"'"
cursor = conn.connection()
cursor.execute(sql_list_tables)
rows = cursor.fetchall()
for result in rows:
schema = []
structure_table = get_structure_table(result.TABLE_NAME)
for inputs in structure_table:
schema.append(
bigquery.SchemaField(inputs.COLUMN_NAME, replace_type(inputs.DATA_TYPE), mode=replace_required(inputs.IS_NULLABLE))
)
table_ref = dataset_ref.table(result.TABLE_NAME)
table = bigquery.Table(table_ref, schema=schema)
table = client.create_table(table) # API request
print("Tabela {} criada com sucesso no BigQuery".format(result.TABLE_NAME))
assert table.table_id == result.TABLE_NAME
# return render_template('bigquery/dataset/tables/create.html', rows=rows)
def insert_big_query(result, table_name):
client = bigquery.Client()
dataset = client.dataset(dataset_id)
table = dataset.table(table_name)
rows_to_insert = [result]
structure_table = get_structure_table(table_name)
schema = []
for inputs in structure_table:
name = inputs.COLUMN_NAME
data_type = replace_type(inputs.DATA_TYPE)
required = replace_required(inputs.IS_NULLABLE)
schema.append(
bigquery.SchemaField(name, data_type, mode=required)
)
errors = client.insert_rows(table, rows_to_insert, selected_fields=schema) # API request
print(errors)
def insert_data():
client = bigquery.Client()
dataset_ref = client.dataset(dataset_id)
database = "INFOSAUDE"
sql_list_tables = "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE' AND TABLE_CATALOG='"+ database +"'"
cursor = conn.connection()
cursor.execute(sql_list_tables)
tables = cursor.fetchall()
for result in tables:
print("Iniciando data streaming da tabela {}".format(result.TABLE_NAME))
cursor_count = conn.connection();
cursor_count.execute("SELECT count(*) as total FROM {}".format(result.TABLE_NAME))
count = cursor_count.fetchone()
print("Total de registros: {}").format(count.total);
if count <= 0:
continue;
else:
cursor_huge_data = conn.connection()
cursor_huge_data.execute("SELECT * FROM {}".format(result.TABLE_NAME))
for result_data in ResultIter(cursor_huge_data):
insert_big_query(result_data, result.TABLE_NAME)
print("Concluido importacao da tabela {}".format(result.TABLE_NAME))
#cria dataset no bigquery
create_dataset()
#loopa e cria as tabelas e suas estruturas no bigquery
create_dataset_tables()
#insere todos os dados na tabela
insert_data()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment