-
-
Save mybigman/49643f0200396fcd6d7e469836bafa18 to your computer and use it in GitHub Desktop.
FastAPI file upload with streaming_form_data to parse multipart/form-data
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from fastapi import FastAPI, UploadFile, HTTPException | |
from starlette.requests import Request | |
from streaming_form_data import StreamingFormDataParser | |
from streaming_form_data.targets import FileTarget | |
from tempfile import NamedTemporaryFile | |
import os | |
import shutil | |
uploadDir = os.getcwd() | |
def abspath(name: str): | |
return os.path.join(uploadDir, os.path.basename(name)) | |
class UploadFileTarget(FileTarget): | |
def __init__( | |
self, dir: str, *args, **kwargs | |
): | |
super().__init__(None, *args, **kwargs) | |
self.file = UploadFile(None, file=NamedTemporaryFile(delete=False, dir=dir)) | |
self._fd = self.file.file | |
def on_start(self): | |
self.file.filename = self.filename = self.multipart_filename | |
if os.path.exists(abspath(self.filename)): | |
raise HTTPException(409, "File already exists") | |
app = FastAPI() | |
@app.post("/") | |
async def upload(request: Request): | |
parser = StreamingFormDataParser(request.headers) | |
target = UploadFileTarget(uploadDir) | |
try: | |
parser.register("file", target) | |
async for chunk in request.stream(): | |
parser.data_received(chunk) | |
if target.filename: | |
shutil.move(target.file.file.name, abspath(target.filename)) | |
else: | |
raise HTTPException(422, "Could not find file in body") | |
finally: | |
await target.file.close() | |
if os.path.exists(target.file.file.name): | |
os.unlink(target.file.file.name) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment