Created
August 25, 2010 16:45
-
-
Save zentrope/549828 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer | |
import urllib2 | |
import base64 | |
import cgi | |
from xml.etree.ElementTree import XML | |
class Cws: | |
def __init__(self): | |
self.url = 'http://localhost:9998/cws' | |
def get_user(self, user): | |
xml = """ | |
<request version="1" revision="1"> | |
<connection> | |
<domain>hpstest</domain> | |
<store-id>webstore</store-id> | |
<service-user>webadmin</service-user> | |
<service-password>xj11</service-password> | |
</connection> | |
<get-user> | |
<user> | |
<id>%s</id> | |
</user> | |
</get-user> | |
</request> | |
""" | |
req = urllib2.Request(self.url, xml % user) | |
conn = urllib2.urlopen(req) | |
data = conn.read() | |
conn.close() | |
return data | |
class Mws: | |
def __init__(self, host='localhost', port=10002): | |
self.health_url = 'http://%s:%s/health' % (host, port) | |
self.url = 'http://%s:%s/mws' % (host, port) | |
self.auth = 'Basic ' + base64.b64encode('mws-user:mws-pass') | |
def health(self): | |
conn = urllib2.urlopen(self.health_url) | |
data = conn.read() | |
conn.close() | |
return data | |
def consumers(self, domain, chain, limit=0): | |
resource = '%s/%s/chain/%s/consumers' % (self.url, domain, chain) | |
if limit != 0: | |
resource = '%s?limit=%s' % (resource, limit) | |
req = urllib2.Request(resource) | |
req.add_header('Authorization', self.auth) | |
conn = urllib2.urlopen(req) | |
data = conn.read() | |
conn.close() | |
return data | |
class Handler: | |
def send_html(self, r): | |
r.send_response(200) | |
r.send_header('content-type', 'text/html') | |
r.end_headers() | |
return | |
def fparams(self, r): | |
content_length = r.headers.getheader('Content-length') | |
length = int(content_length) | |
return cgi.parse_qs(r.rfile.read(length)) | |
class CwsFormHandler(Handler): | |
def handle(self, r): | |
page = """ | |
<h1>CWS</h1> | |
<form action='/cws/result' method='post'> | |
user-id: <input type='text' name='user-id' value=''/> | |
<input type='submit'/> | |
</form> | |
""" | |
self.send_html(r) | |
r.wfile.write(page) | |
return | |
class MwsFormHandler(Handler): | |
def handle(self, r): | |
page = """ | |
<h1>MWS</h1> | |
<form action='/mws/result' method='post'> | |
domain: <input type='text' name='domain' value=''/> | |
chain: <input type='text' name='chain' value=''/> | |
<input type='submit'/> | |
</form> | |
""" | |
self.send_html(r) | |
r.wfile.write(page) | |
return | |
class CwsHandler(Handler): | |
def handle(self, r): | |
content_length = r.headers.getheader('Content-length') | |
length = int(content_length) | |
form = self.fparams(r) | |
user = form['user-id'][0] | |
cws = Cws() | |
data = cws.get_user(user) | |
data = data.replace('<', '<').replace('>', '>') | |
self.send_html(r) | |
r.wfile.write('<pre>%s</pre>' % data) | |
class MwsHandler(Handler): | |
def handle(self, request): | |
content_length = request.headers.getheader('Content-length') | |
length = int(content_length) | |
form = self.fparams(request) | |
domain = form['domain'][0] | |
chain = form['chain'][0] | |
print form | |
mws = Mws() | |
print "domain", domain | |
print "chain", chain | |
consumers = mws.consumers(domain, chain, limit=20) | |
root = XML(consumers) | |
consumers = consumers.replace('<', '<').replace('>', '>') | |
self.send_html(request) | |
request.wfile.write("""<html> | |
<head> | |
<style> | |
h1, h2, h3 { color: #666 } | |
pre { font-size: 80%; border: 1px solid #f2f2f2; | |
background-color: white; padding: 4px; height: 600px; overflow: auto; | |
-webkit-border-radius: 5px; -webkit-box-shadow: 2px 2px 6px rgba(0,0,0,0.6); | |
} | |
table { -webkit-box-shadow: 2px 2px 6px rgba(0,0,0,0.6); } | |
td, th { border: 1px solid #f2f2f2; padding: 4px; font-size: 90%; | |
background-color: white } | |
th { text-align: left; background-color: #d6d6d6; | |
} | |
body { background-color: #e6e6e6 } | |
#container { width: 700; margin: auto; padding: 1em; border: 1px solid silver; | |
margin-top: 1em; -webkit-border-radius: 5px; background-color: #f6f6f6} | |
</style> | |
</head>""") | |
request.wfile.write('<body') | |
request.wfile.write('<div id="container">') | |
request.wfile.write('<h1>Chain Consumers</h1>') | |
request.wfile.write('<p>domain: <b>%s</b></p>' % (domain)) | |
request.wfile.write('<p>chain: <b>%s</b></p>' % (chain)) | |
request.wfile.write('<p><a href="/mws/form">Try Again</a></p>') | |
request.wfile.write('<h2>Consumers:</h2>') | |
request.wfile.write('<table cellpadding="0" cellspacing="0" width="100%" style="border: 1px solid #f2f2f2">') | |
request.wfile.write('<tr><th>Id</th><th>Name</th><th>Opt in?</th></tr>') | |
for c in root.findall('consumers/consumer'): | |
id = c.find('id').text | |
title = c.find('title').text | |
first = c.find('first_name').text | |
last = c.find('last_name').text | |
optin = c.find('opt-in').text | |
if optin == '1': | |
optin = 'Yes' | |
else: | |
optin = 'No' | |
request.wfile.write(""" | |
<tr><td>%s</td><td>%s %s %s</td><td>%s</td></tr> | |
""" % (id, title, first, last, optin)) | |
request.wfile.write('</table>') | |
request.wfile.write('<h2>Raw Data</h2>') | |
request.wfile.write('<pre>%s</pre>' % consumers) | |
request.wfile.write('</div>') | |
request.wfile.write('</body></head>') | |
return | |
class HomeHandler(Handler): | |
def handle(self, r): | |
page = """ | |
<li><a href='/mws/form'>MWS demo</a></li> | |
<li><a href='/cws/form'>CWS demo</a></li> | |
""" | |
self.send_html(r) | |
r.wfile.write(page) | |
return | |
class DefaultHandler(Handler): | |
def handle(self, request): | |
self.send_html(request) | |
request.wfile.write('<h1>no handler for: %s' % request.path) | |
return | |
class ErrorHandler(Handler): | |
def handle(self, req, error): | |
self.send_html(req) | |
req.wfile.write('<h1>Error: <span style="color: red">%s</span></h1>' % error) | |
req.wfile.write('<li><a href="/mws/form">mws demo</a></li>') | |
class BaseHandler(BaseHTTPRequestHandler): | |
handlers = { | |
'/' : HomeHandler(), | |
'/mws/form' : MwsFormHandler(), | |
'/mws/result' : MwsHandler(), | |
'/cws/form' : CwsFormHandler(), | |
'/cws/result' : CwsHandler() | |
} | |
def do_GET(self): | |
if self.path == '/favicon.ico': | |
return False | |
handler = HomeHandler() | |
try: | |
handler = self.handlers[self.path] | |
except KeyError: | |
pass | |
try: | |
handler.handle(self) | |
except Exception, e: | |
ErrorHandler().handle(self, e) | |
return True | |
def do_POST(self): | |
self.do_GET() | |
return | |
def main(): | |
try: | |
server = HTTPServer(('localhost', 6789), BaseHandler) | |
print 'started httpserver...' | |
server.serve_forever() | |
except KeyboardInterrupt: | |
print '^C received, shutting down server' | |
server.socket.close() | |
if __name__ == '__main__': | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This is a quick and dirty Python script I wrote to create a web app in order to demo some changes to the web services I'd written. Amazing how far you can get with just the standard tools provided by a default installation of Python. As basic as the above is, it's a lot sexier than using Curl at the command line. ;)