Created
July 13, 2017 20:31
-
-
Save vandorjw/950f800981fed31d8645fc125fb13ab4 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from datetime import datetime | |
from ftplib import FTP | |
from functools import partial | |
import hashlib | |
import logging | |
import os | |
import sys | |
import boto3 | |
from botocore.errorfactory import ClientError | |
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO) | |
try: | |
server = os.environ['ftp_server'] | |
user = os.environ['ftp_user'] | |
password = os.environ['ftp_password'] | |
tld = os.environ['ftp_tld'] | |
local_root = os.environ['ftp_local_root'] | |
s3_bucket_name = os.environ['ftp_s3_bucket_name'] | |
s3_folder_name = os.environ['ftp_s3_folder_name'] | |
except KeyError as error: | |
logging.error("Environment variable `{}` is not set".format(rror.args[0])) | |
sys.exit(1) | |
data_files = "{local_root}/data-files/".format(local_root=local_root) | |
trigger_files = "{local_root}/trigger-files/".format(local_root=local_root) | |
archived_files = "{local_root}/archived-files/".format(local_root=local_root) | |
if not os.path.exists(data_files): | |
os.makedirs(data_files) | |
if not os.path.exists(trigger_files): | |
os.makedirs(trigger_files) | |
if not os.path.exists(archived_files): | |
os.makedirs(archived_files) | |
def push_to_s3(): | |
""" | |
uses boto3 to contect to AWS S3 to archive the data_files. | |
See: http://boto3.readthedocs.io/en/stable/reference/services/s3.html | |
Since all .zip and .gz files have a timestamp in the filename, | |
I orginized the folderstructure on s3 accordingly. | |
Daily arhives can be found under: | |
/daily/yyymmdd/filename | |
""" | |
s3 = boto3.resource('s3') | |
bucket = s3.Bucket(s3_bucket_name) | |
file_list = os.listdir(data_files) | |
archived_list = os.listdir(archived_files) | |
for filename in file_list: | |
if filename in archived_list: | |
# If a file has been archived in the past, I try to save | |
# bandwidth and not overwrite an existing file. | |
logging.info("skipping {}".format(filename)) | |
continue | |
else: | |
filepath = data_files + filename | |
timestamp = filename.split('.')[0].split('_')[-1] | |
with open(filepath, 'rb') as data: | |
key = "{}/{}/{}".format(s3_folder_name, timestamp, filename) | |
logging.info("uploading {}".format(key)) | |
bucket.put_object(Key=key, Body=data) | |
archived_path = archived_files + filename | |
with open(archived_path, 'w') as f: | |
f.write("{} uploaded at: {}".format(filename, datetime.now().timestamp())) | |
def md5sum(filename): | |
""" | |
Method to which calculates the md5sum of a file. | |
""" | |
filepath = "{data_files}/{filename}".format(data_files=data_files, filename=filename) | |
with open(filepath, mode='rb') as f: | |
d = hashlib.md5() | |
for buf in iter(partial(f.read, 128), b''): | |
d.update(buf) | |
return d.hexdigest() | |
def get_filenames(ftp, directory): | |
""" | |
ftp: An FTP interface | |
directory: The complete directory path. | |
- Example: /Daily/NAV/CAN_FO/ | |
This functions returns a complete list of filenames located in a directory. | |
Folders are excluded. | |
""" | |
items = ftp.mlsd() | |
filenames = [] | |
for filename, fact in items: | |
if fact['type'] not in ['dir', 'cdir']: | |
filenames.append(filename) | |
return filenames | |
def verify_archive_integrity(ftp, archive_name): | |
""" | |
Since there does not always exist a md5 checksum for all files, | |
we do our best to check the md5sum when it exists. | |
If the md5sum of the downloaded archive does not match the checksum given in | |
the checksum file, we log an error. | |
If no checksum file is found, we hope for the best... | |
""" | |
try: | |
CMD = "RETR {archive_name}.checksum".format(archive_name=archive_name) | |
checksum_file = "{data_files}/{archive_name}.checksum".format(data_files=data_files, archive_name=archive_name) | |
remote_checksum_file = "{archive_name}.checksum".format(archive_name=archive_name) | |
try: | |
filelist = [filename for filename, fact in ftp.mlsd()] | |
except Exception as error: | |
logging.error("Error in `filelist = [filename for filename, fact in ftp.mlsd()]`") | |
if remote_checksum_file in filelist: | |
with open(checksum_file, 'wb') as f: | |
ftp.retrbinary(CMD, f.write) | |
with open(checksum_file, 'rb') as f: | |
checksum_value = f.readline() | |
try: | |
computed_sum = md5sum(archive_name) | |
except Exception as error: | |
logging.error("computed_sum = md5sum(archive_name)") | |
logging.error(error) | |
checksum_value = str(checksum_value, 'utf-8').rstrip() | |
if computed_sum == checksum_value: | |
logging.info("{archive_name} is valid".format(archive_name=archive_name)) | |
return True | |
else: | |
logging.info("{archive_name} is invalid: {computed_sum} vs {checksum_value}".format(archive_name=archive_name, computed_sum=computed_sum, checksum_value=checksum_value)) | |
return False | |
else: | |
logging.info("{remote_checksum_file} does not exist".format(remote_checksum_file=remote_checksum_file)) | |
return True | |
except Exception as error: | |
logging.error("Error in verify_archive_integrity") | |
logging.error(error) | |
return False | |
def download_compressed_feed(ftp, filenames): | |
""" | |
Takes as input the list of filenames | |
generated by the function: get_filenames. | |
From filenames list, I construct 2 lists. | |
a) list of triggers (found in the current working directory) | |
b) list of archives (found in the current working directory) | |
successfull_triggers is a list of all successfully "downloaded" | |
archives. If a triggername is found in successfull_triggers, | |
I skip a repeat download. | |
successfull_triggers is NOT the same as archived_files. | |
archived_files have been pushed to S3. | |
successfull_triggers have been downloaded to local Filesystem | |
Notice that this method downloads an archive IF there is an associated | |
trigger file. | |
This means that if a trigger_file exists, for which no archive exists, | |
no error is logged. | |
If an archive exists, and no trigger file is found, the archive is not | |
downloaded. No error is logged. The assumption is that the ftp has not | |
finished their upload of the archive and a trigger_file will be created once | |
the upload is finished! | |
""" | |
trigger_list = [filename for filename in filenames if filename.endswith(".trigger") or filename.endswith("trigger.txt")] | |
archive_list = [filename for filename in filenames if filename.endswith(".gz") or filename.endswith(".zip")] | |
successfull_triggers = os.listdir(trigger_files) | |
for trigger_name in trigger_list: | |
if trigger_name in successfull_triggers: | |
continue # skip downloading files we already have | |
if trigger_name.endswith('txt'): | |
prefix = trigger_name.split('_trigger')[0] | |
else: | |
prefix = trigger_name.split('.')[0] | |
for archive_name in archive_list: | |
if archive_name.startswith(prefix): | |
try: | |
CMD = "RETR {archive_name}".format(archive_name=archive_name) | |
download_location = "{data_files}/{archive_name}".format(data_files=data_files, archive_name=archive_name) | |
with open(download_location, 'wb') as f: | |
ftp.retrbinary(CMD, f.write) | |
except Exception as error: | |
logging.error("Error while downloading archive") | |
logging.error(error) | |
is_valid = verify_archive_integrity(ftp, archive_name) | |
if is_valid: | |
# If the archive is verified to be valid, we download the | |
# triggerfile to a local directory. | |
# we use this directory to skip downloading known successfull files | |
# NOTICE: An archive is assumed to be valid if ftp server | |
# failed to provide a checksum file. | |
# The archive may be corrupt, but I had no way to know! | |
try: | |
CMD = "RETR {trigger_name}".format(trigger_name=trigger_name) | |
trigger_download_location = "{trigger_files}/{trigger_name}".format(trigger_files=trigger_files, trigger_name=trigger_name) | |
logging.info("Downloading {trigger_download_location}".format(trigger_download_location=trigger_download_location)) | |
with open(trigger_download_location, 'wb') as f: | |
ftp.retrbinary(CMD, f.write) | |
except Exception as error: | |
logging.error("Error while downloading trigger file") | |
logging.error(error) | |
else: | |
# the archive was not valid, remove the broken file | |
logging.info("Archive is corrupt. Removing: {download_location}".format(download_location=download_location)) | |
# I am not removing anything just yet... | |
# os.remove(download_location) | |
def ftp_walk(ftp, directory): | |
""" | |
Starting at the given directory, this method will recursively | |
"walk" down each subfolder and attempt to download the data_archives. | |
""" | |
logging.info("Path: {}".format(directory)) | |
ftp.cwd(directory) | |
filenames = get_filenames(ftp, directory) | |
download_compressed_feed(ftp, filenames) | |
items = ftp.mlsd() | |
for filename, fact in items: | |
if fact['type'] == 'dir': | |
new_path = "{directory}/{filename}".format(directory=directory, filename=filename) | |
try: | |
ftp_walk(ftp, new_path) | |
except Exception as error: | |
logging.error("Error in ftp_walk recursive call") | |
logging.error(error) | |
def main(server, user, password, tld): | |
""" | |
A wrapper functions which calls: | |
`ftp_walk` and `push_to_s3` | |
tld: Top Level Directory, this is the directory where we begin our search | |
for data_archives. Every subfolder in tld is checked for files. | |
See ftp_walk for details. | |
""" | |
try: | |
with FTP(server) as ftp: | |
ftp.login(user, password) | |
ftp_walk(ftp, tld) | |
except Exception as error: | |
logging.error("Exception in main") | |
logging.error(error) | |
sys.exit(1) | |
# archive files on S3 | |
try: | |
push_to_s3() | |
# TODO: After a successull push, delete them from local filesystem | |
except Exception as error: | |
logging.error("Failed to push to S3 bucket!") | |
logging.error(error) | |
sys.exit(1) | |
# successfully exit | |
sys.exit(0) | |
if __name__ == "__main__": | |
main(server, user, password, tld) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment