Created
December 8, 2025 02:45
-
-
Save alphabraga/d3a1de9a11db10450fdb0ee3050f0bb7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import oracledb | |
| #from rich import print | |
| from time import sleep | |
| def log_too_long(id): | |
| with open("log_too_log", "r") as f: | |
| f.write(id) | |
| f.close | |
| def fix(v): | |
| if isinstance(v, oracledb.LOB): | |
| blob = v.read() | |
| return f"q'*{blob}*'" | |
| if v is None: | |
| return 'NULL' | |
| if isinstance(v, str): | |
| return "'" + v.replace("'", "''") + "'" | |
| return v | |
| def fix_resulado(v): | |
| if v is None: | |
| return 'NULL' | |
| if isinstance(v, str): | |
| return "q'*{v}*'" | |
| if isinstance(v, oracledb.LOB): | |
| blob = v.read() | |
| return f"q'*{blob}*'" | |
| return v | |
| def create_row(row): | |
| r = { | |
| "ID": fix(row[0]), | |
| "DADOS": fix(row[1]), | |
| "DISPOSITIVO": fix(row[2]), | |
| "ENTRADA": fix(row[3]), | |
| "ESTADO": fix(row[4]), | |
| "HOST_NAME": fix(row[5]), | |
| "INICIO": fix(row[6]), | |
| "IP": fix(row[7]), | |
| "RESULTADO": fix_resulado(row[8]), | |
| "STATUS_CODE": fix(row[9]), | |
| "TERMINO": fix(row[10]), | |
| "TIPO": fix(row[11]), | |
| "PESO_MAXIMO": fix(row[12]), | |
| "TENTATIVAS": fix(row[13]), | |
| "PLACA": fix(row[14]), | |
| "UUID": fix(row[15]) | |
| } | |
| return r | |
| ORIGEM_DSN = "(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=172.16.80.13)(PORT=1521))(CONNECT_DATA=(SERVICE_NAME=tosplus)))" | |
| DESTINO_DSN = "(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=oracle19c.portodoitaqui.com)(PORT=1521))(CONNECT_DATA=(SERVICE_NAME=tosplus)))" | |
| USER = "emap" | |
| PASSWORD = "Athenas2015" | |
| conn_origem = oracledb.connect(user="tosp", password="Athenas2015", dsn=ORIGEM_DSN) | |
| conn_destino = oracledb.connect(user="emap", password="Athenas2015", dsn=DESTINO_DSN) | |
| cur_origem = conn_origem.cursor() | |
| cur_destino = conn_destino.cursor() | |
| sql_select = """ | |
| SELECT | |
| ID, | |
| DADOS, | |
| DISPOSITIVO, | |
| ENTRADA, | |
| ESTADO, | |
| HOST_NAME, | |
| INICIO, | |
| IP, | |
| RESULTADO, | |
| STATUS_CODE, | |
| TERMINO, | |
| TIPO, | |
| PESO_MAXIMO, | |
| TENTATIVAS, | |
| PLACA, | |
| UUID | |
| FROM EMAP.INTEGRADOR | |
| WHERE ENTRADA BETWEEN TO_DATE('01/06/2025 00:00:00', 'DD/MM/YYYY HH24:MI:SS') AND TO_DATE('08/06/2025 23:59:59', 'DD/MM/YYYY HH24:MI:SS') | |
| order by id DESC | |
| """ | |
| cur_origem.execute(sql_select) | |
| rows = cur_origem.fetchmany(500) | |
| total = 0 | |
| cur_destino = conn_destino.cursor() | |
| conn_destino.call_timeout = 15000 | |
| while rows: | |
| for row in rows: | |
| r = create_row(row) | |
| print(r["ID"]) | |
| sql_insert = f""" | |
| INSERT INTO EMAP.INTEGRADOR ( | |
| ID, | |
| DADOS, | |
| DISPOSITIVO, | |
| ENTRADA, | |
| ESTADO, | |
| HOST_NAME, | |
| INICIO, | |
| IP, | |
| RESULTADO, | |
| STATUS_CODE, | |
| TERMINO, | |
| TIPO, | |
| PESO_MAXIMO, | |
| TENTATIVAS, | |
| PLACA, | |
| UUID | |
| ) VALUES ( | |
| {r["ID"]}, | |
| {r["DADOS"]}, | |
| {r["DISPOSITIVO"]}, | |
| TO_TIMESTAMP('{r["ENTRADA"]}', 'YYYY-MM-DD HH24:MI:SS.FF6'), | |
| {r["ESTADO"]}, | |
| {r["HOST_NAME"]}, | |
| TO_TIMESTAMP('{r["INICIO"]}', 'YYYY-MM-DD HH24:MI:SS.FF6'), | |
| {r["IP"]}, | |
| {r["RESULTADO"]}, | |
| {r["STATUS_CODE"]}, | |
| TO_TIMESTAMP('{r["TERMINO"]}', 'YYYY-MM-DD HH24:MI:SS.FF6'), | |
| {r["TIPO"]}, | |
| {r["PESO_MAXIMO"]}, | |
| {r["TENTATIVAS"]}, | |
| {r["PLACA"]}, | |
| {r["UUID"]} | |
| ) | |
| """ | |
| #print(sql_insert) | |
| try: | |
| cur_destino.execute(sql_insert) | |
| conn_destino.commit() | |
| except Exception as e: | |
| error_obj, = e.args | |
| # Verifica se é ORA-00001 | |
| if error_obj.code == 1: | |
| print(e) | |
| print(sql_insert) | |
| print("[red]Registro duplicado — ignorando e continuando.") | |
| continue | |
| if "DPY-4025" in str(e): | |
| print("Timeout!") | |
| log_too_long(r["ID"]) | |
| else: | |
| print(sql_insert) | |
| raise | |
| print("pegando mais linhas.......................................................") | |
| sleep(1) | |
| rows = cur_origem.fetchmany(500) | |
| print(f"total:{total}") | |
| total += len(rows) | |
| # sleep(2) | |
| print(f"total:{total}") | |
| print(f"Transferência concluída! {total} registros inseridos.") | |
| # Fecha tudo | |
| cur_origem.close() | |
| cur_destino.close() | |
| conn_origem.close() | |
| conn_destino.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment