Last active
August 29, 2015 14:01
-
-
Save wolf0403/9fda3361dbbf74547bf7 to your computer and use it in GitHub Desktop.
Flask / Request reverse proxy, with `admin' (upload) interface.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from __future__ import unicode_literals, print_function | |
import os | |
import subprocess as sp | |
from subprocess import check_output | |
from flask import Flask, Blueprint, current_app, \ | |
request, redirect, url_for | |
from werkzeug.utils import secure_filename | |
import logging | |
from logging import warning, info, exception, debug | |
from json import dumps | |
import re | |
import fnmatch | |
UPLOAD_FOLDER = './uploads' | |
def _find_index (base): | |
print ('testing: ' + base) | |
for root, dirnames, filenames in os.walk(base): | |
print ('testing: ' + root) | |
for filename in fnmatch.filter(filenames, 'index.html'): | |
print (filename) | |
return os.path.join(root, filename) | |
def _mkbasedir (filename, ext): | |
dname = filename[:filename.rindex(ext)] | |
assert filename == (dname + ext),\ | |
'{} -> {}.{}'.format(filename, dname, ext) | |
if not os.path.isdir(dname): | |
os.mkdir(dname) | |
return dname | |
def tgz_proc (filename, ext='.tgz'): | |
dname = _mkbasedir(filename, ext) | |
check_output(['tar', 'zxf', os.path.abspath(filename)], cwd=dname) | |
return dname | |
def zip_proc (filename, ext='.zip'): | |
dname = _mkbasedir(filename, ext) | |
check_output(['unzip', '-o', os.path.abspath(filename)], cwd=dname) | |
return dname | |
ALLOWED_EXTENSIONS = {'.tgz' : tgz_proc, | |
'.zip' : zip_proc} | |
_bp = Blueprint('admin', __name__, static_folder='static') | |
def allowed_file(filename): | |
for ext, proc in ALLOWED_EXTENSIONS.iteritems(): | |
if filename.endswith(ext): | |
return lambda x: proc(x, ext) | |
return None | |
@_bp.route('/stub', methods=['GET', 'POST']) | |
def stub(): | |
rt = {'method' : request.method, | |
'data' : dict(request.values), | |
'file' : dict(request.files)} | |
return dumps(rt) | |
@_bp.route('/', methods=['GET', 'POST']) | |
def upload_file(): | |
if request.method == 'POST': | |
uploadfile = request.files['file'] | |
if uploadfile: | |
debug('received upload file: ' | |
+ uploadfile.filename) | |
proc = allowed_file(uploadfile.filename) | |
if proc: | |
filename = secure_filename(uploadfile.filename) | |
filepath = os.path.join( | |
current_app.config['UPLOAD_FOLDER'], | |
filename) | |
uploadfile.save(filepath) | |
try: | |
dname = proc(filepath) | |
idx = _find_index(dname) | |
uri = idx if idx else dname | |
debug('found index: %s', idx) | |
debug('upload file saved: {}'.format(filepath)) | |
debug('upload file dname: {}'.format(dname)) | |
return 'OK: <a href="/{uri}">{uri}</a>'\ | |
.format(uri=uri) | |
except sp.CalledProcessError as e: | |
info('extracting failed: {}'.format(filepath)) | |
try: | |
with open(os.path.join(filepath + '.log'), | |
'w') as f: | |
f.write(e.cmd) | |
f.write('\n') | |
f.write(e.output) | |
except: | |
pass | |
try: | |
return e.output | |
except: | |
return 'ERROR' | |
else: | |
debug('upload file type error') | |
return 'TYPE ERROR' | |
return ''' | |
<!doctype html> | |
<title>Upload new File</title> | |
<h1>Upload new File</h1> | |
<form action="" method=post enctype=multipart/form-data> | |
<p><input type=file name=file> | |
<input type=submit value=Upload> | |
</form> | |
''' | |
def register_admin(flask_app, url='/admin'): | |
if 'UPLOAD_FOLDER' not in flask_app.config: | |
flask_app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER | |
updir = flask_app.config['UPLOAD_FOLDER'] | |
if flask_app.config['TESTING'] == True: | |
logging.basicConfig(level=logging.DEBUG) | |
try: | |
with open(os.path.join(updir, '.test-upload-dir'), | |
'w') as f: | |
f.write('Test OK\n') | |
except Exception as e: | |
exception(e) | |
raise | |
flask_app.register_blueprint(_bp, url_prefix=url) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Reverse proxy for sub-directory mounted static file trees | |
with fallback to root for non-static stuff e.g. AJAX calls. | |
Failed to do this in Nginx so Python comes to rescue. | |
""" | |
from __future__ import unicode_literals, print_function | |
from gevent.monkey import patch_all | |
patch_all() | |
import requests | |
from flask import Flask, Blueprint, Response, request, url_for, redirect | |
from pprint import pformat | |
from admin import register_admin | |
import logging | |
from logging import debug, exception | |
__author__ = "Ryan Gao <wolf0403>" | |
app = Flask(__name__) | |
@app.route('/', defaults={'rest':'/'}) | |
@app.route('/<path:rest>', methods=['GET','POST']) | |
def any_req(rest): | |
r = requests.request(request.method, | |
'http://localhost:8081/{}'.format(rest), | |
params=dict(request.args), | |
data=request.form, | |
cookies=request.cookies) | |
resp = Response(r.text, r.status_code, headers=dict(r.headers)) | |
for k, v in r.cookies.iteritems(): | |
resp.set_cookie(k, v) | |
#return r.text, r.status_code, dict(r.headers) | |
return resp | |
def install_root(rootpath, prefix, app): | |
dbg = Blueprint('debug_{}'.format(prefix), __name__, | |
static_folder=rootpath, | |
static_url_path='') | |
@dbg.errorhandler(404) | |
def dbg_proxy(error): | |
u = request.url[len(request.url_root):] | |
debug('app 404: {} -> {}'.format(request.url, u)) | |
if '/' not in u: | |
return u | |
return redirect(u[u.index('/'):]) | |
app.register_blueprint(dbg, url_prefix=prefix) | |
app.config['UPLOAD_FOLDER'] = 'static' | |
register_admin(app) | |
install_root('/opt/app/static/public', '/v1', app) | |
install_root('/opt/app/static/optimised', '/v2', app) | |
if __name__ == '__main__': | |
app.config['TESTING'] = True | |
if app.config['TESTING'] == True: | |
logging.basicConfig(level=logging.DEBUG) | |
app.run(debug=True, host='0.0.0.0', port=81) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment