Skip to content

Instantly share code, notes, and snippets.

@marek-obuchowicz
Created October 10, 2021 12:35
Show Gist options
  • Save marek-obuchowicz/7d1ea1f2f3e104891c441ad9802a11f9 to your computer and use it in GitHub Desktop.
Save marek-obuchowicz/7d1ea1f2f3e104891c441ad9802a11f9 to your computer and use it in GitHub Desktop.
Basic example of communication layer with Emotiva XMC-1.
#!/usr/bin/env python3
import argparse
import logging
import socket
import xml.etree.ElementTree as ET
TRANSPONDER_REQUEST_PORT = 7000
TRANSPONDER_RESPONSE_PORT = 7001
SEND_TIMEOUT = 1
RECV_TIMEOUT = 1
DISCOVERY_TIMEOUT = 1
MAX_RESPONSE_SIZE = 1024 * 100
logger = logging.getLogger(__name__)
class client:
XMC_PING='<?xml version="1.0" encoding="utf-8"?><emotivaPing />'
def __init__(self, bind_ip=''):
self.connected = False
self.bind_ip = bind_ip
def discover_devices(self, broadcast_addr='255.255.255.255'):
try:
sock_resp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock_resp.settimeout(DISCOVERY_TIMEOUT)
sock_resp.bind((self.bind_ip, TRANSPONDER_RESPONSE_PORT))
logger.debug(f"Sending broadcast ping request to {broadcast_addr}")
sock_req = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock_req.settimeout(SEND_TIMEOUT)
sock_req.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock_req.sendto(self.XMC_PING.encode('UTF-8'), (broadcast_addr, TRANSPONDER_REQUEST_PORT))
logger.debug(f"Waiting for ping responses")
responses = {}
try:
while True:
data, sender_addr = sock_resp.recvfrom(1024)
responses[sender_addr] = data
logger.debug(f"Discovered {sender_addr}")
except:
pass
finally:
sock_resp.close()
sock_req.close()
ret = list(map(lambda x: x[0], responses.keys()))
logging.info(f"Discovered devices: {','.join(ret)}")
return ret
def connect(self, hostname):
try:
sock_resp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock_resp.settimeout(RECV_TIMEOUT)
sock_resp.bind((self.bind_ip, TRANSPONDER_RESPONSE_PORT))
logger.debug(f"Sending ping request to {hostname}")
sock_req = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock_req.settimeout(SEND_TIMEOUT)
sock_req.sendto(self.XMC_PING.encode('UTF-8'), (hostname, TRANSPONDER_REQUEST_PORT))
logger.debug(f"Waiting for ping response")
data, sender_addr = sock_resp.recvfrom(MAX_RESPONSE_SIZE)
logger.debug(f"Received response from {sender_addr}: {data}")
finally:
sock_resp.close()
sock_req.close()
self.device = self._parse_emotiva_transponder_response(data)
if self.device['control_version'] == '2.0':
logging.info(f"Connected to {self.device['name']} on {hostname}, model: {self.device['model']}")
else:
raise ValueError(f"Emotiva control version {self.device['control_version']} is not supported")
self.hostname = hostname
self.connected = True
def call_command(self, command, value=0):
self._check_connection()
logger.info(f"Sending command: {command} value={value}")
cmd = self._prepare_command(command, value)
return self._command_response(cmd)
def poweron(self):
return self.call_command("power_on")
def poweroff(self):
return self.call_command("power_off")
def _check_connection(self):
if not self.connected:
raise ConnectionError(f("Not connected to device"))
def _command_response(self, data):
resp = None
try:
sock_resp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock_resp.settimeout(RECV_TIMEOUT)
sock_resp.bind((self.bind_ip, self.device['ports']['control']))
sock_req = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock_req.settimeout(SEND_TIMEOUT)
sock_req.sendto(data, (self.hostname, self.device['ports']['control']))
resp, sender_addr = sock_resp.recvfrom(MAX_RESPONSE_SIZE)
except socket.timeout:
pass
finally:
sock_resp.close()
sock_req.close()
return resp
def _prepare_command(self, command, value):
ret = f'<?xml version="1.0" encoding="utf-8"?><emotivaControl><{command} value="{value}" ack="yes" /></emotivaControl>'
return ret.encode('UTF-8')
def _parse_emotiva_transponder_response(self, data):
resp = ET.fromstring(data)
try:
device = {
'model': resp.findall("./model")[0].text,
'name': resp.findall("./name")[0].text,
'control_version': resp.findall("./control/version")[0].text,
'ports': {
'control': int(resp.findall("./control/controlPort")[0].text),
'notify': int(resp.findall("./control/notifyPort")[0].text),
'info': int(resp.findall("./control/infoPort")[0].text),
}
}
except IndexError:
raise ValueError('Emotiva ping response parse error')
return device
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.StreamHandler()
]
)
parser = argparse.ArgumentParser(description='Control Emotiva device')
parser.add_argument('command', type=str, nargs=1, help='Name of command to send (ie. power_on, power_off, volume, set_volume, loudness_on, loudness, source_1, preset1, dirac) ')
parser.add_argument('value', type=str, nargs='?', default='0', help='Command value, default: 0')
parser.add_argument('--host', type=str, default=None, help="Emotiva device hostname")
args = parser.parse_args()
client = client()
client.connect(args.host or client.discover_devices()[0])
client.call_command(args.command[0], args.value)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment