|
import os |
|
import shutil |
|
import json |
|
import logging |
|
import re |
|
from abc import ABC, abstractmethod |
|
from typing import BinaryIO, Tuple, Dict |
|
|
|
from open_webui.config import ( |
|
S3_ACCESS_KEY_ID, |
|
S3_BUCKET_NAME, |
|
S3_ENDPOINT_URL, |
|
S3_KEY_PREFIX, |
|
S3_REGION_NAME, |
|
S3_SECRET_ACCESS_KEY, |
|
S3_USE_ACCELERATE_ENDPOINT, |
|
S3_ADDRESSING_STYLE, |
|
S3_ENABLE_TAGGING, |
|
GCS_BUCKET_NAME, |
|
GOOGLE_APPLICATION_CREDENTIALS_JSON, |
|
AZURE_STORAGE_ENDPOINT, |
|
AZURE_STORAGE_CONTAINER_NAME, |
|
AZURE_STORAGE_KEY, |
|
STORAGE_PROVIDER, |
|
UPLOAD_DIR, |
|
) |
|
|
|
from open_webui.constants import ERROR_MESSAGES |
|
from open_webui.env import SRC_LOG_LEVELS |
|
|
|
|
|
log = logging.getLogger(__name__) |
|
log.setLevel(SRC_LOG_LEVELS["MAIN"]) |
|
|
|
|
|
class StorageProvider(ABC): |
|
@abstractmethod |
|
def get_file(self, file_path: str) -> str: |
|
pass |
|
|
|
@abstractmethod |
|
def upload_file( |
|
self, file: BinaryIO, filename: str, tags: Dict[str, str] |
|
) -> Tuple[bytes, str]: |
|
pass |
|
|
|
@abstractmethod |
|
def delete_all_files(self) -> None: |
|
pass |
|
|
|
@abstractmethod |
|
def delete_file(self, file_path: str) -> None: |
|
pass |
|
|
|
|
|
class LocalStorageProvider(StorageProvider): |
|
@staticmethod |
|
def upload_file( |
|
file: BinaryIO, filename: str, tags: Dict[str, str] |
|
) -> Tuple[bytes, str]: |
|
contents = file.read() |
|
if not contents: |
|
raise ValueError(ERROR_MESSAGES.EMPTY_CONTENT) |
|
file_path = f"{UPLOAD_DIR}/{filename}" |
|
with open(file_path, "wb") as f: |
|
f.write(contents) |
|
return contents, file_path |
|
|
|
@staticmethod |
|
def get_file(file_path: str) -> str: |
|
"""Handles downloading of the file from local storage.""" |
|
return file_path |
|
|
|
@staticmethod |
|
def delete_file(file_path: str) -> None: |
|
"""Handles deletion of the file from local storage.""" |
|
filename = file_path.split("/")[-1] |
|
file_path = f"{UPLOAD_DIR}/{filename}" |
|
if os.path.isfile(file_path): |
|
os.remove(file_path) |
|
else: |
|
log.warning(f"File {file_path} not found in local storage.") |
|
|
|
@staticmethod |
|
def delete_all_files() -> None: |
|
"""Handles deletion of all files from local storage.""" |
|
if os.path.exists(UPLOAD_DIR): |
|
for filename in os.listdir(UPLOAD_DIR): |
|
file_path = os.path.join(UPLOAD_DIR, filename) |
|
try: |
|
if os.path.isfile(file_path) or os.path.islink(file_path): |
|
os.unlink(file_path) # Remove the file or link |
|
elif os.path.isdir(file_path): |
|
shutil.rmtree(file_path) # Remove the directory |
|
except Exception as e: |
|
log.exception(f"Failed to delete {file_path}. Reason: {e}") |
|
else: |
|
log.warning(f"Directory {UPLOAD_DIR} not found in local storage.") |
|
|
|
|
|
def get_storage_provider(storage_provider: str): |
|
'''if storage_provider == "local": |
|
Storage = LocalStorageProvider() |
|
elif storage_provider == "s3": |
|
Storage = S3StorageProvider() |
|
elif storage_provider == "gcs": |
|
Storage = GCSStorageProvider() |
|
elif storage_provider == "azure": |
|
Storage = AzureStorageProvider() |
|
else: |
|
raise RuntimeError(f"Unsupported storage provider: {storage_provider}") |
|
return Storage''' |
|
Storage = LocalStorageProvider() |
|
return Storage |
|
|
|
Storage = get_storage_provider(STORAGE_PROVIDER) |