Last active
February 18, 2022 20:58
-
-
Save sumnerboy12/bc40668005b3e4358d2a to your computer and use it in GitHub Desktop.
Slack & openHAB integration
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Add the following to rtmbot.conf | |
DEBUG: True | |
SLACK_TOKEN: "xoxb-xxxxxxxxxxxxxxxxxxxxx" | |
ACTIVE_PLUGINS: | |
- plugins.slackhab.SlackhabPlugin |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[slackhab] | |
debug = False | |
openhab_url = http://localhost:8080 | |
slackhab_user_id = U0XXXXXXX |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import ConfigParser | |
import os | |
import requests | |
import time | |
import xml.etree.ElementTree as xml | |
from rtmbot.core import Plugin | |
SCRIPTDIR = os.path.dirname(__file__) | |
SCRIPTNAME = os.path.splitext(os.path.basename(__file__))[0] | |
CONFIGFILE = os.getenv(SCRIPTNAME.upper() + 'INI', os.path.join(SCRIPTDIR, SCRIPTNAME + '.ini')) | |
# parse our config/ini file | |
config = ConfigParser.ConfigParser() | |
config.read(CONFIGFILE) | |
debug = config.get('slackhab', 'debug') | |
openhab_url = config.get('slackhab', 'openhab_url') | |
slackhab_user_id = config.get('slackhab', 'slackhab_user_id') | |
# headers required for openhab REST API requests | |
headers = { 'Content-Type': 'text/plain' } | |
class SlackhabPlugin(Plugin): | |
def process_message(self, data): | |
print_debug("rx'd message: %s" % (str(data))) | |
# check we have sufficient details | |
if 'channel' not in data or 'user' not in data or 'text' not in data: | |
return | |
channel = data['channel'] | |
user = data['user'] | |
text = data['text'] | |
# first check if we are interested in this command | |
command_text = get_command_text(channel, user, text) | |
if command_text is None: | |
return | |
tokens = command_text.split() | |
if len(tokens) == 0: | |
return | |
command = tokens[0].lower() | |
print_debug("command: %s" % (command)) | |
if command == "send" and len(tokens) >= 3: | |
filter = tokens[1] | |
value = " ".join(tokens[2:]) | |
item = self.get_single_item(filter, channel) | |
if item is None: | |
return | |
url = openhab_url + '/rest/items/' + get_item_attr(item, 'name') | |
r = requests.post(url, headers=headers, data=normalise_value(value)) | |
if self.check_response(r, channel): | |
self.outputs.append([ channel, "```Sent %s command to %s```" % (value, get_item_attr(item, 'name')) ]) | |
elif command == "update" and len(tokens) >= 3: | |
filter = tokens[1] | |
value = " ".join(tokens[2:]) | |
item = self.get_single_item(filter, channel) | |
if item is None: | |
return | |
url = openhab_url + '/rest/items/' + get_item_attr(item, 'name') + '/state' | |
r = requests.put(url, headers=headers, data=normalise_value(value)) | |
if self.check_response(r, channel): | |
self.outputs.append([ channel, "```Sent %s update to %s```" % (value, get_item_attr(item, 'name')) ]) | |
elif command == "status" and len(tokens) >= 2: | |
filter = tokens[1] | |
item = self.get_single_item(filter, channel) | |
if item is None: | |
return | |
url = openhab_url + '/rest/items/' + get_item_attr(item, 'name') + '/state' | |
r = requests.get(url, headers=headers) | |
if self.check_response(r, channel): | |
self.outputs.append([ channel, "```%s is %s```" % (get_item_attr(item, 'name'), r.text) ]) | |
elif command == "items": | |
filter = None | |
if len(tokens) > 1: | |
filter = tokens[1] | |
output = "" | |
maxtypelen = 0 | |
maxnamelen = 0 | |
items = self.get_items(filter, channel) | |
print_debug("%d items match %s" % (len(items), filter)) | |
if len(items) == 0: | |
if filter is None: | |
self.outputs.append([ channel, "```No items found```" ]) | |
else: | |
self.outputs.append([ channel, "```No items found matching '%s'```" % (filter) ]) | |
else: | |
self.print_items(items, channel) | |
def get_single_item(self, filter, channel): | |
items = self.get_items(filter, channel) | |
items_count = len(items) | |
if items_count == 1: | |
print_debug("found single matching item: %s" % (get_item_attr(items[0], 'name'))) | |
return items[0] | |
if items_count > 1: | |
self.outputs.append([ channel, "```Found %d items matching '%s', please restrict your filter```" % (items_count, filter) ]) | |
self.print_items(items, channel) | |
else: | |
self.outputs.append([ channel, "```No item found matching '%s'```" % (filter) ]) | |
return None | |
def get_items(self, filter, channel): | |
# cache this maybe? | |
url = openhab_url + '/rest/items' | |
r = requests.get(url, headers=headers) | |
if not self.check_response(r, channel): | |
return [] | |
filter = filter.lower() | |
items = [] | |
for item in xml.fromstring(r.content).findall('item'): | |
name = get_item_attr(item, 'name').lower() | |
if filter is None or filter in name: | |
items.append(item) | |
return items | |
def print_items(self, items, channel): | |
output = "" | |
maxtypelen = 0 | |
maxnamelen = 0 | |
# get the max name and type lengths so we can format our output nicely | |
for item in items: | |
name = get_item_attr(item, 'name') | |
type = get_item_attr(item, 'type') | |
if len(type) > maxtypelen: | |
maxtypelen = len(type) | |
if len(name) > maxnamelen: | |
maxnamelen = len(name) | |
for item in items: | |
name = get_item_attr(item, 'name') | |
state = get_item_attr(item, 'state') | |
type = get_item_attr(item, 'type') | |
output = output + "%s%s%s\n" % ( type.ljust( maxtypelen + 5 ), name.ljust( maxnamelen + 5 ), state ) | |
if len(output) >= 8000: | |
output = output + "... (too much output, please specify a filter)" | |
break | |
self.outputs.append([ channel, "```%s```" % (output) ]) | |
def check_response(self, r, channel): | |
# check our rest api call was successful | |
if r.status_code == 200: | |
return True | |
if r.status_code == 201: | |
return True | |
# log the response code/reason back to our slack channel | |
self.outputs.append([ channel, "```%d: %s```" % (r.status_code, r.reason) ]) | |
return False | |
def normalise_value(state): | |
if state.lower() == "on": | |
return "ON" | |
if state.lower() == "off": | |
return "OFF" | |
if state.lower() == "open": | |
return "OPEN" | |
if state.lower() == "closed": | |
return "CLOSED" | |
return state | |
def get_command_text(channel, user, text): | |
if channel == "" or channel is None: | |
return None | |
if user == "" or user is None: | |
return None | |
if text == "" or text is None: | |
return None | |
# check for a message directed at our bot | |
user_tag = "<@%s>:" % (slackhab_user_id) | |
if text.startswith(user_tag): | |
return text[len(user_tag):] | |
user_tag = "<@%s>" % (slackhab_user_id) | |
if text.startswith(user_tag): | |
return text[len(user_tag):] | |
# check for a DM to our bot | |
if channel.startswith("D"): | |
return text | |
return None | |
def get_item_names(items): | |
return ", ".join(get_item_attr(item, 'name') for item in items) | |
def get_item_attr(item, attr): | |
return item.find(attr).text | |
def print_debug(message): | |
if debug: | |
print message |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
* create a new Slack Real-Time Messaging Bot integration | |
* copy the API token | |
* install python-rtmbot (https://github.com/slackhq/python-rtmbot) and follow instructions set your API token and user id | |
* copy `slackhab.py' and 'slackhab.ini' to ./plugins and update with your openHAB URL and Slack user id | |
* copy `rtmbot.conf` and update with your API token | |
* start `rtmbot` and you should be able to send commands to openHAB | |
* features | |
- send <item> <command> | |
- update <item> <state> | |
- status <item> | |
- items [<filter>] | |
* supports fuzzy logic matching; | |
- e.g. `update coffeemachine on` will match to `GF_Kitchen_CoffeeMachine` |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
FYI - moved this to a proper repo and
dockerised
it - see https://hub.docker.com/repository/docker/sumnerboy12/slackhab.