Last active
March 13, 2022 14:05
-
-
Save cyberbutler/98c7177f2e0f64756b99694e8493a772 to your computer and use it in GitHub Desktop.
A Flask web server with builtin file upload handling, Tar extraction, and OpenSSL AES256 CBC PBKDF2 decryption
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import base64 | |
import tarfile | |
import hashlib | |
import datetime | |
import tempfile | |
from Crypto.Cipher import AES | |
from werkzeug.utils import secure_filename | |
from flask import Flask, request, jsonify | |
app = Flask(__name__) | |
app.config['UPLOAD_DIR'] = 'uploads' | |
app.config['AUTO_EXTRACT_TAR'] = True | |
app.config['AUTO_DECRYPT'] = True | |
def tarfile_has_safe_members(tarFile: tarfile.TarFile): | |
""" | |
Validate that each member in the tarFile does not contain | |
an absolute path nor a relative path which could overwrite existing files | |
Why? From the docs: https://docs.python.org/3/library/tarfile.html#tarfile.TarFile.extractall | |
Warning: | |
Never extract archives from untrusted sources without prior inspection. | |
It is possible that files are created outside of path, | |
e.g. members that have absolute filenames starting with "/" or | |
filenames with two dots "..". | |
""" | |
for member in tarFile: | |
memberPath = os.path.normpath(member.name) | |
if memberPath.startswith("..") or os.path.isabs(memberPath): | |
return False | |
return True | |
def decryptAESPBKDF2(encrypted, password, iterations=10000): | |
""" | |
Shamelessly borrowed from @mti2935 here: https://crypto.stackexchange.com/a/79855 | |
It's highly recommended reviewing the entire thread on StackExchange, there is substantial information | |
about the nuances of OpenSSL's methods for encryption when using the `enc` command. | |
""" | |
salt = encrypted[8:16] | |
passwordBytes = password.encode('utf-8') | |
derivedKey = hashlib.pbkdf2_hmac('sha256', passwordBytes, salt, iterations, 48) | |
key = derivedKey[0:32] | |
iv = derivedKey[32:48] | |
ciphertext = encrypted[16:] | |
decryptor = AES.new(key, AES.MODE_CBC, iv) | |
plaintext = decryptor.decrypt(ciphertext) | |
return plaintext[:-plaintext[-1]] | |
@app.route('/', methods=['POST']) | |
def upload_file(): | |
try: | |
for parameter, f in request.files.items(): | |
# If `enc=` is found in the request URI arguments, base64 decode and decrypt the file | |
if request.args.get('enc', default=False, type=bool) and app.config['AUTO_DECRYPT']: | |
# Base64 decode the file stream | |
decodedStream = base64.b64decode(f.stream.read()) | |
# Decrypt using the provided password | |
decryptedFile = decryptAESPBKDF2(decodedStream, app.config.get('ENCRYPT_PASS', 'EncryptMe')) | |
# Create a new Temporary File and reassign f.stream for use later | |
f.stream = tempfile.SpooledTemporaryFile() | |
# Write the decryptedFile bytes to the Temporary File | |
f.stream.write(decryptedFile) | |
# Create the upload path by concatenating the | |
# specified upload directory and the Remote Address | |
fileDir = os.path.join(app.config['UPLOAD_DIR'], request.remote_addr) | |
# Create the fileDir directory if it doesn't exist | |
if not os.path.exists(fileDir): | |
os.makedirs(fileDir) | |
if tarfile.is_tarfile(f.stream) and app.config['AUTO_EXTRACT_TAR']: | |
# If the file is a TAR Archive, extract its contents to the fileDir | |
# Set the file's position to 0 | |
f.stream.seek(0) | |
# Open the file stream as a TarFile object in "read:gzip" mode | |
uploadedTar = tarfile.open(fileobj=f.stream, mode="r:gz") | |
# Validate that uploadedTar has safe members | |
if not tarfile_has_safe_members(uploadedTar): | |
raise Exception("TarFile has unsafe members") | |
# Extract the TarFile's contents | |
uploadedTar.extractall(fileDir) | |
# Close the file handle | |
uploadedTar.close() | |
else: | |
currentTimeStamp = datetime.datetime.now().strftime('%d-%b-%Y-%H-%M-%S') | |
# Create the filename by concatenating the currentTimeStame and POSTed filename | |
fileName = secure_filename(f"{currentTimeStamp}.{f.filename}") | |
# Otherwise, Save the file to the fileDir | |
f.save(os.path.join(fileDir, fileName)) | |
except Exception as e: | |
print(e) | |
return jsonify({"status": "failed"}) | |
return jsonify({"status": "success"}) | |
if __name__ == "__main__": | |
# Run the server in Debug mode on socket 0.0.0.0:80 | |
app.run('0.0.0.0', port=80, debug=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment