Skip to content

Instantly share code, notes, and snippets.

@jjesusfilho
Last active January 15, 2025 12:49
Show Gist options
  • Save jjesusfilho/31b505195dda1080447dcfeaed481c82 to your computer and use it in GitHub Desktop.
Save jjesusfilho/31b505195dda1080447dcfeaed481c82 to your computer and use it in GitHub Desktop.
Consulta processo do tjsp via mni
from zeep import Client
import xmltodict
import json
from dotenv import load_dotenv
import os
import getpass
def consultar_processo(
usuario= None,
senha = None,
numero_processo = None,
cabecalho = True,
incluir_movimentos = False,
data_referencia = None,
incluir_documentos = False,
diretorio = "."
):
if isinstance(numero_processo, list) == False:
return "Número do processo deve ser uma lista"
load_dotenv()
wsdl = 'http://esaj.tjsp.jus.br/mniws/servico-intercomunicacao-2.2.2/intercomunicacao?wsdl'
if usuario is None or senha is None:
usuario = os.getenv("TJSPMNIUSUARIO")
senha = os.getenv("TJSPMNISENHA")
if not usuario or not senha:
usuario = input("Forneça o usuario: ")
senha = getpass.getpass("Forneça a senha: ")
client = Client(wsdl = wsdl)
for p in numero_processo:
try:
with client.settings( raw_response = True, strict = False):
resposta = client.service.consultarProcesso(
idConsultante = usuario,
senhaConsultante = senha,
numeroProcesso = p,
incluirCabecalho = cabecalho,
movimentos = incluir_movimentos,
dataReferencia = data_referencia,
incluirDocumentos = incluir_documentos
)
dict_data = xmltodict.parse(resposta.content,
process_namespaces= True,
attr_prefix = '',
namespaces = {'http://www.cnj.jus.br/tipos-servico-intercomunicacao-2.2.2':None,
'http://www.cnj.jus.br/intercomunicacao-2.2.2':None,
'http://www.cnj.jus.br/mni/cda': None,
'http://www.cnj.jus.br/servico-intercomunicacao-2.2.2/':None}
)
dict_data = dict_data["http://schemas.xmlsoap.org/soap/envelope/:Envelope"]["http://schemas.xmlsoap.org/soap/envelope/:Body"]["consultarProcessoResposta"]["processo"]
del dict_data['dadosBasicos']['xmlns']
json_data = json.dumps(dict_data, indent=4)
arquivo = os.path.join(diretorio, "api_processo_" + p + ".json")
with open(arquivo, "w") as json_file:
json_file.write(json_data)
except:
pass
def baixar_documento(
usuario= None,
senha = None,
numero_processo = None,
documento = None,
diretorio = "."
):
if isinstance(numero_processo, list) == False:
return "Número do processo deve ser uma lista"
load_dotenv()
wsdl = 'http://esaj.tjsp.jus.br/mniws/servico-intercomunicacao-2.2.2/intercomunicacao?wsdl'
if usuario is None or senha is None:
usuario = os.getenv("TJSPMNIUSUARIO")
senha = os.getenv("TJSPMNISENHA")
if not usuario or not senha:
usuario = input("Forneça o usuario: ")
senha = getpass.getpass("Forneça a senha: ")
client = Client(wsdl = wsdl)
for p in numero_processo:
try:
with client.settings( raw_response = True, strict = False):
resposta = client.service.consultarProcesso(
idConsultante = usuario,
senhaConsultante = senha,
numeroProcesso = p,
documento = documento
)
dict_data = xmltodict.parse(resposta.content,
process_namespaces= True,
attr_prefix = '',
namespaces = {'http://www.cnj.jus.br/tipos-servico-intercomunicacao-2.2.2':None,
'http://www.cnj.jus.br/intercomunicacao-2.2.2':None,
'http://www.cnj.jus.br/mni/cda': None,
'http://www.cnj.jus.br/servico-intercomunicacao-2.2.2/':None}
)
dict_data = dict_data["http://schemas.xmlsoap.org/soap/envelope/:Envelope"]["http://schemas.xmlsoap.org/soap/envelope/:Body"]["consultarProcessoResposta"]["processo"]
del dict_data['dadosBasicos']['xmlns']
json_data = json.dumps(dict_data, indent=4)
arquivo = os.path.join(diretorio, "api_processo_documento" + p + ".json")
with open(arquivo, "w") as json_file:
json_file.write(json_data)
except:
pass
import os
import json
import getpass
import xmltodict
from zeep import Client
from dotenv import load_dotenv
class MNIClient:
def __init__(self, usuario=None, senha=None):
load_dotenv()
self.wsdl = 'http://esaj.tjsp.jus.br/mniws/servico-intercomunicacao-2.2.2/intercomunicacao?wsdl'
self.usuario = usuario or os.getenv("TJSPMNIUSUARIO") or input("Forneça o usuario: ")
self.senha = senha or os.getenv("TJSPMNISENHA") or getpass.getpass("Forneça a senha: ")
self.client = Client(wsdl=self.wsdl)
def consultar_processo(self, numeros_processo, cabecalho=True, incluir_movimentos=False,
data_referencia=None, incluir_documentos=False, formato="xml", diretorio="."):
if not isinstance(numeros_processo, list):
raise ValueError("Argumento numeros_processo deve ser uma lista")
for p in numeros_processo:
try:
with self.client.settings(raw_response=True, strict=False):
resposta = self.client.service.consultarProcesso(
idConsultante=self.usuario,
senhaConsultante=self.senha,
numeroProcesso=p,
incluirCabecalho=cabecalho,
movimentos=incluir_movimentos,
dataReferencia=data_referencia,
incluirDocumentos=incluir_documentos
)
self._save_response(resposta.content, p, formato, diretorio)
except Exception as e:
print(f"Erro ao processar o processo {p}: {e}")
def baixar_documento(self, numeros_processo, documento=None, formato="xml", diretorio="."):
if not isinstance(numeros_processo, list):
raise ValueError("Número do processo deve ser uma lista")
for p in numeros_processo:
try:
with self.client.settings(raw_response=True, strict=False):
resposta = self.client.service.consultarProcesso(
idConsultante=self.usuario,
senhaConsultante=self.senha,
numeroProcesso=p,
documento=documento
)
self._save_response(resposta.content, f"documento_{p}", formato, diretorio)
except Exception as e:
print(f"Erro ao processar o processo {p}: {e}")
def _save_response(self, content, identifier, formato, diretorio):
if formato.lower() == "xml":
file_path = os.path.join(diretorio, f"mni_processo_{identifier}.xml")
with open(file_path, "wb") as file:
file.write(content)
elif formato.lower() == "json":
dict_data = xmltodict.parse(
content,
process_namespaces=True,
attr_prefix='',
namespaces={
'http://www.cnj.jus.br/tipos-servico-intercomunicacao-2.2.2': None,
'http://www.cnj.jus.br/intercomunicacao-2.2.2': None,
'http://www.cnj.jus.br/mni/cda': None,
'http://www.cnj.jus.br/servico-intercomunicacao-2.2.2/': None
}
)
dict_data = dict_data["http://schemas.xmlsoap.org/soap/envelope/:Envelope"][
"http://schemas.xmlsoap.org/soap/envelope/:Body"]["consultarProcessoResposta"]["processo"]
del dict_data['dadosBasicos']['xmlns']
file_path = os.path.join(diretorio, f"mni_processo_{identifier}.json")
with open(file_path, "w") as json_file:
json.dump(dict_data, json_file, indent=4)
else:
raise ValueError("Formato inválido. Escolha 'json' ou 'xml'.")
# Example usage:
# client = MNIClient()
# client.consultar_processo(["123456789", "987654321"], formato="json")
# client.baixar_documento(["123456789"], documento="some_document_id", formato="xml")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment