Created
July 15, 2020 16:29
-
-
Save jarkin13/e8a82ef3cf6d27e503988a0df0bda007 to your computer and use it in GitHub Desktop.
GCS
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import pytz | |
import argparse | |
import os | |
import shutil | |
import logging | |
import logging.config | |
from datetime import datetime | |
from parse import parse | |
from google.cloud import storage | |
from google.cloud import pubsub | |
# import google.cloud.logging | |
from google.cloud.exceptions import NotFound, Conflict | |
# from concurrent.futures import ThreadPoolExecutor | |
import pysftp | |
this_dir, _ = os.path.split(__file__) | |
path = os.path.join(this_dir, 'logging.conf') | |
logging.config.fileConfig(path, disable_existing_loggers=False) | |
LOGGER = logging.getLogger('bsd') | |
from yaml import load | |
try: | |
from yaml import CLoader as Loader | |
except ImportError: | |
from yaml import Loader | |
def force_utc(d_object): | |
if hasattr(d_object, 'tzinfo') and d_object.tzinfo is None: | |
return d_object.replace(tzinfo=pytz.UTC) | |
return d_object | |
def from_tz(d_object, tz): | |
t = pytz.timezone(tz).localize(d_object) | |
return t.astimezone(pytz.UTC) | |
def sftp_list_dir(sftp, path=None, tz='UTC'): | |
if not path: | |
path = '/' | |
files = [] | |
with sftp.cd(path): | |
for file in sftp.listdir_attr(): | |
files.append({ | |
'file': file.filename, | |
'modified': from_tz(datetime.fromtimestamp(file.st_mtime), tz) | |
}) | |
return files | |
def gcs_list_dir(bucket, path=None): | |
files = [] | |
for blob in bucket.list_blobs(prefix=path): | |
# exclude directories | |
modified = None | |
if blob.metadata: | |
try: | |
modified = force_utc(datetime.fromtimestamp(blob.metadata.get('lastmodified'))) | |
except: | |
pass | |
if not modified: | |
modified = force_utc(blob.updated) | |
if blob.name[-1]!='/': | |
files.append({ | |
'file': blob.name, | |
'modified': modified, | |
'_blob': blob | |
}) | |
return files | |
#Probably want to refactor this into separate functions. Simplest would be GCP to FTP and another for FTP to GCS, pass in all of the parameters you need from the sync function | |
def sync(config, gcs): | |
for job in config.get('jobs',[]): | |
LOGGER.info(f"Running: {job.get('name')}") | |
gcs_config = job.get('gcs') | |
bucket = gcs.get_bucket(gcs_config['bucket']) | |
ftp_config = job.get('ftp') | |
# RSA hack | |
cnopts = pysftp.CnOpts() | |
cnopts.hostkeys = None | |
sftp = pysftp.Connection( | |
ftp_config.get('host') | |
,port=ftp_config.get('port', 22) | |
,username=ftp_config.get('username') | |
,password=ftp_config.get('password') | |
,cnopts=cnopts | |
) | |
mode = job.get('mode') | |
bucket_path = '' | |
if gcs_config.get('path'): | |
bucket_path = gcs_config.get('path') + '/' | |
# Section 1 to break into new function | |
if mode in ("gcs<ftp", "gcs<>ftp"): | |
bucket_files = gcs_list_dir(bucket, gcs_config.get('path', None)) | |
ftp_files = sftp_list_dir(sftp, ftp_config.get('path'), ftp_config.get('timezone')) | |
# FTP to GCS | |
if not os.path.exists('tmp'): | |
os.mkdir('tmp') | |
LOGGER.info("GCS<FTP") | |
download_files = [] | |
for ftp_file in ftp_files: | |
ftp_filename = ftp_file.get('file') | |
exists = False | |
for gcs_file in bucket_files: | |
gcs_filename = os.path.basename(gcs_file['file']) | |
if gcs_filename == ftp_filename: | |
exists = True | |
if gcs_file.get('modified', datetime.min) < ftp_file.get('modified', datetime.max): | |
download_files.append(ftp_filename) | |
if exists == False: | |
LOGGER.info(f"GCS<FTP: {ftp_filename}") | |
download_files.append(ftp_filename) | |
for filename in download_files: | |
with sftp.cd(ftp_config.get('path', '/')): | |
sftp.get(filename, localpath=os.path.join('tmp', filename), preserve_mtime=True) | |
blob = bucket.blob(bucket_path + filename) | |
t = os.path.getmtime(os.path.join('tmp', filename)) | |
LOGGER.info(str(int(t))) | |
blob.metadata = {'lastmodified': str(int(t))} | |
blob.upload_from_filename(os.path.join('tmp', filename)) | |
if os.path.exists('tmp'): | |
shutil.rmtree(os.path.join(this_dir, 'tmp')) | |
#Section 2 to break into new function | |
#The way you download a file from GCS is different from the way you download a file from FTP | |
if mode in ("gcs>ftp", "gcs<>ftp"): | |
bucket_files = gcs_list_dir(bucket, gcs_config.get('path', None)) | |
ftp_files = sftp_list_dir(sftp, ftp_config.get('path'), ftp_config.get('timezone')) | |
# GCS to FTP | |
if not os.path.exists('tmp'): | |
os.mkdir('tmp') | |
LOGGER.info("GCS>FTP") | |
download_files = [] | |
for gcs_file in bucket_files: | |
gcs_filename = os.path.basename(gcs_file['file']) | |
exists = False | |
for ftp_file in ftp_files: | |
ftp_filename = ftp_file.get('file') | |
if gcs_filename == ftp_filename: | |
exists = True | |
if gcs_file.get('modified', datetime.min) > ftp_file.get('modified', datetime.max): | |
download_files.append(gcs_file) | |
if exists == False: | |
LOGGER.info(f"GCS>FTP: {gcs_filename}") | |
download_files.append(gcs_file) | |
for file in download_files: | |
filename = os.path.basename(file['file']) | |
with open(os.path.join('tmp', filename), "wb") as file_obj: | |
file['_blob'].download_to_file(file_obj) | |
sftp.put( | |
os.path.join('tmp', filename) | |
,remotepath=os.path.join(ftp_config.get('path','/'), filename) | |
,preserve_mtime=True | |
) | |
if os.path.exists('tmp'): | |
shutil.rmtree(os.path.join(this_dir, 'tmp')) | |
def main(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--project-id', type=str) | |
parser.add_argument('--local-config', type=str) | |
parser.add_argument('--cloud-config', type=str) | |
args = parser.parse_args() | |
gcs = storage.Client(project=args.project_id) | |
if args.cloud_config: | |
config_path = parse("gs://{bucket}/{path}", args.cloud_config) | |
bucket = gcs.get_bucket(config_path['bucket']) | |
blob = bucket.get_blob(config_path['path']) | |
config = load(blob.download_as_string(), Loader=Loader) | |
elif args.local_config: | |
with open(args.local_config) as f: | |
config = load(f, Loader=Loader) | |
try: | |
sync(config, gcs) | |
finally: | |
if os.path.exists('tmp'): | |
shutil.rmtree(os.path.join(this_dir, 'tmp')) | |
if __name__ == "__main__": | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
jobs: | |
- name: Primary Sync | |
ftp: | |
host: sftp://app.files.com | |
port: 22 | |
username: xxx | |
password: xxx | |
path: Test folder/test | |
timezone: America/New_York | |
gcs: | |
bucket: bsd-transfer-test | |
path: 'test-folder' | |
mode: gcs<>ftp | |
# gcs>ftp | |
# gcs<>ftp |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment