Created
September 29, 2017 21:37
-
-
Save pckv/0e53f7f627fd9130f76f3357a323616c to your computer and use it in GitHub Desktop.
wwww
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import random | |
import json | |
import os | |
path = "blueprints/filehost/hosts/db.json" | |
db = { | |
"auth": None, | |
"hosts": { | |
"vu": {}, | |
"pc": {}, | |
} | |
} | |
def load(): | |
global db | |
with open(path, encoding="utf-8") as f: | |
db = json.load(f) | |
def save(): | |
with open(path, "w", encoding="utf-8") as f: | |
json.dump(db, f, indent=4) | |
def set(attr, value): | |
db[attr] = value | |
save() | |
def get(attr): | |
return db.get(attr, None) | |
def add_file(url_name, filename, realname, mimetype, user, content_length): | |
data = { | |
"filename": filename, | |
"realname": realname, | |
"type": mimetype, | |
"content_length": content_length | |
} | |
db["hosts"][user][url_name] = data | |
save() | |
def get_file(url_name, user): | |
if url_name in db["hosts"][user]: | |
return db["hosts"][user][url_name] | |
if not os.path.exists(path): | |
save() | |
else: | |
load() | |
if db["auth"] is None: | |
random.seed(os.urandom(32)) | |
auth = (random.choice("abcdefghijklmnopqrstuvwxyz1234567890") for _ in range(64)) | |
db["auth"] = "".join(c.upper() if random.randint(0, 1) == 0 else c for c in auth) | |
save() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import os | |
import random | |
import re | |
from collections import namedtuple | |
from urllib import parse as url_parse | |
from asyncio.subprocess import create_subprocess_exec | |
from sanic import Blueprint, response, exceptions | |
from blueprints import pckv_host, vups_party_host, jinja | |
from . import database as db | |
bp = Blueprint("filehost") | |
with open("blueprints/filehost/emoji_json/emojis.json") as f: | |
emoji_nums = json.load(f) | |
emoji_len = len(emoji_nums) | |
num_filename_chars = 2 | |
FileData = namedtuple("FileData", "host name ext") | |
def route(endpoint, methods=None, *hosts): | |
""" Implement custom router since multiple hosts doesn't work. """ | |
if not methods: | |
methods = ["GET"] | |
# Default to the filehosts but allow multiple | |
if not hosts: | |
hosts = (vups_party_host("f"), pckv_host("f")) | |
def decorated(coroutine): | |
for host in hosts: | |
bp.add_route(coroutine, endpoint, methods=methods, host=host) | |
return coroutine | |
return decorated | |
class UserNotFound(exceptions.NotFound): | |
pass | |
@bp.exception(UserNotFound, exceptions.InvalidUsage, exceptions.Unauthorized) | |
async def respond_to_filehost_error(request, exception): | |
""" Return a JSON response of the exception. """ | |
# If the method was HEAD, the client is looking for headers. | |
if request.method == "HEAD": | |
return response.HTTPResponse(headers={"Error": str(exception)}, status=exception.status_code) | |
else: | |
return response.json({"error": str(exception)}, status=exception.status_code) | |
def user_dir(user: str): | |
path = f"blueprints/filehost/hosts/{user}" | |
if not os.path.exists(path): | |
os.mkdir(path) | |
return path | |
def create_filename(user: str): | |
""" Returns a filename for the given user. """ | |
# Continue until a random filename is found (this will probably never run twice lmao) | |
while True: | |
name = "".join(chr(random.choice(emoji_nums)) for _ in range(num_filename_chars)) | |
if not db.get_file(url_parse.quote(name), user): | |
break | |
return name | |
@route("/<filename>", methods=["GET", "HEAD"]) | |
async def return_files(request, filename): | |
""" """ | |
if request.host == vups_party_host("f"): | |
user = "vu" | |
elif request.host == pckv_host("f"): | |
user = "pc" | |
else: | |
return | |
# Strip the extension from the filename if there is one | |
url_name = filename.split(".")[0] | |
# Host the given file if it exists | |
file = db.get_file(url_name, user) | |
if not file: | |
raise exceptions.NotFound(f"File not found") | |
# Setup the headers needed | |
headers = { | |
"Accept-Ranges": "bytes", | |
"Cache-Control": "max-age=120", | |
"Content-Disposition": "inline; filename=\"{}\"".format(file["realname"]), | |
"Content-Type": file["type"], | |
} | |
# Send the file for GET methods, return headers for HEAD methods | |
if request.method == "GET": | |
return await response.file_stream(os.path.join(user_dir(user), file["filename"]), | |
mime_type=file["type"], headers=headers) | |
elif request.method == "HEAD": | |
headers["Content-Length"] = file["content_length"] | |
return response.HTTPResponse(headers=headers, status=200) | |
def verify_headers(headers): | |
""" Verify that the headers are valid. """ | |
try: | |
if not headers["Vups-Party-Auth"] == db.get("auth"): | |
raise exceptions.Unauthorized("Invalid authentication header", scheme=None) | |
if headers["User"] not in ("vu", "pc"): | |
raise UserNotFound("Invalid user") | |
except KeyError: | |
raise exceptions.InvalidUsage("Missing headers") | |
@route("/verify", methods=["HEAD"]) | |
async def verify(request): | |
""" Verify authentication. """ | |
verify_headers(request.headers) | |
return response.HTTPResponse(status=200) | |
async def upload_file(request, user, convert_protocol=None): | |
# Return a filename | |
name = create_filename(user) | |
# Save the uploaded file | |
uploaded_file = request.files.get("file") | |
url_name = url_parse.quote(name) | |
filename = url_name + "." + uploaded_file.name.split(".")[-1] | |
realname = uploaded_file.name | |
mimetype = uploaded_file.type | |
# Save the file | |
with open(os.path.join(user_dir(user), filename), "wb") as f: | |
f.write(uploaded_file.body) | |
if convert_protocol: | |
filename, ext, mimetype = await convert_protocol(user_dir(user), filename, url_name) | |
# If ext is None, the conversion failed | |
if ext is not None: | |
realname = ".".join(realname.split(".")[:-1]) + "." + ext | |
db.add_file(url_name, filename, realname, mimetype, user, request.headers["Content-Length"]) | |
host = "niceme.me" | |
if user == "pc": | |
host = pckv_host("f") | |
elif user == "vu": | |
host = vups_party_host("f") | |
print(f"{user} uploaded file: {name}") | |
return FileData(host=host, name=name, ext=uploaded_file.name.split(".")[-1]) | |
@route("/up", methods=["POST"]) | |
async def upload_handler(request): | |
""" Handle direct uploads. """ | |
verify_headers(request.headers) | |
user = request.headers["User"] | |
data = await upload_file(request, user) | |
return response.json(data._asdict()) | |
async def video_to_mp4(path: str, filename: str, name: str): | |
""" Converts the given video file to H.264 mp4 and compress to max 5000kbps. """ | |
to_convert = os.path.join(path, filename) | |
save_to = os.path.join(path, f"{name}.mp4") | |
print("Converting file.") | |
process = await create_subprocess_exec("ffmpeg", "-nostats", "-loglevel", "error", | |
to_convert, "-vcodec", "copy", "-acodec", "copy", save_to) | |
returncode = await process.wait() | |
# Success | |
if returncode == 0: | |
os.remove(to_convert) | |
return f"{name}.mp4", "mp4", "video/mp4" | |
else: | |
print(f"Conversion of file \"{name}\" failed.") | |
return name, None, None | |
@route("/up.w", methods=["GET", "POST"]) | |
async def upload_web_handler(request): | |
""" Handle web uploads. """ | |
if request.method == "GET": | |
return jinja.render("host_upload.html", request) | |
# Change form data to header data and verify | |
if "auth" in request.form and "user" in request.form: | |
request.headers["Vups-Party-Auth"] = request.form["auth"][0] | |
request.headers["User"] = request.form["user"][0] | |
verify_headers(request.headers) | |
user = request.headers["User"] | |
convert_protocol = None | |
if re.match(".+\.(mov)", request.files.get("file").name, re.IGNORECASE): | |
convert_protocol = video_to_mp4 | |
data = await upload_file(request, user, convert_protocol) | |
return response.text(f"http://{data.host}/{data.name}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment