-
-
Save Habbie/7098903 to your computer and use it in GitHub Desktop.
twitter stream to mqtt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
consumer_key = 'xxxxxxxxxxxxxxxxxxxxxx' | |
consumer_secret = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' | |
token = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' | |
token_secret = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
tw/jpmens {"url": "http://twitter.com/jpmens/status/392602935735365632", "date": "Tue Oct 22 10:46:48 +0000 2013", "fullname": "Jan-Piet Mens", "text": "NSEC explained #dnssec http://t.co/nJRH5P6Ugc"} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from twython import TwythonStreamer # https://github.com/ryanmcgrath/twython | |
import mosquitto | |
import socket | |
import time | |
try: | |
import json | |
except ImportError: | |
import simplejson as json | |
import sys | |
mqttc = mosquitto.Mosquitto(clean_session=True) | |
def on_connect(mosq, userdata, rc): | |
print "Connected to broker" | |
def on_disconnect(mosq, userdata, rc): | |
while True: | |
print "Disconnected from broker; retrying.." | |
time.sleep(10) | |
try: | |
rc = mosq.reconnect() | |
if rc == 0: | |
break | |
except: | |
pass | |
print "Reconnected to broker." | |
config = {} | |
execfile('conf.ig', config) | |
class MyStreamer(TwythonStreamer): | |
def on_success(self, data): | |
if 'text' in data: | |
try: | |
text = data['text'].encode('utf-8') | |
screen_name = data['user']['screen_name'].encode('utf-8') | |
fullname = data['user']['name'].encode('utf-8') | |
date = data['created_at'] | |
id_str = data['id_str'] | |
url = 'http://twitter.com/%s/status/%s' % (screen_name, id_str) | |
payload = json.dumps(dict( | |
fullname = fullname, | |
text = text, | |
date = date, | |
url = url | |
)) | |
print payload | |
mqttc.publish('tw/%s' % (screen_name), payload, qos=0, retain=False) | |
except: | |
raise | |
def on_error(self, status_code, data): | |
print status_code, data | |
self.disconnect() | |
# Delays will be: 3, 6, 12, 24, 30, 30, ... | |
mqttc.reconnect_delay_set(delay=3, delay_max=30, exponential_backoff=True) | |
mqttc.on_connect = on_connect | |
mqttc.on_disconnect = on_disconnect | |
try: | |
mqttc.connect("localhost", 1883, 60) | |
except Exception, e: | |
sys.exit("Cannot connect to broker: %s" % (str(e))) | |
mqttc.loop_start() | |
# Requires Authentication as of Twitter API v1.1 | |
stream = MyStreamer(config['consumer_key'], config['consumer_secret'], | |
config['token'], config['token_secret']) | |
try: | |
stream.statuses.filter(track=['#dns', 'dnssec', 'ansible', 'powerdns']) | |
# stream.user() # myself (i.e. authenticated user) | |
except KeyboardInterrupt: | |
mqttc.loop_stop() | |
sys.exit(0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment