|
# !pip install -U -q PyDrive |
|
from pydrive.auth import GoogleAuth |
|
from pydrive.drive import GoogleDrive |
|
from google.colab import auth |
|
from oauth2client.client import GoogleCredentials |
|
|
|
import os |
|
import subprocess |
|
from pathlib import Path |
|
|
|
__all__ = [ |
|
'create_archive', |
|
'extract_archive', |
|
'GoogleDriveHandler' |
|
] |
|
|
|
def create_archive(zip_name, local_file_paths, temp_folder='/tmp', verbose=False): |
|
zip_name = '{0}/{1}'.format(temp_folder, zip_name) + '.tar.gz' * ('.tar.gz' not in zip_name) |
|
# Filter out non-existing files and directorys |
|
zipped_files = [] |
|
for f in local_file_paths: |
|
if not Path(f).exists(): |
|
print('file {0} does not exist, ignore it'.format(f)) |
|
else: |
|
zipped_files.append(f) |
|
# Find common prefix to avoid a too many level folders |
|
common_prefix = '' |
|
for chars in zip(*zipped_files): |
|
if len(set(chars)) == 1: |
|
common_prefix += chars[0] |
|
else: |
|
break |
|
common_prefix = '/'.join(common_prefix.split('/')[:-1]) + '/' |
|
# Excuting tar.gz format compression |
|
L = len(common_prefix) |
|
zipped_files = ' '.join([f[L:] for f in zipped_files]) |
|
cmd = 'tar -czvf {0} -C {1} {2}'.format(zip_name, common_prefix, zipped_files) |
|
if verbose: |
|
print('ignore the common prefix {0}'.format(common_prefix)) |
|
print('running shell command:','\n'+cmd) |
|
result = subprocess.check_output(cmd, shell=True).decode('utf-8') |
|
if verbose: print(result) |
|
# Return absolute path of the tar.gz file |
|
return zip_name |
|
|
|
def extract_archive(zip_path, target_folder='./', verbose=False): |
|
cmd = 'tar -xf {0} -C {1}'.format(zip_path, target_folder) |
|
if verbose: print('running shell command:','\n'+cmd) |
|
result = subprocess.check_output(cmd, shell=True).decode('utf-8') |
|
if verbose: print(result) |
|
|
|
class GoogleDriveHandler: |
|
def __init__(self): |
|
auth.authenticate_user() |
|
gauth = GoogleAuth() |
|
gauth.credentials = GoogleCredentials.get_application_default() |
|
self.drive = GoogleDrive(gauth) |
|
|
|
def path_to_id(self, rel_path, parent_folder_id='root'): |
|
rel_path = '/'.join(list(filter(len, rel_path.split('/')))) |
|
if rel_path == '': |
|
return parent_folder_id |
|
else: |
|
first, *rest = list(filter(len, rel_path.split('/'))) |
|
file_dict = {f['title']:f for f in self.list_folder(parent_folder_id)} |
|
if first not in file_dict: |
|
raise Exception('{0} not exist'.format(first)) |
|
else: |
|
return self.path_to_id('/'.join(rest), file_dict[first]['id']) |
|
|
|
def list_folder(self, root_folder_id='root', max_depth=0): |
|
query = "'{0}' in parents and trashed=false".format(root_folder_id) |
|
file_list, folder_type = [], 'application/vnd.google-apps.folder' |
|
for f in self.drive.ListFile({'q': query}).GetList(): |
|
if f['mimeType'] == folder_type and max_depth > 0: |
|
file_list.append( |
|
{ |
|
'title': f['title'], |
|
'id': f['id'], |
|
'link': f['alternateLink'], |
|
'mimeType': f['mimeType'], |
|
'children': self.list_folder(f['id'], max_depth-1) |
|
} |
|
) |
|
else: |
|
file_list.append( |
|
{ |
|
'title':f['title'], |
|
'id': f['id'], |
|
'link':f['alternateLink'], |
|
'mimeType': f['mimeType'] |
|
} |
|
) |
|
return file_list |
|
|
|
def create_folder(self, folder_name, parent_path=''): |
|
parent_folder_id = self.path_to_id(parent_path) |
|
folder_type = 'application/vnd.google-apps.folder' |
|
file_dict = {f['title']:f for f in self.list_folder(parent_folder_id)} |
|
if folder_name not in file_dict: |
|
folder_metadata = { |
|
'title' : folder_name, |
|
'mimeType' : folder_type, |
|
'parents': [{'kind': 'drive#fileLink', 'id': parent_folder_id}] |
|
} |
|
folder = self.drive.CreateFile(folder_metadata) |
|
folder.Upload() |
|
return folder['id'] |
|
else: |
|
if file_dict[folder_name]['mimeType'] != folder_type: |
|
raise Exception('{0} already exists as a file'.format(folder_name)) |
|
else: |
|
print('{0} already exists'.format(folder_name)) |
|
return file_dict[folder_name]['id'] |
|
|
|
def upload(self, local_file_path, parent_path='', overwrite=True): |
|
parent_folder_id = self.path_to_id(parent_path) |
|
file_dict = {f['title']:f for f in self.list_folder(parent_folder_id)} |
|
file_name = local_file_path.split('/')[-1] |
|
if file_name in file_dict and overwrite: |
|
file_dict[file_name].Delete() |
|
file = self.drive.CreateFile( |
|
{ |
|
'title': file_name, |
|
'parents': [{'kind': 'drive#fileLink', 'id': parent_folder_id}] |
|
} |
|
) |
|
file.SetContentFile(local_file_path) |
|
file.Upload() |
|
return file['id'] |
|
|
|
def download(self, local_file_path, target_path): |
|
target_id = self.path_to_id(target_path) |
|
file = self.drive.CreateFile({'id': target_id}) |
|
file.GetContentFile(local_file_path) |