Created
December 12, 2014 06:16
-
-
Save mdaniel/7817c7fda1b665c6aecf to your computer and use it in GitHub Desktop.
Run a fake copy of api.parse.com that is sufficient for demos
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python2.7 | |
from __future__ import absolute_import, print_function | |
__docformat__ = 'reStructuredText' | |
import json | |
import mimetypes | |
import os | |
import re | |
import stat | |
import time | |
from uuid import uuid4 | |
from datetime import datetime | |
from urllib import quote | |
from urlparse import parse_qsl, urlparse | |
from SimpleHTTPServer import SimpleHTTPRequestHandler | |
# noinspection PyPep8Naming | |
class ParseAPIHandler(SimpleHTTPRequestHandler): | |
""" | |
Attempts to recreate enough of `api.parse.com` to permit | |
offline demos. Clearly, not every functionality is implemented | |
nor are the ones here implemented fully or (necessarily) correctly. | |
But I suspect the project for which I made this will not be the | |
last one who would like to have some offline parse.com support. | |
""" | |
LOGIN_URI = '/1/login/' | |
""" | |
``GET`` (yup) to ``?username&password`` yields a reply of | |
:: | |
{ | |
"username": "cooldude6", | |
"phone": "415-392-0202", | |
"createdAt": "2011-11-07T20:58:34.448Z", | |
"updatedAt": "2011-11-07T20:58:34.448Z", | |
"objectId": "g7y9tkhB7O", | |
"sessionToken": "pnktnjyb996sj4p156gjtp4im" | |
} | |
This path is used in a `startswith` context. | |
""" | |
CLASS_URI = re.compile(r'(/1/classes)/(\w+)(?:/([^/?&]+))?') | |
FILE_URI = re.compile(r'(/1/files)/([^/?&]+)') | |
""" | |
This regex is **much** more lenient than is parse.com | |
since this is designed to be used in demos, where | |
*strictness* is not usually a requirement. | |
""" | |
CORS_HEADERS = { | |
'Access-Control-Allow-Headers': ( | |
'Accept' | |
',Content-Type' | |
',X-Parse-REST-API-Key' | |
',X-Parse-Application-Id' | |
',X-Parse-Client-Version' | |
',X-Parse-Session-Token' | |
',X-Requested-With' | |
).lower(), | |
'Access-Control-Allow-Methods': 'GET, OPTIONS, POST, PUT', | |
'Access-Control-Allow-Origin': '*', | |
'Access-Control-Max-Age': '300', # seconds | |
} | |
JSON_CTYPE = 'application/javascript;charset=UTF-8' | |
def do_GET(self): | |
""" | |
Supports 3 `GET` URIs: classes, files and the login. | |
""" | |
# self.log_message('GET(%s)', self.path) | |
cls_ma = self.CLASS_URI.match(self.path) | |
file_ma = self.FILE_URI.match(self.path) | |
# yes, I am aware these could be in methods for clarity | |
if cls_ma: | |
class_name = cls_ma.group(2) | |
lc_class = class_name.lower() | |
result_files = [fn for fn in os.listdir('.') | |
if fn.lower().startswith(lc_class) and fn.endswith('.json')] | |
results = [] | |
for rf in result_files: | |
with open(rf) as fh: | |
results.append(json.loads(fh.read().decode('utf-8'))) | |
reply = {'results': results} | |
body = json.dumps(reply) | |
self.send_response(200, 'OK') | |
self._send_cors_headers() | |
self.send_header('Connection', 'close') | |
self.send_header('Content-Type', self.JSON_CTYPE) | |
self.send_header('Content-Length', str(len(body))) | |
self.end_headers() | |
self.wfile.write(body) | |
self.wfile.flush() | |
return | |
elif file_ma: | |
fn = file_ma.group(2) | |
c_type, _ = mimetypes.guess_type(fn) | |
if os.path.isfile(fn): | |
file_size = os.stat(fn)[stat.ST_SIZE] | |
self.send_response(200, 'OK') | |
self._send_cors_headers() | |
self.send_header('Connection', 'close') | |
self.send_header('Content-Type', c_type) | |
self.send_header('Content-Length', str(file_size)) | |
self.end_headers() | |
with open(fn, 'rb') as fh: | |
self.wfile.write(fh.read(file_size)) | |
self.wfile.flush() | |
return | |
else: | |
self.send_response(404, 'No such file') | |
self._send_cors_headers() | |
self.send_header('Connection', 'close') | |
self.send_header('Content-Type', 'text/plain') | |
self.send_header('Content-Length', '0') | |
self.end_headers() | |
return | |
elif self.path.startswith(self.LOGIN_URI): | |
pr = urlparse(self.path) | |
qs = dict(parse_qsl(pr.query)) | |
un = qs.get('username') | |
pw = qs.get('password') | |
def bogus(msg): | |
self.send_response(404, msg) | |
self._send_cors_headers() | |
self.send_header('Connection', 'close') | |
self.send_header('Content-Type', self.JSON_CTYPE) | |
self.send_header('Content-Length', '0') | |
self.end_headers() | |
if not un or not pw: | |
bogus('Missing credentials') | |
return | |
un_l = un.lower() | |
pw_l = pw.lower() | |
credentials_file = '%s,%s.json' % (quote(un_l), quote(pw_l)) | |
self.log_message('Checking for "%s"', credentials_file) | |
if os.path.isfile(credentials_file): | |
with open(credentials_file) as fh: | |
reply = fh.read() | |
self.send_response(200, 'OK') | |
self._send_cors_headers() | |
self.send_header('Connection', 'close') | |
self.send_header('Content-Type', self.JSON_CTYPE) | |
self.send_header('Content-Length', str(len(reply))) | |
self.end_headers() | |
self.wfile.write(reply) | |
self.wfile.flush() | |
return | |
else: | |
self.log_error('Nope, I do not know "%s"', un) | |
bogus('Bad credentials') | |
return | |
def do_POST(self): | |
""" | |
This works in two ways: | |
1. post to `/1/classes/Foo` | |
1. post to `/1/files/{filename}` | |
""" | |
# self.log_message('POST(%s)', self.path) | |
cls_ma = self.CLASS_URI.match(self.path) | |
file_ma = self.FILE_URI.match(self.path) | |
# yes, I am aware these could be in methods for clarity | |
if cls_ma: | |
c_type = self.headers.get('content-type') | |
if 'javascript' in c_type.lower() or 'json' in c_type.lower(): | |
c_len = self.headers.get('content-length') | |
if c_len: | |
input_text = self.rfile.read(int(c_len)) | |
else: | |
input_text = self.rfile.read() # good luck! | |
class_name = cls_ma.group(2) | |
#: :type: dict[unicode,object] | |
payload = json.loads(input_text) | |
payload['objectId'] = '%s-%d' % (class_name, int(time.time())) | |
payload['createdAt'] = '%sZ' % datetime.utcnow().isoformat() | |
payload_js = json.dumps(payload, indent=2) | |
self.log_message('Payload: %s', payload_js) | |
with open('%s.json' % payload['objectId'], 'w') as fh: | |
fh.write(payload_js) | |
self.send_response(201, 'Created') | |
self._send_cors_headers() | |
self.send_header('Connection', 'close') | |
self.send_header('Location', | |
'http://%(host)s%(path)s/%(oid)s' % { | |
'host': self.headers.get('host'), | |
'path': cls_ma.group(1), | |
'oid': payload['objectId'], | |
}) | |
reply = json.dumps({ | |
'objectId': payload['objectId'], | |
'createdAt': payload['createdAt'] | |
}) | |
self.send_header('Content-Type', self.JSON_CTYPE) | |
self.send_header('Content-Length', str(len(reply))) | |
self.end_headers() | |
self.wfile.write(reply) | |
self.wfile.flush() | |
return | |
elif file_ma: | |
input_fn = file_ma.group(2) | |
output_fn = '%s-%s' % (uuid4(), input_fn) | |
while os.path.exists(output_fn): | |
time.sleep(0.5) | |
output_fn = '%s-%s' % (uuid4(), input_fn) | |
c_len = self.headers.get('content-length') | |
if c_len: | |
input_bytes = self.rfile.read(int(c_len)) | |
else: | |
input_bytes = self.rfile.read() # good luck! | |
with open(output_fn, 'wb') as fh: | |
fh.write(input_bytes) | |
payload = { | |
'name': output_fn, | |
'url': 'http://%(host)s%(path)s/%(fn)s' % { | |
'host': self.headers.get('host'), | |
'path': file_ma.group(1), | |
'fn': output_fn, | |
}, | |
} | |
reply = json.dumps(payload) | |
self.send_response(201, 'Created') | |
self._send_cors_headers() | |
self.send_header('Connection', 'close') | |
self.send_header('Content-Type', self.JSON_CTYPE) | |
self.send_header('Content-Length', str(len(reply))) | |
self.send_header('Location', payload['url']) | |
self.end_headers() | |
self.wfile.write(reply) | |
self.wfile.flush() | |
return | |
else: | |
self.log_error('I do not understand POST:%s', self.path) | |
self._send_cors_error(500, 'Quoi?') | |
def do_PUT(self): | |
""" | |
Expects a `classes` style URI that contains the `objectId`, | |
whereupon it will read in the JSON, apply the incoming | |
JSON as an update, and take care of patching in the `url` | |
for any discovered `__type: File` properties. | |
""" | |
cls_ma = self.CLASS_URI.match(self.path) | |
if not cls_ma: | |
self.log_error('I do not understand PUT:%s', self.path) | |
self._send_cors_error(500, 'Quoi?') | |
return | |
# class_name = cls_ma.group(2) | |
# the class_name is embedded in the oid; | |
# that's how "GET" knows about them :-) | |
oid = cls_ma.group(3) | |
fn = '%s.json' % oid | |
the_data = [it for it in os.listdir('.') | |
if it.lower() == fn.lower()] | |
if not the_data: | |
self.log_error('I could not find "%s"', fn) | |
self._send_cors_error(404, 'Gone Walkabout') | |
return | |
input_fn = the_data[0] | |
output_fn = input_fn # sorry | |
with open(input_fn) as fh: | |
#: :type: dict[unicode,object] | |
item = json.loads(fh.read().decode('utf-8')) | |
c_len = self.headers.get('content-length') | |
if c_len: | |
input_text = self.rfile.read(int(c_len)) | |
else: | |
input_text = self.rfile.read() # good luck! | |
#: :type: dict[unicode,object] | |
payload = json.loads(input_text) | |
forbidden_keys = ['objectId', 'createdAt', 'updatedAt'] | |
for fk in forbidden_keys: | |
if fk in payload: | |
del payload[fk] | |
# find File references and fix them up before writes | |
# so the subsequent 'GET' will JFW | |
for k in payload.keys(): | |
obj = payload[k] | |
if isinstance(obj, dict) and 'name' in obj and '__type' in obj: | |
o_type = obj['__type'] | |
if 'File' == o_type: | |
obj['url'] = self._make_file_url(obj['name']) | |
item.update(payload) | |
item['updatedAt'] = '%sZ' % datetime.utcnow().isoformat() | |
with open(output_fn, 'w') as fh: | |
json.dump(item, fh, indent=2) | |
self.send_response(200, 'Updated') | |
self._send_cors_headers() | |
self.send_header('Connection', 'close') | |
def do_OPTIONS(self): | |
""" | |
Deals with CORS requests. | |
Request:: | |
Accept:*/* | |
Accept-Encoding:gzip, deflate, sdch | |
Accept-Language:en-US,en;q=0.8 | |
Access-Control-Request-Headers:accept, x-parse-application-id, x-parse-rest-api-key | |
Access-Control-Request-Method:GET | |
Connection:keep-alive | |
Host:api.parse.com | |
Origin:http://localhost:9000 | |
Referer:http://localhost:9000/ | |
User-Agent:Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) | |
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36 | |
Response:: | |
Access-Control-Allow-Headers:X-Parse-REST-API-Key, | |
X-Parse-Application-Id, X-Parse-Client-Version, X-Parse-Session-Token, | |
X-Requested-With, Content-Type | |
Access-Control-Allow-Methods:OPTIONS, POST, GET | |
Access-Control-Allow-Origin:* | |
Access-Control-Max-Age:86400 | |
Connection:keep-alive | |
Content-Length:0 | |
Content-Type:application/json; charset=utf-8 | |
Date:Fri, 12 Dec 2014 00:18:23 GMT | |
Server:nginx/1.6.0 | |
X-Parse-Platform:G1 | |
X-Runtime:0.000076 | |
""" | |
origin_header = self.headers.get('origin') | |
self.log_message('OPTIONS for "%s"\n[origin=%s]', self.path, origin_header) | |
self.send_response(200, 'OK') | |
self._send_cors_headers() | |
self.send_header('Connection', 'close') | |
self.send_header('Content-Length', '0') | |
self.end_headers() | |
def _send_cors_headers(self): | |
""" | |
Outputs the CORS headers into the current Response. | |
""" | |
for k, v in self.CORS_HEADERS.iteritems(): | |
self.send_header(k, v) | |
def _send_cors_error(self, code, message=None): | |
"""It is expected you will just return after calling this.""" | |
self.send_error(code, message) | |
self._send_cors_headers() | |
self.send_header('Connection', 'close') | |
self.send_header('Content-Length', '0') | |
def _make_file_url(self, fn): | |
""" | |
Places the "files" path hard-coding in one place. | |
I thought about fishing it out of the regex | |
but thought better of it. | |
""" | |
return 'http://%(host)s/1/files/%(fn)s' % { | |
'host': self.headers.get('host'), | |
'fn': fn | |
} | |
def main(): | |
import sys | |
# if one does not use the Threading flavor, | |
# Chrome will eat your Python script for breakfast | |
from SocketServer import ThreadingTCPServer | |
tcp = ThreadingTCPServer(('0.0.0.0', 8000), ParseAPIHandler) | |
try: | |
print('Serving on http://%s:%d' % tcp.server_address) | |
tcp.serve_forever() | |
except KeyboardInterrupt: | |
pass # don't let these ugly things print | |
print('Bye, now!') | |
finally: | |
tcp.shutdown() | |
print('Shutdown, OK') | |
sys.exit(0) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment