Last active
February 22, 2021 09:23
-
-
Save leinich/1fc3b16b82d86ad593ca8716a15596bf to your computer and use it in GitHub Desktop.
Home Assistant sensor - lrabb - Landkreis Böblingen
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import requests | |
import csv | |
from datetime import datetime | |
from datetime import timedelta | |
import voluptuous as vol | |
from pprint import pprint | |
from homeassistant.components.sensor import PLATFORM_SCHEMA | |
import homeassistant.helpers.config_validation as cv | |
from homeassistant.const import (CONF_RESOURCES) | |
from homeassistant.util import Throttle | |
from homeassistant.helpers.entity import Entity | |
_LOGGER = logging.getLogger(__name__) | |
MIN_TIME_BETWEEN_UPDATES = timedelta(days=1) | |
SENSOR_PREFIX = 'Waste ' | |
SENSOR_TYPES = { | |
'restmuell': ['Restmüll', '', 'mdi:recycle'], | |
'bioabfall': ['Biomüll', '', 'mdi:recycle'], | |
'papiertonne': ['Papier', '', 'mdi:recycle'], | |
'wertstofftonne': ['Wertstoffe', '', 'mdi:recycle'], | |
} | |
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({ | |
vol.Required(CONF_RESOURCES, default=[]): | |
vol.All(cv.ensure_list, [vol.In(SENSOR_TYPES)]), | |
}) | |
def setup_platform(hass, config, add_entities, discovery_info=None): | |
_LOGGER.debug("Setup Abfall API retriever") | |
try: | |
data = AbfallData() | |
except requests.exceptions.HTTPError as error: | |
_LOGGER.error(error) | |
return False | |
entities = [] | |
for resource in config[CONF_RESOURCES]: | |
sensor_type = resource.lower() | |
if sensor_type not in SENSOR_TYPES: | |
SENSOR_TYPES[sensor_type] = [ | |
sensor_type.title(), '', 'mdi:flash'] | |
entities.append(AbfallSensor(data, sensor_type)) | |
add_entities(entities) | |
class AbfallData(object): | |
def __init__(self): | |
self.data = None | |
@Throttle(MIN_TIME_BETWEEN_UPDATES) | |
def update(self): | |
_LOGGER.debug("Updating Abfall dates using remote API") | |
try: | |
payload = { | |
"f_id_kommune": "2997", | |
"f_id_bezirk": "XX", | |
"f_id_strasse": "XXXX", | |
"f_id_abfalltyp_0": "50", | |
"f_id_abfalltyp_1": "53", | |
"f_id_abfalltyp_2": "31", | |
"f_id_abfalltyp_3": "299", | |
"f_abfallarten_index_max": "7", | |
"f_abfallarten": "50,53,31,299", | |
"f_zeitraum": "20190101-20301231" | |
} | |
j = requests.post( | |
"https://api.abfall.io/?key=8215c62763967916979e0e8566b6172e&modus=d6c5855a62cf32a4dadbc2831f0f295f&waction=export_csv", data=payload, timeout=20) | |
apiRequest = j.text.split('\n') | |
reader = csv.reader(apiRequest, delimiter=";") | |
rowCounter = 0 | |
restMuell = [] | |
papierTonne = [] | |
wertstoffTonne = [] | |
bioAbfall = [] | |
for row in reader: | |
if rowCounter > 0: | |
if (row[0] != ""): | |
restMuell.append(datetime.strptime(row[0], "%d.%m.%Y")) | |
if (row[1] != ""): | |
papierTonne.append(datetime.strptime(row[1], "%d.%m.%Y")) | |
if (row[2] != ""): | |
wertstoffTonne.append(datetime.strptime(row[2], "%d.%m.%Y")) | |
if (row[3] != ""): | |
bioAbfall.append(datetime.strptime(row[3], "%d.%m.%Y")) | |
rowCounter = rowCounter + 1 | |
restMuell.sort(key=lambda date: date) | |
papierTonne.sort(key=lambda date: date) | |
wertstoffTonne.sort(key=lambda date: date) | |
bioAbfall.sort(key=lambda date: date) | |
nextDates = {} | |
for nextDate in restMuell: | |
if nextDate > datetime.now(): | |
nextDates["restMuell"] = nextDate | |
break | |
for nextDate in papierTonne: | |
if nextDate > datetime.now(): | |
nextDates["papierTonne"] = nextDate | |
break | |
for nextDate in wertstoffTonne: | |
if nextDate > datetime.now(): | |
nextDates["wertstoffTonne"] = nextDate | |
break | |
for nextDate in bioAbfall: | |
if nextDate > datetime.now(): | |
nextDates["bioAbfall"] = nextDate | |
break | |
self.data = nextDates | |
except requests.exceptions.RequestException as exc: | |
_LOGGER.error("Error occurred while fetching data: %r", exc) | |
self.data = None | |
return False | |
class AbfallSensor(Entity): | |
def __init__(self, data, sensor_type): | |
self.data = data | |
self.type = sensor_type | |
self._name = SENSOR_PREFIX + SENSOR_TYPES[self.type][0] | |
self._unit = SENSOR_TYPES[self.type][1] | |
self._icon = SENSOR_TYPES[self.type][2] | |
self._state = None | |
self._attributes = {} | |
@property | |
def name(self): | |
return self._name | |
@property | |
def icon(self): | |
return self._icon | |
@property | |
def state(self): | |
return self._state | |
@property | |
def unit_of_measurement(self): | |
return self._unit | |
@property | |
def device_state_attributes(self): | |
"""Return attributes for the sensor.""" | |
return self._attributes | |
def update(self): | |
self.data.update() | |
abfallData = self.data.data | |
try: | |
if self.type == 'restmuell': | |
self._state = abfallData.get("restMuell") | |
elif self.type == 'papiertonne': | |
self._state = abfallData.get("papierTonne") | |
elif self.type == 'wertstofftonne': | |
self._state = abfallData.get("wertstoffTonne") | |
elif self.type == 'bioabfall': | |
self._state = abfallData.get("bioAbfall") | |
if self._state is not None: | |
weekdays = ["Mo", "Di", "Mi", "Do", "Fr", "Sa", "So"] | |
self._attributes['days'] = (self._state.date() - datetime.now().date()).days | |
if self._attributes['days'] == 0: | |
printtext = "heute" | |
elif self._attributes['days'] == 1: | |
printtext = "morgen" | |
else: | |
printtext = 'in {} Tagen'.format(self._attributes['days']) | |
self._attributes['display_text'] = self._state.strftime('{}, %d.%m.%Y ({})').format(weekdays[self._state.weekday()],printtext) | |
except ValueError: | |
self._state = None |
@ChristopCaina
This integration is outdated.
I switched to https://github.com/mk-maddin/abfall_io_component which is working fine since months.
Hopes this helps.
Hi, thanks for your reply! :)
I've found this one: https://github.com/mampfes/hacs_waste_collection_schedule which does also allow to get the data from LRBB Waste collecton service.
I will just check the abfall_io_component from maddin also :)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@ChristopCaina
This integration is outdated.
I switched to https://github.com/mk-maddin/abfall_io_component which is working fine since months.
Hopes this helps.