Skip to content

Instantly share code, notes, and snippets.

@swinton
Created September 24, 2012 15:21
Show Gist options
  • Save swinton/3776504 to your computer and use it in GitHub Desktop.
Save swinton/3776504 to your computer and use it in GitHub Desktop.
Connect to Twitter, track a hashtag, read out the tweets
#!/usr/bin/env python
"""Connect to Twitter, track a hashtag, read out the tweets!"""
import sys
import subprocess
import multiprocessing
import track
queue = multiprocessing.Queue()
def say():
while True:
user, line, voice = queue.get()
print user, "says:", line, "[ as", voice, "]"
subprocess.call(['say', '-v', voice, user + " says: " + line])
def echobot(filter, access_token, access_token_secret, consumer_key, consumer_secret):
"""Connect to Twitter, track a hashtag, read out the tweets!"""
voices = 'alex,victoria,albert,zarvox'.split(",")
voice_hashtags = ["#" + voice for voice in voices]
voices = zip(voices, voice_hashtags)
try:
process = multiprocessing.Process(target=say)
process.start()
for tweet in track.track(filter, access_token, access_token_secret, consumer_key, consumer_secret):
if "text" in tweet:
voice = None
for v, hashtag in voices:
if hashtag in tweet["text"]:
voice = v
break
voice = "alex" if voice is None else voice
queue.put((tweet["user"]["name"], tweet["text"], voice))
except KeyboardInterrupt:
process.terminate()
process.join()
if __name__ == "__main__":
import settings
filter = sys.argv[1]
echobot(filter, access_token=settings.ACCESS_TOKEN, access_token_secret=settings.ACCESS_TOKEN_SECRET, consumer_key=settings.CONSUMER_KEY, consumer_secret=settings.CONSUMER_SECRET)
#!/usr/bin/env python
"""
Example of connecting to the Twitter user stream using Requests.
"""
import sys
import json
import requests
from oauth_hook import OAuthHook
def track(track, access_token, access_token_secret, consumer_key, consumer_secret):
oauth_hook = OAuthHook(access_token=access_token, access_token_secret=access_token_secret,
consumer_key=consumer_key, consumer_secret=consumer_secret,
header_auth=True)
hooks = dict(pre_request=oauth_hook)
config = dict(verbose=sys.stderr)
client = requests.session(hooks=hooks, config=config)
data = dict(delimited="length", track=track)
r = client.post("https://userstream.twitter.com/1.1/user.json", data=data, prefetch=False)
for chunk in r.iter_lines(chunk_size=1):
if chunk and not chunk.isdigit():
yield json.loads(chunk)
if __name__ == "__main__":
import pprint
import settings
for obj in track(sys.argv[1], access_token=settings.ACCESS_TOKEN, access_token_secret=settings.ACCESS_TOKEN_SECRET, consumer_key=settings.CONSUMER_KEY, consumer_secret=settings.CONSUMER_SECRET):
pprint.pprint(obj)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment