Last active
August 6, 2020 13:06
-
-
Save joffilyfe/88f6f111a46f9aeb663fccdc96d835fa to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import abc | |
import os | |
import re | |
import shutil | |
import logging | |
import tempfile | |
import zipfile | |
import threading | |
from datetime import datetime | |
from ftplib import FTP, all_errors | |
from minio import Minio | |
from minio.error import ResponseError, NoSuchBucket | |
LOGGER = logging.getLogger(__name__) | |
class AsyncFTP(threading.Thread): | |
def __init__( | |
self, local_file_path, server, user, password, remote_path, timeout=60 | |
): | |
threading.Thread.__init__(self) | |
self.local_file_path = local_file_path | |
self.server = server | |
self.user = user | |
self.password = password | |
self.remote_path = remote_path | |
self.timeout = timeout | |
def run(self): | |
LOGGER.info("FTP.START") | |
try: | |
ftp = FTP(self.server, self.user, self.password, self.timeout) | |
except all_errors as e: | |
LOGGER.info(e) | |
return | |
try: | |
if self.remote_path: | |
ftp.cwd(self.remote_path) | |
LOGGER.info("ftp " + self.local_file_path) | |
remote_name = os.path.basename(self.local_file_path) | |
with open(self.local_file_path, "rb") as f: | |
try: | |
LOGGER.info("FTP.STOR %s - start" % remote_name) | |
ftp.storbinary("STOR {}".format(remote_name), f) | |
LOGGER.info("FTP.STOR %s - end" % remote_name) | |
except all_errors: | |
LOGGER.info( | |
"FTP: Unable to send %s to %s" | |
% (self.local_file_path, remote_name), | |
exc_info=True, | |
) | |
except all_errors as e: | |
LOGGER.exception("all_errors as e") | |
finally: | |
ftp.close() | |
LOGGER.info("FTP.END") | |
class Exporter(abc.ABC): | |
"""Abstract Class that should be used as template""" | |
@abc.abstractmethod | |
def export(self, file_path: str) -> dict: | |
raise NotImplementedError | |
class FTPExporter(Exporter): | |
def __init__( | |
self, host, user, password, destination_dir_path, ftp_service: object = AsyncFTP | |
): | |
self.host = host | |
self.user = user | |
self.password = password | |
self.destination_dir_path = destination_dir_path | |
self.ftp_service = ftp_service | |
def export(self, zip_file: str): | |
try: | |
background = self.ftp_service( | |
zip_file, self.host, self.user, self.password, self.destination_dir_path | |
) | |
except (PermissionError, OSError) as exception: | |
LOGGER.error( | |
"%s - Unable to move zip file '%s' to destination '%s'. " | |
"The following exception was raised: '%s'", | |
self.__class__.__name__, | |
zip_file, | |
self.destination_dir_path, | |
exception, | |
) | |
else: | |
background.start() | |
class FileSystemExporter(Exporter): | |
"""Copy a file to destination""" | |
def __init__(self, destination_dir_path: str): | |
self.destination_dir_path = destination_dir_path | |
if not os.path.exists(self.destination_dir_path): | |
os.makedirs(self.destination_dir_path) | |
def export(self, zip_file: str): | |
"""Execute the proccess of export file to destination""" | |
try: | |
shutil.copy(zip_file, self.destination_dir_path) | |
except (PermissionError, OSError) as exception: | |
LOGGER.error( | |
"%s - Unable to move zip file '%s' to destination '%s'. " | |
"The following exception was raised: '%s'", | |
self.__class__.__name__, | |
zip_file, | |
self.destination_dir_path, | |
exception, | |
) | |
class MinioExporter(Exporter): | |
"""Sends a file to a Min.io object storage""" | |
def __init__( | |
self, host, access_key, secret_key, secure=False, bucket_name="xmlconverter", | |
): | |
self.bucket_name = bucket_name | |
self.POLICY_READ_ONLY = { | |
"Version": "2012-10-17", | |
"Statement": [ | |
{ | |
"Effect": "Allow", | |
"Principal": {"AWS": ["*"]}, | |
"Action": ["s3:GetBucketLocation", "s3:ListBucket"], | |
"Resource": [f"arn:aws:s3:::{self.bucket_name}"], | |
}, | |
{ | |
"Effect": "Allow", | |
"Principal": {"AWS": ["*"]}, | |
"Action": ["s3:GetObject"], | |
"Resource": [f"arn:aws:s3:::{self.bucket_name}/*"], | |
}, | |
], | |
} | |
self._client = Minio( | |
host, access_key=access_key, secret_key=secret_key, secure=secure, | |
) | |
def export(self, file_path: str): | |
"""Put a object to the configurated bucket""" | |
object_name = os.path.basename(file_path) | |
self._client.fput_object( | |
self.bucket_name, object_name=object_name, file_path=file_path | |
) | |
LOGGER.info("ENVIANDO ARQUIVO PARA O MINIO %s", file_path) | |
class TemporaryZipFile: | |
"""Compress a source folder and generate a temporary Zip file""" | |
def __init__(self, package_source_dir_path: str): | |
self.package_source_dir_path = package_source_dir_path | |
def _create_temporary_zip_file( | |
self, directory_files_path: str, zip_filename: str | |
) -> str: | |
"""Makes a compressed file with directory source files and return a | |
path to the tempraty zip file.""" | |
destination_directory_path = tempfile.mkdtemp() | |
file_path = os.path.join(destination_directory_path, zip_filename) | |
with zipfile.ZipFile(file_path, "w") as zf: | |
LOGGER.debug( | |
"%s - Creating zip file '%s' with files from %s.", | |
self.__class__.__name__, | |
file_path, | |
directory_files_path, | |
) | |
for item in os.listdir(directory_files_path): | |
path = os.path.join(directory_files_path, item) | |
zf.write(path, arcname=item) | |
LOGGER.debug( | |
"%s - Zip file '%s' created.", self.__class__.__name__, file_path, | |
) | |
return file_path | |
def get_zip_file(self): | |
root_dir, dir_base_name = ( | |
os.path.dirname(self.package_source_dir_path), | |
os.path.basename(self.package_source_dir_path), | |
) | |
# preppend current datetime to the package name | |
package_file_name = os.path.join( | |
"%s_%s" % (re.sub(r"[\W\s]", "-", str(datetime.now())), dir_base_name,) | |
) | |
final_path = os.path.join(root_dir, package_file_name) | |
try: | |
zip_file = self._create_temporary_zip_file( | |
directory_files_path=self.package_source_dir_path, | |
zip_filename="{}.zip".format(final_path), | |
) | |
except (IOError, PermissionError) as e: | |
LOGGER.exception( | |
"%s - Unable to create zip for package '%s'. The following " | |
"exception was raised", | |
self.__class__.__name__, | |
package_file_name, | |
) | |
raise e | |
else: | |
return zip_file | |
def __enter__(self): | |
return self.get_zip_file() | |
def __exit__(self, type, value, traceback): | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment