Skip to content

Instantly share code, notes, and snippets.

@jpmens
Created October 22, 2013 10:52
Show Gist options
  • Save jpmens/7098570 to your computer and use it in GitHub Desktop.
Save jpmens/7098570 to your computer and use it in GitHub Desktop.
Real-time Twitter to MQTT streamer. (Thanks to Peter v. Dijk for making me do this :-)
consumer_key = 'xxxxxxxxxxxxxxxxxxxxxx'
consumer_secret = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
token = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
token_secret = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
tw/jpmens {"url": "http://twitter.com/jpmens/status/392602935735365632", "date": "Tue Oct 22 10:46:48 +0000 2013", "fullname": "Jan-Piet Mens", "text": "NSEC explained #dnssec http://t.co/nJRH5P6Ugc"}
#!/usr/bin/env python
from twython import TwythonStreamer # https://github.com/ryanmcgrath/twython
import mosquitto
import socket
import time
try:
import json
except ImportError:
import simplejson as json
import sys
mqttc = mosquitto.Mosquitto(clean_session=True)
def on_connect(mosq, userdata, rc):
print "Connected to broker"
def on_disconnect(mosq, userdata, rc):
while True:
print "Disconnected from broker; retrying.."
time.sleep(10)
try:
rc = mosq.reconnect()
if rc == 0:
break
except:
pass
print "Reconnected to broker."
config = {}
execfile('conf.ig', config)
class MyStreamer(TwythonStreamer):
def on_success(self, data):
if 'text' in data:
try:
text = data['text'].encode('utf-8')
screen_name = data['user']['screen_name'].encode('utf-8')
fullname = data['user']['name'].encode('utf-8')
date = data['created_at']
id_str = data['id_str']
url = 'http://twitter.com/%s/status/%s' % (screen_name, id_str)
payload = json.dumps(dict(
fullname = fullname,
text = text,
date = date,
url = url
))
print payload
mqttc.publish('tw/%s' % (screen_name), payload, qos=0, retain=False)
except:
raise
def on_error(self, status_code, data):
print status_code, data
self.disconnect()
# Delays will be: 3, 6, 12, 24, 30, 30, ...
mqttc.reconnect_delay_set(delay=3, delay_max=30, exponential_backoff=True)
mqttc.on_connect = on_connect
mqttc.on_disconnect = on_disconnect
try:
mqttc.connect("localhost", 1883, 60)
except Exception, e:
sys.exit("Cannot connect to broker: %s" % (str(e)))
mqttc.loop_start()
# Requires Authentication as of Twitter API v1.1
stream = MyStreamer(config['consumer_key'], config['consumer_secret'],
config['token'], config['token_secret'])
try:
stream.statuses.filter(track=['#dns', 'dnssec', 'ansible', 'powerdns'])
# stream.user() # myself (i.e. authenticated user)
except KeyboardInterrupt:
mqttc.loop_stop()
sys.exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment