Skip to content

Instantly share code, notes, and snippets.

@mdaniel
Created December 12, 2014 06:16
Show Gist options
  • Save mdaniel/7817c7fda1b665c6aecf to your computer and use it in GitHub Desktop.
Save mdaniel/7817c7fda1b665c6aecf to your computer and use it in GitHub Desktop.
Run a fake copy of api.parse.com that is sufficient for demos
#! /usr/bin/env python2.7
from __future__ import absolute_import, print_function
__docformat__ = 'reStructuredText'
import json
import mimetypes
import os
import re
import stat
import time
from uuid import uuid4
from datetime import datetime
from urllib import quote
from urlparse import parse_qsl, urlparse
from SimpleHTTPServer import SimpleHTTPRequestHandler
# noinspection PyPep8Naming
class ParseAPIHandler(SimpleHTTPRequestHandler):
"""
Attempts to recreate enough of `api.parse.com` to permit
offline demos. Clearly, not every functionality is implemented
nor are the ones here implemented fully or (necessarily) correctly.
But I suspect the project for which I made this will not be the
last one who would like to have some offline parse.com support.
"""
LOGIN_URI = '/1/login/'
"""
``GET`` (yup) to ``?username&password`` yields a reply of
::
{
"username": "cooldude6",
"phone": "415-392-0202",
"createdAt": "2011-11-07T20:58:34.448Z",
"updatedAt": "2011-11-07T20:58:34.448Z",
"objectId": "g7y9tkhB7O",
"sessionToken": "pnktnjyb996sj4p156gjtp4im"
}
This path is used in a `startswith` context.
"""
CLASS_URI = re.compile(r'(/1/classes)/(\w+)(?:/([^/?&]+))?')
FILE_URI = re.compile(r'(/1/files)/([^/?&]+)')
"""
This regex is **much** more lenient than is parse.com
since this is designed to be used in demos, where
*strictness* is not usually a requirement.
"""
CORS_HEADERS = {
'Access-Control-Allow-Headers': (
'Accept'
',Content-Type'
',X-Parse-REST-API-Key'
',X-Parse-Application-Id'
',X-Parse-Client-Version'
',X-Parse-Session-Token'
',X-Requested-With'
).lower(),
'Access-Control-Allow-Methods': 'GET, OPTIONS, POST, PUT',
'Access-Control-Allow-Origin': '*',
'Access-Control-Max-Age': '300', # seconds
}
JSON_CTYPE = 'application/javascript;charset=UTF-8'
def do_GET(self):
"""
Supports 3 `GET` URIs: classes, files and the login.
"""
# self.log_message('GET(%s)', self.path)
cls_ma = self.CLASS_URI.match(self.path)
file_ma = self.FILE_URI.match(self.path)
# yes, I am aware these could be in methods for clarity
if cls_ma:
class_name = cls_ma.group(2)
lc_class = class_name.lower()
result_files = [fn for fn in os.listdir('.')
if fn.lower().startswith(lc_class) and fn.endswith('.json')]
results = []
for rf in result_files:
with open(rf) as fh:
results.append(json.loads(fh.read().decode('utf-8')))
reply = {'results': results}
body = json.dumps(reply)
self.send_response(200, 'OK')
self._send_cors_headers()
self.send_header('Connection', 'close')
self.send_header('Content-Type', self.JSON_CTYPE)
self.send_header('Content-Length', str(len(body)))
self.end_headers()
self.wfile.write(body)
self.wfile.flush()
return
elif file_ma:
fn = file_ma.group(2)
c_type, _ = mimetypes.guess_type(fn)
if os.path.isfile(fn):
file_size = os.stat(fn)[stat.ST_SIZE]
self.send_response(200, 'OK')
self._send_cors_headers()
self.send_header('Connection', 'close')
self.send_header('Content-Type', c_type)
self.send_header('Content-Length', str(file_size))
self.end_headers()
with open(fn, 'rb') as fh:
self.wfile.write(fh.read(file_size))
self.wfile.flush()
return
else:
self.send_response(404, 'No such file')
self._send_cors_headers()
self.send_header('Connection', 'close')
self.send_header('Content-Type', 'text/plain')
self.send_header('Content-Length', '0')
self.end_headers()
return
elif self.path.startswith(self.LOGIN_URI):
pr = urlparse(self.path)
qs = dict(parse_qsl(pr.query))
un = qs.get('username')
pw = qs.get('password')
def bogus(msg):
self.send_response(404, msg)
self._send_cors_headers()
self.send_header('Connection', 'close')
self.send_header('Content-Type', self.JSON_CTYPE)
self.send_header('Content-Length', '0')
self.end_headers()
if not un or not pw:
bogus('Missing credentials')
return
un_l = un.lower()
pw_l = pw.lower()
credentials_file = '%s,%s.json' % (quote(un_l), quote(pw_l))
self.log_message('Checking for "%s"', credentials_file)
if os.path.isfile(credentials_file):
with open(credentials_file) as fh:
reply = fh.read()
self.send_response(200, 'OK')
self._send_cors_headers()
self.send_header('Connection', 'close')
self.send_header('Content-Type', self.JSON_CTYPE)
self.send_header('Content-Length', str(len(reply)))
self.end_headers()
self.wfile.write(reply)
self.wfile.flush()
return
else:
self.log_error('Nope, I do not know "%s"', un)
bogus('Bad credentials')
return
def do_POST(self):
"""
This works in two ways:
1. post to `/1/classes/Foo`
1. post to `/1/files/{filename}`
"""
# self.log_message('POST(%s)', self.path)
cls_ma = self.CLASS_URI.match(self.path)
file_ma = self.FILE_URI.match(self.path)
# yes, I am aware these could be in methods for clarity
if cls_ma:
c_type = self.headers.get('content-type')
if 'javascript' in c_type.lower() or 'json' in c_type.lower():
c_len = self.headers.get('content-length')
if c_len:
input_text = self.rfile.read(int(c_len))
else:
input_text = self.rfile.read() # good luck!
class_name = cls_ma.group(2)
#: :type: dict[unicode,object]
payload = json.loads(input_text)
payload['objectId'] = '%s-%d' % (class_name, int(time.time()))
payload['createdAt'] = '%sZ' % datetime.utcnow().isoformat()
payload_js = json.dumps(payload, indent=2)
self.log_message('Payload: %s', payload_js)
with open('%s.json' % payload['objectId'], 'w') as fh:
fh.write(payload_js)
self.send_response(201, 'Created')
self._send_cors_headers()
self.send_header('Connection', 'close')
self.send_header('Location',
'http://%(host)s%(path)s/%(oid)s' % {
'host': self.headers.get('host'),
'path': cls_ma.group(1),
'oid': payload['objectId'],
})
reply = json.dumps({
'objectId': payload['objectId'],
'createdAt': payload['createdAt']
})
self.send_header('Content-Type', self.JSON_CTYPE)
self.send_header('Content-Length', str(len(reply)))
self.end_headers()
self.wfile.write(reply)
self.wfile.flush()
return
elif file_ma:
input_fn = file_ma.group(2)
output_fn = '%s-%s' % (uuid4(), input_fn)
while os.path.exists(output_fn):
time.sleep(0.5)
output_fn = '%s-%s' % (uuid4(), input_fn)
c_len = self.headers.get('content-length')
if c_len:
input_bytes = self.rfile.read(int(c_len))
else:
input_bytes = self.rfile.read() # good luck!
with open(output_fn, 'wb') as fh:
fh.write(input_bytes)
payload = {
'name': output_fn,
'url': 'http://%(host)s%(path)s/%(fn)s' % {
'host': self.headers.get('host'),
'path': file_ma.group(1),
'fn': output_fn,
},
}
reply = json.dumps(payload)
self.send_response(201, 'Created')
self._send_cors_headers()
self.send_header('Connection', 'close')
self.send_header('Content-Type', self.JSON_CTYPE)
self.send_header('Content-Length', str(len(reply)))
self.send_header('Location', payload['url'])
self.end_headers()
self.wfile.write(reply)
self.wfile.flush()
return
else:
self.log_error('I do not understand POST:%s', self.path)
self._send_cors_error(500, 'Quoi?')
def do_PUT(self):
"""
Expects a `classes` style URI that contains the `objectId`,
whereupon it will read in the JSON, apply the incoming
JSON as an update, and take care of patching in the `url`
for any discovered `__type: File` properties.
"""
cls_ma = self.CLASS_URI.match(self.path)
if not cls_ma:
self.log_error('I do not understand PUT:%s', self.path)
self._send_cors_error(500, 'Quoi?')
return
# class_name = cls_ma.group(2)
# the class_name is embedded in the oid;
# that's how "GET" knows about them :-)
oid = cls_ma.group(3)
fn = '%s.json' % oid
the_data = [it for it in os.listdir('.')
if it.lower() == fn.lower()]
if not the_data:
self.log_error('I could not find "%s"', fn)
self._send_cors_error(404, 'Gone Walkabout')
return
input_fn = the_data[0]
output_fn = input_fn # sorry
with open(input_fn) as fh:
#: :type: dict[unicode,object]
item = json.loads(fh.read().decode('utf-8'))
c_len = self.headers.get('content-length')
if c_len:
input_text = self.rfile.read(int(c_len))
else:
input_text = self.rfile.read() # good luck!
#: :type: dict[unicode,object]
payload = json.loads(input_text)
forbidden_keys = ['objectId', 'createdAt', 'updatedAt']
for fk in forbidden_keys:
if fk in payload:
del payload[fk]
# find File references and fix them up before writes
# so the subsequent 'GET' will JFW
for k in payload.keys():
obj = payload[k]
if isinstance(obj, dict) and 'name' in obj and '__type' in obj:
o_type = obj['__type']
if 'File' == o_type:
obj['url'] = self._make_file_url(obj['name'])
item.update(payload)
item['updatedAt'] = '%sZ' % datetime.utcnow().isoformat()
with open(output_fn, 'w') as fh:
json.dump(item, fh, indent=2)
self.send_response(200, 'Updated')
self._send_cors_headers()
self.send_header('Connection', 'close')
def do_OPTIONS(self):
"""
Deals with CORS requests.
Request::
Accept:*/*
Accept-Encoding:gzip, deflate, sdch
Accept-Language:en-US,en;q=0.8
Access-Control-Request-Headers:accept, x-parse-application-id, x-parse-rest-api-key
Access-Control-Request-Method:GET
Connection:keep-alive
Host:api.parse.com
Origin:http://localhost:9000
Referer:http://localhost:9000/
User-Agent:Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1)
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36
Response::
Access-Control-Allow-Headers:X-Parse-REST-API-Key,
X-Parse-Application-Id, X-Parse-Client-Version, X-Parse-Session-Token,
X-Requested-With, Content-Type
Access-Control-Allow-Methods:OPTIONS, POST, GET
Access-Control-Allow-Origin:*
Access-Control-Max-Age:86400
Connection:keep-alive
Content-Length:0
Content-Type:application/json; charset=utf-8
Date:Fri, 12 Dec 2014 00:18:23 GMT
Server:nginx/1.6.0
X-Parse-Platform:G1
X-Runtime:0.000076
"""
origin_header = self.headers.get('origin')
self.log_message('OPTIONS for "%s"\n[origin=%s]', self.path, origin_header)
self.send_response(200, 'OK')
self._send_cors_headers()
self.send_header('Connection', 'close')
self.send_header('Content-Length', '0')
self.end_headers()
def _send_cors_headers(self):
"""
Outputs the CORS headers into the current Response.
"""
for k, v in self.CORS_HEADERS.iteritems():
self.send_header(k, v)
def _send_cors_error(self, code, message=None):
"""It is expected you will just return after calling this."""
self.send_error(code, message)
self._send_cors_headers()
self.send_header('Connection', 'close')
self.send_header('Content-Length', '0')
def _make_file_url(self, fn):
"""
Places the "files" path hard-coding in one place.
I thought about fishing it out of the regex
but thought better of it.
"""
return 'http://%(host)s/1/files/%(fn)s' % {
'host': self.headers.get('host'),
'fn': fn
}
def main():
import sys
# if one does not use the Threading flavor,
# Chrome will eat your Python script for breakfast
from SocketServer import ThreadingTCPServer
tcp = ThreadingTCPServer(('0.0.0.0', 8000), ParseAPIHandler)
try:
print('Serving on http://%s:%d' % tcp.server_address)
tcp.serve_forever()
except KeyboardInterrupt:
pass # don't let these ugly things print
print('Bye, now!')
finally:
tcp.shutdown()
print('Shutdown, OK')
sys.exit(0)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment