Last active
April 8, 2016 00:37
-
-
Save justinatomatic/def5550ffcfe1736c817360180d75ab5 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from TwitterAPI import TwitterAPI | |
from webpurify import * | |
import re | |
import unicodedata | |
import time | |
import datetime | |
import socket | |
import mosquitto | |
''' | |
Query Twitter, sanatise results and publish to MQTT | |
Dependencies | |
TwitterAPI pip install TwitterAPI | |
webpurify local module file | |
mosquitto | |
TODO | |
Logging | |
''' | |
TRACK_TERM = '#SyPy,#myotherhashtagisaporsche' | |
CONSUMER_KEY = '****' | |
CONSUMER_SECRET = '****' | |
ACCESS_TOKEN_KEY = '****' | |
ACCESS_TOKEN_SECRET = '****' | |
web_purifier = WebPurify('****') | |
# --- MQTT Callbacks --- | |
def on_connect(mosq, obj, rc): | |
if rc == 0: | |
print("Connected successfully.") | |
else: | |
print("Connected failed.") | |
def on_disconnect(mosq, obj, rc): | |
print("Disconnected successfully.") | |
def on_publish(mosq, obj, mid): | |
print("Message "+str(mid)+" published.") | |
def on_message(mosq, obj, msg): | |
print("Message "+str(msg)+" recived.") | |
def on_subscribe(mosq, obj, mid, qos_list): | |
print("Subscribe with mid "+str(mid)+" received.") | |
def connectToBroker(): | |
client.will_set('SyPy/clients', payload="twitter Stream Fail", qos=0) | |
try: | |
if not client.connect("localhost"): | |
print "Connected to Broker" | |
except socket.error, (value,message): | |
print "Could not open socket: " + message | |
time.sleep(5) | |
client = mosquitto.Mosquitto("SyPy_TweetStream") | |
client.on_connect = on_connect | |
client.on_disconnect = on_disconnect | |
client.on_publish = on_publish | |
client.on_message = on_message | |
client.on_subscribe = on_subscribe | |
api = TwitterAPI( | |
CONSUMER_KEY, | |
CONSUMER_SECRET, | |
ACCESS_TOKEN_KEY, | |
ACCESS_TOKEN_SECRET) | |
r = api.request('statuses/filter', {'track': TRACK_TERM}) | |
for item in r: | |
if ('text' in item): | |
ascii_string = unicodedata.normalize('NFKD', item['text']).encode('ascii','ignore') | |
count = count + 1 | |
print(ascii_string) | |
Striped_String = re.sub(r'\w+:\/{2}[\d\w-]+(\.[\d\w-]+)*(?:(?:\/[^\s/]*))*', '', item['text']) | |
Striped_String = re.sub(r'#([^\\\n\r\s]*)','', Striped_String) # Strip Hastag | |
Striped_String = re.sub(r'@([^\\ ]*)','', Striped_String) # Strip @ user | |
Striped_String = re.sub(r'\r?\n|\r',' ', Striped_String) # Remove CR LF | |
Striped_String = re.sub(r'&','&', Striped_String) # Replace ampersand | |
Striped_String = re.sub(r'RT ','', Striped_String) # Strip retweets | |
if (Striped_String != ""): | |
WebPurifyData = web_purifier.check(Striped_String) | |
print "Message Purity " + str(WebPurifyData) | |
tweetData = {} | |
tweetData['id'] = item['id'] | |
tweetData['time'] = item['created_at'] | |
tweetData['tweet'] = ascii_string | |
tweetData['type'] = "tweet" | |
tweetData['replyto'] = item['user']['screen_name'] | |
jsonstring = json.dumps(tweetData) | |
print jsonstring | |
if WebPurifyData["found"] == 0: | |
print("Send Tweet to Queue") | |
connectToBroker() | |
client.publish("SyPy/TweetStream",jsonstring) | |
else: | |
print("Un pure!!!") | |
connectToBroker() | |
client.publish("SyPy/droppedTweets",jsonstring) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment