Last active
August 29, 2015 14:04
-
-
Save oogali/9877e9958e72112ad812 to your computer and use it in GitHub Desktop.
Yamaha HTTP Control PoC
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import sys | |
import time | |
import curses | |
import signal | |
import requests | |
from lxml import etree | |
class YZone: | |
def __init__(self, doctree=None, name=None, receiver=None): | |
if receiver is None: | |
raise Exception('Expected parent receiver object') | |
if doctree is None and name is None: | |
raise Exception('Expected tree of document data or zone name') | |
self.receiver = receiver | |
if doctree is not None: | |
self.name = doctree.tag | |
self.doctree = doctree | |
else: | |
self.name = name | |
self.doctree = self.get_status() | |
def get_status(self): | |
req = self.receiver.build_request(verb='GET', zone=self.name, | |
command='GetParam', | |
target=['Basic_Status']) | |
response = self.receiver.query(req) | |
return list(response.xpath('/YAMAHA_AV')[0])[0] | |
def get_mute(self): | |
req = self.receiver.build_request(verb='GET', zone=self.name, | |
command='GetParam', | |
target=['Volume', 'Mute']) | |
response = self.receiver.query(req) | |
return response.xpath('/YAMAHA_AV/{}/Volume/Mute'.format(self.name))[0].text == 'On' | |
def set_mute(self, on=True): | |
req = self.receiver.build_request(verb='PUT', zone=self.name, | |
command='On' if on else 'Off', | |
target=['Volume', 'Mute']) | |
response = self.receiver.query(req) | |
return response | |
def toggle_mute(self): | |
new_status = not self.get_mute() | |
self.set_mute(new_status) | |
return new_status | |
def get_volume(self): | |
req = self.receiver.build_request(verb='GET', zone=self.name, | |
command='GetParam', | |
target=['Volume', 'Lvl']) | |
response = self.receiver.query(req) | |
return int(response.xpath('/YAMAHA_AV/{}/Volume/Lvl/Val'.format(self.name))[0].text) | |
def set_volume(self, level): | |
req = self.receiver.build_request(verb='PUT', zone=self.name, | |
command=str(level), | |
target=['Volume', 'Lvl', 'Val'], | |
args={'Exp': '1', 'Unit': 'dB'}) | |
response = self.receiver.query(req) | |
return response | |
def adjust_volume(self, offset): | |
volume = self.get_volume() | |
new_volume = volume + int(offset) | |
self.set_volume(new_volume) | |
return new_volume | |
class YReceiver: | |
def __init__(self, address=None): | |
if address is None: | |
raise Exception('You must specify a receiver IP address or hostname') | |
self.address = address | |
self.url = 'http://{}/YamahaRemoteControl/ctrl'.format(address) | |
self.zones = [] | |
# load our main zone | |
self.create_zone('Main_Zone') | |
def create_zone(self, name): | |
zone = YZone(name=name, receiver=self) | |
self.zones.append(zone) | |
def build_request(self, verb=None, zone=None, command=None, target=None, args=None): | |
if verb is None: | |
raise Exception('A verb must be provided') | |
root = etree.Element('YAMAHA_AV', cmd=verb) | |
doc = etree.ElementTree(root) | |
zone_element = etree.SubElement(root, zone) | |
target_element = etree.SubElement(zone_element, target[0]) | |
i = 0 | |
for t in target[1:]: | |
target_element = etree.SubElement(target_element, t) | |
if i == len(target[1:]) - 2: | |
parent_target = target_element | |
i += 1 | |
target_element.text = command | |
if args is not None: | |
for k, v in args.items(): | |
element = etree.SubElement(parent_target, k) | |
element.text = v | |
return etree.tostring(doc) | |
def query(self, req): | |
r = requests.post(self.url, data=req) | |
return etree.fromstring(r.text) | |
def catch_ctrl_c(signo, frame): | |
curses.endwin() | |
def main(argv=None): | |
if argv is None: | |
argv = sys.argv | |
if len(argv) < 2: | |
print '{} <receiver ip>'.format(argv[0]) | |
return -1 | |
rcvr = YReceiver(address=argv[1]) | |
zone = rcvr.zones[0] | |
print zone.name | |
print '=' * len(zone.name) | |
print etree.tostring(zone.doctree, pretty_print=True) | |
time.sleep(4) | |
signal.signal(signal.SIGINT, catch_ctrl_c) | |
screen = curses.initscr() | |
curses.cbreak() | |
screen.keypad(1) | |
screen.addstr(0, 10, "Hit 'q' to quit") | |
screen.refresh() | |
key = None | |
while key != ord('q'): | |
key = screen.getch() | |
screen.addch(20, 25, key) | |
screen.refresh() | |
if key == curses.KEY_UP: | |
zone.adjust_volume(+5) | |
elif key == curses.KEY_DOWN: | |
zone.adjust_volume(-5) | |
elif key == ord('m'): | |
zone.toggle_mute() | |
curses.endwin() | |
return 0 | |
if __name__ == '__main__': | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment