Skip to content

Instantly share code, notes, and snippets.

@pckv
Created September 29, 2017 21:37
Show Gist options
  • Save pckv/0e53f7f627fd9130f76f3357a323616c to your computer and use it in GitHub Desktop.
Save pckv/0e53f7f627fd9130f76f3357a323616c to your computer and use it in GitHub Desktop.
wwww
import random
import json
import os
path = "blueprints/filehost/hosts/db.json"
db = {
"auth": None,
"hosts": {
"vu": {},
"pc": {},
}
}
def load():
global db
with open(path, encoding="utf-8") as f:
db = json.load(f)
def save():
with open(path, "w", encoding="utf-8") as f:
json.dump(db, f, indent=4)
def set(attr, value):
db[attr] = value
save()
def get(attr):
return db.get(attr, None)
def add_file(url_name, filename, realname, mimetype, user, content_length):
data = {
"filename": filename,
"realname": realname,
"type": mimetype,
"content_length": content_length
}
db["hosts"][user][url_name] = data
save()
def get_file(url_name, user):
if url_name in db["hosts"][user]:
return db["hosts"][user][url_name]
if not os.path.exists(path):
save()
else:
load()
if db["auth"] is None:
random.seed(os.urandom(32))
auth = (random.choice("abcdefghijklmnopqrstuvwxyz1234567890") for _ in range(64))
db["auth"] = "".join(c.upper() if random.randint(0, 1) == 0 else c for c in auth)
save()
import json
import os
import random
import re
from collections import namedtuple
from urllib import parse as url_parse
from asyncio.subprocess import create_subprocess_exec
from sanic import Blueprint, response, exceptions
from blueprints import pckv_host, vups_party_host, jinja
from . import database as db
bp = Blueprint("filehost")
with open("blueprints/filehost/emoji_json/emojis.json") as f:
emoji_nums = json.load(f)
emoji_len = len(emoji_nums)
num_filename_chars = 2
FileData = namedtuple("FileData", "host name ext")
def route(endpoint, methods=None, *hosts):
""" Implement custom router since multiple hosts doesn't work. """
if not methods:
methods = ["GET"]
# Default to the filehosts but allow multiple
if not hosts:
hosts = (vups_party_host("f"), pckv_host("f"))
def decorated(coroutine):
for host in hosts:
bp.add_route(coroutine, endpoint, methods=methods, host=host)
return coroutine
return decorated
class UserNotFound(exceptions.NotFound):
pass
@bp.exception(UserNotFound, exceptions.InvalidUsage, exceptions.Unauthorized)
async def respond_to_filehost_error(request, exception):
""" Return a JSON response of the exception. """
# If the method was HEAD, the client is looking for headers.
if request.method == "HEAD":
return response.HTTPResponse(headers={"Error": str(exception)}, status=exception.status_code)
else:
return response.json({"error": str(exception)}, status=exception.status_code)
def user_dir(user: str):
path = f"blueprints/filehost/hosts/{user}"
if not os.path.exists(path):
os.mkdir(path)
return path
def create_filename(user: str):
""" Returns a filename for the given user. """
# Continue until a random filename is found (this will probably never run twice lmao)
while True:
name = "".join(chr(random.choice(emoji_nums)) for _ in range(num_filename_chars))
if not db.get_file(url_parse.quote(name), user):
break
return name
@route("/<filename>", methods=["GET", "HEAD"])
async def return_files(request, filename):
""" """
if request.host == vups_party_host("f"):
user = "vu"
elif request.host == pckv_host("f"):
user = "pc"
else:
return
# Strip the extension from the filename if there is one
url_name = filename.split(".")[0]
# Host the given file if it exists
file = db.get_file(url_name, user)
if not file:
raise exceptions.NotFound(f"File not found")
# Setup the headers needed
headers = {
"Accept-Ranges": "bytes",
"Cache-Control": "max-age=120",
"Content-Disposition": "inline; filename=\"{}\"".format(file["realname"]),
"Content-Type": file["type"],
}
# Send the file for GET methods, return headers for HEAD methods
if request.method == "GET":
return await response.file_stream(os.path.join(user_dir(user), file["filename"]),
mime_type=file["type"], headers=headers)
elif request.method == "HEAD":
headers["Content-Length"] = file["content_length"]
return response.HTTPResponse(headers=headers, status=200)
def verify_headers(headers):
""" Verify that the headers are valid. """
try:
if not headers["Vups-Party-Auth"] == db.get("auth"):
raise exceptions.Unauthorized("Invalid authentication header", scheme=None)
if headers["User"] not in ("vu", "pc"):
raise UserNotFound("Invalid user")
except KeyError:
raise exceptions.InvalidUsage("Missing headers")
@route("/verify", methods=["HEAD"])
async def verify(request):
""" Verify authentication. """
verify_headers(request.headers)
return response.HTTPResponse(status=200)
async def upload_file(request, user, convert_protocol=None):
# Return a filename
name = create_filename(user)
# Save the uploaded file
uploaded_file = request.files.get("file")
url_name = url_parse.quote(name)
filename = url_name + "." + uploaded_file.name.split(".")[-1]
realname = uploaded_file.name
mimetype = uploaded_file.type
# Save the file
with open(os.path.join(user_dir(user), filename), "wb") as f:
f.write(uploaded_file.body)
if convert_protocol:
filename, ext, mimetype = await convert_protocol(user_dir(user), filename, url_name)
# If ext is None, the conversion failed
if ext is not None:
realname = ".".join(realname.split(".")[:-1]) + "." + ext
db.add_file(url_name, filename, realname, mimetype, user, request.headers["Content-Length"])
host = "niceme.me"
if user == "pc":
host = pckv_host("f")
elif user == "vu":
host = vups_party_host("f")
print(f"{user} uploaded file: {name}")
return FileData(host=host, name=name, ext=uploaded_file.name.split(".")[-1])
@route("/up", methods=["POST"])
async def upload_handler(request):
""" Handle direct uploads. """
verify_headers(request.headers)
user = request.headers["User"]
data = await upload_file(request, user)
return response.json(data._asdict())
async def video_to_mp4(path: str, filename: str, name: str):
""" Converts the given video file to H.264 mp4 and compress to max 5000kbps. """
to_convert = os.path.join(path, filename)
save_to = os.path.join(path, f"{name}.mp4")
print("Converting file.")
process = await create_subprocess_exec("ffmpeg", "-nostats", "-loglevel", "error",
to_convert, "-vcodec", "copy", "-acodec", "copy", save_to)
returncode = await process.wait()
# Success
if returncode == 0:
os.remove(to_convert)
return f"{name}.mp4", "mp4", "video/mp4"
else:
print(f"Conversion of file \"{name}\" failed.")
return name, None, None
@route("/up.w", methods=["GET", "POST"])
async def upload_web_handler(request):
""" Handle web uploads. """
if request.method == "GET":
return jinja.render("host_upload.html", request)
# Change form data to header data and verify
if "auth" in request.form and "user" in request.form:
request.headers["Vups-Party-Auth"] = request.form["auth"][0]
request.headers["User"] = request.form["user"][0]
verify_headers(request.headers)
user = request.headers["User"]
convert_protocol = None
if re.match(".+\.(mov)", request.files.get("file").name, re.IGNORECASE):
convert_protocol = video_to_mp4
data = await upload_file(request, user, convert_protocol)
return response.text(f"http://{data.host}/{data.name}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment