Skip to content

Instantly share code, notes, and snippets.

@vandorjw
Created July 13, 2017 20:31
Show Gist options
  • Save vandorjw/950f800981fed31d8645fc125fb13ab4 to your computer and use it in GitHub Desktop.
Save vandorjw/950f800981fed31d8645fc125fb13ab4 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
from datetime import datetime
from ftplib import FTP
from functools import partial
import hashlib
import logging
import os
import sys
import boto3
from botocore.errorfactory import ClientError
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO)
try:
server = os.environ['ftp_server']
user = os.environ['ftp_user']
password = os.environ['ftp_password']
tld = os.environ['ftp_tld']
local_root = os.environ['ftp_local_root']
s3_bucket_name = os.environ['ftp_s3_bucket_name']
s3_folder_name = os.environ['ftp_s3_folder_name']
except KeyError as error:
logging.error("Environment variable `{}` is not set".format(rror.args[0]))
sys.exit(1)
data_files = "{local_root}/data-files/".format(local_root=local_root)
trigger_files = "{local_root}/trigger-files/".format(local_root=local_root)
archived_files = "{local_root}/archived-files/".format(local_root=local_root)
if not os.path.exists(data_files):
os.makedirs(data_files)
if not os.path.exists(trigger_files):
os.makedirs(trigger_files)
if not os.path.exists(archived_files):
os.makedirs(archived_files)
def push_to_s3():
"""
uses boto3 to contect to AWS S3 to archive the data_files.
See: http://boto3.readthedocs.io/en/stable/reference/services/s3.html
Since all .zip and .gz files have a timestamp in the filename,
I orginized the folderstructure on s3 accordingly.
Daily arhives can be found under:
/daily/yyymmdd/filename
"""
s3 = boto3.resource('s3')
bucket = s3.Bucket(s3_bucket_name)
file_list = os.listdir(data_files)
archived_list = os.listdir(archived_files)
for filename in file_list:
if filename in archived_list:
# If a file has been archived in the past, I try to save
# bandwidth and not overwrite an existing file.
logging.info("skipping {}".format(filename))
continue
else:
filepath = data_files + filename
timestamp = filename.split('.')[0].split('_')[-1]
with open(filepath, 'rb') as data:
key = "{}/{}/{}".format(s3_folder_name, timestamp, filename)
logging.info("uploading {}".format(key))
bucket.put_object(Key=key, Body=data)
archived_path = archived_files + filename
with open(archived_path, 'w') as f:
f.write("{} uploaded at: {}".format(filename, datetime.now().timestamp()))
def md5sum(filename):
"""
Method to which calculates the md5sum of a file.
"""
filepath = "{data_files}/{filename}".format(data_files=data_files, filename=filename)
with open(filepath, mode='rb') as f:
d = hashlib.md5()
for buf in iter(partial(f.read, 128), b''):
d.update(buf)
return d.hexdigest()
def get_filenames(ftp, directory):
"""
ftp: An FTP interface
directory: The complete directory path.
- Example: /Daily/NAV/CAN_FO/
This functions returns a complete list of filenames located in a directory.
Folders are excluded.
"""
items = ftp.mlsd()
filenames = []
for filename, fact in items:
if fact['type'] not in ['dir', 'cdir']:
filenames.append(filename)
return filenames
def verify_archive_integrity(ftp, archive_name):
"""
Since there does not always exist a md5 checksum for all files,
we do our best to check the md5sum when it exists.
If the md5sum of the downloaded archive does not match the checksum given in
the checksum file, we log an error.
If no checksum file is found, we hope for the best...
"""
try:
CMD = "RETR {archive_name}.checksum".format(archive_name=archive_name)
checksum_file = "{data_files}/{archive_name}.checksum".format(data_files=data_files, archive_name=archive_name)
remote_checksum_file = "{archive_name}.checksum".format(archive_name=archive_name)
try:
filelist = [filename for filename, fact in ftp.mlsd()]
except Exception as error:
logging.error("Error in `filelist = [filename for filename, fact in ftp.mlsd()]`")
if remote_checksum_file in filelist:
with open(checksum_file, 'wb') as f:
ftp.retrbinary(CMD, f.write)
with open(checksum_file, 'rb') as f:
checksum_value = f.readline()
try:
computed_sum = md5sum(archive_name)
except Exception as error:
logging.error("computed_sum = md5sum(archive_name)")
logging.error(error)
checksum_value = str(checksum_value, 'utf-8').rstrip()
if computed_sum == checksum_value:
logging.info("{archive_name} is valid".format(archive_name=archive_name))
return True
else:
logging.info("{archive_name} is invalid: {computed_sum} vs {checksum_value}".format(archive_name=archive_name, computed_sum=computed_sum, checksum_value=checksum_value))
return False
else:
logging.info("{remote_checksum_file} does not exist".format(remote_checksum_file=remote_checksum_file))
return True
except Exception as error:
logging.error("Error in verify_archive_integrity")
logging.error(error)
return False
def download_compressed_feed(ftp, filenames):
"""
Takes as input the list of filenames
generated by the function: get_filenames.
From filenames list, I construct 2 lists.
a) list of triggers (found in the current working directory)
b) list of archives (found in the current working directory)
successfull_triggers is a list of all successfully "downloaded"
archives. If a triggername is found in successfull_triggers,
I skip a repeat download.
successfull_triggers is NOT the same as archived_files.
archived_files have been pushed to S3.
successfull_triggers have been downloaded to local Filesystem
Notice that this method downloads an archive IF there is an associated
trigger file.
This means that if a trigger_file exists, for which no archive exists,
no error is logged.
If an archive exists, and no trigger file is found, the archive is not
downloaded. No error is logged. The assumption is that the ftp has not
finished their upload of the archive and a trigger_file will be created once
the upload is finished!
"""
trigger_list = [filename for filename in filenames if filename.endswith(".trigger") or filename.endswith("trigger.txt")]
archive_list = [filename for filename in filenames if filename.endswith(".gz") or filename.endswith(".zip")]
successfull_triggers = os.listdir(trigger_files)
for trigger_name in trigger_list:
if trigger_name in successfull_triggers:
continue # skip downloading files we already have
if trigger_name.endswith('txt'):
prefix = trigger_name.split('_trigger')[0]
else:
prefix = trigger_name.split('.')[0]
for archive_name in archive_list:
if archive_name.startswith(prefix):
try:
CMD = "RETR {archive_name}".format(archive_name=archive_name)
download_location = "{data_files}/{archive_name}".format(data_files=data_files, archive_name=archive_name)
with open(download_location, 'wb') as f:
ftp.retrbinary(CMD, f.write)
except Exception as error:
logging.error("Error while downloading archive")
logging.error(error)
is_valid = verify_archive_integrity(ftp, archive_name)
if is_valid:
# If the archive is verified to be valid, we download the
# triggerfile to a local directory.
# we use this directory to skip downloading known successfull files
# NOTICE: An archive is assumed to be valid if ftp server
# failed to provide a checksum file.
# The archive may be corrupt, but I had no way to know!
try:
CMD = "RETR {trigger_name}".format(trigger_name=trigger_name)
trigger_download_location = "{trigger_files}/{trigger_name}".format(trigger_files=trigger_files, trigger_name=trigger_name)
logging.info("Downloading {trigger_download_location}".format(trigger_download_location=trigger_download_location))
with open(trigger_download_location, 'wb') as f:
ftp.retrbinary(CMD, f.write)
except Exception as error:
logging.error("Error while downloading trigger file")
logging.error(error)
else:
# the archive was not valid, remove the broken file
logging.info("Archive is corrupt. Removing: {download_location}".format(download_location=download_location))
# I am not removing anything just yet...
# os.remove(download_location)
def ftp_walk(ftp, directory):
"""
Starting at the given directory, this method will recursively
"walk" down each subfolder and attempt to download the data_archives.
"""
logging.info("Path: {}".format(directory))
ftp.cwd(directory)
filenames = get_filenames(ftp, directory)
download_compressed_feed(ftp, filenames)
items = ftp.mlsd()
for filename, fact in items:
if fact['type'] == 'dir':
new_path = "{directory}/{filename}".format(directory=directory, filename=filename)
try:
ftp_walk(ftp, new_path)
except Exception as error:
logging.error("Error in ftp_walk recursive call")
logging.error(error)
def main(server, user, password, tld):
"""
A wrapper functions which calls:
`ftp_walk` and `push_to_s3`
tld: Top Level Directory, this is the directory where we begin our search
for data_archives. Every subfolder in tld is checked for files.
See ftp_walk for details.
"""
try:
with FTP(server) as ftp:
ftp.login(user, password)
ftp_walk(ftp, tld)
except Exception as error:
logging.error("Exception in main")
logging.error(error)
sys.exit(1)
# archive files on S3
try:
push_to_s3()
# TODO: After a successull push, delete them from local filesystem
except Exception as error:
logging.error("Failed to push to S3 bucket!")
logging.error(error)
sys.exit(1)
# successfully exit
sys.exit(0)
if __name__ == "__main__":
main(server, user, password, tld)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment