Skip to content

Instantly share code, notes, and snippets.

@wil3
Last active November 15, 2017 19:43
Show Gist options
  • Save wil3/23c1c9071ae2b00d9416b64ab15feecf to your computer and use it in GitHub Desktop.
Save wil3/23c1c9071ae2b00d9416b64ab15feecf to your computer and use it in GitHub Desktop.
""" Demo creating a simple webserver and using a pub/sub design pattern
to have handlers for the request url to do work off the critcal path. This could
for example be used to process data analytics.
The publisher uses the callback to signal to the subscriber data has changed.
Conditions are used to ensure subscribers are not reading data while it is being
written. This allows the sublisher to wait (sleep) until the publisher wakes
them up to process an event. See docs for more information on Conditions,
https://docs.python.org/2/library/threading.html#condition-objects
Running the example:
$ python http-server-events.py
After the server above make a web browser request to,
localhost:8000?message=hey
The message will be echoed back and it will be published to
any subscribers.
Addtional Reading Material:
For a simple example you can also just run a standalone server like this,
$ python -m SimpleHTTPServer 8000
Where 8000 is the port it will run on. In python3 you can bind to any
interface. CAUTION Use this responsibly! All of your files will be publicly
exposed in the current directy the command is run,
$ python3 -m http.server --bind [Interface IP Address] 8000
"""
__author__ = "William Koch"
__email__ = "wfkoch [at] bu.edu"
import SimpleHTTPServer
import SocketServer
import threading
import logging
import urlparse
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("http")
class EventData(object):
""" Shared data object for web events """
def __init__(self):
self.request = None
def write(self, request):
logger.info("Data writter {}".format(request))
self.request = request
def read(self):
return self.request
class RequestSubscriber(threading.Thread):
""" A subscriber must have a callback that will be called when an event
occurs. """
def __init__(self, condition, event_data):
self.condition = condition
self.event_data = event_data
self.message_ready = False
super(RequestSubscriber, self).__init__()
def _process_request(self, request):
""" Do some work with the data we obtain like some analytics """
logger.info("Subscribe: Do work {} {}".format(threading.current_thread().name, request))
def run(self):
""" Run forever waiting for events to happen to be processed """
while True:
self.condition.acquire()
while True:
if self.message_ready:
data = self.event_data.read()
logger.info("Subscribe: {} {}".format(threading.current_thread().name, data))
self._process_request(data)
self.message_ready = False
break
# If we wait (block) until notified or timeout occurs. This also
# releases the underlying lock.
# In a perfect world in this situation could just use notify as the
# callback
self.condition.wait(timeout=5)
self.condition.release()
def callback(self):
""" Since we are demonstrating reading / writting from a single shared object
the callback just acts as a flag indicating data is ready to be written.
In practice (i.e., Javascript Events, Android UI Framework, etc) the
callback will contain the data you are interested in."""
self.message_ready = True
class HTTPEventPublisher(object):
def __init__(self, condition, event_data):
self.subscriber = None
self.condition = condition
self.event_data = event_data
def publish(self, event):
# Publish the event
self.condition.acquire()
self._write_event(event)
self._notify_subscriber()
self.condition.notify_all()
self.condition.release()
def _notify_subscriber(self):
if self.subscriber:
self.subscriber()
def subscribe(self, topic, callback):
# We only are having a single subscribe so
# ignore topic
self.subscriber = callback
def _write_event(self, event):
""" Publish the event, we only are demonstrating one here,
What if the publisher has different events that can occur? We allow
the subscribers to subscribe to particular events"""
logger.info("Publish: {} {}".format(threading.current_thread().name, event))
self.event_data.write(event)
"""
Cant pass in parameters to the constructor for the handler because this is
initialized internal by TCPServer. To bypass this setup a factory worker to create the class.
The scope will allow us to set the values properly
"""
def HandlerFactory(publisher):
# First get the simple webserve working then convert to a factory
class EventPublisherHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
""" Uses the following base class to do most work,
https://docs.python.org/2/library/basehttpserver.html#BaseHTTPServer.BaseHTTPRequestHandler
"""
DEFAULT_RESPONSE = "<html><head></head><body><h1>OI! Add message as URL parameter!</h1></body></html>"
# Hacked template
MESSAGE_RESPONSE = "<html><head></head><body><h1>{}</h1></body></html>"
# The *args and **kwargs allows us to pass in variable lenth of
# arguments and keyword arguments
# Read this for more details, https://pythontips.com/2013/08/04/args-and-kwargs-in-python-explained/
def __init__(self, *args, **kwargs):
self.publisher = publisher
# This lib uses old style class so cant use super
# https://stackoverflow.com/questions/25317696/python-webserver-script-wont-submit-to-sqlite3-database
SimpleHTTPServer.SimpleHTTPRequestHandler.__init__(self, *args, **kwargs)
def do_GET(self):
self.send_response(200)
self.send_header("Content-type", "text/html")
# Form the body, echo the message query string
# or send default
parsed_path = urlparse.urlparse(self.path)
q = urlparse.parse_qs(parsed_path.query)
if "message" in q:
message = q["message"][0]
logger.debug("Found message {}".format(message))
body = self.MESSAGE_RESPONSE.format(message)
else:
body = self.DEFAULT_RESPONSE
self.send_header("Content-length", len(body))
self.end_headers()
# Write response
self.wfile.write(body)
# Publish the event
self.publisher.publish(self.path)
return EventPublisherHandler
if __name__ == "__main__":
# Everything runs forever, get ready to killall python
condition = threading.Condition()
data = EventData()
pub = HTTPEventPublisher(condition, data)
sub = RequestSubscriber(condition, data)
pub.subscribe("path", sub.callback)
sub.start()
handler = HandlerFactory(pub)
HOST, PORT = "localhost", 8000
httpd = SocketServer.TCPServer((HOST, PORT), handler)
logger.info("Serving on port {}".format(PORT))
httpd.serve_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment