Skip to content

Instantly share code, notes, and snippets.

@turlockmike
Last active April 1, 2026 19:31
Show Gist options
  • Select an option

  • Save turlockmike/da757caa7cacabc4816f393628d047dd to your computer and use it in GitHub Desktop.

Select an option

Save turlockmike/da757caa7cacabc4816f393628d047dd to your computer and use it in GitHub Desktop.
slack-listen: Real-time Slack event listener via WebSocket — zero polling, zero rate limits
name slack-listen
description Real-time Slack event listener via WebSocket. Use when the user wants to watch for Slack messages, monitor channels, listen for notifications, or stream Slack events in real-time. Triggers: 'listen to slack', 'watch slack', 'slack notifications', 'monitor slack channels', 'slack realtime', '/slack-listen'.

Slack Listen

Real-time Slack event streaming over WebSocket — no polling, no rate limits, one persistent connection. Events arrive as NDJSON to stdout.

Architecture

Two scripts in ~/.claude/skills/slack-listen/scripts/:

Script Role When
slack-auth.py Extract creds from Slack Desktop → macOS Keychain Once, or when creds expire
slack-listen.py WebSocket → NDJSON event stream Ongoing

Auth flow: Slack Desktop LevelDB → xoxc tokens. Cookies SQLite → xoxd (AES-128-CBC decrypted via Keychain safe storage). Both stored in macOS Keychain under service slack-listen. Workspace URLs resolved via auth.test API call.

Connection: Uses client.getWebSocketURL (Slack's internal desktop endpoint), NOT rtm.connect. This matters because rtm.connect fails with enterprise_is_restricted on Enterprise Grid orgs. The desktop endpoint works with enterprise-level xoxc tokens.

Dependencies: Python websockets + cryptography (pre-installed). No npm packages, no agent-slack CLI.

Auth Setup

# First time — extracts from Slack Desktop, stores in Keychain
python3 ~/.claude/skills/slack-listen/scripts/slack-auth.py

# Verify credentials work
python3 ~/.claude/skills/slack-listen/scripts/slack-auth.py --check

# Show what's stored (redacted)
python3 ~/.claude/skills/slack-listen/scripts/slack-auth.py --show

If auth fails: Slack Desktop must be installed and logged in. The script reads from ~/Library/Application Support/Slack/. Re-run after logging in.

Listening

# High-signal: only events Slack thinks you should see (DMs, mentions, replies)
python3 ~/.claude/skills/slack-listen/scripts/slack-listen.py --filter notifications

# All messages across all channels you're in
python3 ~/.claude/skills/slack-listen/scripts/slack-listen.py --filter messages

# Messages + reactions + joins + file shares (default)
python3 ~/.claude/skills/slack-listen/scripts/slack-listen.py --filter activity

# Everything including typing indicators
python3 ~/.claude/skills/slack-listen/scripts/slack-listen.py --filter all

# Scoped to specific channels
python3 ~/.claude/skills/slack-listen/scripts/slack-listen.py --filter messages --channels C02DTTJ1B8Q,DUEKV5ELV

# Bounded run (seconds)
python3 ~/.claude/skills/slack-listen/scripts/slack-listen.py --filter notifications --timeout 60

Output Format

NDJSON to stdout. Each line is a self-contained JSON object. Nulls are pruned.

{"type": "init", "workspace": "extend.enterprise.slack.com", "filter": "notifications"}
{"type": "hello", "ts": 1775060480.3, "status": "connected"}
{"type": "desktop_notification", "ts": 1775060505.7, "title": "Extend, Inc.", "subtitle": "Heather", "content": "oh really!", "channel": "D022QJT844B"}
{"type": "message", "ts": 1775060524.4, "channel": "G01ETM20D9S", "user": "U0213BDE4B0", "text": "because new services won't have PITR...", "thread_ts": "1775060253.650509"}
{"type": "shutdown", "events_received": 12, "duration_s": 60.0}

Filter Guide

Filter Events Use case
notifications desktop_notification only "Tell me when something needs my attention"
messages message only "Watch for messages in my channels"
activity messages + reactions + joins + files Default — broad awareness
all Everything including typing/read markers Debugging or full audit

For notification-driven workflows, --filter notifications is almost always the right choice — Slack pre-filters these to DMs, mentions, and thread replies relevant to you.

Gotchas

  • Keychain service is slack-listen, not agent-slack. The scripts are fully independent.
  • Enterprise Grid: The enterprise-level token works for all workspaces in the org via client.getWebSocketURL. No need for workspace-specific tokens.
  • Messages have user IDs, not names. Use Slack MCP slack_read_user_profile to resolve if needed.
  • Token expiry: When hello stops appearing or you get invalid_auth on the WebSocket, re-run slack-auth.py to refresh credentials.
  • One connection per process. Slack limits concurrent WebSocket connections — don't run multiple listeners simultaneously or it may disconnect your Slack Desktop app.
#!/usr/bin/env python3
"""Extract Slack Desktop credentials and store them in macOS Keychain.
Reads xoxc tokens from Slack Desktop's LevelDB and decrypts the xoxd
cookie from its SQLite Cookies DB. Stores results in Keychain under the
service name "slack-listen" so slack-listen.py can use them.
No external dependencies beyond Python stdlib + cryptography (pre-installed).
Usage:
slack-auth.py # extract and store in keychain
slack-auth.py --check # verify stored credentials still work
slack-auth.py --show # print stored credentials (redacted)
Inspired by: github.com/stablyai/agent-slack, github.com/hraftery/slacktokens
"""
import argparse
import hashlib
import json
import os
import re
import shutil
import sqlite3
import struct
import subprocess
import sys
import tempfile
import urllib.parse
import urllib.request
KEYCHAIN_SERVICE = "slack-listen"
SLACK_DIR = os.path.expanduser("~/Library/Application Support/Slack")
SLACK_DIR_APPSTORE = os.path.expanduser(
"~/Library/Containers/com.tinyspeck.slackmacgap/Data/Library/Application Support/Slack"
)
# --- Keychain helpers ---
def keychain_set(account, value):
"""Store a value in macOS Keychain."""
# Delete existing entry first (ignore errors if it doesn't exist)
subprocess.run(
["security", "delete-generic-password", "-s", KEYCHAIN_SERVICE, "-a", account],
capture_output=True,
)
subprocess.check_call(
["security", "add-generic-password", "-s", KEYCHAIN_SERVICE, "-a", account, "-w", value],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
)
def keychain_get(account):
"""Read a value from macOS Keychain."""
try:
return subprocess.check_output(
["security", "find-generic-password", "-s", KEYCHAIN_SERVICE, "-a", account, "-w"],
stderr=subprocess.DEVNULL,
).decode().strip()
except subprocess.CalledProcessError:
return None
def get_safe_storage_password():
"""Get Slack's Safe Storage password from Keychain."""
for service, account in [
("Slack Safe Storage", "Slack Key"),
("Slack Safe Storage", "Slack App Store Key"),
("Slack Safe Storage", None),
]:
try:
args = ["security", "find-generic-password", "-w", "-s", service]
if account:
args.extend(["-a", account])
return subprocess.check_output(args, stderr=subprocess.DEVNULL).decode().strip()
except subprocess.CalledProcessError:
continue
return None
# --- LevelDB log file reader (no compression needed) ---
def read_log_records(data):
"""Parse LevelDB .log file records. Yields raw record payloads."""
BLOCK_SIZE = 32768
offset = 0
pending = []
while offset < len(data):
block_offset = offset % BLOCK_SIZE
remaining = BLOCK_SIZE - block_offset
if remaining < 7:
offset += remaining
continue
if offset + 7 > len(data):
break
length = struct.unpack_from("<H", data, offset + 4)[0]
rtype = data[offset + 6]
offset += 7
if length == 0 or offset + length > len(data):
offset += max(0, remaining - 7)
pending = []
continue
chunk = data[offset:offset + length]
offset += length
if rtype == 1: # FULL
pending = []
yield chunk
elif rtype == 2: # FIRST
pending = [chunk]
elif rtype == 3: # MIDDLE
if pending:
pending.append(chunk)
elif rtype == 4: # LAST
if pending:
pending.append(chunk)
yield b"".join(pending)
pending = []
def parse_log_batch(batch):
"""Parse a LevelDB write batch. Yields (key, value) tuples."""
if len(batch) < 12:
return
offset = 12 # skip sequence(8) + count(4)
while offset < len(batch):
record_type = batch[offset]
offset += 1
if record_type == 1: # Value record
key_len, n = read_varint(batch, offset)
offset += n
key = batch[offset:offset + key_len]
offset += key_len
val_len, n = read_varint(batch, offset)
offset += n
value = batch[offset:offset + val_len]
offset += val_len
yield key, value
elif record_type == 0: # Deletion
key_len, n = read_varint(batch, offset)
offset += n
offset += key_len
else:
break
def read_varint(buf, offset):
"""Read a LevelDB varint."""
result = 0
shift = 0
n = 0
while offset + n < len(buf):
b = buf[offset + n]
n += 1
result |= (b & 0x7F) << shift
if (b & 0x80) == 0:
return result, n
shift += 7
if shift >= 35:
break
raise ValueError("bad varint")
def extract_teams_from_leveldb(slack_dir):
"""Extract xoxc team tokens from Slack's LevelDB."""
leveldb_dir = os.path.join(slack_dir, "Local Storage", "leveldb")
if not os.path.isdir(leveldb_dir):
return []
# Snapshot the LevelDB dir (Slack holds a lock on it)
snap = tempfile.mkdtemp(prefix="slack-leveldb-")
try:
subprocess.run(["cp", "-cR", leveldb_dir, snap + "/db"], capture_output=True)
lock = os.path.join(snap, "db", "LOCK")
if os.path.exists(lock):
os.unlink(lock)
db_dir = snap + "/db"
except Exception:
shutil.copytree(leveldb_dir, snap + "/db")
db_dir = snap + "/db"
try:
config_buf = None
# Scan .log files first (most recent data, no compression)
for fname in sorted(os.listdir(db_dir), reverse=True):
if not fname.endswith(".log"):
continue
with open(os.path.join(db_dir, fname), "rb") as f:
data = f.read()
for record in read_log_records(data):
for key, value in parse_log_batch(record):
if b"localConfig_v" in key:
config_buf = value
break
if config_buf:
break
if config_buf:
break
# If not in logs, scan all .ldb files containing localConfig
if not config_buf:
ldb_bufs = []
for fname in sorted(os.listdir(db_dir)):
if not fname.endswith(".ldb"):
continue
with open(os.path.join(db_dir, fname), "rb") as f:
raw = f.read()
if b"localConfig_v" in raw:
ldb_bufs.append(raw)
# Concatenate all matching files for regex extraction
if ldb_bufs:
config_buf = b"\x00".join(ldb_bufs)
if not config_buf:
return []
# LevelDB uses prefix compression which corrupts JSON structure.
# Use regex to extract team data from the raw bytes.
tokens = re.findall(rb'"token"\s*:\s*"(xoxc-[^"]+)"', config_buf)
urls = re.findall(rb'"url"\s*:\s*"(https://[^"]+slack\.com/[^"]*)"', config_buf)
names = re.findall(rb'"name"\s*:\s*"([^"]{1,100})"', config_buf)
if not tokens:
return []
teams = []
seen_tokens = set()
for token in tokens:
tok = token.decode("utf-8", errors="ignore")
if tok in seen_tokens:
continue
seen_tokens.add(tok)
# Match this token to the nearest url and name before it
tok_pos = config_buf.find(token)
best_url = ""
best_name = ""
for url in urls:
pos = config_buf.rfind(url, 0, tok_pos)
if pos != -1:
best_url = url.decode("utf-8", errors="ignore").rstrip("/")
break
for name in names:
pos = config_buf.rfind(name, 0, tok_pos)
if pos != -1:
best_name = name.decode("utf-8", errors="ignore")
break
teams.append({"url": best_url, "name": best_name, "token": tok})
return teams
finally:
shutil.rmtree(snap, ignore_errors=True)
# --- Cookie decryption ---
def decrypt_cookie(encrypted_value, password):
"""Decrypt a Chromium cookie using AES-128-CBC (macOS)."""
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
# Skip v10/v11 prefix
prefix = encrypted_value[:3]
if prefix in (b"v10", b"v11"):
data = encrypted_value[3:]
else:
data = encrypted_value
salt = b"saltysalt"
iv = b" " * 16
key = hashlib.pbkdf2_hmac("sha1", password.encode(), salt, 1003, dklen=16)
cipher = Cipher(algorithms.AES(key), modes.CBC(iv))
decryptor = cipher.decryptor()
decrypted = decryptor.update(data) + decryptor.finalize()
# Find xoxd- token in decrypted bytes
match = re.search(rb"xoxd-[A-Za-z0-9%/+_=.-]+", decrypted)
if match:
raw = match.group(0).decode("utf-8")
try:
return urllib.parse.unquote(raw)
except Exception:
return raw
return None
def extract_cookie_d(slack_dir):
"""Extract and decrypt the xoxd cookie from Slack's Cookies DB."""
# Find the cookies DB
for candidate in [
os.path.join(slack_dir, "Network", "Cookies"),
os.path.join(slack_dir, "Cookies"),
]:
if os.path.exists(candidate):
cookies_path = candidate
break
else:
return None
password = get_safe_storage_password()
if not password:
print('{"error": "could not read Slack Safe Storage password from Keychain"}', file=sys.stderr)
return None
# Copy the DB to avoid locking issues
tmp = tempfile.mktemp(suffix=".db", prefix="slack-cookies-")
shutil.copy2(cookies_path, tmp)
try:
conn = sqlite3.connect(tmp)
conn.row_factory = sqlite3.Row
rows = conn.execute(
"SELECT host_key, name, value, encrypted_value FROM cookies "
"WHERE name = 'd' AND host_key LIKE '%slack.com' "
"ORDER BY length(encrypted_value) DESC"
).fetchall()
conn.close()
if not rows:
return None
row = rows[0]
# Check for unencrypted value first
if row["value"] and row["value"].startswith("xoxd-"):
return row["value"]
encrypted = bytes(row["encrypted_value"])
if not encrypted:
return None
return decrypt_cookie(encrypted, password)
finally:
os.unlink(tmp)
# --- Main ---
def resolve_team_url(xoxc, xoxd):
"""Call auth.test to get the actual workspace URL for a token."""
xoxd_enc = urllib.parse.quote(xoxd, safe="")
try:
req = urllib.request.Request(
"https://slack.com/api/auth.test",
data=urllib.parse.urlencode({"token": xoxc}).encode(),
headers={"Cookie": f"d={xoxd_enc}"},
method="POST",
)
resp = json.loads(urllib.request.urlopen(req, timeout=10).read())
if resp.get("ok"):
return {
"url": resp.get("url", "").rstrip("/"),
"team": resp.get("team", ""),
"user": resp.get("user", ""),
"team_id": resp.get("team_id", ""),
}
except Exception:
pass
return None
def extract_all():
"""Extract credentials from Slack Desktop and store in Keychain."""
# Try both Electron and App Store paths
for slack_dir in [SLACK_DIR, SLACK_DIR_APPSTORE]:
if not os.path.isdir(slack_dir):
continue
teams = extract_teams_from_leveldb(slack_dir)
cookie_d = extract_cookie_d(slack_dir)
if teams and cookie_d:
# Resolve actual workspace URLs via auth.test
resolved = []
seen_urls = set()
for team in teams:
info = resolve_team_url(team["token"], cookie_d)
if info and info["url"] not in seen_urls:
seen_urls.add(info["url"])
resolved.append({
"url": info["url"],
"name": info["team"],
"user": info["user"],
"token": team["token"],
})
if not resolved:
# Fall back to LevelDB URLs if auth.test fails
resolved = teams
# Store in keychain
keychain_set("xoxd", cookie_d)
for team in resolved:
keychain_set(f"xoxc:{team['url']}", team["token"])
result = {
"ok": True,
"cookie_d": f"xoxd-...{cookie_d[-6:]}",
"teams": [{
"url": t["url"],
"name": t.get("name", ""),
"user": t.get("user", ""),
"token": f"xoxc-...{t['token'][-6:]}",
} for t in resolved],
"keychain_service": KEYCHAIN_SERVICE,
}
print(json.dumps(result))
return True
print(json.dumps({"ok": False, "error": "could not extract credentials from Slack Desktop"}), file=sys.stderr)
return False
def find_stored_workspaces():
"""Find workspace URLs stored under our keychain service."""
dump = subprocess.check_output(["security", "dump-keychain"], stderr=subprocess.DEVNULL).decode()
urls = []
lines = dump.splitlines()
in_our_service = False
for line in lines:
if f'"svce"<blob>="{KEYCHAIN_SERVICE}"' in line:
in_our_service = True
elif '"svce"<blob>=' in line:
in_our_service = False
if in_our_service and '"acct"' in line and "xoxc:" in line:
url = line.split("xoxc:", 1)[1].strip().strip('"')
if url not in urls:
urls.append(url)
in_our_service = False
return urls
def check_credentials():
"""Verify stored Keychain credentials work against Slack API."""
xoxd = keychain_get("xoxd")
if not xoxd:
print(json.dumps({"ok": False, "error": "no xoxd in keychain"}))
return False
workspaces = find_stored_workspaces()
if not workspaces:
print(json.dumps({"ok": False, "error": "no xoxc entries in keychain"}))
return False
results = []
xoxd_enc = urllib.parse.quote(xoxd, safe="")
for url in workspaces:
xoxc = keychain_get(f"xoxc:{url}")
if not xoxc:
results.append({"url": url, "ok": False, "error": "token not found in keychain"})
continue
try:
req = urllib.request.Request(
f"{url}/api/auth.test",
data=urllib.parse.urlencode({"token": xoxc}).encode(),
headers={"Cookie": f"d={xoxd_enc}"},
method="POST",
)
resp = json.loads(urllib.request.urlopen(req, timeout=10).read())
results.append({"url": url, "ok": resp.get("ok", False), "user": resp.get("user"), "team": resp.get("team")})
except Exception as e:
results.append({"url": url, "ok": False, "error": str(e)})
print(json.dumps({"ok": all(r["ok"] for r in results), "workspaces": results}))
return all(r["ok"] for r in results)
def show_credentials():
"""Show stored credentials (redacted)."""
xoxd = keychain_get("xoxd")
dump = subprocess.check_output(["security", "dump-keychain"], stderr=subprocess.DEVNULL).decode()
teams = []
for line in dump.splitlines():
if '"acct"' in line and "xoxc:" in line:
url = line.split("xoxc:", 1)[1].strip().strip('"')
xoxc = keychain_get(f"xoxc:{url}")
if xoxc:
teams.append({"url": url, "token": f"xoxc-...{xoxc[-6:]}"})
result = {
"keychain_service": KEYCHAIN_SERVICE,
"cookie_d": f"xoxd-...{xoxd[-6:]}" if xoxd else None,
"teams": teams,
}
print(json.dumps(result, indent=2))
def main():
parser = argparse.ArgumentParser(description="Extract Slack Desktop credentials to Keychain")
parser.add_argument("--check", action="store_true", help="Verify stored credentials")
parser.add_argument("--show", action="store_true", help="Show stored credentials (redacted)")
args = parser.parse_args()
if args.check:
sys.exit(0 if check_credentials() else 1)
elif args.show:
show_credentials()
else:
sys.exit(0 if extract_all() else 1)
if __name__ == "__main__":
main()
#!/usr/bin/env python3
"""Real-time Slack event listener via WebSocket.
Connects to Slack's WebSocket and streams events as NDJSON.
Auth is pulled from macOS Keychain entries (stored by slack-auth.py).
Usage:
slack-listen.py # all events
slack-listen.py --filter messages # messages only
slack-listen.py --filter notifications # desktop_notification events only
slack-listen.py --filter messages --channels C02DTTJ1B8Q,DUEKV5ELV
slack-listen.py --timeout 300 # run for 5 minutes
slack-listen.py --workspace extend-workspace.slack.com
Output: NDJSON to stdout, errors to stderr. Exit 0 on clean shutdown, 1 on error.
"""
import argparse
import asyncio
import json
import signal
import subprocess
import sys
import time
import urllib.parse
import urllib.request
# Event types worth surfacing per filter mode
MESSAGE_TYPES = {"message"}
NOTIFICATION_TYPES = {"desktop_notification"}
ACTIVITY_TYPES = {
"message", "desktop_notification", "reaction_added",
"member_joined_channel", "channel_joined", "group_joined",
"im_open", "file_shared", "file_comment_added",
}
NOISE_TYPES = {
"user_typing", "reconnect_url", "pref_change",
"clear_mention_notification", "badge_counts_updated",
"thread_marked", "channel_marked", "im_marked", "group_marked",
"update_global_thread_state", "mpim_marked",
}
FILTERS = {
"all": None, # everything
"messages": MESSAGE_TYPES,
"notifications": NOTIFICATION_TYPES,
"activity": ACTIVITY_TYPES,
}
KEYCHAIN_SERVICE = "slack-listen"
def get_keychain(account):
"""Read a value from macOS Keychain for the slack-listen service."""
try:
return subprocess.check_output(
["security", "find-generic-password", "-s", KEYCHAIN_SERVICE, "-a", account, "-w"],
stderr=subprocess.DEVNULL,
).decode().strip()
except subprocess.CalledProcessError:
return None
def find_workspaces():
"""Find all workspace URLs stored in our keychain service."""
# Use security dump to find all accounts for our service
try:
dump = subprocess.check_output(
["security", "dump-keychain"], stderr=subprocess.DEVNULL
).decode()
except subprocess.CalledProcessError:
return []
workspaces = []
lines = dump.splitlines()
in_our_service = False
for line in lines:
if f'"svce"<blob>="{KEYCHAIN_SERVICE}"' in line:
in_our_service = True
elif '"svce"<blob>=' in line:
in_our_service = False
if in_our_service and '"acct"' in line and "xoxc:" in line:
url = line.split("xoxc:", 1)[1].strip().strip('"')
if url not in workspaces:
workspaces.append(url)
in_our_service = False
return workspaces
def resolve_workspace(hint=None):
"""Find workspace URL and credentials from keychain entries."""
workspaces = find_workspaces()
if not workspaces:
print(json.dumps({"error": f"no credentials in keychain (service: {KEYCHAIN_SERVICE}). Run slack-auth.py first."}), file=sys.stderr)
sys.exit(1)
if hint:
matches = [w for w in workspaces if hint in w]
if len(matches) == 1:
return matches[0]
elif len(matches) > 1:
print(json.dumps({"error": f"ambiguous workspace hint '{hint}'", "matches": matches}), file=sys.stderr)
sys.exit(1)
else:
print(json.dumps({"error": f"no workspace matching '{hint}'", "available": workspaces}), file=sys.stderr)
sys.exit(1)
return workspaces[0]
def get_ws_url(workspace_url, xoxc, xoxd_enc):
"""Get a WebSocket URL for real-time events.
Tries client.getWebSocketURL first (works with enterprise tokens),
falls back to rtm.connect for non-enterprise workspaces.
"""
# Try the internal desktop endpoint first
for endpoint in ["client.getWebSocketURL", "rtm.connect"]:
try:
req = urllib.request.Request(
f"{workspace_url}/api/{endpoint}",
data=urllib.parse.urlencode({"token": xoxc}).encode(),
headers={"Cookie": f"d={xoxd_enc}"},
method="POST",
)
resp = json.loads(urllib.request.urlopen(req, timeout=10).read())
if not resp.get("ok"):
continue
# client.getWebSocketURL returns primary_websocket_url
ws_url = resp.get("primary_websocket_url") or resp.get("url")
if ws_url:
return ws_url, resp.get("routing_context")
except Exception:
continue
print(json.dumps({"error": "could not obtain WebSocket URL from Slack"}), file=sys.stderr)
sys.exit(1)
def format_event(data):
"""Extract a compact NDJSON record from a raw RTM event."""
etype = data.get("type", "unknown")
record = {"type": etype, "ts": time.time()}
if etype == "message":
record["channel"] = data.get("channel")
record["user"] = data.get("user")
record["text"] = data.get("text", "")
record["thread_ts"] = data.get("thread_ts")
subtype = data.get("subtype")
if subtype:
record["subtype"] = subtype
# Detect bot messages
if data.get("bot_id"):
record["bot_id"] = data["bot_id"]
elif etype == "desktop_notification":
record["title"] = data.get("title")
record["subtitle"] = data.get("subtitle")
record["content"] = data.get("content")
record["channel"] = data.get("channel")
record["thread_ts"] = data.get("thread_ts")
record["event_ts"] = data.get("event_ts")
elif etype == "reaction_added":
record["user"] = data.get("user")
record["reaction"] = data.get("reaction")
record["item"] = data.get("item")
elif etype == "hello":
record["status"] = "connected"
else:
# Pass through other events with minimal wrapping
for key in ("channel", "user", "text", "item"):
if key in data:
record[key] = data[key]
# Strip None values for token efficiency
return {k: v for k, v in record.items() if v is not None}
async def listen(ws_url, xoxc, xoxd_enc, args):
"""Main WebSocket listener loop."""
import websockets
filter_types = FILTERS.get(args.filter)
channels = set(args.channels.split(",")) if args.channels else None
count = 0
shutdown = asyncio.Event()
loop = asyncio.get_event_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, shutdown.set)
extra_headers = {"Cookie": f"d={xoxd_enc}"}
# Append token to WebSocket URL (like the desktop client does)
sep = "&" if "?" in ws_url else "?"
full_ws_url = f"{ws_url}{sep}token={xoxc}"
async with websockets.connect(full_ws_url, additional_headers=extra_headers, close_timeout=2) as ws:
# Start timeout AFTER connection is established
start = time.time()
while not shutdown.is_set():
# Check timeout
remaining = None
if args.timeout:
remaining = args.timeout - (time.time() - start)
if remaining <= 0:
break
# Use remaining time or 5s poll interval, whichever is shorter
recv_timeout = min(remaining, 5) if remaining else 5
try:
msg = await asyncio.wait_for(ws.recv(), timeout=recv_timeout)
except asyncio.TimeoutError:
continue
except Exception:
break
data = json.loads(msg)
etype = data.get("type", "unknown")
# Always pass hello through
if etype == "hello":
record = format_event(data)
print(json.dumps(record), flush=True)
count += 1
continue
# Skip noise unless showing all
if args.filter != "all" and etype in NOISE_TYPES:
continue
# Apply type filter
if filter_types and etype not in filter_types:
continue
# Apply channel filter
if channels and data.get("channel") not in channels:
continue
record = format_event(data)
print(json.dumps(record), flush=True)
count += 1
summary = {"type": "shutdown", "events_received": count, "duration_s": round(time.time() - start, 1)}
print(json.dumps(summary), flush=True)
async def run(args):
"""Connect and listen with automatic reconnection."""
import websockets # noqa: F401 — verify import early
workspace_url = resolve_workspace(args.workspace)
hostname = urllib.parse.urlparse(workspace_url).hostname or workspace_url
xoxc = get_keychain(f"xoxc:{workspace_url}")
xoxd = get_keychain("xoxd")
if not xoxc or not xoxd:
print(json.dumps({"error": "missing credentials", "workspace": workspace_url}), file=sys.stderr)
sys.exit(1)
xoxd_enc = urllib.parse.quote(xoxd, safe="")
max_retries = 5
retry = 0
while retry < max_retries:
try:
ws_url, routing = get_ws_url(workspace_url, xoxc, xoxd_enc)
meta = {"type": "init", "workspace": hostname, "filter": args.filter}
if args.channels:
meta["channels"] = args.channels
if args.timeout:
meta["timeout_s"] = args.timeout
print(json.dumps(meta), flush=True)
await listen(ws_url, xoxc, xoxd_enc, args)
break # Clean exit
except Exception as e:
retry += 1
wait = min(2 ** retry, 30)
print(json.dumps({"type": "reconnect", "attempt": retry, "wait_s": wait, "error": str(e)}),
file=sys.stderr, flush=True)
if retry < max_retries:
await asyncio.sleep(wait)
if retry >= max_retries:
print(json.dumps({"error": "max retries exceeded"}), file=sys.stderr)
sys.exit(1)
def main():
parser = argparse.ArgumentParser(description="Real-time Slack event listener via RTM WebSocket")
parser.add_argument("--filter", choices=list(FILTERS.keys()), default="activity",
help="Event filter (default: activity)")
parser.add_argument("--channels", type=str, default=None,
help="Comma-separated channel IDs to watch (default: all)")
parser.add_argument("--timeout", type=int, default=None,
help="Stop after N seconds (default: run until interrupted)")
parser.add_argument("--workspace", type=str, default=None,
help="Workspace URL substring (default: auto-detect)")
args = parser.parse_args()
asyncio.run(run(args))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment