Skip to content

Instantly share code, notes, and snippets.

@edwardgeorge
Created July 18, 2010 23:08
Show Gist options
  • Save edwardgeorge/480796 to your computer and use it in GitHub Desktop.
Save edwardgeorge/480796 to your computer and use it in GitHub Desktop.
research into fake devices for syncing cultured code's Things task management application with other apps.
import gzip
import logging
import plistlib
import rfc822
import StringIO
class config:
DEVICE_NAME = "Fake iPod"
DEVICE_MODEL = "iPod touch"
CLIENT_API_VER = "1520"
class ThingsSync(object):
def __init__(self, config):
self.config = config
self.db_identifier = None
self.outgoing_changes = []
self.data = {}
self.post_init()
def post_init(self):
pass
def pre_sync(self):
pass
def post_sync(self):
pass
def handle_change(self, type_, uuid, changes):
logging.getLogger('ThingsSync.handle_change').\
debug('got change: %s(%s)\n%s' % (type_, uuid,
'\n'.join([' %s: %r' % i for i in changes.items()])))
self.data.setdefault(type_, {}).setdefault(uuid, {}).update(changes)
def do_SEND_DEVICE_NAME(self, data):
return self.config.DEVICE_NAME
def do_SEND_DEVICE_MODEL(self, data):
return self.config.DEVICE_MODEL
def do_SEND_SECURITY_CODE(self, data):
code = raw_input('security code:')
return code
def do_PAIR_WITH_DESKTOP(self, data):
if self.db_identifier is not None:
raise Exception('ALREADY PAIRED')
self.db_identifier = data
return 'PAIR_WITH_DESKTOP_OK'
def do_SEND_APPLICATION_VERSION(self, data):
return self.config.CLIENT_API_VER
def do_HELLO(self, data):
if data != self.db_identifier:
raise Exception('DB ID MISMATCH')
return 'OK'
def do_BEGIN_SYNC_SESSION(self, data):
self.pre_sync()
return self._encode_data(len(self.outgoing_changes))
def do_END_SYNC_SESSION(self, data):
self.post_sync()
return self._gzip('OK')
def do_RECEIVE_CHANGES_FOR_NEXT_OBJECT(self, data):
self.handle_change(data['type'], data['uuid'], data['changes'])
return self._gzip('OK')
def do_RETURN_CHANGES_FOR_NEXT_OBJECT(self, data):
if not self.outgoing_changes:
return ''
type_, uuid, changes = self.outgoing_changes.pop(0)
data = self._encode_data({'type': type_, 'uuid': uuid, 'changes': changes})
return data
def _parse_data(self, data):
if not data:
return
if data.startswith('\x1f\x8b'):
data = gzip.GzipFile(fileobj=StringIO.StringIO(data)).read()
return plistlib.readPlistFromString(data)
def _parse_http(self, data):
data = StringIO.StringIO(data)
req = data.readline()
msg = rfc822.Message(data)
return req, msg
def _encode_data(self, data):
data = plistlib.writePlistToString(data)
return self._gzip(data)
def _gzip(self, data):
out = StringIO.StringIO()
g = gzip.GzipFile(fileobj=out, mode='wb')
g.write(data)
g.close()
return out.getvalue()
def connection(self, sock, address):
logger = logging.getLogger('ThingsSync.connection')
logger.info('connection from %s:%s' % address)
BUFSIZE = 4096
eof = False
while not eof:
data = ''
while True:
d = sock.recv(BUFSIZE)
data = data + d
if not d:
eof = True
if len(d) < BUFSIZE:
break
if not data:
break
req, msg = self._parse_http(data)
method, path, httpver = req.split()
data = self._parse_data(msg.fp.read())
logger.debug('%s %s' % (path, data))
try:
resp = getattr(self, 'do_%s' % path)(data)
logger.debug('-> %r' % resp)
resp = 'HTTP/1.0 200 OK\r\nContent-Length: %d\r\n\r\n%s' % (len(resp), resp)
except Exception, e:
import traceback
traceback.print_exc()
resp = 'HTTP/1.0 500 SERVER ERROR\r\n\r\n'
sock.send(resp)
logger.info('closing connection from %s:%s' % address)
sock.close()
if __name__ == '__main__':
import hashlib
import select
import socket
import uuid
import pybonjour
logging.basicConfig(level=logging.DEBUG)
things_sync = ThingsSync(config)
sock = socket.socket()
sock.bind(('0.0.0.0', 0))
PORT = sock.getsockname()[1]
sock.listen(1)
def register_callback(sdRef, flags, errorCode, name, regtype, domain):
if errorCode != pybonjour.kDNSServiceErr_NoError:
print 'error registering service'
raise Exception()
print 'service registraton OK'
sdready[sdRef.fileno()] = True
sdRef = pybonjour.DNSServiceRegister(name=hashlib.sha1(uuid.uuid1().hex).hexdigest(),
regtype='_cciphonethings._tcp',
port=PORT,
callBack=register_callback)
sdready = {sdRef.fileno(): False}
try:
while not sdready[sdRef.fileno()]:
select.select([sdRef], [], [])
pybonjour.DNSServiceProcessResult(sdRef)
while True:
conn, addr = sock.accept()
things_sync.connection(conn, addr)
finally:
sdRef.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment