Last active
November 27, 2019 13:02
-
-
Save flinox/ca2301c5ee806f6d628d37fcdd4da8c8 to your computer and use it in GitHub Desktop.
A python script to to purge inactive records of many different sources from informatica MDM calling the service SIF ExecuteBatchDelete
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# ------------------------------------------------------------------------------ | |
# v1.0 - 2019-08-13 - Created by: Fernando Lino Di Tomazzo Silva | |
# v2.0 - 2019-11-27 - Updated by: Fernando Lino Di Tomazzo Silva | |
# | |
# Script para realizar expurgo de registros logicamente inativos das BO's do MDM da INFORMATICA. | |
# Mais informacoes sobre MDM da INFORMATICA em: https://www.informatica.com/br/products/master-data-management.html | |
# | |
# ##### PRE REQS ###### | |
# 1) Ter uma tabela criada que será usada para carregar os ID's dos registros para expurgar, Ex.: | |
# create table CMX_ORS.TB_PURGE_AUX | |
# ( | |
# ROWID_XREF CHAR(14) not null | |
# ) | |
# tablespace CMX_DATA; | |
# | |
# 2) Python 3.6.5 ou + e library cx_Oracle instalada | |
# pip3 install cx_Oracle | |
# | |
# 3) No servidor linux que for executar o script, é necessário ter 2 pares de usuario e senha setados em variaveis de ambiente, o primeiro par | |
# deve ter permissão para realizar consultas nas tabelas do MDM e para insert e delete na tabela criada no | |
# passo 1, Precisa ter os nomes abaixo: | |
# SRCUSER e SRCPASS | |
# | |
# O segundo par deve ter permissão para executar os serviços do MDM, precisa ser os nomes abaixo de variaveis de ambiente: | |
# MDMUSER e MDMPASS | |
# | |
# 4) Para ajuda ou maiores informações sobre os parametros use: | |
# python mdm_purge.py -h | |
# | |
# 5) Exemplo de uso: | |
# python mdm_purge.py 192.168.0.1/servicedata http://192.168.0.2:7000/cmx/services/SifService servicedata-CMX_ORS C_B_PESSOA TB_PURGE_AUX 500 false true true false | |
# | |
# ------------------------------------------------------------------------------ | |
import cx_Oracle | |
import requests | |
import time | |
import sys | |
import xml.etree.ElementTree as ET | |
import datetime | |
import argparse | |
import textwrap | |
import os | |
# Realiza conexao com banco | |
def conexao(cs): | |
# Realiza a conexao | |
print(">>> Conectando no banco de origem ...") | |
con = cx_Oracle.connect(cs) | |
return con | |
# Conta qtde de registros por origem | |
def get_qtde_para_expurgo_por_origem(con,bo): | |
lista = [] | |
try: | |
# Contando os registros para expurgo | |
print(">>> Contando os registros para o expurgo ...") | |
#print(">"*80 +"\n") | |
registros = "SELECT XRPE.ROWID_SYSTEM,COUNT(1) FROM CMX_ORS.@mdmbo@_XREF XRPE INNER JOIN CMX_ORS.@mdmbo@ BOPE ON (XRPE.ROWID_OBJECT = BOPE.ROWID_OBJECT) WHERE BOPE.HUB_STATE_IND = -1 GROUP BY XRPE.ROWID_SYSTEM ORDER BY 2" | |
registros = registros.replace('@mdmbo@',bo) | |
cur = con.cursor() | |
cur.execute(registros) | |
time.sleep(1) | |
for result in cur: | |
lista.append({result[0].strip(" ") : int(result[1])}) | |
except: | |
e = sys.exc_info()[0] | |
print("Error: %s" % e) | |
print(sys.exc_info()) | |
finally: | |
cur.close() | |
if len(lista) > 0: | |
for item in lista: | |
for key,val in item.items(): | |
print(">>> Origem %s possui %s registros inativos pendentes para expurgo ..." % (key,str(val))) | |
else: | |
print(">>> Nenhum sistema origem possui registros para expurgar.") | |
return lista | |
# Define os parametros esperados e obrigatorios | |
parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,description=textwrap.dedent('''\ | |
Expurgo de registros do MDM da informatica usando o serviço SOAP SIF ExecuteBatchDelete! | |
Maiores informacoes leia a doc. MDM_XXXHFX_ServicesIntegrationFramework[SIF]Guide_en.pdf | |
---------------------------------------------------------------------------------------- | |
Created By: Fernando Lino Di Tomazzo Silva ( https://www.linkedin.com/in/flinox ) | |
Version 1.0 - 2019-08-13 | |
''')) | |
parser.add_argument("connectstring", help="Informe o endereco para o banco oracle onde estão os dados do MDM, Ex.: hostname:port/service") | |
parser.add_argument("mdmsifurl", help="Informe a url dos serviços SIF do MDM, Ex.: http://hostname:port/cmx/services/SifService") | |
parser.add_argument("mdmorsid", help="Informe o nome do ORS ID do seu MDM, Ex.: banco-CMX_ORS") | |
parser.add_argument("mdmbo", help="Informe o nome da BO que quer realizar o expurgo de registros inativos HUB_STATE_IND = -1, Ex.: C_B_PESSOA ") | |
parser.add_argument("mdmtabaux", help="Informe o nome da tabela auxiliar que deve conter os ROWID_XREF's dos registros que deverão ser expurgados, ela deve conter apenas um campo ROWID_XREF, Ex.: PURGE_AUX") | |
parser.add_argument("lote", help="Informe a qtde de registros por lote para expurgo", default=500, type=int) | |
parser.add_argument("recalculatebvt", help="Informe se após o expurgo você deseva recalcular o BVT") | |
parser.add_argument("cascading", help="Informe se você deseja que o expurgo seja em cascata para todas as tabelas relacionadas aquele registro que será expurgado") | |
parser.add_argument("overridehistory", help="Informe se você quer que o MDM registre a atividade executada pela exclusão do lote nas tabelas do histórico.") | |
parser.add_argument("purgehistory", help="Informe se você quer que o MDM exclua todos os registros do histórico de não mesclagem relacionados ao registro de cross reference excluído.") | |
args = parser.parse_args() | |
print(">"*80 +"\n") | |
print(">>> Verificando parametros e variaveis ...") | |
listaorigens = [] | |
origem = '' | |
lote = args.lote | |
origemuser = os.environ['SRCUSER'] | |
origempass = os.environ['SRCPASS'] | |
mdmuser = os.environ['MDMUSER'] | |
mdmpass = os.environ['MDMPASS'] | |
connectstring = origemuser+'/'+origempass+'@'+args.connectstring #args.connectstring | |
mdmbo = args.mdmbo | |
mdmtabaux = args.mdmtabaux | |
mdmorsid = args.mdmorsid | |
url=args.mdmsifurl | |
recalculatebvt = args.recalculatebvt | |
cascading = args.cascading | |
overridehistory = args.overridehistory | |
purgehistory = args.purgehistory | |
print(">>> Inicio do processo de expurgo de inativos %s " % datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")) | |
while True: | |
try: | |
# Realiza a conexão com o banco ... | |
con = conexao(connectstring) | |
listaorigens = get_qtde_para_expurgo_por_origem(con,mdmbo) | |
for item in listaorigens: | |
for key,val in item.items(): | |
origem = key | |
origem_qtde = val | |
recount = 0 | |
while(origem_qtde > 0): | |
qtdedeleted = 0 | |
# Deleta tabela auxiliar, insere novos registros selecionados | |
try: | |
# Seleciona 100 registros para expurgo | |
print(">>> Selecionando um lote de até %s registros para expurgo da origem %s ..." % ((lote),origem)) | |
queryselect = "SELECT XRPE.ROWID_XREF FROM CMX_ORS.@mdmbo@_XREF XRPE INNER JOIN CMX_ORS.@mdmbo@ BOPE ON (XRPE.ROWID_OBJECT = BOPE.ROWID_OBJECT) WHERE BOPE.HUB_STATE_IND = -1 AND XRPE.ROWID_SYSTEM = '@origem@' AND ROWNUM < @lote@" | |
queryselect = queryselect.replace('@origem@',origem) | |
queryselect = queryselect.replace('@lote@',str(lote+1)) | |
queryselect = queryselect.replace('@mdmbo@',str(mdmbo)) | |
cur = con.cursor() | |
# Deleta a tabela atual | |
print(">>> Limpando a tabela CMX_ORS.%s ..." % mdmtabaux) | |
querydelete = ("DELETE FROM CMX_ORS.%s" % mdmtabaux) | |
cur.execute(querydelete) | |
con.commit() | |
time.sleep(1) | |
print(">>> Inserindo os rowid_xref dos registros selecionados na CMX_ORS.PURGE_AUX ...") | |
queryinsert = ("INSERT INTO CMX_ORS.%s %s " % (mdmtabaux,queryselect)) | |
cur.execute(queryinsert) | |
con.commit() | |
time.sleep(1) | |
except: | |
e = sys.exc_info()[0] | |
print("Error: %s" % e) | |
print(sys.exc_info()) | |
finally: | |
# Encerra a conexao com o banco | |
print(">>> Commitando as transacoes ...") | |
time.sleep(1) | |
cur.close() | |
con.commit() | |
# Realiza a chamada do serviço para o expurgo | |
try: | |
# Realiza a chamada do serviço para o expurgo | |
print(">>> Executando o serviço SIF ExecuteBatchDelete do MDM ...") | |
headers = {'content-type': 'text/xml', 'SOAPAction': 'POST'} | |
body = """<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:urn="urn:siperian.api"> | |
<soapenv:Header/> | |
<soapenv:Body> | |
<urn:executeBatchDelete> | |
<!--Optional:--> | |
<urn:username>@mdmuser@</urn:username> | |
<!--Optional:--> | |
<urn:password> | |
<urn:password>@mdmpass@</urn:password> | |
<urn:encrypted>false</urn:encrypted> | |
</urn:password> | |
<!--Optional:--> | |
<urn:orsId>@mdmorsid@</urn:orsId> | |
<!--Optional:--> | |
<urn:interactionId></urn:interactionId> | |
<!--Optional:--> | |
<urn:asynchronousOptions> | |
<urn:isAsynchronous>false</urn:isAsynchronous> | |
<!--Optional:--> | |
<urn:jmsReplyTo></urn:jmsReplyTo> | |
<!--Optional:--> | |
<urn:jmsCorrelationId></urn:jmsCorrelationId> | |
</urn:asynchronousOptions> | |
<urn:tableName>@mdmbo@</urn:tableName> | |
<urn:sourceTableName>@mdmtabaux@</urn:sourceTableName> | |
<urn:recalculateBvt>@recalculatebvt@</urn:recalculateBvt> | |
<urn:cascading>@cascading@</urn:cascading> | |
<urn:overrideHistory>@overridehistory@</urn:overrideHistory> | |
<urn:purgeHistory>@purgehistory@</urn:purgeHistory> | |
</urn:executeBatchDelete> | |
</soapenv:Body> | |
</soapenv:Envelope>""" | |
body = body.replace("@mdmuser@",mdmuser) | |
body = body.replace("@mdmpass@",mdmpass) | |
body = body.replace("@mdmbo@",mdmbo) | |
body = body.replace("@mdmtabaux@",mdmtabaux) | |
body = body.replace("@mdmorsid@",mdmorsid) | |
body = body.replace("@recalculatebvt@",recalculatebvt) | |
body = body.replace("@cascading@",cascading) | |
body = body.replace("@overridehistory@",overridehistory) | |
body = body.replace("@purgehistory@",purgehistory) | |
response = requests.post(url,data=body,headers=headers) | |
print("") | |
print(response.content.decode('utf8')) | |
xml=ET.fromstring(response.content.decode('utf8')) | |
for child in xml[0][0]: | |
if child.tag == "{urn:siperian.api}processedXrefsCount": | |
qtdedeleted = int(child.text) | |
print("") | |
print(">>> Registros deletados: %s " % str(qtdedeleted)) | |
print("") | |
except: | |
e = sys.exc_info()[0] | |
print("Error: %s" % e) | |
finally: | |
print(">"*80 +"\n") | |
if qtdedeleted == 0: | |
recount += 1 | |
if recount > 6: | |
recountqry = "SELECT COUNT(1) FROM CMX_ORS.@mdmbo@_XREF XRPE INNER JOIN CMX_ORS.@mdmbo@ BOPE ON (XRPE.ROWID_OBJECT = BOPE.ROWID_OBJECT) WHERE BOPE.HUB_STATE_IND = -1 AND XRPE.ROWID_SYSTEM = '@origem@'" | |
recountqry = recountqry.replace('@origem@',origem) | |
recountqry = recountqry.replace('@mdmbo@',str(mdmbo)) | |
cur = con.cursor() | |
cur.execute(recountqry) | |
for result in cur: | |
origem_qtde = int(result[0]) | |
if origem_qtde == 0: | |
print(">>> Não existem mais registros para expurgar da origem %s " % origem) | |
continue | |
cur.close() | |
origem_qtde = origem_qtde - ( qtdedeleted ) | |
print(">>> Processo em execucao %s " % datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")) | |
print(">>> Restando %s registros inativos pendentes para %s ..." % (origem_qtde,origem)) | |
print(">>> Lote finalizado para origem %s " % origem) | |
print(">>> Termino do lote em %s <<< " % datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")) | |
# Encerrando a conexão com o banco | |
con.close() | |
print(">>> Termino do processo de expurgo de inativos %s " % datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")) | |
print("") | |
print(">"*80 +"\n") | |
exit(0) | |
except KeyboardInterrupt: | |
print(">>> Processo finalizado pelo usuário ( CTRL + C ) <<<") | |
print(">"*80 +"\n") | |
exit(888) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment