Skip to content

Instantly share code, notes, and snippets.

@sumnerboy12
Last active February 18, 2022 20:58
Show Gist options
  • Save sumnerboy12/bc40668005b3e4358d2a to your computer and use it in GitHub Desktop.
Save sumnerboy12/bc40668005b3e4358d2a to your computer and use it in GitHub Desktop.
Slack & openHAB integration
# Add the following to rtmbot.conf
DEBUG: True
SLACK_TOKEN: "xoxb-xxxxxxxxxxxxxxxxxxxxx"
ACTIVE_PLUGINS:
- plugins.slackhab.SlackhabPlugin
[slackhab]
debug = False
openhab_url = http://localhost:8080
slackhab_user_id = U0XXXXXXX
import ConfigParser
import os
import requests
import time
import xml.etree.ElementTree as xml
from rtmbot.core import Plugin
SCRIPTDIR = os.path.dirname(__file__)
SCRIPTNAME = os.path.splitext(os.path.basename(__file__))[0]
CONFIGFILE = os.getenv(SCRIPTNAME.upper() + 'INI', os.path.join(SCRIPTDIR, SCRIPTNAME + '.ini'))
# parse our config/ini file
config = ConfigParser.ConfigParser()
config.read(CONFIGFILE)
debug = config.get('slackhab', 'debug')
openhab_url = config.get('slackhab', 'openhab_url')
slackhab_user_id = config.get('slackhab', 'slackhab_user_id')
# headers required for openhab REST API requests
headers = { 'Content-Type': 'text/plain' }
class SlackhabPlugin(Plugin):
def process_message(self, data):
print_debug("rx'd message: %s" % (str(data)))
# check we have sufficient details
if 'channel' not in data or 'user' not in data or 'text' not in data:
return
channel = data['channel']
user = data['user']
text = data['text']
# first check if we are interested in this command
command_text = get_command_text(channel, user, text)
if command_text is None:
return
tokens = command_text.split()
if len(tokens) == 0:
return
command = tokens[0].lower()
print_debug("command: %s" % (command))
if command == "send" and len(tokens) >= 3:
filter = tokens[1]
value = " ".join(tokens[2:])
item = self.get_single_item(filter, channel)
if item is None:
return
url = openhab_url + '/rest/items/' + get_item_attr(item, 'name')
r = requests.post(url, headers=headers, data=normalise_value(value))
if self.check_response(r, channel):
self.outputs.append([ channel, "```Sent %s command to %s```" % (value, get_item_attr(item, 'name')) ])
elif command == "update" and len(tokens) >= 3:
filter = tokens[1]
value = " ".join(tokens[2:])
item = self.get_single_item(filter, channel)
if item is None:
return
url = openhab_url + '/rest/items/' + get_item_attr(item, 'name') + '/state'
r = requests.put(url, headers=headers, data=normalise_value(value))
if self.check_response(r, channel):
self.outputs.append([ channel, "```Sent %s update to %s```" % (value, get_item_attr(item, 'name')) ])
elif command == "status" and len(tokens) >= 2:
filter = tokens[1]
item = self.get_single_item(filter, channel)
if item is None:
return
url = openhab_url + '/rest/items/' + get_item_attr(item, 'name') + '/state'
r = requests.get(url, headers=headers)
if self.check_response(r, channel):
self.outputs.append([ channel, "```%s is %s```" % (get_item_attr(item, 'name'), r.text) ])
elif command == "items":
filter = None
if len(tokens) > 1:
filter = tokens[1]
output = ""
maxtypelen = 0
maxnamelen = 0
items = self.get_items(filter, channel)
print_debug("%d items match %s" % (len(items), filter))
if len(items) == 0:
if filter is None:
self.outputs.append([ channel, "```No items found```" ])
else:
self.outputs.append([ channel, "```No items found matching '%s'```" % (filter) ])
else:
self.print_items(items, channel)
def get_single_item(self, filter, channel):
items = self.get_items(filter, channel)
items_count = len(items)
if items_count == 1:
print_debug("found single matching item: %s" % (get_item_attr(items[0], 'name')))
return items[0]
if items_count > 1:
self.outputs.append([ channel, "```Found %d items matching '%s', please restrict your filter```" % (items_count, filter) ])
self.print_items(items, channel)
else:
self.outputs.append([ channel, "```No item found matching '%s'```" % (filter) ])
return None
def get_items(self, filter, channel):
# cache this maybe?
url = openhab_url + '/rest/items'
r = requests.get(url, headers=headers)
if not self.check_response(r, channel):
return []
filter = filter.lower()
items = []
for item in xml.fromstring(r.content).findall('item'):
name = get_item_attr(item, 'name').lower()
if filter is None or filter in name:
items.append(item)
return items
def print_items(self, items, channel):
output = ""
maxtypelen = 0
maxnamelen = 0
# get the max name and type lengths so we can format our output nicely
for item in items:
name = get_item_attr(item, 'name')
type = get_item_attr(item, 'type')
if len(type) > maxtypelen:
maxtypelen = len(type)
if len(name) > maxnamelen:
maxnamelen = len(name)
for item in items:
name = get_item_attr(item, 'name')
state = get_item_attr(item, 'state')
type = get_item_attr(item, 'type')
output = output + "%s%s%s\n" % ( type.ljust( maxtypelen + 5 ), name.ljust( maxnamelen + 5 ), state )
if len(output) >= 8000:
output = output + "... (too much output, please specify a filter)"
break
self.outputs.append([ channel, "```%s```" % (output) ])
def check_response(self, r, channel):
# check our rest api call was successful
if r.status_code == 200:
return True
if r.status_code == 201:
return True
# log the response code/reason back to our slack channel
self.outputs.append([ channel, "```%d: %s```" % (r.status_code, r.reason) ])
return False
def normalise_value(state):
if state.lower() == "on":
return "ON"
if state.lower() == "off":
return "OFF"
if state.lower() == "open":
return "OPEN"
if state.lower() == "closed":
return "CLOSED"
return state
def get_command_text(channel, user, text):
if channel == "" or channel is None:
return None
if user == "" or user is None:
return None
if text == "" or text is None:
return None
# check for a message directed at our bot
user_tag = "<@%s>:" % (slackhab_user_id)
if text.startswith(user_tag):
return text[len(user_tag):]
user_tag = "<@%s>" % (slackhab_user_id)
if text.startswith(user_tag):
return text[len(user_tag):]
# check for a DM to our bot
if channel.startswith("D"):
return text
return None
def get_item_names(items):
return ", ".join(get_item_attr(item, 'name') for item in items)
def get_item_attr(item, attr):
return item.find(attr).text
def print_debug(message):
if debug:
print message
* create a new Slack Real-Time Messaging Bot integration
* copy the API token
* install python-rtmbot (https://github.com/slackhq/python-rtmbot) and follow instructions set your API token and user id
* copy `slackhab.py' and 'slackhab.ini' to ./plugins and update with your openHAB URL and Slack user id
* copy `rtmbot.conf` and update with your API token
* start `rtmbot` and you should be able to send commands to openHAB
* features
- send <item> <command>
- update <item> <state>
- status <item>
- items [<filter>]
* supports fuzzy logic matching;
- e.g. `update coffeemachine on` will match to `GF_Kitchen_CoffeeMachine`
@sumnerboy12
Copy link
Author

FYI - moved this to a proper repo and dockerised it - see https://hub.docker.com/repository/docker/sumnerboy12/slackhab.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment