Skip to content

Instantly share code, notes, and snippets.

@carlosdelfino
Last active December 13, 2022 02:13
Show Gist options
  • Save carlosdelfino/f17caa6c66cd2b96c5715edeb6b624a3 to your computer and use it in GitHub Desktop.
Save carlosdelfino/f17caa6c66cd2b96c5715edeb6b624a3 to your computer and use it in GitHub Desktop.
Carregando dados dos arquivos de histórico da B3 para banco de dados SQLite, execute primeiro os comandos do arquivo create_database.txt para criar o banco de dados e seus índices.

Use os comandos abaixo com o SQLite3 para criar o banco de dados.

sqlite3 cotacoes/database.db 'create table COTACOES (
                    DATA_PREGAO date not null,
                    CODBDI text,
                    CODIGO_NEGOCIACAO text not null,
                    TIPO_MERCADO integer,
                    NOME_RESUMIDO text,
                    ESPECI text,
                    PRAZO_TERMO integer,
                    MOEDA_REFERENCIA text,
                    PRECO_ABERTURA real,
                    PRECO_MAXIMO real,
                    PRECO_MINIMO real,
                    PRECO_MEDIO real,
                    PRECO_FECHAMENTO real,
                    MELHOR_OFERTA_COMPRA real,
                    MELHOR_OFERTA_VENDA real,
                    TOTAL_NEGOCIADO integer,
                    TOTAL_DE_TITULOS integer,
                    VOLUME_TOTAL real,
                    PREEXE real,
                    INDOPC integer,
                    DATVEN date,
                    FATCOT integer,
                    PTOEXE real,
                    CODISI text,
                    DISMES integer
            )'

É extremamente importante criar o indice abaixo, pois acelera o procesamento em pelo menos uma centena de vezes.

sqlite3 db.sqlite3 'CREATE INDEX IDX_CODIGO_DATA_TIPO_MERCADO
                                                ON COTACOES (CODIGO_NEGOCIACAO, DATA_PREGAO, PRAZO_TERMO)'
import calendar
from datetime import datetime
from datetime import date
#######################################################
def dia_util(ano=None, mes = None, meses=range(1,13), feriados=None, inicio=None, limite=None):
"""
retorna os dias úteis do período solicitado.
Feriados deve ser uma lista de 'dates' (datetime) ou um objeto Holiday obtido por exemplo com holidays.BR()
"""
resutil = []
if inicio is not None and limite is not None:
ano = int(inicio.split('/')[2])
meses = range(int(inicio.split('/')[1]), int(limite.split('/')[1])+1)
elif mes:
meses = range(mes,mes+1)
for month in meses:
calutil = calendar.monthcalendar(ano, month)
weekutil = [week[0:5] for week in calutil]
for week in weekutil:
for day in week:
if day == 0:
continue
daystr = "{:02d}/{:02d}/{:4d}".format(day, month, ano)
if inicio is not None and datetime.strptime(inicio,'%d/%m/%Y') >= datetime.strptime(daystr,'%d/%m/%Y'):
continue
if limite is not None and datetime.strptime(limite,'%d/%m/%Y') <= datetime.strptime(daystr,'%d/%m/%Y'):
break
if feriados is not None and (date(int(ano),int(month),int(day)) in feriados):
continue
resutil.append(daystr)
else:
continue
break
else:
continue
break
return resutil
import os
import requests as req
from pathlib import Path
def download_cotacoes(ano, mes=None, dia=None, overwrite=True, path='./downloads'):
"""
Obtem os históricos da B3 diretamente no site
Retorna 0 se a obteve o arquivo com sucesso, caso contrário retorna
o código de erro obtido na transação web.
"""
#
os.makedirs(path, exist_ok=True)
#
if dia and mes:
zip_file_name = "COTAHIST_D{:0>2}{:0>2}{:0>4}.ZIP".format(dia,mes,ano)
elif mes:
zip_file_name = "COTAHIST_M{:0>2}{:0>4}.ZIP".format(mes,ano)
else:
zip_file_name = "COTAHIST_A{:0>4}.ZIP".format(ano)
#
dest_path_file = Path(path + '/' + zip_file_name)
if dest_path_file.is_file() and not overwrite:
print("Arquivo {} já existe, não será baixado!".format(zip_file_name))
return 0
#
print("Obtendo histórico {}".format(zip_file_name))
url = "https://bvmf.bmfbovespa.com.br/InstDados/SerHist/"+zip_file_name
headers = { 'accept': '*/*',
'accept-language': 'en-US,en;q=0.9,pt-BR;q=0.8,pt;q=0.7,es-MX;q=0.6,es;q=0.5',
'content-type': 'application/x-www-form-urlencoded; charset=UTF-8',
'x-requested-with': 'XMLHttpRequest',
'sec-ch-ua': '" Not;A Brand";v="99", "Google Chrome";v="97", "Chromium";v="97"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"macOS"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-origin',
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko)'}
get_response = req.get(url,stream=True, headers = headers)
status_code = get_response.status_code
if(status_code != 200):
print("Falha ao obter o arquivo, codigo: {}".format(status_code))
return status_code
print('#', end='')
#
with open(dest_path_file, 'wb') as f:
print('#', end='')
for chunk in get_response.iter_content(chunk_size=1024):
if chunk:
print('.', end='')
f.write(chunk)
#
print('#')
return 0
#!/usr/bin/env python3.10
"""
script de importação do histórico de cotações disponibilizado pela B3
"""
import datetime
import os, sys
from pytz import timezone
from dia_util import dia_util
from download_historico_b3 import download_cotacoes
from processa_historico_b3 import processa_cotacoes
tz = timezone('America/Fortaleza')
data_e_hora_atuais = datetime.datetime.now()
data_e_hora_atuais_tz = data_e_hora_atuais.astimezone(tz)
if __name__ == "__main__":
import holidays
import warnings
warnings.filterwarnings("ignore")
import argparse
parser = argparse.ArgumentParser(
prog='historicos_b3_django',
description='Este script importa os dados históricos da B3 que são automáticamente baixados e grava diretamente no banco de dados do Nizin Invest',
)
parser.add_argument('--yesterday',dest='yesterday', action='store_true',
help='processa apenas o dia de hoje, tem prioridade sobre todos os demais parametros')
parser.add_argument('-y', '--year', metavar='year', type=int,
help='informa o ano a ser baixado, se não for informado mês e dia será baixado o histórico do ano. esta opção ignora mês (-m) e dia (-d)')
parser.add_argument('-m', '--month', metavar='month', type=int,
help='informa o mês do ano corrente a ser baixado, esta opção ignora o dia (-d)')
parser.add_argument('-d', '--day', metavar='day', type=int,
help='informa o dia do mês e ano corrente a ser baixado')
parser.add_argument('--range_year', metavar='year', type=int, nargs=2,
help='solicita baixar uma faixa de ano, informe o ano inicial e final, este parâmetro tem precedência sobre os demais que serão ignorados')
parser.add_argument('--range_month', metavar='month', type=int, nargs=2,
help='solicita baixar uma faixa de meses, informe o mês inicial e final, este parâmetro tem precedência sobre os demais, com exceção do parâmetro --range_year')
parser.add_argument('--range_day', metavar='day', type=int, nargs=2,
help='solicita baixar uma faixa de dias, informe o dia inicial e final, este parâmetro tem precedência sobre -y, -m e -d, porém é ignorado se --range_month ou --range_year for informado')
parser.add_argument('--overwrite',dest='overwrite', action='store_true',
help='sobre escreve os históricos existente, baixando novamente, se não for informado somente será baixado os históricos faltantes')
parser.add_argument('--version', action='version', version='%(prog)s 2.0')
parser.add_argument('--sqlite', default='db.sqlite3',
help='path para o arquivo do banco de dados sqlite3')
parser.add_argument('--cotacoes', default='./downloads',
help='Path para o diretório onde as cotações baixadas serão gravados')
args = parser.parse_args()
if args.yesterday:
yesterday = datetime.date.today() - datetime.timedelta(days=1)
ano = yesterday.year
mes = yesterday.month
dia = yesterday.day
if download_cotacoes(ano=ano, mes=mes,dia=dia, overwrite=args.overwrite) == 0:
processa_cotacoes(ano=ano,mes=mes,dia=dia,
sqlite_file=args.sqlite,
path=args.cotacoes)
else:
print('Problemas ao obter as cotações!')
elif args.range_year is not None:
for ano in range(args.range_year[0],args.range_year[1]+1):
print(ano)
download_cotacoes(ano=ano, overwrite=args.overwrite)
processa_cotacoes(ano=ano,
sqlite_file=args.sqlite,
path=args.cotacoes)
elif args.range_month is not None:
ano = datetime.date.today().year
for mes in range(args.range_month[0], args.range_month[1]+1):
download_cotacoes(ano=ano,mes=mes, overwrite=args.overwrite)
processa_cotacoes(ano=ano,mes=mes,
sqlite_file=args.sqlite,
path=args.cotacoes)
elif args.range_day is not None:
ano = datetime.today().year
mes = datetime.today().month
#hoje = datetime.today().strftime("d%/m%/Y%")
inicio = "{:02d}/{:02d}/{:4d}".format(args.range_day[0],mes,ano)
limite = "{:02d}/{:02d}/{:4d}".format(args.range_day[1],mes,ano)
for data in dia_util(feriados=holidays.BR(), inicio=inicio, limite=limite):
dia = data[:2]
print(dia)
download_cotacoes(ano=ano,mes=mes,dia=dia, overwrite=args.overwrite)
processa_cotacoes(ano=ano,mes=mes,dia=dia,
sqlite_file=args.sqlite,
path=args.cotacoes)
elif args.year and args.month is None and args.day is None:
download_cotacoes(ano=args.year, overwrite=args.overwrite)
processa_cotacoes(ano=args.year,
sqlite_file=args.sqlite,
path=args.cotacoes)
else:
parser.print_help()
import os
import sqlite3
from zipfile import ZipFile
from datetime import datetime
###############################################
def existe_negociacao(cursor, data_pregao, codigo_negociacao, prazo_termo):
"""
Verifica se uma determinada negociação já existe no banco de dados com base na Data do Pregão, o Código de Negociação e o Prazo a Termo
É preciso informar o cursor de uma conexão aberta.
Parametros:
cursor: Cursor da Conexão com o banco de dados
data_pregao, data do pregão no formato YYYY-mm-aa
codigo_negociacao, código da negociação com letras maiúscula
prazo_termo, prazo da negociação a termo
retorno
Contagem de registros encontrados
"""
result = cursor.execute("select count(*) from cotacoes where DATA_PREGAO = ? and CODIGO_NEGOCIACAO = ? and PRAZO_TERMO = ?",(data_pregao, codigo_negociacao, prazo_termo)).fetchone()
return result[0]
###############################################
def processa_cotacoes(ano, mes=None, dia=None,
sqlite_file='db.sqlite',
path='./downloads'):
"""
Processa cada registro do arquivo de histórico da B3, verificando se o registro já existe com base em DATA_PREGAO, CODIGO_NEGOCIACAO e PRAZO_TERMO
Parametros:
"ano" no formato de quatro digitos
"mês" no formato númerico
"dia" no formato númerico
Retorno
Retorna uma tupla com código de resultado da operação, a quantidade de registros processados e o nome do arquivo processado.
Lança uma exeption caso o arquivo não tenha o número de registro coerentes
"""
if dia and mes:
file_name = "COTAHIST_D{:0>2}{:0>2}{:0>4}".format(dia,mes,ano)
elif mes:
file_name = "COTAHIST_M{:0>2}{:0>4}".format(mes,ano)
else:
file_name = "COTAHIST_A{:0>4}".format(ano)
#
zip_file_name = path + '/' + file_name + ".ZIP"
print('#', end='')
#
conn = sqlite3.connect(sqlite_file,
detect_types=sqlite3.PARSE_DECLTYPES)
cursor = conn.cursor()
#
count = 0
#
if not os.path.isfile(zip_file_name):
return (404, 0, zip_file_name)
with ZipFile(zip_file_name, 'r') as zip:
arq_cotacoes = zip.filelist[0]
print('#', end='')
with zip.open(arq_cotacoes) as path:
print('#', end='')
#
for linha in path:
count = count + 1
dic = {}
linha = linha.decode("iso-8859-1")
if linha[0:2] == '01': # registro de dados
#
data_pregao = "" + linha[2:6] + "-" + linha[6:8] + "-" + linha[8:10]
codigo_negociacao = linha[12:24].strip()
# converte para inteiro, se vazio prenche com 0
prazo_a_termo = int(linha[49:52].strip()) if linha[49:52].strip() else 0
if not existe_negociacao(cursor, data_pregao, codigo_negociacao, prazo_a_termo):
codigo_bdi = linha[10:12].strip()
tipo_mercado = int(linha[24:27])
nome_resumido = linha[27:39].strip()
# Remove espaços extras
especificacao = " ".join(linha[39:49].split())
moeda_referencia = linha[52:56].strip()
preco_abertura = float(linha[56:69])/100
preco_maximo = float(linha[69:82])/100
preco_minimo = float(linha[82:95])/100
preco_medio = float(linha[95:108])/100
preco_de_fechamento = float(linha[108:121])/100
melhor_oferta_compra = float(linha[121:134])/100
melhor_oferta_venda = float(linha[134:147])/100
total_de_negociado = int(linha[147:152])
total_de_titulos = int(linha[152:170])
volume_total = float(linha[170:188])/100
preco_executado = float(linha[188:201])/100
#indopc
indicador_de_correcao = int(linha[201:202])
data_vencimento = linha[202:206] + "-" + linha[206:208] + "-" + linha[208:210]
fatcot = int(linha[210:217])
ptoexe = float(linha[217:230])/1000000
codisi = linha[230:242].strip()
dismis = int(linha[242:245])
#
cursor.execute("insert into cotacoes (DATA_PREGAO, CODBDI, CODIGO_NEGOCIACAO, TIPO_MERCADO, NOME_RESUMIDO, ESPECI, PRAZO_TERMO, MOEDA_REFERENCIA, PRECO_ABERTURA, PRECO_MAXIMO, PRECO_MINIMO, PRECO_MEDIO, PRECO_FECHAMENTO, MELHOR_OFERTA_COMPRA, MELHOR_OFERTA_VENDA, TOTAL_NEGOCIADO, TOTAL_DE_TITULOS, VOLUME_TOTAL, PREEXE, INDOPC, DATVEN, FATCOT, PTOEXE, CODISI, DISMES) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
data_pregao,
codigo_bdi,
codigo_negociacao,
tipo_mercado,
nome_resumido,
especificacao,
prazo_a_termo,
moeda_referencia,
preco_abertura,
preco_maximo,
preco_minimo,
preco_medio,
preco_de_fechamento,
melhor_oferta_compra,
melhor_oferta_venda,
total_de_negociado,
total_de_titulos,
volume_total,
preco_executado,
indicador_de_correcao,
data_vencimento,
fatcot,
ptoexe,
codisi,
dismis
))
print('.', end='')
else:
print('*', end='')
#
elif linha[0:2] == '00': # registro de metadados
print('_', end='')
print("Arquivo criado em {}".format(datetime.strptime(linha[23:31], '%Y%m%d')))
elif linha[0:2] == '99': # registro de metadados
size = int(linha[31:42])
print('_', end='')
if count != size:
raise Exception("Arquivo Invalido, número de linhas diferente: foram processadas {}, mas era esperado {}".format(count, size))
#
#
if not count % 40:
print(" " + str(count))
zip.close()
#
print("# total: {}".format(count))
conn.commit()
cursor.close()
conn.close()
return (0, count, zip_file_name)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment