Skip to content

Instantly share code, notes, and snippets.

@jamesp
Created April 2, 2009 08:39
Show Gist options
  • Save jamesp/89092 to your computer and use it in GitHub Desktop.
Save jamesp/89092 to your computer and use it in GitHub Desktop.
# Built in library imports
import mimetypes
import re
import os
import sys
# Server imports
import urlparse
import cgi
class Juno(object):
def __init__(self, configuration=None):
"""Takes an optional configuration dictionary. """
if _hub is not None:
print 'warning: there is already a Juno object created;'
print ' you might get some weird behavior.'
self.routes = []
# Set options and merge in user-set options
self.config = {
# General settings
'log': True,
'content_type': 'text/html',
# Server options
'mode': 'dev',
'scgi_port': 8000,
'fcgi_port': 8000,
'dev_port': 8000,
# Static file handling
'use_static': True,
'static_url': '/static/*:file/',
'static_root': './static/',
'static_handler': static_serve,
# Template options
'use_templates': True,
'template_lib': 'jinja2',
'get_template_handler': get_template,
'render_template_handler': render_template,
'template_root': './views/',
'404_template': '404.html',
'500_template': '500.html',
}
if configuration is not None: self.config.update(configuration)
# set up the static file handler
if self.config['use_static']:
self.route(self.config['static_url'], self.config['static_handler'], '*')
# Set up templating
if self.config['template_lib'] == 'jinja2' and self.config['use_templates']:
import jinja2
self.config['template_env'] = jinja2.Environment(
loader=jinja2.FileSystemLoader(self.config['template_root'])
)
if self.config['template_lib'] == 'mako' and self.config['use_templates']:
import mako.lookup
self.config['template_env'] = mako.lookup.TemplateLookup(
directories=[self.config['template_root']]
)
def run(self, mode=None):
"""Runs the Juno hub, in the set mode (default now is scgi). """
# If a mode is specified, use it. Otherwise use the mode from the config.
mode = mode if mode else self.config['mode']
self.config['mode'] = mode
if mode == 'dev':
run_dev('', self.config['dev_port'], self.request)
elif mode == 'scgi':
run_scgi('', self.config['scgi_port'], self.request)
elif mode == 'fcgi':
run_fcgi('', self.config['scgi_port'], self.request)
else:
print 'error: only scgi, fcgi and the dev server are supported now'
print 'exiting juno...'
def request(self, request, method='*', **kwargs):
"""Called when a request is received. Routes a url to its view.
Returns a 3-tuple (status_string, headers, body) from
JunoResponse.render()."""
if self.log: print '%s request for %s...' %(method, request)
req_obj = JunoRequest(kwargs)
# Set the global response object in case the view wants to use it
global _response
_response = JunoResponse()
# Add a slash if there isn't one - avoids frustrating matching bugs
if request[-1] != '/': request += '/'
for route in self.routes:
if not route.match(request, method): continue
if self.log: print '%s matches, calling %s()...\n' %(
route.old_url, route.func.__name__)
# Get the return from the view
try:
response = route.dispatch(req_obj)
except:
return servererror(error=str(sys.exc_info())).render()
# If nothing returned, use the global object
if response is None: response = _response
# If we don't have a string, render the Response to one
if isinstance(response, JunoResponse):
return response.render()
return JunoResponse(body=response).render()
# No matches - 404
if self.log: print 'No match, returning 404...\n'
return notfound(error='No matching routes registered').render()
def route(self, url, func, method):
"""Attaches a view to a url or list of urls, for a given function. """
# An implicit route - the url is just the function name
if url is None: url = '/' + func.__name__ + '/'
# If we just have one url, add it
if type(url) == str: self.routes.append(JunoRoute(url, func, method))
# Otherwise add each url in the list
else:
for u in url: self.routes.append(JunoRoute(u, func, method))
def __getattr__(self, attr):
return self.config[attr] if attr in self.config else None
def __repr__(self): return '<Juno>'
class JunoRoute(object):
"""Uses a simplified regex to grab url parts:
i.e., '/hello/*:name/' compiles to '^/hello/(?P<name>\w+)/' """
def __init__(self, url, func, method):
# Make sure the url begins and ends in a '/'
if url[0] != '/': url = '/' + url
if url[-1] != '/': url += '/'
# Store the old one before we modify it (we use it for __repr__)
self.old_url = url
# RE to match the splat format
splat_re = re.compile('^\*?:(?P<var>\w+)$')
# Start building our modified url
buffer = '^'
for part in url.split('/'):
# Beginning and end entries are empty, so skip them
if not part: continue
match_obj = splat_re.match(part)
# If it doesn't match, just add it without modification
if match_obj is None: buffer += '/' + part
# Otherwise replace it with python's regex format
else: buffer += '/(?P<' + match_obj.group('var') + '>.*)'
# If we don't end with a wildcard, add a end of line modifier
if buffer[-1] != ')': buffer += '/$'
else: buffer += '/'
self.url = re.compile(buffer)
self.func = func
self.method = method.upper()
self.params = {}
def match(self, request, method):
"""Matches a request uri to this url object. """
match_obj = self.url.match(request)
if match_obj is None: return False
# Make sure the request method matches
if self.method != '*' and self.method != method: return False
# Store the parts that matched
self.params.update(match_obj.groupdict())
return True
def dispatch(self, req):
"""Calls the route's view with any named parameters."""
return self.func(req, **self.params)
def __repr__(self):
return '<JunoRoute: %s %s - %s()>' %(self.method, self.old_url,
self.func.__name__)
class JunoRequest(object):
"""Offers following members:
raw => the header dict used to construct the JunoRequest
location => uri being requested, without query string ('/' from '/?a=6')
full_location => uri with query string ('/?a=6' from '/?a=6')
user_agent => the user agent string of requester
"""
def __init__(self, request):
# Make sure we have a request uri, and it ends in '/'
if 'DOCUMENT_URI' not in request: request['DOCUMENT_URI'] = '/'
elif request['DOCUMENT_URI'][-1] != '/': request['DOCUMENT_URI'] += '/'
# Set some instance variables
self.raw = request
self.raw['input'] = {}
self.location = request['DOCUMENT_URI']
# If we get a REQUEST_URI, store it. Otherwise copy DOCUMENT_URI.
if 'REQUEST_URI' in request:
self.full_location = request['REQUEST_URI']
else: self.full_location = self.location
# Find the right user agent header
if 'HTTP_USER_AGENT' in request:
self.user_agent = request['HTTP_USER_AGENT']
elif 'User-Agent' in request:
self.user_agent = request['User-Agent']
else: self.user_agent = '?'
self.combine_request_dicts()
# Check for sessions
if config('use_sessions') and config('session_lib') == 'beaker':
self.session = request['beaker.session']
else:
self.session = None
def combine_request_dicts(self):
input_dict = self.raw['GET_DICT'].copy()
for k, v in self.raw['POST_DICT'].items():
# Combine repeated keys
if k in input_dict.keys(): input_dict[k].extend(v)
# Otherwise just add this key
else: input_dict[k] = v
# Escape each item in the input dict
for k, v in input_dict.items():
input_dict[k] = [cgi.escape(i) for i in v]
# Reduce the dict - change one item lists ([a] to a)
for k, v in input_dict.items():
if len(v) == 1: input_dict[k] = v[0]
self.raw['input'] = input_dict
def __getattr__(self, attr):
# Try returning values from self.raw
if attr in self.keys(): return self.raw[attr]
return None
def input(self, arg=None):
# No args: return the whole dictionary
if arg is None: return self.raw['input']
# Otherwise try to return the value for that key
if self.raw['input'].has_key(arg):
return self.raw['input'][arg]
return None
# Make JunoRequest act as a dictionary for self.raw
def __getitem__(self, key): return self.raw[key]
def __setitem__(self, key, val): self.raw[key] = val
def keys(self): return self.raw.keys()
def items(self): return self.raw.items()
def values(self): return self.raw.values()
def __len__(self): return len(self.raw)
def __contains__(self, key): return key in self.raw
def __repr__(self):
return '<JunoRequest: %s>' %self.location
class JunoResponse(object):
status_codes = {
200: 'OK',
301: 'Moved Permanently',
302: 'Found',
303: 'See Other',
304: 'Not Modified',
400: 'Bad Request',
403: 'Forbidden',
404: 'Not Found',
405: 'Method Not Allowed',
410: 'Gone',
500: 'Internal Server Error',
}
def __init__(self, configuration=None, **kwargs):
# Set options and merge in user-set options
self.config = {
'body': '',
'status': 200,
'headers': { 'Content-Type': config('content_type'), },
}
if configuration is None: configuration = {}
self.config.update(configuration)
self.config.update(kwargs)
self.config['headers']['Content-Length'] = len(self.config['body'])
# Add text and adjust content-length
def append(self, text):
self.config['body'] += str(text)
self.config['headers']['Content-Length'] = len(self.config['body'])
return self
# Implement +=
def __iadd__(self, text):
return self.append(text)
def render(self):
"""Returns a 3-tuple (status_string, headers, body)."""
status_string = '%s %s' %(self.config['status'],
self.status_codes[self.config['status']])
headers = [(k, str(v)) for k, v in self.config['headers'].items()]
body = '%s' %self.config['body']
return (status_string, headers, body)
# Set a header value
def header(self, header, value):
self.config['headers'][header] = value
return self
# Modify the headers dictionary when the response is treated like a dict
def __setitem__(self, header, value): self.header(header, value)
def __getitem__(self, header): return self.config['headers'][header]
def __getattr__(self, attr):
return self.config[attr]
def __repr__(self):
return '<JunoResponse: %s %s>' %(self.status, self.status_codes[self.status])
#
# Functions to deal with the global Juno object (_hub)
#
_hub = None
def init(configuration=None):
"""Set up Juno with an optional configuration."""
global _hub
_hub = _hub if _hub else Juno(configuration)
return _hub
def config(key, value=None):
"""Get or set configuration options."""
if value is None:
if type(key) == dict: _hub.config.update(key)
else: return _hub.config[key] if key in _hub.config else None
else: _hub.config[key] = value
def run(mode=None):
"""Start Juno, with an optional mode argument."""
if _hub is None: init()
if len(sys.argv) > 1:
if '-mode=' in sys.argv[1]: mode = sys.argv[1].split('=')[1]
elif '-mode' == sys.argv[1]: mode = sys.argv[2]
_hub.run(mode)
#
# Decorators to add routes based on request methods
#
def route(url=None, method='*'):
if _hub is None: init()
def wrap(f): _hub.route(url, f, method)
return wrap
def post(url=None): return route(url, 'post')
def get(url=None): return route(url, 'get')
def head(url=None): return route(url, 'head')
def put(url=None): return route(url, 'put')
def delete(url=None): return route(url, 'delete')
#
# Functions to deal with the global response object (_response)
#
_response = None
def append(body):
"""Add text to response body. """
global _response
return _response.append(body)
def header(key, value):
"""Set a response header. """
global _response
return _response.header(key, value)
def content_type(type):
"""Set the content type header. """
header('Content-Type', type)
def status(code):
_response.config['status'] = code
#
# Convenience functions for 404s and redirects
#
def redirect(url, code=302):
status(code)
# clear the response headers and add the location header
_response.config['headers'] = { 'Location': url }
return _response
def assign(from_, to):
if type(from_) not in (list, tuple): from_ = [from_]
for url in from_:
@route(url)
def temp(web): redirect(to)
def notfound(error='Unspecified error', file=None):
"""Sets the response to a 404, sets the body to 404_template."""
status(404)
file = file if file else config('404_template')
return template(file, error=error)
def servererror(error='Unspecified error', file=None):
"""Sets the response to a 500, sets the body to 500_template."""
status(500)
file = file if file else config('500_template')
return template(file, error=error)
#
# Serve static files.
#
def static_serve(web, file):
"""The default static file serve function. Maps arguments to dir structure."""
file = config('static_root') + file
if not yield_file(file): notfound("that file could not be found/served")
def yield_file(filename, type=None):
"""Append the content of a file to the response. Guesses file type if not
included. Returns 1 if requested file can' be accessed (often means doesn't
exist). Returns 2 if requested file is a directory. Returns 7 on success. """
if not os.access(filename, os.F_OK): return 1
if os.path.isdir(filename): return 2
if type is None:
guess = mimetypes.guess_type(filename)[0]
if guess is None: content_type('text/plain')
else: content_type(guess)
else: content_type(type)
append(open(filename, 'r').read())
return 7
#
# Templating
#
def template(template_path, template_dict=None, **kwargs):
"""Append a rendered template to response. If template_dict is provided,
it is passed to the render function. If not, kwargs is."""
# Retreive a template object.
t = config('get_template_handler')(template_path)
# Render it without arguments.
if not kwargs and not template_dict:
return append(config('render_template_handler')(t))
# Render the template with a provided template dictionary
if template_dict:
return append(config('render_template_handler')(t, **template_dict))
# Render the template with **kwargs
return append(config('render_template_handler')(t, **kwargs))
# The default value of config('get_template_handler')
def get_template(template_path):
"""Return a template object. This is defined for the Jinja2 and
Mako libraries, otherwise you have to override it. Takes one
parameter: a string containing the desired template path. Needs
to return an object that will be passed to your rendering function."""
return _hub.config['template_env'].get_template(template_path)
# The default value of config('render_template_handler')
def render_template(template_obj, **kwargs):
"""Renders template object with an optional dictionary of values.
Defined for Jinja2 and Mako - override it if you use another
library. Takes a template object as the first parameter, with an
optional **kwargs parameter. Needs to return a string."""
return template_obj.render(**kwargs)
def autotemplate(urls, template_path):
"""Automatically renders a template for a given path. Currently can't
use any arguments in the url."""
if type(urls) not in (list, tuple): urls = urls[urls]
for url in urls:
@route(url)
def temp(web): template(template_path)
####
# Juno's Servers - Development (using WSGI), and SCGI (using Flup)
####
def get_application(process_func):
def application(environ, start_response):
if environ is None:
print 'Error: environ is None for some reason.'
print 'Error: environ=%s' %environ
sys.exit()
# Ensure some variable exist (WSGI doesn't guarantee them)
if not environ['PATH_INFO']: environ['PATH_INFO'] = '/'
if not environ['QUERY_STRING']: environ['QUERY_STRING'] = ''
if not environ['CONTENT_LENGTH']: environ['CONTENT_LENGTH'] = '0'
# Standardize some header names
environ['DOCUMENT_URI'] = environ['PATH_INFO']
if environ['QUERY_STRING']:
environ['REQUEST_URI'] = environ['PATH_INFO']+'?'+environ['QUERY_STRING']
else:
environ['REQUEST_URI'] = environ['DOCUMENT_URI']
# Parse query string arguments
if environ['REQUEST_METHOD'] == 'GET':
environ['GET_DICT'] = cgi.parse_qs(environ['QUERY_STRING'],
keep_blank_values=1)
else: environ['GET_DICT'] = {}
if environ['REQUEST_METHOD'] == 'POST':
# Read from the POST file, skipping read errors or errors formatting
# the Content-Length header
try:
post_data = environ['wsgi.input'].read(int(environ['CONTENT_LENGTH']))
except:
post_data = ''
environ['POST_DICT'] = cgi.parse_qs(post_data,
keep_blank_values=1)
else: environ['POST_DICT'] = {}
# Done parsing inputs, now reading to send to Juno
(status_str, headers, body) = process_func(environ['DOCUMENT_URI'],
environ['REQUEST_METHOD'],
**environ)
start_response(status_str, headers)
return [body]
return application
def run_dev(addr, port, process_func):
from wsgiref.simple_server import make_server
app = get_application(process_func)
print ''
print 'running Juno development server, <C-c> to exit...'
print 'connect to 127.0.0.1:%s to use your app...' %port
print ''
srv = make_server(addr, port, app)
try:
srv.serve_forever()
except:
print 'interrupted; exiting juno...'
srv.socket.close()
def run_scgi(addr, port, process_func):
from flup.server.scgi import WSGIServer as SCGI
SCGI(get_application(process_func), bindAddress=(addr, port)).run()
def run_fcgi(addr, port, process_func):
from flup.server.fcgi import WSGIServer as FCGI
FCGI(get_application(process_func), bindAddress=(addr, port)).run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment