Last active
February 4, 2017 09:09
-
-
Save robbiet480/e0291fce8a9e58e683a19b5dee5c1be5 to your computer and use it in GitHub Desktop.
A Python 3/asyncio implementation of DACP discovery and pairing for use with iTunes or Apple TV (tested on aTV Generation 4)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import aiohttp | |
from aiohttp import web | |
from zeroconf import ServiceInfo, ServiceBrowser, Zeroconf | |
import socket | |
import struct | |
import random | |
import hashlib | |
def generate_hex_string(bits): | |
return hex(random.getrandbits(bits))[2:-1] | |
pairing = generate_hex_string(64).upper() | |
print('Pairing code', pairing) | |
guid = generate_hex_string(64).upper() | |
print('GUID', guid) | |
type_ = '_touch-remote._tcp.local.' | |
registration_name = 'pyATV-{}._touch-remote._tcp.local.'.format(pairing) | |
local_ip = socket.inet_aton(socket.gethostbyname(socket.gethostname())) | |
device_name = 'pyATV - {}'.format(pairing) | |
print('device_name', device_name) | |
info = ServiceInfo('_touch-remote._tcp.local.', registration_name, | |
local_ip, 3689, 0, 0, { | |
'DvNm': device_name, | |
'RemV': '10000', | |
'DvTy': 'computer', | |
'RemN': 'Remote', | |
'Pair': pairing, | |
'txtvers': '1' | |
}) | |
zeroconf = Zeroconf() | |
def encode_msg(msg): | |
encoded = ''.join([('%s%s%s' % (key, | |
struct.pack('>i', len(value)).decode(), | |
value)) for key, value in msg.items()]) | |
return 'cmpa%s%s' % (struct.pack('>i', len(encoded)).decode(), encoded) | |
def calculate_paring_code(pair, code): | |
payload = str(pair + ''.join([c + '\x00' for c in code])).encode('utf-8') | |
return hashlib.md5(payload).hexdigest().upper() | |
def serve(): | |
loop = asyncio.get_event_loop() | |
loop.run_in_executor(None, zeroconf.register_service, info) | |
app = aiohttp.web.Application(loop=loop) | |
srv = None | |
pin_code = ''.join(['%s' % random.randint(0, 9) for num in range(0, 4)]) | |
print('Please enter', pin_code) | |
@asyncio.coroutine | |
def delayed_shutdown(): | |
yield from asyncio.sleep(0.2) | |
srv.close() | |
zeroconf.unregister_service(info) | |
zeroconf.close() | |
@asyncio.coroutine | |
def accept_pairing(request): | |
received_pair_code = request.rel_url.query.get('pairingcode') | |
expected_code = calculate_paring_code(pairing, pin_code) | |
print('RECEIVED PAIRING CODE', received_pair_code) | |
print('EXPECTED CODE', expected_code) | |
if (received_pair_code == expected_code): | |
print('Pairing code matched!!!') | |
else: | |
print('WRONG PAIRING CODE :(') | |
cmpg = ''.join([chr(int(guid[i:i + 2], 16)) for i in range(0, 16, 2)]) | |
# Option 1 | |
encoded = encode_msg({'cmpg': cmpg, 'cmnm': device_name, | |
'cmty': 'computer'}) | |
# Option 2 | |
# values = {'cmpg': cmpg, 'cmnm': device_name, 'cmty': 'computer'} | |
# encoded = '' | |
# for key, value in values.items(): | |
# packed = struct.pack('>i', len(value)).decode() | |
# encoded += '%s%s%s' % (key, packed, value) | |
# header = 'cmpa%s' % (struct.pack('>i', len(encoded)).decode()) | |
# encoded = '%s%s' % (header, encoded) | |
# loop.create_task(delayed_shutdown()) | |
return aiohttp.web.Response(content_type='text/plain', text=encoded) | |
app.router.add_get('/pair', accept_pairing) | |
handler = app.make_handler() | |
f = loop.create_server(handler, '0.0.0.0', 3689) | |
srv = loop.run_until_complete(f) | |
print('serving on', srv.sockets[0].getsockname()) | |
try: | |
loop.run_forever() | |
finally: | |
srv.close() | |
loop.run_until_complete(srv.wait_closed()) | |
loop.run_until_complete(app.shutdown()) | |
loop.run_until_complete(handler.shutdown(60.0)) | |
loop.run_until_complete(app.cleanup()) | |
loop.close() | |
serve() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment