Created
June 20, 2018 17:54
-
-
Save JoaoVagner/a1d6f31c5ca3fc9d63e9bf318c90c2aa to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import generators | |
from google.cloud import bigquery | |
from datetime import datetime, date, time | |
import conn | |
import os | |
import sys | |
reload(sys) | |
sys.setdefaultencoding('utf8') | |
bigquery_client = bigquery.Client() | |
dataset_id = 'infosaude' | |
dataset_ref = bigquery_client.dataset(dataset_id) | |
def ResultIter(cursor, arraysize=1000): | |
'An iterator that uses fetchmany to keep memory usage down' | |
while True: | |
results = cursor.fetchmany(arraysize) | |
if not results: | |
break | |
for result in results: | |
yield result | |
def root_dir(): | |
return os.path.abspath(os.path.dirname(__file__)) | |
def get_file(filename): | |
try: | |
src = os.path.join(root_dir(), filename) | |
return open(src).read() | |
except IOError as exc: | |
return str(exc) | |
def allowed_file(filename): | |
return '.' in filename and \ | |
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS | |
def process_xls(file): | |
data = get_data(file) | |
return data | |
def count_rows(table): | |
cursor = conn.connection() | |
sql = "SELECT COUNT(*) as total FROM {}".format(table) | |
cursor.execute(sql) | |
count = cursor.fetchone() | |
return count.total | |
def get_structure_table(table): | |
sql = "select TABLE_NAME, COLUMN_NAME, IS_NULLABLE, DATA_TYPE from INFORMATION_SCHEMA.COLUMNS where TABLE_NAME='{}'".format(table) | |
cursor = conn.connection() | |
cursor.execute(sql) | |
data = cursor.fetchall() | |
return data | |
def replace_type(type): | |
if(type == "varchar"): | |
return "STRING" | |
if(type == "nvarchar"): | |
return "STRING" | |
if(type == "varbinary"): | |
return "STRING" | |
if(type == "char"): | |
return "STRING" | |
if(type == "decimal"): | |
return "FLOAT64" | |
if(type == "tinyint"): | |
return "integer" | |
if(type == "binary"): | |
return "BOOL" | |
if(type == "smallint"): | |
return "INTEGER" | |
if(type == "text"): | |
return "STRING" | |
if(type == "int"): | |
return "INTEGER" | |
if(type == "numeric"): | |
return "INTEGER" | |
if(type == "datetime"): | |
return "DATETIME" | |
if(type == "date"): | |
return "DATE" | |
if(type == "time"): | |
return "TIME" | |
return "STRING" | |
def replace_required(value): | |
if(value == "NO"): | |
return "REQUIRED" | |
else: | |
return "NULLABLE" | |
def create_dataset(): | |
dataset_ref = bigquery_client.dataset(dataset_id) | |
dataset = bigquery.Dataset(dataset_ref) | |
print('Dataset {} created.'.format(dataset.dataset_id)) | |
# Creates the new dataset | |
dataset = bigquery_client.create_dataset(dataset) | |
# return render_template('bigquery/dataset/create.html') | |
def create_dataset_tables(): | |
client = bigquery.Client() | |
dataset_ref = client.dataset(dataset_id) | |
database = 'INFOSAUDE' | |
sql_list_tables = "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE' AND TABLE_CATALOG='"+ database +"'" | |
cursor = conn.connection() | |
cursor.execute(sql_list_tables) | |
rows = cursor.fetchall() | |
for result in rows: | |
schema = [] | |
structure_table = get_structure_table(result.TABLE_NAME) | |
for inputs in structure_table: | |
schema.append( | |
bigquery.SchemaField(inputs.COLUMN_NAME, replace_type(inputs.DATA_TYPE), mode=replace_required(inputs.IS_NULLABLE)) | |
) | |
table_ref = dataset_ref.table(result.TABLE_NAME) | |
table = bigquery.Table(table_ref, schema=schema) | |
table = client.create_table(table) # API request | |
print("Tabela {} criada com sucesso no BigQuery".format(result.TABLE_NAME)) | |
assert table.table_id == result.TABLE_NAME | |
# return render_template('bigquery/dataset/tables/create.html', rows=rows) | |
def insert_big_query(result, table_name): | |
client = bigquery.Client() | |
dataset = client.dataset(dataset_id) | |
table = dataset.table(table_name) | |
rows_to_insert = [result] | |
structure_table = get_structure_table(table_name) | |
schema = [] | |
for inputs in structure_table: | |
name = inputs.COLUMN_NAME | |
data_type = replace_type(inputs.DATA_TYPE) | |
required = replace_required(inputs.IS_NULLABLE) | |
schema.append( | |
bigquery.SchemaField(name, data_type, mode=required) | |
) | |
errors = client.insert_rows(table, rows_to_insert, selected_fields=schema) # API request | |
print(errors) | |
def insert_data(): | |
client = bigquery.Client() | |
dataset_ref = client.dataset(dataset_id) | |
database = "INFOSAUDE" | |
sql_list_tables = "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE' AND TABLE_CATALOG='"+ database +"'" | |
cursor = conn.connection() | |
cursor.execute(sql_list_tables) | |
tables = cursor.fetchall() | |
for result in tables: | |
print("Iniciando data streaming da tabela {}".format(result.TABLE_NAME)) | |
cursor_count = conn.connection(); | |
cursor_count.execute("SELECT count(*) as total FROM {}".format(result.TABLE_NAME)) | |
count = cursor_count.fetchone() | |
print("Total de registros: {}").format(count.total); | |
if count <= 0: | |
continue; | |
else: | |
cursor_huge_data = conn.connection() | |
cursor_huge_data.execute("SELECT * FROM {}".format(result.TABLE_NAME)) | |
for result_data in ResultIter(cursor_huge_data): | |
insert_big_query(result_data, result.TABLE_NAME) | |
print("Concluido importacao da tabela {}".format(result.TABLE_NAME)) | |
#cria dataset no bigquery | |
create_dataset() | |
#loopa e cria as tabelas e suas estruturas no bigquery | |
create_dataset_tables() | |
#insere todos os dados na tabela | |
insert_data() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment