Skip to content

Instantly share code, notes, and snippets.

@geekgonecrazy
Last active May 30, 2019 16:36
Show Gist options
  • Save geekgonecrazy/6099301 to your computer and use it in GitHub Desktop.
Save geekgonecrazy/6099301 to your computer and use it in GitHub Desktop.
Leapcast
#!/usr/bin/python
# Python program that emulates ChromeCast device
from twisted.internet import reactor
from twisted.internet.protocol import DatagramProtocol
import tornado.ioloop
import tornado.web
import tornado.websocket
import threading
import string
import argparse
import signal
import logging
from textwrap import dedent
import shlex, subprocess
import json
status = dict()
friendlyName = "Mopidy"
user_agent ="Mozilla/5.0 (CrKey - 0.0.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/30.0.1573.2 Safari/537.36"
chrome = "/opt/google/chrome/chrome"
class SSDP(DatagramProtocol):
SSDP_ADDR = '239.255.255.250'
SSDP_PORT = 1900
MS = """HTTP/1.1 200 OK\r
LOCATION: http://$ip:8008/ssdp/device-desc.xml\r
CACHE-CONTROL: max-age=1800\r
CONFIGID.UPNP.ORG: 7337\r
BOOTID.UPNP.ORG: 7337\r
USN: uuid:3e1cc7c0-f4f3-11e2-b778-0800200c9a66::urn:dial-multiscreen-org:service:dial:1\r
ST: urn:dial-multiscreen-org:service:dial:1\r
\r
"""
def __init__(self, iface):
self.iface = iface
self.transport = reactor.listenMulticast(
self.SSDP_PORT, self, listenMultiple=True)
self.transport.setLoopbackMode(1)
self.transport.joinGroup(self.SSDP_ADDR, interface=iface)
def stop(self):
self.transport.leaveGroup(self.SSDP_ADDR, interface=self.iface)
self.transport.stopListening()
def datagramReceived(self, datagram, address):
if "urn:dial-multiscreen-org:service:dial:1" in datagram and "M-SEARCH" in datagram:
data =string.Template(dedent(self.MS)).substitute(ip=self.iface)
self.transport.write(data, address)
class LEAP(tornado.web.RequestHandler):
service = """<service xmlns="urn:dial-multiscreen-org:schemas:dial">
<name>$name</name>
<options allowStop="true"/>
<state>$state</state>
$link
</service>
"""
ip = None
url = "$query"
def prepare(self):
global status
if self.__class__.__name__ not in status:
status[self.__class__.__name__ ] = dict(name=self.__class__.__name__ , state="stopped", link="", pid=None)
self.ip = self.request.host
def get_app_status(self):
return status[self.__class__.__name__ ]
def set_app_status(self, app_status):
global status
app_status["name"] = self.__class__.__name__
status[self.__class__.__name__ ] = app_status
def _response(self):
self.set_header("Content-Type", "application/xml; charset=UTF-8")
self.set_header("Cache-control", "no-cache, must-revalidate, no-store")
self.finish(self._toXML(self.get_app_status()))
def post(self):
"""Start app"""
self.set_status(201)
self.set_header("Location", self._getLocation(self.__class__.__name__))
pid = self.launch(self.request.body)
self.set_app_status(dict(state="running", link="""<link rel="run" href="run"/>""", pid=pid))
self._response()
def get(self):
"""Status of an app"""
if self.get_app_status()["pid"]:
# app crashed or closed
if self.get_app_status()["pid"].poll() is not None:
self.set_app_status(dict(name=self.__class__.__name__ , state="stopped", link="", pid=None))
self._response()
def delete(self):
"""Close app"""
self.destroy(self.get_app_status()["pid"])
self.set_app_status(dict(name=self.__class__.__name__ , state="stopped", link="", pid=None))
self._response()
def _getLocation(self, app):
return "http://%s/apps/%s/run" % (self.ip, app )
def launch(self, data):
appurl = string.Template(self.url).substitute(query=data)
command_line ="""%s --incognito --kiosk --app="%s" --user-agent="%s" """ % (chrome, appurl, user_agent)
print(command_line)
args = shlex.split(command_line)
return subprocess.Popen(args)
def destroy(self, pid):
pid.terminate()
def _toXML(self, data):
return string.Template(dedent(self.service)).substitute(data)
@classmethod
def toInfo(cls):
global status
if cls.__name__ not in status:
status[cls.__name__ ] = dict(name=cls.__name__ , state="stopped", link="", pid=None)
return string.Template(dedent(cls.service)).substitute(status[cls.__name__])
class ChromeCast(LEAP):
service = """<service xmlns="urn:chrome.google.com:cast">
<name>ChromeCast</name>
<options allowStop="true"/>
<activity-status>
<description>Experimental Mopidy sink</description>
<image src="http://www.mopidy.com/media/images/penguin_speakers.jpg"/>
</activity-status>
<servicedata>
<protocols>
<protocol>video_playback</protocol>
<protocol>audio_playback</protocol>
</protocols>
</servicedata>
<state>$state</state>
$link
</service>
"""
url = "https://www.gstatic.com/cv/receiver.html?$query"
class YouTube(LEAP):
url = "https://www.youtube.com/tv?$query"
class PlayMovies(LEAP):
url = "https://play.google.com/video/avi/eureka?$query"
class GoogleMusic(LEAP):
url = "https://play.google.com/music/cast/player"
class GoogleCastSampleApp(LEAP):
url = "http://anzymrcvr.appspot.com/receiver/anzymrcvr.html"
class Geektac(LEAP):
url = "http://labs.geekgonecrazy.com/chromecast/receiver.html"
class Netflix(LEAP):
url = ""
class GoogleCastPlayer(LEAP):
url = "http://anzymrcvr.appspot.com/receiver/anzymrcvr.html"
class Fling(LEAP):
url = "http://anzymrcvr.appspot.com/receiver/anzymrcvr.html"
class DeviceHandler(tornado.web.RequestHandler):
device = """<?xml version="1.0" encoding="utf-8"?>
<root xmlns="urn:schemas-upnp-org:device-1-0">
<specVersion>
<major>1</major>
<minor>0</minor>
</specVersion>
<URLBase>$path</URLBase>
<device>
<deviceType>urn:schemas-upnp-org:device:dail:1</deviceType>
<friendlyName>$friendlyName</friendlyName>
<manufacturer>Google Inc.</manufacturer>
<modelName>Eureka Dongle</modelName>
<UDN></UDN>
<iconList>
<icon>
<mimetype>image/png</mimetype>
<width>98</width>
<height>55</height>
<depth>32</depth>
<url>/setup/icon.png</url>
</icon>
</iconList>
<serviceList>
<service>
<serviceType>urn:schemas-upnp-org:service:dail:1</serviceType>
<serviceId>urn:upnp-org:serviceId:dail</serviceId>
<controlURL>/ssdp/notfound</controlURL>
<eventSubURL>/ssdp/notfound</eventSubURL>
<SCPDURL>/ssdp/notfound</SCPDURL>
</service>
</serviceList>
</device>
</root>"""
def get(self):
path = "http://%s" % self.request.host
self.add_header("Application-URL",path)
self.set_header("Content-Type", "application/xml; charset=UTF-8")
self.set_header("Cache-control", "no-cache")
gservice = "\n".join( [
ChromeCast.toInfo(),
YouTube.toInfo(),
PlayMovies.toInfo(),
GoogleMusic.toInfo(),
GoogleCastSampleApp.toInfo(),
GoogleCastPlayer.toInfo(),
Fling.toInfo(),
Geektac.toInfo(),
])
self.write(string.Template(dedent(self.device)).substitute(dict(services=gservice, friendlyName=friendlyName, path=path )))
class WebSocketCast(tornado.websocket.WebSocketHandler):
def open(self):
logging.info("WebSocket opened")
def on_message(self, message):
cmd = json.loads(message)
print(cmd)
if cmd["type"] == "REGISTER":
self.new_request()
def on_close(self):
logging.info("WebSocket opened")
def new_chanell(self):
self.write_message((json.dumps({"type":"NEWCHANNEL"})))
def new_request(self):
self.write_message((json.dumps({"type":"CHANNELREQUEST", "requestId": "123456"})))
class HTTPThread(threading.Thread):
def run(self):
self.application = tornado.web.Application([
(r"/ssdp/device-desc.xml", DeviceHandler),
(r"/apps", DeviceHandler),
(r"/apps/ChromeCast", ChromeCast),
(r"/apps/YouTube", YouTube),
(r"/apps/PlayMovies", PlayMovies),
(r"/apps/GoogleMusic", GoogleMusic),
(r"/apps/GoogleCastSampleApp", GoogleCastSampleApp),
(r"/apps/GoogleCastPlayer", GoogleCastPlayer),
(r"/apps/Fling", Fling),
(r"/apps/Geektac", Geektac),
(r"/apps/Netflix", Netflix),
(r"/connection", WebSocketCast),
(r"/system/control", WebSocketCast),
])
self.application.listen(8008)
tornado.ioloop.IOLoop.instance().start()
def shutdown(self, ):
logging.info('Stopping HTTP server')
reactor.callFromThread(reactor.stop)
logging.info('Stopping DIAL server')
tornado.ioloop.IOLoop.instance().stop()
def sig_handler(self, sig, frame):
tornado.ioloop.IOLoop.instance().add_callback(self.shutdown)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument('iface', help='Interface you want to bind to (for example 192.168.1.22)')
parser.add_argument('--name', help='Friendly name for this device')
parser.add_argument('--user_agent', help='Custom user agent')
parser.add_argument('--chrome', help='Path to Google Chrome executable')
args = parser.parse_args()
if args.name:
friendlyName = args.name
logging.info("Service name is %s" % friendlyName)
if args.user_agent:
user_agent = args.user_agent
logging.info("User agent is %s" % user_agent)
if args.chrome:
chrome = args.chrome
logging.info("Chrome path is %s" % chrome)
server = HTTPThread()
server.start()
signal.signal(signal.SIGTERM, server.sig_handler)
signal.signal(signal.SIGINT, server.sig_handler)
def LeapUPNPServer():
logging.info("Listening on %s" % args.iface)
sobj = SSDP(args.iface)
reactor.addSystemEventTrigger('before', 'shutdown', sobj.stop)
reactor.callWhenRunning(LeapUPNPServer)
reactor.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment