Created
April 3, 2015 08:29
-
-
Save MarkusHackspacher/79deccf3a56910114db2 to your computer and use it in GitHub Desktop.
Weather Station
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
""" | |
license CC BY-SA 3.0 | |
http://creativecommons.org/licenses/by-sa/3.0/deed.de | |
orginal: | |
https://github.com/Tinkerforge/weather-station/tree/master/write_to_lcd/python | |
http://www.tinkerforge.com/de/doc/Kits/WeatherStation/PythonToLCD.html | |
my changes | |
FHEM_Value(dvc) | |
FHEM_Cmd(cmd) | |
from http://forum.fhem.de/index.php?topic=16297.0 | |
""" | |
import socket | |
import sys | |
import time | |
import math | |
import logging as log | |
log.basicConfig(level=log.INFO) | |
import os | |
from tinkerforge.ip_connection import IPConnection | |
from tinkerforge.ip_connection import Error | |
from tinkerforge.brick_master import Master | |
from tinkerforge.bricklet_lcd_20x4 import LCD20x4 | |
from tinkerforge.bricklet_ambient_light import AmbientLight | |
from tinkerforge.bricklet_humidity import Humidity | |
from tinkerforge.bricklet_barometer import Barometer | |
import telnetlib | |
control_host= 'localhost' | |
control_port= 7072 | |
control_password= "" | |
def FHEM_Value(dvc): | |
tn = telnetlib.Telnet(control_host, control_port, 5) | |
# tn.read_until("Password: ") | |
# tn.write(control_password + "\n") | |
tn.write("{'=' . Value('" + dvc + "')}\n" ) | |
tn.read_until("=") | |
res= tn.read_until("\r")[:-1] | |
tn.close() | |
return res | |
def FHEM_Cmd(cmd): | |
tn = telnetlib.Telnet(control_host, control_port, 5) | |
# tn.read_until("Password:") | |
# tn.write(control_password + "\n") | |
tn.write(cmd + "\n") | |
# the following makes sure that we don't close the connection | |
# before the cmd was executed... (synchronization point) | |
tn.write("{ '=' }\n") | |
tn.read_until("=") | |
tn.close() | |
class WeatherStation: | |
HOST = "localhost" | |
PORT = 4223 | |
ipcon = None | |
lcd = None | |
al = None | |
hum = None | |
baro = None | |
def __init__(self): | |
self.fhem_il = 10 | |
self.fhem_hu = 7 | |
self.fhem_ai = 4 | |
self.ipcon = IPConnection() | |
while True: | |
try: | |
self.ipcon.connect(WeatherStation.HOST, WeatherStation.PORT) | |
break | |
except Error as e: | |
log.error('Connection Error: ' + str(e.description)) | |
time.sleep(1) | |
except socket.error as e: | |
log.error('Socket error: ' + str(e)) | |
time.sleep(1) | |
self.ipcon.register_callback(IPConnection.CALLBACK_ENUMERATE, | |
self.cb_enumerate) | |
self.ipcon.register_callback(IPConnection.CALLBACK_CONNECTED, | |
self.cb_connected) | |
while True: | |
try: | |
self.ipcon.enumerate() | |
break | |
except Error as e: | |
log.error('Enumerate Error: ' + str(e.description)) | |
time.sleep(1) | |
def cb_illuminance(self, illuminance): | |
if self.lcd is not None: | |
text = 'Licht %6.2f lx' % (illuminance/10.0) | |
self.lcd.write_line(0, 0, text) | |
log.info('Write to line 0: ' + text) | |
if self.fhem_il > 10: | |
self.cb_fhem_illuminance(illuminance) | |
self.fhem_il = 0 | |
self.fhem_il += 1 | |
def cb_fhem_illuminance(self, illuminance): | |
if self.lcd is not None: | |
FHEM_Cmd('set illuminance {}lx'.format(illuminance / 10.0)) | |
log.info('Write to fhem: illuminance') | |
def cb_humidity(self, humidity): | |
if self.lcd is not None: | |
text = 'Luftfeuchte%6.2f %%' % (humidity / 10.0) | |
self.lcd.write_line(1, 0, text) | |
log.info('Write to line 1: ' + text) | |
if self.fhem_hu > 10: | |
self.cb_fhem_humidity(humidity) | |
self.fhem_hu = 0 | |
self.fhem_hu += 1 | |
def cb_fhem_humidity(self, humidity): | |
if self.lcd is not None: | |
FHEM_Cmd('set humidity {}%'.format(humidity / 10.0)) | |
log.info('Write to fhem: humidity') | |
def cb_air_pressure(self, air_pressure): | |
if self.lcd is not None: | |
text = 'Luftdruck %7.2f mb' % (air_pressure / 1000.0) | |
self.lcd.write_line(2, 0, text) | |
log.info('Write to line 2: ' + text) | |
try: | |
temperature = self.baro.get_chip_temperature() | |
except Error as e: | |
log.error('Could not get temperature: ' + str(e.description)) | |
return | |
# \xDF == ° on LCD 20x4 charset | |
text = 'Temperatur %5.2f \xDFC' % (temperature / 100.0) | |
self.lcd.write_line(3, 0, text) | |
log.info('Write to line 3: ' + text.replace('\xDF', '°')) | |
if self.fhem_ai > 10: | |
self.cb_fhem_air_pressure(air_pressure, temperature) | |
self.fhem_ai = 0 | |
self.fhem_ai += 1 | |
def cb_fhem_air_pressure(self, air_pressure, temperature): | |
FHEM_Cmd('set air_pressure {:.2f}mb'. | |
format(air_pressure / 1000.0)) | |
FHEM_Cmd('set temperature {:.2f}°C'. | |
format(temperature / 100.0)) | |
log.info('Write to fhem: air pressure and temperature') | |
def cb_button_pressed(self, button): | |
log.info('button pressed: {}'.format(button)) | |
hostname = socket.gethostname() | |
self.lcd.write_line(0, 0, hostname) | |
self.lcd.write_line(1, 0, socket.gethostbyname(hostname)) | |
# self.lcd.write_line(2, 0, socket.getsockname()) | |
def cb_enumerate(self, uid, connected_uid, position, hardware_version, | |
firmware_version, device_identifier, enumeration_type): | |
if enumeration_type == IPConnection.ENUMERATION_TYPE_CONNECTED or \ | |
enumeration_type == IPConnection.ENUMERATION_TYPE_AVAILABLE: | |
if device_identifier == LCD20x4.DEVICE_IDENTIFIER: | |
try: | |
self.lcd = LCD20x4(uid, self.ipcon) | |
self.lcd.clear_display() | |
self.lcd.backlight_on() | |
self.lcd.register_callback(self.lcd.CALLBACK_BUTTON_PRESSED, | |
self.cb_button_pressed) | |
log.info('LCD 20x4 initialized') | |
except Error as e: | |
log.error('LCD 20x4 init failed: ' + str(e.description)) | |
self.lcd = None | |
elif device_identifier == AmbientLight.DEVICE_IDENTIFIER: | |
try: | |
self.al = AmbientLight(uid, self.ipcon) | |
self.al.set_illuminance_callback_period(1000) | |
self.al.register_callback(self.al.CALLBACK_ILLUMINANCE, | |
self.cb_illuminance) | |
log.info('Ambient Light initialized') | |
except Error as e: | |
log.error('Ambient Light init failed: ' + str(e.description)) | |
self.al = None | |
elif device_identifier == Humidity.DEVICE_IDENTIFIER: | |
try: | |
self.hum = Humidity(uid, self.ipcon) | |
self.hum.set_humidity_callback_period(1000) | |
self.hum.register_callback(self.hum.CALLBACK_HUMIDITY, | |
self.cb_humidity) | |
log.info('Humidity initialized') | |
except Error as e: | |
log.error('Humidity init failed: ' + str(e.description)) | |
self.hum = None | |
elif device_identifier == Barometer.DEVICE_IDENTIFIER: | |
try: | |
self.baro = Barometer(uid, self.ipcon) | |
self.baro.set_air_pressure_callback_period(1000) | |
self.baro.register_callback(self.baro.CALLBACK_AIR_PRESSURE, | |
self.cb_air_pressure) | |
log.info('Barometer initialized') | |
except Error as e: | |
log.error('Barometer init failed: ' + str(e.description)) | |
self.baro = None | |
def cb_connected(self, connected_reason): | |
if connected_reason == IPConnection.CONNECT_REASON_AUTO_RECONNECT: | |
log.info('Auto Reconnect') | |
while True: | |
try: | |
self.ipcon.enumerate() | |
break | |
except Error as e: | |
log.error('Enumerate Error: ' + str(e.description)) | |
time.sleep(1) | |
if __name__ == "__main__": | |
log.info('Weather Station: Start') | |
weather_station = WeatherStation() | |
if sys.version_info < (3, 0): | |
input = raw_input # Compatibility for Python 2.x | |
input('Press key to exit\n') | |
if weather_station.ipcon is not None: | |
weather_station.ipcon.disconnect() | |
log.info('Weather Station: End') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment