Skip to content

Instantly share code, notes, and snippets.

@2ndsky
Created April 23, 2013 13:01
Show Gist options
  • Select an option

  • Save 2ndsky/5443368 to your computer and use it in GitHub Desktop.

Select an option

Save 2ndsky/5443368 to your computer and use it in GitHub Desktop.
First shot of a smarthome.py plugin for controlling luxtronic 2 heating units.
#!/usr/bin/env python
# vim: set encoding=utf-8 tabstop=4 softtabstop=4 shiftwidth=4 expandtab
#########################################################################
# Copyright 2012-2013 KNX-User-Forum e.V. http://knx-user-forum.de/
#########################################################################
# This file is part of SmartHome.py. http://smarthome.sourceforge.net/
#
# SmartHome.py is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# SmartHome.py is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with SmartHome.py. If not, see <http://www.gnu.org/licenses/>.
#########################################################################
import logging
import socket
import threading
import struct
import time
logger = logging.getLogger('')
class luxex(Exception):
pass
class LuxBase():
def __init__(self, host, port=8888):
self.host = host
self.port = int(port)
self._sock = False
self._lock = threading.Lock()
self.is_connected = False
self._connection_attemtps = 0
self._connection_errorlog = 60
self._params = []
self._attrs = []
self._calc = []
def get_attribute(self, identifier):
return self._attrs[identifier] if identifier < len(self._attrs) else None
def get_parameter(self, identifier):
return self._params[identifier] if identifier < len(self._params) else None
def get_calculated(self, identifier):
return self._calc[identifier] if identifier < len(self._calc) else None
def connect(self):
self._lock.acquire()
try:
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.settimeout(2)
self._sock.connect((self.host, self.port))
except Exception, e:
self._connection_attempts -= 1
if self._connection_attempts <= 0:
logger.error('Luxtronic2: could not connect to {0}:{1}: {2}'.format(self.host, self.port, e))
self._connection_attempts = self._connection_errorlog
return
finally:
self._lock.release()
logger.info('Luxtronic2: connected to {0}:{1}'.format(self.host, self.port))
self.is_connected = True
self._connection_attempts = 0
def close(self):
self.is_connected = False
try:
self._sock.close()
self._sock = False
except:
pass
def _request(self, request, length):
if not self.is_connected:
raise luxex("no connection to luxtronic.")
try:
self._sock.sendall(request)
except Exception, e:
self._lock.release()
self.close()
raise luxex("error sending request: {0}".format(e))
try:
answer = self._sock.recv(length)
except socket.timeout:
self._lock.release()
raise luxex("error receiving answer: timeout")
except Exception, e:
self._lock.release()
self.close()
raise luxex("error receiving answer: {0}".format(e))
return answer
def _request_more(self, length):
try:
return self._sock.recv(length)
except socket.timeout:
self._lock.release()
raise luxex("error receiving payload: timeout")
except Exception, e:
self._lock.release()
self.close()
raise luxex("error receifing payload: {0}".format(e))
def set_param(self, param, value):
param = int(param)
old = self._params[param] if param < len(self._params) else 0
payload = struct.pack('III',
socket.htonl(3002), # command
socket.htonl(int(param)), # parameter
socket.htonl(int(value))) # value
self._lock.acquire()
answer = self._request(payload, 8)
self._lock.release()
if len(answer) != 8:
self.close()
raise luxex("error receiving answer: no data")
answer = struct.unpack('Ii', answer)
answer = map(socket.ntohl, answer)
fields = ['cmd', 'param']
answer = dict(zip(fields, answer))
if answer['cmd'] == 3002 and answer['param'] == param:
logger.debug("Luxtronic2: value {0} for parameter {1} stored".format(value, param))
return True
else:
logger.warning("Luxtronic2: value {0} for parameter {1} not stored".format(value, param))
return False
def refresh_parameters(self):
request = struct.pack('II',
socket.htonl(3003), # command
socket.htonl(0)) # delimiter
self._lock.acquire()
answer = self._request(request, 8)
if len(answer) != 8:
self._lock.release()
self.close()
raise luxex("error receiving answer: no data")
answer = struct.unpack('II', answer)
answer = map(socket.ntohl, answer)
fields = ['cmd', 'len']
answer = dict(zip(fields, answer))
if answer['cmd'] == 3003:
payload = self._request_more(4*answer['len'])
self._lock.release()
params = struct.unpack('i'*answer['len'], payload)
if len(params) > 0:
self._params = params
return True
return False
else:
self._lock.release()
logger.warning("Luxtronic2: failed to retrieve parameters")
return False
def refresh_attributes(self):
request = struct.pack('II',
socket.htonl(3005), # command
socket.htonl(0)) # delimiter
self._lock.acquire()
answer = self._request(request, 8)
if len(answer) != 8:
self._lock.release()
self.close()
raise luxex("error receiving answer: no data")
answer = struct.unpack('II', answer)
answer = map(socket.ntohl, answer)
fields = ['cmd', 'len']
answer = dict(zip(fields, answer))
if answer['cmd'] == 3005:
payload = self._request_more(answer['len'])
self._lock.release()
attrs = struct.unpack('b'*answer['len'], payload)
if len(attrs) > 0:
self._attrs = attrs
return True
return False
else:
self._lock.release()
logger.warning("Luxtronic2: failed to retrieve attributes")
return False
def refresh_calculated(self):
request = struct.pack('II',
socket.htonl(3004), # command
socket.htonl(0)) # delimiter
self._lock.acquire()
answer = self._request(request, 12)
if len(answer) !=12:
self._lock.release()
self.close()
raise luxex("error receiving answer: no data")
answer = struct.unpack('III', answer)
answer = map(socket.ntohl, answer)
fields = ['cmd', 'stat', 'len']
answer = dict(zip(fields, answer))
if answer['cmd'] == 3004:
payload = self._request_more(4*answer['len'])
self._lock.release()
calc = struct.unpack('i'*answer['len'], payload)
if len(calc) > 0:
self._calc = calc
return answer['stat']
return 0
else:
self._lock.release()
logger.warning("Luxtronic2: failed to retrieve calculated")
return 0
class Luxtronic2(LuxBase):
_parameter = {}
_attribute = {}
_calculated = {}
alive = True
def __init__(self, smarthome, host, port=8888, cycle=30):
LuxBase.__init__(self, host, port)
self._sh = smarthome
self._cycle = int(cycle)
self.connect()
def run(self):
self.alive = True
self._sh.scheduler.add('Luxtronic2', self._cycle, cycle=self._cycle)
def stop(self):
self.alive = False
def _cycle(self):
if not self.is_connected:
return
start = time.time()
self.refresh_parameters()
for p in self._parameter:
val = self.get_parameter(p)
if val:
self._parameter[p](val, 'Luxtronic2')
self.refresh_attributes()
for a in self._attribute:
val = self.get_attribute(a)
if val:
self._attribute[a](val, 'Luxtronic2')
self.refresh_calculated()
for c in self._calculated:
val = self.get_caclulated(c)
if val:
self._calculated[c](val, 'Luxtronic2')
cycletime = time.time() - start
logger.debug("cycle takes {0} seconds".format(cycletime))
def parse_item(self, item):
if 'lux2_a' in item.conf:
a = item.conf['lux2_a']
a = int(a)
self._attribute[a] = item
if 'lux2_c' in item.conf:
c = item.conf['lux2_c']
c = int(c)
self._calculated[c] = item
if 'lux2_p' in item.conf:
p = item.conf['lux2_p']
p = int(p)
self._parameter[p] = item
return self.update_item
def update_item(self, item, caller=None, source=None):
if caller != 'Luxtronic2':
self.set_param(item.conf['lux2_p'], item())
@2ndsky
Copy link
Copy Markdown
Author

2ndsky commented Apr 23, 2013

@2ndsky
Copy link
Copy Markdown
Author

2ndsky commented Apr 23, 2013

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment