Created
February 17, 2023 18:23
-
-
Save wybiral/6444686a3dd33d70f1df996bde57f0ef to your computer and use it in GitHub Desktop.
Watch Mastodon stream for notifications and blink Thingy52 LED
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
Watch Mastodon stream for notifications and blink Thingy52 | |
''' | |
import asyncio | |
import os | |
import ssl | |
try: | |
import certifi | |
except ImportError: | |
print('certifi module required: pip install certifi') | |
exit(1) | |
try: | |
from aiomast import MastodonAPI | |
except ImportError: | |
print('aiomast module required: pip install aiomast') | |
exit(1) | |
try: | |
from aiothingy import Thingy52 | |
except ImportError: | |
print('aiothingy module required: pip install aiothingy') | |
exit(1) | |
MASTODON_INSTANCE = 'mastodon.social' | |
ACCESS_TOKEN = os.environ['MASTODON_TOKEN'] | |
active = False | |
async def main(): | |
global active | |
print('connecting to thingy') | |
thingy = Thingy52() | |
await thingy.connect() | |
print('turning led off') | |
await thingy.led.off() | |
async def reset(event): | |
global active | |
if event['type'] == 'button' and event['data']: | |
if active: | |
print('notification dismissed') | |
await thingy.led.off() | |
active = False | |
print('subscribing to button') | |
await thingy.button.subscribe(reset) | |
api = MastodonAPI(MASTODON_INSTANCE) | |
api.set_access_token(ACCESS_TOKEN) | |
api.ssl = ssl.create_default_context(cafile=certifi.where()) | |
print('requesting mastodon instance') | |
instance = await api.instance.info() | |
config = instance['configuration'] | |
streaming_url = config['urls']['streaming'] | |
async with api.session: | |
while True: | |
print('connecting to mastodon stream') | |
stream = await api.websockets.connect(streaming_url) | |
await stream.subscribe('user:notification') | |
print('streaming') | |
print('') | |
async for event in stream: | |
if event['event'] == 'notification': | |
if not active: | |
await thingy.led.breathe(color='purple') | |
active = True | |
data = event['data'] | |
if data['type'] == 'mention': | |
print('mentiond by ' + data['account']['acct']) | |
else: | |
print(data['type']) | |
print('disconnected') | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment