Skip to content

Instantly share code, notes, and snippets.

@thannaske
Created June 15, 2016 15:30
Show Gist options
  • Save thannaske/560abbbbe9e6382fe6869e366bd931ff to your computer and use it in GitHub Desktop.
Save thannaske/560abbbbe9e6382fe6869e366bd931ff to your computer and use it in GitHub Desktop.
Send a Pushover notification when @RMVInfo tweets about a subway- or train-line I take
#!/usr/bin/python
import json
import datetime
import httplib
import urllib
import sys
from tweepy import Stream, OAuthHandler
from tweepy.streaming import StreamListener
TWITTER_CONSUMER_KEY = 'xxx'
TWITTER_CONSUMER_SECRET = 'xxx'
TWITTER_ACCESS_TOKEN = 'xxx'
TWITTER_ACCESS_TOKEN_SECRET = 'xxx'
PUSHOVER_USER_KEY = 'xxx'
PUSHOVER_APP_KEY = 'xxx'
TARGET_TWITTER_USER = ['957672121'] # User-ID of @RMVInfo
# ----------------------------------------------------------- #
class TwitterListener(StreamListener):
def on_status(self, status):
try:
status_text = status.text.encode("utf8")
now = datetime.datetime.now()
pushover = Pushover(status_text)
if now.weekday() == 0 or now.weekday() == 1 or now.weekday() == 2:
# Handle Monday to Wednesday (University)
if 3 < now.hour < 21:
# Only handle messages between 0300h and 2100h
# Between Monday and Wednesday I'm normally using S6 and Subway lines U1, U2, U3 and U8
if ("S-Bahn" in status_text and "S6" in status_text) or ("Frankfurt" in status_text and ("U1" in status_text or "U2" in status_text or "U3" in status_text or "U8" in status_text)):
print "[" + str(datetime.datetime.now()) + "] HANDLE TWEET: " + str(status_text)
pushover.send()
else:
print "[" + str(datetime.datetime.now()) + "] IGNORE TWEET (MATCH): " + str(status_text)
return True
else:
print "[" + str(datetime.datetime.now()) + "] IGNORE TWEET (TIME): " + str(status_text)
return True
elif now.weekday() == 3 or now.weekday() == 4:
# Handle Thursday to Friday (Working)
if 3 < now.hour < 21:
# Only handle messages between 0300h and 2100h
# Betweend Thursday and Friday I'm normally using S6 and Tram 11
if ("S-Bahn" in status_text and "S6" in status_text) or ("Frankfurt" in status_text and "Tram" in status_text and "11" in status_text):
print "[" + str(datetime.datetime.now()) + "] HANDLE TWEET: " + str(status_text)
pushover.send()
else:
print "[" + str(datetime.datetime.now()) + "] IGNORE TWEET (MATCH): " + str(status_text)
return True
else:
print "[" + str(datetime.datetime.now()) + "] IGNORE TWEET (TIME): " + str(status_text)
return True
else:
# On weekend do nothing
print "[" + str(datetime.datetime.now()) + "] IGNORE TWEET (WEEKEND): " + str(status_text)
return True
except Exception as e:
exc_type, exc_obj, exc_tb = sys.exc_info()
print "[" + str(datetime.datetime.now()) + "] EXCEPTION: " + str(e) + " - Line " + str(exc_tb.tb_lineno)
return False
def on_error(self, status):
print "[" + str(datetime.datetime.now()) + "] ERROR: " + str(status)
class Pushover:
def __init__(self, message):
self.__message = str(message)
def send(self):
connection = httplib.HTTPSConnection("api.pushover.net:443")
connection.request("POST", "/1/messages.json", urllib.urlencode({"token": PUSHOVER_APP_KEY, "user": PUSHOVER_USER_KEY, "message": self.__message}), { "Content-type": "application/x-www-form-urlencoded" })
auth = OAuthHandler(TWITTER_CONSUMER_KEY, TWITTER_CONSUMER_SECRET)
auth.secure = True
auth.set_access_token(TWITTER_ACCESS_TOKEN, TWITTER_ACCESS_TOKEN_SECRET)
twitterStream = Stream(auth, TwitterListener())
twitterStream.filter(follow=TARGET_TWITTER_USER)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment