Skip to content

Instantly share code, notes, and snippets.

@remittor
Last active August 5, 2023 15:25
Show Gist options
  • Save remittor/c9411e62b5ea4776200bee288a331016 to your computer and use it in GitHub Desktop.
Save remittor/c9411e62b5ea4776200bee288a331016 to your computer and use it in GitHub Desktop.
WSGI test server
import os
import sys
import io
import optparse
#from email.utils import formatdate
try:
from ujson import dumps as jsonify
except:
from json import dumps as jsonify
parser = optparse.OptionParser("usage: %prog [options]", add_help_option=False)
parser.add_option("-h", "--host", dest="host", default='0.0.0.0', type="string")
parser.add_option("-p", "--port", dest="port", default=5000, type="int")
parser.add_option("-g", "--gateway", dest="gateway", default="wz", type="string")
parser.add_option("-f", "--flask", dest="flask", action="store_true", default=False)
parser.add_option("-b", "--binary", dest="binary", action="store_true", default=False)
parser.add_option("-s", "--size", dest="size", default="", type="string")
parser.add_option("-t", "--threads", dest="threads", default=1, type="int")
parser.add_option("-v", "--verbose", dest="verbose", default=0, type="int")
(opt, args) = parser.parse_args()
g_chunksize = 64*1024
if opt.size:
csize = opt.size.lower()
mlt = 1
if csize.endswith('k'):
mlt = 1024
csize = opt.size[:-1]
g_chunksize = int(csize) * mlt
print(f'{g_chunksize=}')
class MyBytesIO():
def __init__(self, buffer, chunk_size = 64*1024):
self.buffer = buffer
self.chunk_size = chunk_size
self.totalsize = len(buffer)
self.pos = 0
self.closed = False
def __iter__(self):
return self
def __next__(self):
if self.pos >= self.totalsize or self.closed:
raise StopIteration
chunk = self.buffer[self.pos : self.pos + self.chunk_size]
self.pos += len(chunk)
return chunk
def close(self):
self.closed = True
g_file_cache = { }
def get_file(path):
try:
return g_file_cache[path]
except Exception:
pass
fn_list = path.split('/')
fn = fn_list[-1]
print(f'File "{fn}" loading...')
with open(fn, "rb") as file:
payload = file.read()
print(f'File "{fn}" loaded and cached! (size = {len(payload)})')
g_file_cache[path] = payload
return payload
def get_stream(path):
payload = get_file(path)
return MyBytesIO(payload, g_chunksize)
def app(environ, start_response):
path = environ["PATH_INFO"]
headers = [ ('Server', 'FastWSGI') ]
if path == "/hw":
headers.append( ('Content-Type', 'text/plain') )
start_response('200 OK', headers)
return b'Hello, World!'
if path == "/plaintext":
headers.append( ('Content-Type', 'text/plain') )
start_response('200 OK', headers)
return [ b'Hello, World!' ]
if path == "/json":
headers.append( ('Content-Type', 'application/json') )
start_response('200 OK', headers)
return [ jsonify( {"message":"Hello, World!"} ).encode('utf8') ]
if path.startswith("/static/"):
payload = get_file(path)
headers.append( ('Content-Type', 'application/octet-stream') )
#headers.append( ('Content-Disposition', 'attachment', filename='test.jpg') )
start_response('200 OK', headers)
return [ payload ]
if path.startswith("/stream/"):
stream = get_stream(path)
headers.append( ('Content-Type', 'application/octet-stream') )
start_response('200 OK', headers)
return stream
start_response('400 Bad Request', headers)
return [ b'' ]
if opt.flask:
import flask
app = flask.Flask(__name__)
@app.route('/plaintext', methods=['GET', 'POST'])
def plaintext():
response = flask.make_response(b"Hello, World!")
response.content_type = "text/plain"
response.headers.set('Server', opt.gateway)
return response
@app.route('/json', methods=['GET', 'POST'])
def jsondata():
return flask.jsonify(message="Hello, World!")
if __name__ == '__main__':
if opt.gateway == "wz":
import logging
import werkzeug
werkzeug.serving.WSGIRequestHandler.protocol_version = "HTTP/1.1"
wzlog = logging.getLogger("werkzeug")
wzlog.setLevel(logging.WARN)
use_reloader = False # True = Use a reloader process to restart the server process when files are changed
werkzeug.serving.run_simple(opt.host, opt.port, app, use_reloader=use_reloader)
if opt.gateway == "fw":
import fastwsgi
fastwsgi.server.num_workers = opt.threads
fastwsgi.run(wsgi_app=app, host=opt.host, port=opt.port, loglevel=opt.verbose)
if opt.gateway == "bj":
import bjoern
bjoern.run(wsgi_app=app, host=opt.host, port=opt.port)
if opt.gateway == "jp":
import japronto
app = japronto.Application
app.add_route('/', hello, method='GET')
app.run(host=opt.host, port=opt.port)
#japronto.run(wsgi_app=app, host=opt.host, port=opt.port)
if opt.gateway == "wr":
import waitress
waitress.serve(app=app, host=opt.host, port=opt.port)
if opt.gateway == "si":
import socketify
socketify.WSGI(app).listen(opt.port, lambda config: print(f"Listening on port http://localhost:{opt.port} now\n")).run(opt.threads)
if opt.gateway == "gr":
import granian
app_fn = os.path.basename(__file__)
srv = granian.Granian(f"{app_fn}:app")
srv.bind_addr = opt.host
srv.bind_port = opt.port
srv.workers = opt.threads
srv.threading_mode = granian.constants.ThreadModes.runtime
#srv.threading_mode = granian.constants.ThreadModes.workers
srv.threads = 1
srv.pthreads = 1
#srv.loop = "auto"
#srv.loop = "asyncio"
srv.loop = "uvloop"
#srv.backlog = 2048
srv.interface = granian.constants.Interfaces.WSGI
srv.websockets = False
srv.serve()
print("==== Server close =====")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment