Skip to content

Instantly share code, notes, and snippets.

@appcove
Created February 21, 2010 02:55
Show Gist options
  • Save appcove/310090 to your computer and use it in GitHub Desktop.
Save appcove/310090 to your computer and use it in GitHub Desktop.
# vim:encoding=utf-8:ts=2:sw=2:expandtab
#
# Copyright 2010 AppCove, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from ..Lib import EnvironmentContainer, HeaderContainer
###############################################################################
class application:
#============================================================================
def __new__(cls, environ, start_response):
#TODO: the traversal of __mro__ could safely be cached, and stored in a
# dictionary using the class as the key.
# Allow a subclass to override what class will be used
cls = cls.Type()
# Get an instance, and initiailze it with the environment dictionary
self = object.__new__(cls)
self.__init__(environ)
# The __mro__ will take care of intra-plugin dependancies, provided
# that it is iterated over backwards. Cannot use hasattr() because it
# traverses up the super-classes until it finds the attribute, which
# can result in the attribute being called multiple times. Baaaaddd.
for c in reversed(cls.__mro__):
if 'Init' in c.__dict__:
c.Init(self)
# Call the single Exec method
self.Exec()
# Wrap up any plugoins with the Done method
for c in cls.__mro__:
if 'Done' in c.__dict__:
c.Done(self)
start_response(self.Response_Status, self.Response_Header.All())
return self.Response_Iterator
#============================================================================
def __init__(self, environ):
self.Env = EnvironmentContainer(environ)
self.Response_Header = HeaderContainer()
self.Response_Status = '200 OK'
self.Response_Iterator = ['']
#============================================================================
@classmethod
def Type(cls):
return cls
#============================================================================
def Exec(self):
pass
#============================================================================
def Info(self):
rval = ''
rval += (
'AppStruct.WSGI.Handler.application is active!\n\n' +
'Main Application class: ' + repr(type(self)) + '\n\n' +
'type(self).__mro__ is:\n' +
str.join('', (' ' + repr(c) + '\n' for c in type(self).__mro__)) + '\n\n' +
'------------------------------------------------------------------------\n\n' +
'If you are seeing this message, it means that AppStruct is running, and \n' +
'the base Exec() method was called. Either you did this on purpose, or \n' +
'you did it by incorrect use of super(), or perhaps, you just haven\'t \n' +
'gotten around to overriding it yet. In any event, following this is \n' +
'a dump of all of the plugins that are installed.\n\n'
)
for c in reversed(type(self).__mro__):
rval += (
'\n\n------------------------------------------------------------------------\n' +
repr(c) + '\n\n'
)
if 'Dump' in c.__dict__:
rval += c.Dump(self)
else:
rval += 'No Dump() method available...\n'
return rval
#============================================================================
# Embedded encoding plugin -- so it is always avalable
Encoding = ('utf-8', 'strict')
def Text_Encode(self, s):
return s.encode(*self.Encoding)
def Text_Decode(self, b):
return b.decode(*self.Encoding)
#============================================================================
def Dump(self):
return 'self.Encoding = ' + str(self.Encoding)
#============================================================================
# vim:encoding=utf-8:ts=2:sw=2:expandtab
#
# Copyright 2010 AppCove, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from io import StringIO
from .Lib import FieldContainer
from .HTTP import OptionHeader_Parse
###############################################################################
class ContentParse:
'''
This plugin parses the Content-type header, and makes it available as
properties which can be read by any plugin.
Configuration:
(None)
Methods:
(None)
Properties:
.ContentType (str)
.ContentType_Extra (dict)
.ContentLength (int)
'''
#============================================================================
def Init(self):
# Parse out the content type header
self.ContentType, self.ContentType_Extra = OptionHeader_Parse(
self.Env.get('CONTENT_TYPE', '')
)
# Read the content length, or default to zero
try:
self.ContentLength = int(self.Env['CONTENT_LENGTH'])
except (KeyError, ValueError):
self.ContentLength = 0
#============================================================================
def Dump(self):
return \
'self.ContentType = ' + str(self.ContentType) + '\n' \
'self.ContentType_Extra = ' + str(self.ContentType_Extra) + '\n' \
'self.ContentLength = ' + str(self.ContentLength) + '\n'
###############################################################################
class PostForm(ContentParse):
'''
This plugin will enable the automatic parsing of incoming post variables,
provided the content type is set to either
application/x-www-form-urlencoded, or
application/x-url-encoded
The input stream will be consumed by this plugin ONLY if the content type
matches, so be sure not to have any conflicting plugins floating around.
Configuration:
.PostForm_MemoryLimit (int)
If this amount of memory is exceeded, a exception will be raised
Methods:
(None)
Properties:
.Post (Lib.FieldContainer)
All of the post variables
'''
PostForm_MemoryLimit = 1024*1024*8
#============================================================================
def Init(self):
qs = ''
if self.ContentType in (
'application/x-www-form-urlencoded',
'application/x-url-encoded'
):
# Verify the maximum form size is not exceeded
if self.ContentLength > self.PostForm_MemoryLimit:
raise MemoryLimitExceeded(
'Form post size of {0} exceeded the limit of {1}.'.format(
self.ContentLength, self.PostForm_MemoryLimit
))
# The data is posted as a query string of bytes. Only read ContentLength
qs = self.Env['wsgi.input'].read(self.ContentLength)
# Need to decode it
qs = self.Text_Decode(qs)
#endif
self.Post = FieldContainer(qs)
#============================================================================
def Dump(self):
return \
'self.PostForm_MemoryLimit = ' + repr(self.PostForm_MemoryLimit) + '\n' \
'self.Post = ' + repr(self.Post) + '\n'
###############################################################################
class QueryString:
'''
This plugin will enable the automatic parsing of the QueryString.
Configuration:
(None)
Methods:
(None)
Properties:
.QS (Lib.FieldContainer)
All of the query string request variables
'''
#============================================================================
def Init(self):
self.QS = FieldContainer(self.Env.get('QUERY_STRING', ''))
#============================================================================
def Dump(self):
return 'self.QS = {0}\n'.format(self.QS)
###############################################################################
class TextWriter:
'''
This plugin provides for a convenient .write method that can be used to
efficiently write content to an output buffer. At the end of the request,
the buffer is read and the follwing items are unconditionally overwritten:
self.Response_Iterator
self.Response_Header['Content-length']
If at some point during a request, you need direct access to those items
without them being overwritten, then simply call
self.TextWriter_Disable()
Incompatible:
(None)
Configuration:
(None)
Methods:
.TextWriter_Disable()
.write(str)
Properties:
.TextWriter_Buffer (StringIO or None)
'''
#============================================================================
def Init(self):
self.TextWriter_Buffer = StringIO('')
self.write = self.TextWriter_Buffer.write
#============================================================================
def TextWriter_Disable(self):
self.TextWriter_Buffer = None
del(self.write)
#============================================================================
def Done(self):
if self.TextWriter_Buffer != None:
output = self.Text_Encode(self.TextWriter_Buffer.getvalue())
self.Response_Iterator = (output,)
self.Response_Header['Content-length'] = str(len(output))
#============================================================================
def Dump(self):
return 'self.TextWriter_Buffer = ' + str(self.TextWriter_Buffer)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment