Skip to content

Instantly share code, notes, and snippets.

@helton
Created August 2, 2024 08:40
Show Gist options
  • Save helton/6f27c94f44993f7610f0bf1d170f87b0 to your computer and use it in GitHub Desktop.
Save helton/6f27c94f44993f7610f0bf1d170f87b0 to your computer and use it in GitHub Desktop.
S3Path Pydantic validation
import re
from typing import Annotated, List, Optional, Any, Callable
from pydantic_core import core_schema
from enum import Enum
class S3PathType(Enum):
ALL = "all"
OBJECT = "object"
FOLDER = "folder"
class S3Path:
def __init__(self, path: str):
self.path = path
self.validate()
def validate(self):
if not self.path.startswith('s3://'):
raise ValueError('Invalid S3 URI. Must start with "s3://".')
match = re.match(r's3://([^/]+)(/(.*))?', self.path)
if not match:
raise ValueError('Invalid S3 URI format.')
bucket_name = match.group(1)
object_key = match.group(3) or ""
# Validate bucket name rules
if not re.match(r'^[a-z0-9.-]{3,63}$', bucket_name):
raise ValueError('Invalid S3 bucket name.')
if len(bucket_name) > 63 or bucket_name[-1] == '-' or bucket_name[0] == '-' or '..' in bucket_name:
raise ValueError('Invalid S3 bucket name.')
if bucket_name.startswith(('xn--', 'sthree-', 'sthree-configurator')) or bucket_name.endswith(('-s3alias', '--ol-s3')):
raise ValueError('Invalid S3 bucket name.')
if re.match(r'^\d+\.\d+\.\d+\.\d+$', bucket_name):
raise ValueError('Bucket name cannot be formatted as an IP address.')
# Validate object key rules
if len(object_key) > 1024:
raise ValueError('S3 object key must be less than 1024 characters.')
if any(c in object_key for c in '\t\n\r\\'):
raise ValueError('S3 object key contains invalid characters.')
if '//' in object_key:
raise ValueError('S3 object key should not contain double slashes.')
if '?' in object_key:
raise ValueError('S3 object key should not contain query parameters.')
@property
def bucket(self) -> str:
match = re.match(r's3://([^/]+)', self.path)
if match:
return match.group(1)
raise ValueError('Unable to extract bucket name.')
@property
def key(self) -> Optional[str]:
match = re.match(r's3://[^/]+/(.*)', self.path)
if match:
return match.group(1)
return None
@property
def is_folder(self) -> bool:
return bool(self.key) and self.key.endswith('/')
def __repr__(self):
return f"S3Path(path='{self.path}')"
class S3PathConstraints:
def __init__(self, path_type: S3PathType = S3PathType.ALL, allowed_buckets: Optional[List[str]] = None):
self.path_type = path_type
self.allowed_buckets = allowed_buckets
def __get_pydantic_core_schema__(self, source: Any, handler: Callable) -> core_schema.CoreSchema:
def validator(value: Any, info: core_schema.ValidationInfo) -> S3Path:
s3_path = S3Path(value)
if self.allowed_buckets and s3_path.bucket not in self.allowed_buckets:
raise ValueError(f'Bucket name "{s3_path.bucket}" is not allowed.')
if self.path_type != S3PathType.ALL:
if self.path_type == S3PathType.FOLDER and not s3_path.is_folder:
raise ValueError('S3 path must be a folder (end with "/").')
if self.path_type == S3PathType.OBJECT and s3_path.is_folder:
raise ValueError('S3 path must be an object (not end with "/").')
return s3_path
return core_schema.with_info_plain_validator_function(validator)
def __repr__(self) -> str:
return f"S3PathConstraints(path_type={self.path_type}, allowed_buckets={self.allowed_buckets})"
AnyS3Path = Annotated[S3Path, S3PathConstraints(path_type=S3PathType.ALL)]
S3ObjectPath = Annotated[S3Path, S3PathConstraints(path_type=S3PathType.OBJECT)]
S3FolderPath = Annotated[S3Path, S3PathConstraints(path_type=S3PathType.FOLDER)]
from pydantic import BaseModel
import pytest
from typing import Annotated
from s3 import AnyS3Path, S3FolderPath, S3ObjectPath, S3Path, S3PathConstraints
valid_paths = [
's3://my-bucket/',
's3://my.bucket.name/my-key',
's3://123-bucket/another/key/path',
's3://bucket-name/with/white space/in/key',
's3://bucket/this/is/a/folder/',
's3://my-bucket/some/key/path/file.txt',
's3://10-bucket-01/nested.folder/structure/file.jpg',
's3://my-bucket/with-key',
's3://my-bucket/valid-object-path',
's3://my-bucket/another/valid/path',
's3://my-bucket/valid path with spaces',
's3://my-bucket/valid-path-with-dash-and_underscore'
]
invalid_paths = [
's3://', # too short, no bucket name
's3://my', # bucket name too short
'https://my-bucket/', # wrong schema, should be s3://
's3:/my-bucket/', # missing one forward slash after s3:
's3://my_bucket/', # underscore is not allowed in bucket names
's3://My-Bucket/', # uppercase letters are not allowed in bucket names
's3://123.bucket..name/invalid', # consecutive periods are not allowed
's3://-bucket-starts-with-hyphen/', # bucket name cannot start with a hyphen
's3://bucket-ends-with-hyphen-/', # bucket name cannot end with a hyphen
's3://my-bucket/ends/with/slash//', # key should not have a double slash
's3://toolongnameofthebucketwhichexceedssixtyfourcharacterslongforthisexample/mypath', # bucket name exceeds 63 characters
's3://my-bucket/?query=parameters', # query parameters are not a standard part of S3 paths
's3://my-bucket/this/path/has/a/tab/char\there', # contains a tab character
's3://bucket/this\\path\\uses\\backslashes' # backslashes are not standard in S3 paths
]
@pytest.mark.parametrize("path", valid_paths)
def test_valid_s3_paths(path):
class MyModel(BaseModel):
uri: AnyS3Path
MyModel(uri=path)
@pytest.mark.parametrize("path", invalid_paths)
def test_invalid_s3_paths(path):
class MyModel(BaseModel):
uri: AnyS3Path
with pytest.raises(ValueError):
MyModel(uri=path)
def test_constraints_valid_s3_folder_path_properties():
class MyModel(BaseModel):
uri: S3FolderPath
model = MyModel(uri='s3://my-bucket/folder/')
assert model.uri.bucket == "my-bucket"
assert model.uri.key == "folder/"
assert model.uri.is_folder
def test_constraints_valid_s3_object_path_properties():
class MyModel(BaseModel):
uri: S3ObjectPath
model = MyModel(uri='s3://my-bucket/folder/key')
assert model.uri.bucket == "my-bucket"
assert model.uri.key == "folder/key"
assert not model.uri.is_folder
def test_constraints_invalid_s3_folder_path_as_object():
class MyModel(BaseModel):
uri: S3ObjectPath
with pytest.raises(ValueError):
MyModel(uri='s3://my-bucket/folder/')
def test_constraints_invalid_s3_object_path_as_folder():
class MyModel(BaseModel):
uri: S3FolderPath
with pytest.raises(ValueError):
MyModel(uri='s3://my-bucket/folder/key')
def test_constraints_valid_s3_path_with_allowed_buckets():
class MyModel(BaseModel):
uri: Annotated[S3Path, S3PathConstraints(allowed_buckets=['my-bucket', 'another-bucket'])]
MyModel(uri='s3://my-bucket/path/to/object')
MyModel(uri='s3://another-bucket/path/to/object')
def test_constraints_invalid_s3_path_with_disallowed_bucket():
class MyModel(BaseModel):
uri: Annotated[S3Path, S3PathConstraints(allowed_buckets=['my-bucket', 'another-bucket'])]
with pytest.raises(ValueError):
MyModel(uri='s3://not-allowed-bucket/path/to/object')
def test_empty_object_key():
class MyModel(BaseModel):
uri: S3ObjectPath
model = MyModel(uri='s3://my-bucket/')
assert model.uri.bucket == "my-bucket"
assert model.uri.key == ""
def test_various_allowed_buckets():
allowed_buckets = ['my-bucket', 'another-bucket', 'third-bucket']
class MyModel(BaseModel):
uri: Annotated[S3Path, S3PathConstraints(allowed_buckets=allowed_buckets)]
MyModel(uri='s3://my-bucket/path/to/object')
MyModel(uri='s3://another-bucket/path/to/object')
MyModel(uri='s3://third-bucket/path/to/object')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment