Last active
June 20, 2018 18:44
-
-
Save Terrance/431fb0be5b97ae18e380c0fc4d0aecd2 to your computer and use it in GitHub Desktop.
A minimal asyncio client for the Slack real-time messaging (RTM) APIs.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import aiohttp | |
import asyncio | |
import logging | |
class SlackAPIError(Exception): pass | |
class Slack(object): | |
""" | |
A tiny async Slack client for the RTM APIs. | |
""" | |
def __init__(self, token, log=logging.getLogger(__name__)): | |
self.token = token | |
self.log = log | |
self.sess = aiohttp.ClientSession() | |
self.team = self.users = self.channels = self.directs = None | |
# When we send messages asynchronously, we'll receive an RTM event before the HTTP request | |
# returns. This lock will block event parsing whilst we're sending, to make sure the caller | |
# can finish processing the new message (e.g. storing the ID) before receiving the event. | |
self.lock = asyncio.BoundedSemaphore() | |
self.callbacks = [] | |
@asyncio.coroutine | |
def msg(self, **kwargs): | |
self.log.debug("Sending message") | |
with (yield from self.lock): | |
# Block event processing whilst we wait for the message to go through. Processing will | |
# resume once the caller yields or returns. | |
resp = yield from self.sess.post("https://slack.com/api/chat.postMessage", | |
data=dict(kwargs, token=self.token)) | |
json = yield from resp.json() | |
if not json["ok"]: | |
raise SlackAPIError(json["error"]) | |
return json | |
@asyncio.coroutine | |
def rtm(self): | |
self.log.debug("Requesting RTM session") | |
resp = yield from self.sess.post("https://slack.com/api/rtm.start", | |
data={"token": self.token}) | |
json = yield from resp.json() | |
if not json["ok"]: | |
raise SlackAPIError(json["error"]) | |
# Cache useful information about users and channels, to save on queries later. | |
self.team = json["team"] | |
self.users = {u["id"]: u for u in json["users"]} | |
self.log.debug("Users ({}): {}".format(len(self.users), self.users.keys())) | |
self.channels = {c["id"]: c for c in json["channels"] + json["groups"]} | |
self.log.debug("Channels ({}): {}".format(len(self.channels), self.channels.keys())) | |
self.directs = {c["id"]: c for c in json["ims"]} | |
self.log.debug("Directs ({}): {}".format(len(self.directs), self.directs.keys())) | |
sock = yield from self.sess.ws_connect(json["url"]) | |
self.log.debug("Connected to websocket") | |
while True: | |
event = yield from sock.receive_json() | |
with (yield from self.lock): | |
# No critical section here, just wait for any pending messages to be sent. | |
pass | |
if "type" not in event: | |
self.log.warn("Received strange message with no type") | |
continue | |
self.log.debug("Received a '{}' event".format(event["type"])) | |
if event["type"] in ("team_join", "user_change"): | |
# A user appears or changed, update our cache. | |
self.users[event["user"]["id"]] = event["user"] | |
elif event["type"] in ("channel_joined", "group_joined"): | |
# A group or channel appeared, add to our cache. | |
self.channels[event["channel"]["id"]] = event["channel"] | |
elif event["type"] == "im_created": | |
# A DM appeared, add to our cache. | |
self.directs[event["channel"]["id"]] = event["channel"] | |
try: | |
for callback in self.callbacks: | |
yield from callback(event) | |
except Exception: | |
self.log.exception("Failed callback for event") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment