Skip to content

Instantly share code, notes, and snippets.

@MarkusHackspacher
Created April 3, 2015 08:29
Show Gist options
  • Save MarkusHackspacher/79deccf3a56910114db2 to your computer and use it in GitHub Desktop.
Save MarkusHackspacher/79deccf3a56910114db2 to your computer and use it in GitHub Desktop.
Weather Station
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
license CC BY-SA 3.0
http://creativecommons.org/licenses/by-sa/3.0/deed.de
orginal:
https://github.com/Tinkerforge/weather-station/tree/master/write_to_lcd/python
http://www.tinkerforge.com/de/doc/Kits/WeatherStation/PythonToLCD.html
my changes
FHEM_Value(dvc)
FHEM_Cmd(cmd)
from http://forum.fhem.de/index.php?topic=16297.0
"""
import socket
import sys
import time
import math
import logging as log
log.basicConfig(level=log.INFO)
import os
from tinkerforge.ip_connection import IPConnection
from tinkerforge.ip_connection import Error
from tinkerforge.brick_master import Master
from tinkerforge.bricklet_lcd_20x4 import LCD20x4
from tinkerforge.bricklet_ambient_light import AmbientLight
from tinkerforge.bricklet_humidity import Humidity
from tinkerforge.bricklet_barometer import Barometer
import telnetlib
control_host= 'localhost'
control_port= 7072
control_password= ""
def FHEM_Value(dvc):
tn = telnetlib.Telnet(control_host, control_port, 5)
# tn.read_until("Password: ")
# tn.write(control_password + "\n")
tn.write("{'=' . Value('" + dvc + "')}\n" )
tn.read_until("=")
res= tn.read_until("\r")[:-1]
tn.close()
return res
def FHEM_Cmd(cmd):
tn = telnetlib.Telnet(control_host, control_port, 5)
# tn.read_until("Password:")
# tn.write(control_password + "\n")
tn.write(cmd + "\n")
# the following makes sure that we don't close the connection
# before the cmd was executed... (synchronization point)
tn.write("{ '=' }\n")
tn.read_until("=")
tn.close()
class WeatherStation:
HOST = "localhost"
PORT = 4223
ipcon = None
lcd = None
al = None
hum = None
baro = None
def __init__(self):
self.fhem_il = 10
self.fhem_hu = 7
self.fhem_ai = 4
self.ipcon = IPConnection()
while True:
try:
self.ipcon.connect(WeatherStation.HOST, WeatherStation.PORT)
break
except Error as e:
log.error('Connection Error: ' + str(e.description))
time.sleep(1)
except socket.error as e:
log.error('Socket error: ' + str(e))
time.sleep(1)
self.ipcon.register_callback(IPConnection.CALLBACK_ENUMERATE,
self.cb_enumerate)
self.ipcon.register_callback(IPConnection.CALLBACK_CONNECTED,
self.cb_connected)
while True:
try:
self.ipcon.enumerate()
break
except Error as e:
log.error('Enumerate Error: ' + str(e.description))
time.sleep(1)
def cb_illuminance(self, illuminance):
if self.lcd is not None:
text = 'Licht %6.2f lx' % (illuminance/10.0)
self.lcd.write_line(0, 0, text)
log.info('Write to line 0: ' + text)
if self.fhem_il > 10:
self.cb_fhem_illuminance(illuminance)
self.fhem_il = 0
self.fhem_il += 1
def cb_fhem_illuminance(self, illuminance):
if self.lcd is not None:
FHEM_Cmd('set illuminance {}lx'.format(illuminance / 10.0))
log.info('Write to fhem: illuminance')
def cb_humidity(self, humidity):
if self.lcd is not None:
text = 'Luftfeuchte%6.2f %%' % (humidity / 10.0)
self.lcd.write_line(1, 0, text)
log.info('Write to line 1: ' + text)
if self.fhem_hu > 10:
self.cb_fhem_humidity(humidity)
self.fhem_hu = 0
self.fhem_hu += 1
def cb_fhem_humidity(self, humidity):
if self.lcd is not None:
FHEM_Cmd('set humidity {}%'.format(humidity / 10.0))
log.info('Write to fhem: humidity')
def cb_air_pressure(self, air_pressure):
if self.lcd is not None:
text = 'Luftdruck %7.2f mb' % (air_pressure / 1000.0)
self.lcd.write_line(2, 0, text)
log.info('Write to line 2: ' + text)
try:
temperature = self.baro.get_chip_temperature()
except Error as e:
log.error('Could not get temperature: ' + str(e.description))
return
# \xDF == ° on LCD 20x4 charset
text = 'Temperatur %5.2f \xDFC' % (temperature / 100.0)
self.lcd.write_line(3, 0, text)
log.info('Write to line 3: ' + text.replace('\xDF', '°'))
if self.fhem_ai > 10:
self.cb_fhem_air_pressure(air_pressure, temperature)
self.fhem_ai = 0
self.fhem_ai += 1
def cb_fhem_air_pressure(self, air_pressure, temperature):
FHEM_Cmd('set air_pressure {:.2f}mb'.
format(air_pressure / 1000.0))
FHEM_Cmd('set temperature {:.2f}°C'.
format(temperature / 100.0))
log.info('Write to fhem: air pressure and temperature')
def cb_button_pressed(self, button):
log.info('button pressed: {}'.format(button))
hostname = socket.gethostname()
self.lcd.write_line(0, 0, hostname)
self.lcd.write_line(1, 0, socket.gethostbyname(hostname))
# self.lcd.write_line(2, 0, socket.getsockname())
def cb_enumerate(self, uid, connected_uid, position, hardware_version,
firmware_version, device_identifier, enumeration_type):
if enumeration_type == IPConnection.ENUMERATION_TYPE_CONNECTED or \
enumeration_type == IPConnection.ENUMERATION_TYPE_AVAILABLE:
if device_identifier == LCD20x4.DEVICE_IDENTIFIER:
try:
self.lcd = LCD20x4(uid, self.ipcon)
self.lcd.clear_display()
self.lcd.backlight_on()
self.lcd.register_callback(self.lcd.CALLBACK_BUTTON_PRESSED,
self.cb_button_pressed)
log.info('LCD 20x4 initialized')
except Error as e:
log.error('LCD 20x4 init failed: ' + str(e.description))
self.lcd = None
elif device_identifier == AmbientLight.DEVICE_IDENTIFIER:
try:
self.al = AmbientLight(uid, self.ipcon)
self.al.set_illuminance_callback_period(1000)
self.al.register_callback(self.al.CALLBACK_ILLUMINANCE,
self.cb_illuminance)
log.info('Ambient Light initialized')
except Error as e:
log.error('Ambient Light init failed: ' + str(e.description))
self.al = None
elif device_identifier == Humidity.DEVICE_IDENTIFIER:
try:
self.hum = Humidity(uid, self.ipcon)
self.hum.set_humidity_callback_period(1000)
self.hum.register_callback(self.hum.CALLBACK_HUMIDITY,
self.cb_humidity)
log.info('Humidity initialized')
except Error as e:
log.error('Humidity init failed: ' + str(e.description))
self.hum = None
elif device_identifier == Barometer.DEVICE_IDENTIFIER:
try:
self.baro = Barometer(uid, self.ipcon)
self.baro.set_air_pressure_callback_period(1000)
self.baro.register_callback(self.baro.CALLBACK_AIR_PRESSURE,
self.cb_air_pressure)
log.info('Barometer initialized')
except Error as e:
log.error('Barometer init failed: ' + str(e.description))
self.baro = None
def cb_connected(self, connected_reason):
if connected_reason == IPConnection.CONNECT_REASON_AUTO_RECONNECT:
log.info('Auto Reconnect')
while True:
try:
self.ipcon.enumerate()
break
except Error as e:
log.error('Enumerate Error: ' + str(e.description))
time.sleep(1)
if __name__ == "__main__":
log.info('Weather Station: Start')
weather_station = WeatherStation()
if sys.version_info < (3, 0):
input = raw_input # Compatibility for Python 2.x
input('Press key to exit\n')
if weather_station.ipcon is not None:
weather_station.ipcon.disconnect()
log.info('Weather Station: End')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment