Last active
July 22, 2019 08:50
-
-
Save nicksspirit/0357edceece3027c5aa9d72fd1948803 to your computer and use it in GitHub Desktop.
A middleware for falcon, python web framework, to handle multipart form requests. It stores files in `Request.files` and other form parameters in `Request.form_params`
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
FieldValue = Union[FieldStorage, str] | |
class MultipartMiddleware(Middleware): | |
MEDIA_FORM = "multipart/form-data" | |
METHODS_ALLOWED = {"PUT", "POST"} | |
def _get_field_val(self, field: FieldStorage) -> FieldValue: | |
""" | |
Returns input value for input text fields and FileStream object | |
""" | |
if field.file and "filename" in field.disposition_options: | |
return field | |
return unescape(str(field.value)) | |
def process_request(self, req: Request, resp: Response, **kwargs): | |
files = {} | |
form_params = {} | |
content_type: Optional[str] = req.content_type | |
is_media_form = content_type is not None and self.MEDIA_FORM in content_type | |
if req.method not in self.METHODS_ALLOWED: | |
return | |
if not is_media_form: | |
return | |
try: | |
# This must be done to avoid a bug in cgi.FieldStorage. | |
req.env.setdefault("QUERY_STRING", "") | |
field_storage = FieldStorage(fp=req.stream, environ=req.env) | |
except ValueError as e: | |
raise HTTPBadRequest("Error parsing file", str(e)) | |
for field_key in field_storage.keys(): | |
field = field_storage[field_key] | |
if isinstance(field, list): | |
param_list = [] | |
file_list = [] | |
for item in field: | |
field_val = self._get_field_val(item) | |
if isinstance(field_val, str): | |
param_list.append(field_val) | |
elif field_val is not None: | |
file_list.append(field_val) | |
if param_list: | |
form_params[field_key] = ( | |
tlz.first(param_list) if len(param_list) == 1 else param_list | |
) | |
if file_list: | |
files[field_key] = ( | |
tlz.first(file_list) if len(file_list) == 1 else file_list | |
) | |
else: | |
field_val = self._get_field_val(field) | |
if isinstance(field_val, str): | |
form_params[field_key] = field_val | |
elif field_val is not None: | |
files[field_key] = field_val | |
req.files = files | |
req.form_params = form_params |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment