Created
April 1, 2024 00:40
-
-
Save phrz/ababf51fa777864cd52b2d6d041a0b3c to your computer and use it in GitHub Desktop.
lets you treat an S3 (Spaces) bucket like a normal folder, sort of?
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from boto3 import session | |
from botocore.client import Config | |
import os | |
import os.path | |
from collections import namedtuple | |
class Space: | |
def __init__(self, region_name, endpoint_url, aws_access_key_id, aws_secret_access_key, bucket_name): | |
self._sess = session.Session() | |
self._client = self._sess.client( | |
's3', | |
region_name=region_name, | |
endpoint_url=endpoint_url, | |
aws_access_key_id=aws_access_key_id, | |
aws_secret_access_key=aws_secret_access_key | |
) | |
self._bucket_name = bucket_name | |
self._pwd = '/' # stores present working directory as a POSIX (not S3) path. | |
def abspath(self, path): | |
return os.path.normpath(os.path.join(self._pwd, path)) | |
def _prep_s3_path(self, path, is_dir=True): | |
# allow relative paths from pseudo PWD. | |
path = self.abspath(path) | |
# take a normalized POSIX path and make it work with S3 prefix requirements. | |
# 1. must have a trailing slash. Joining with empty string will accomplish this. | |
if is_dir: | |
path = os.path.join(path, '') | |
# 2. '/' to indicate root is not allowed, nor are any leading slashes. | |
path = path.removeprefix('/') | |
return path | |
def cd(self, path=None): | |
if not path: | |
path = '' | |
path = self.abspath(path) | |
# check if valid path | |
try: | |
self.ls(path, printout=False) | |
except FileNotFoundError: | |
raise | |
self._pwd = path | |
return self._pwd | |
def _get_object(self,key): | |
return self._client.get_object(Bucket=self._bucket_name, Key=key) | |
def _put_object(self,key, **kwargs): | |
return self._client.put_object(Bucket=self._bucket_name, Key=key, **kwargs) | |
def _delete_object(self,key): | |
return self._client.delete_object(Bucket=self._bucket_name, Key=key) | |
def _file_exists(self,file,is_dir=False): | |
file = self._prep_s3_path(file or '', is_dir=is_dir) | |
try: | |
key = self._get_object(file) | |
return True | |
except self._client.exceptions.NoSuchKey: | |
# raise FileNotFoundError(f'File "{file}" does not exist.') | |
return False | |
def _dir_exists(self,dir): | |
return self._file_exists(dir,is_dir=True) | |
def mkdir(self, path): | |
exists = self._dir_exists(path) | |
path = self._prep_s3_path(path or '') | |
if exists: | |
raise FileExistsError(f'cannot mkdir {path}, already exists.') | |
key = self._put_object(path, Body=b'') | |
def rmdir(self, path): | |
l = self.ls(path, printout=False) | |
path = self._prep_s3_path(path or '') | |
if len(l.files) or len(l.folders): | |
raise ValueError('rmdir: {path}: Directory not empty') | |
return self._delete_object(path) | |
def ls(self, path=None, printout=True): | |
# leading slashes or just '/' are not allowed as prefixes so we strip them. | |
path = self._prep_s3_path(path or '') | |
res = self._client.list_objects_v2(Bucket=self._bucket_name, Prefix=path, Delimiter='/') | |
files = {*map(lambda o: o.get('Key'), res.get('Contents') or {})} | |
# S3 includes current path e.g. "folder/" in the object list under | |
# Prefix="folder/". Remove it to keep up the hierarchical abstraction. | |
# But before doing this, check there is at least one key, empty folder = 1 key, | |
# nonexistent folder = 0 keys. | |
if len(files) == 0: | |
raise FileNotFoundError(f'Prefix "{path}" does not exist in this bucket.') | |
files.discard(path) | |
folders = {*map(lambda o: o.get('Prefix'), res.get('CommonPrefixes') or {})} | |
Result = namedtuple('Result', ['files', 'folders']) | |
out = Result(files, folders) | |
if printout: | |
print(f'total {len(folders)+len(files)}\n', '\n'.join(sorted(list(folders)) + sorted(list(files)))) | |
return out | |
def write(self, path, content, public=False): | |
if self._file_exists(path): | |
raise FileExistsError(f'Cannot write existing file {path}') | |
path = self._prep_s3_path(path, is_dir=False) | |
print('PATH', path) | |
key = self._put_object(path, Body=content, ACL='public-read' if public else 'private') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment