Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save cyberbutler/98c7177f2e0f64756b99694e8493a772 to your computer and use it in GitHub Desktop.
Save cyberbutler/98c7177f2e0f64756b99694e8493a772 to your computer and use it in GitHub Desktop.
A Flask web server with builtin file upload handling, Tar extraction, and OpenSSL AES256 CBC PBKDF2 decryption
import os
import base64
import tarfile
import hashlib
import datetime
import tempfile
from Crypto.Cipher import AES
from werkzeug.utils import secure_filename
from flask import Flask, request, jsonify
app = Flask(__name__)
app.config['UPLOAD_DIR'] = 'uploads'
app.config['AUTO_EXTRACT_TAR'] = True
app.config['AUTO_DECRYPT'] = True
def tarfile_has_safe_members(tarFile: tarfile.TarFile):
"""
Validate that each member in the tarFile does not contain
an absolute path nor a relative path which could overwrite existing files
Why? From the docs: https://docs.python.org/3/library/tarfile.html#tarfile.TarFile.extractall
Warning:
Never extract archives from untrusted sources without prior inspection.
It is possible that files are created outside of path,
e.g. members that have absolute filenames starting with "/" or
filenames with two dots "..".
"""
for member in tarFile:
memberPath = os.path.normpath(member.name)
if memberPath.startswith("..") or os.path.isabs(memberPath):
return False
return True
def decryptAESPBKDF2(encrypted, password, iterations=10000):
"""
Shamelessly borrowed from @mti2935 here: https://crypto.stackexchange.com/a/79855
It's highly recommended reviewing the entire thread on StackExchange, there is substantial information
about the nuances of OpenSSL's methods for encryption when using the `enc` command.
"""
salt = encrypted[8:16]
passwordBytes = password.encode('utf-8')
derivedKey = hashlib.pbkdf2_hmac('sha256', passwordBytes, salt, iterations, 48)
key = derivedKey[0:32]
iv = derivedKey[32:48]
ciphertext = encrypted[16:]
decryptor = AES.new(key, AES.MODE_CBC, iv)
plaintext = decryptor.decrypt(ciphertext)
return plaintext[:-plaintext[-1]]
@app.route('/', methods=['POST'])
def upload_file():
try:
for parameter, f in request.files.items():
# If `enc=` is found in the request URI arguments, base64 decode and decrypt the file
if request.args.get('enc', default=False, type=bool) and app.config['AUTO_DECRYPT']:
# Base64 decode the file stream
decodedStream = base64.b64decode(f.stream.read())
# Decrypt using the provided password
decryptedFile = decryptAESPBKDF2(decodedStream, app.config.get('ENCRYPT_PASS', 'EncryptMe'))
# Create a new Temporary File and reassign f.stream for use later
f.stream = tempfile.SpooledTemporaryFile()
# Write the decryptedFile bytes to the Temporary File
f.stream.write(decryptedFile)
# Create the upload path by concatenating the
# specified upload directory and the Remote Address
fileDir = os.path.join(app.config['UPLOAD_DIR'], request.remote_addr)
# Create the fileDir directory if it doesn't exist
if not os.path.exists(fileDir):
os.makedirs(fileDir)
if tarfile.is_tarfile(f.stream) and app.config['AUTO_EXTRACT_TAR']:
# If the file is a TAR Archive, extract its contents to the fileDir
# Set the file's position to 0
f.stream.seek(0)
# Open the file stream as a TarFile object in "read:gzip" mode
uploadedTar = tarfile.open(fileobj=f.stream, mode="r:gz")
# Validate that uploadedTar has safe members
if not tarfile_has_safe_members(uploadedTar):
raise Exception("TarFile has unsafe members")
# Extract the TarFile's contents
uploadedTar.extractall(fileDir)
# Close the file handle
uploadedTar.close()
else:
currentTimeStamp = datetime.datetime.now().strftime('%d-%b-%Y-%H-%M-%S')
# Create the filename by concatenating the currentTimeStame and POSTed filename
fileName = secure_filename(f"{currentTimeStamp}.{f.filename}")
# Otherwise, Save the file to the fileDir
f.save(os.path.join(fileDir, fileName))
except Exception as e:
print(e)
return jsonify({"status": "failed"})
return jsonify({"status": "success"})
if __name__ == "__main__":
# Run the server in Debug mode on socket 0.0.0.0:80
app.run('0.0.0.0', port=80, debug=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment