Created
November 27, 2019 21:37
-
-
Save nelsonsar/aec5283eaa01829cb65979c388724cbf to your computer and use it in GitHub Desktop.
In memory Django storage based on FileSystemStorage implementation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Depends on https://docs.pyfilesystem.org/en/latest/reference/memoryfs.html | |
import errno | |
from pathlib import PurePosixPath | |
from django.conf import settings | |
from django.core.files.storage import Storage | |
from django.core.files import File | |
from django.utils._os import abspathu, safe_join | |
from django.utils.encoding import filepath_to_uri, force_text | |
from django.utils.six.moves.urllib.parse import urljoin | |
from fs.memoryfs import MemoryFS | |
class InMemoryStorage(Storage): | |
def __init__(self): | |
self.os = MemoryFS() | |
@property | |
def location(self): | |
return abspathu(settings.MEDIA_ROOT) | |
@property | |
def base_url(self): | |
return settings.MEDIA_URL | |
def path(self, name): | |
return safe_join(self.location, name) | |
def delete(self, name): | |
name = self.path(name) | |
try: | |
self.os.remove(name) | |
except OSError as e: | |
if e.errno != errno.ENOENT: | |
raise | |
def exists(self, name): | |
return self.os.exists(self.path(name)) | |
def listdir(self, path): | |
return self.os.listdir(path) | |
def size(self, name): | |
return self.os.getsize(self.path(name)) | |
def url(self, name): | |
url = filepath_to_uri(name) | |
if url is not None: | |
url = url.lstrip('/') | |
return urljoin(self.base_url, url) | |
def _open(self, name, mode='rb'): | |
return File(self.os.open(self.path(name), mode)) | |
def _save(self, name, content): | |
full_path = self.path(name) | |
directory = str(PurePosixPath(full_path).parent) | |
if not self.os.exists(directory): | |
self.os.makedirs(directory) | |
_file = None | |
try: | |
for chunk in content.chunks(): | |
if _file is None: | |
mode = 'wb' if isinstance(chunk, bytes) else 'wt' | |
_file = self.os.open(self.path(name), mode) | |
_file.write(chunk) | |
finally: | |
_file.close() | |
return force_text(name.replace('\\', '/')) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment