Created
October 10, 2021 12:35
-
-
Save marek-obuchowicz/7d1ea1f2f3e104891c441ad9802a11f9 to your computer and use it in GitHub Desktop.
Basic example of communication layer with Emotiva XMC-1.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import logging | |
import socket | |
import xml.etree.ElementTree as ET | |
TRANSPONDER_REQUEST_PORT = 7000 | |
TRANSPONDER_RESPONSE_PORT = 7001 | |
SEND_TIMEOUT = 1 | |
RECV_TIMEOUT = 1 | |
DISCOVERY_TIMEOUT = 1 | |
MAX_RESPONSE_SIZE = 1024 * 100 | |
logger = logging.getLogger(__name__) | |
class client: | |
XMC_PING='<?xml version="1.0" encoding="utf-8"?><emotivaPing />' | |
def __init__(self, bind_ip=''): | |
self.connected = False | |
self.bind_ip = bind_ip | |
def discover_devices(self, broadcast_addr='255.255.255.255'): | |
try: | |
sock_resp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) | |
sock_resp.settimeout(DISCOVERY_TIMEOUT) | |
sock_resp.bind((self.bind_ip, TRANSPONDER_RESPONSE_PORT)) | |
logger.debug(f"Sending broadcast ping request to {broadcast_addr}") | |
sock_req = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) | |
sock_req.settimeout(SEND_TIMEOUT) | |
sock_req.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) | |
sock_req.sendto(self.XMC_PING.encode('UTF-8'), (broadcast_addr, TRANSPONDER_REQUEST_PORT)) | |
logger.debug(f"Waiting for ping responses") | |
responses = {} | |
try: | |
while True: | |
data, sender_addr = sock_resp.recvfrom(1024) | |
responses[sender_addr] = data | |
logger.debug(f"Discovered {sender_addr}") | |
except: | |
pass | |
finally: | |
sock_resp.close() | |
sock_req.close() | |
ret = list(map(lambda x: x[0], responses.keys())) | |
logging.info(f"Discovered devices: {','.join(ret)}") | |
return ret | |
def connect(self, hostname): | |
try: | |
sock_resp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) | |
sock_resp.settimeout(RECV_TIMEOUT) | |
sock_resp.bind((self.bind_ip, TRANSPONDER_RESPONSE_PORT)) | |
logger.debug(f"Sending ping request to {hostname}") | |
sock_req = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) | |
sock_req.settimeout(SEND_TIMEOUT) | |
sock_req.sendto(self.XMC_PING.encode('UTF-8'), (hostname, TRANSPONDER_REQUEST_PORT)) | |
logger.debug(f"Waiting for ping response") | |
data, sender_addr = sock_resp.recvfrom(MAX_RESPONSE_SIZE) | |
logger.debug(f"Received response from {sender_addr}: {data}") | |
finally: | |
sock_resp.close() | |
sock_req.close() | |
self.device = self._parse_emotiva_transponder_response(data) | |
if self.device['control_version'] == '2.0': | |
logging.info(f"Connected to {self.device['name']} on {hostname}, model: {self.device['model']}") | |
else: | |
raise ValueError(f"Emotiva control version {self.device['control_version']} is not supported") | |
self.hostname = hostname | |
self.connected = True | |
def call_command(self, command, value=0): | |
self._check_connection() | |
logger.info(f"Sending command: {command} value={value}") | |
cmd = self._prepare_command(command, value) | |
return self._command_response(cmd) | |
def poweron(self): | |
return self.call_command("power_on") | |
def poweroff(self): | |
return self.call_command("power_off") | |
def _check_connection(self): | |
if not self.connected: | |
raise ConnectionError(f("Not connected to device")) | |
def _command_response(self, data): | |
resp = None | |
try: | |
sock_resp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) | |
sock_resp.settimeout(RECV_TIMEOUT) | |
sock_resp.bind((self.bind_ip, self.device['ports']['control'])) | |
sock_req = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) | |
sock_req.settimeout(SEND_TIMEOUT) | |
sock_req.sendto(data, (self.hostname, self.device['ports']['control'])) | |
resp, sender_addr = sock_resp.recvfrom(MAX_RESPONSE_SIZE) | |
except socket.timeout: | |
pass | |
finally: | |
sock_resp.close() | |
sock_req.close() | |
return resp | |
def _prepare_command(self, command, value): | |
ret = f'<?xml version="1.0" encoding="utf-8"?><emotivaControl><{command} value="{value}" ack="yes" /></emotivaControl>' | |
return ret.encode('UTF-8') | |
def _parse_emotiva_transponder_response(self, data): | |
resp = ET.fromstring(data) | |
try: | |
device = { | |
'model': resp.findall("./model")[0].text, | |
'name': resp.findall("./name")[0].text, | |
'control_version': resp.findall("./control/version")[0].text, | |
'ports': { | |
'control': int(resp.findall("./control/controlPort")[0].text), | |
'notify': int(resp.findall("./control/notifyPort")[0].text), | |
'info': int(resp.findall("./control/infoPort")[0].text), | |
} | |
} | |
except IndexError: | |
raise ValueError('Emotiva ping response parse error') | |
return device | |
if __name__ == "__main__": | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s [%(levelname)s] %(message)s", | |
handlers=[ | |
logging.StreamHandler() | |
] | |
) | |
parser = argparse.ArgumentParser(description='Control Emotiva device') | |
parser.add_argument('command', type=str, nargs=1, help='Name of command to send (ie. power_on, power_off, volume, set_volume, loudness_on, loudness, source_1, preset1, dirac) ') | |
parser.add_argument('value', type=str, nargs='?', default='0', help='Command value, default: 0') | |
parser.add_argument('--host', type=str, default=None, help="Emotiva device hostname") | |
args = parser.parse_args() | |
client = client() | |
client.connect(args.host or client.discover_devices()[0]) | |
client.call_command(args.command[0], args.value) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment