Created
December 13, 2013 22:44
-
-
Save habnabit/7952702 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pprint | |
import sys | |
import oauth2 | |
from twisted.internet import defer, task | |
from twisted.web.client import Agent, HTTPConnectionPool | |
import twits | |
consumer = oauth2.Consumer('...', '...') | |
token = oauth2.Token('...', '...') | |
def main(reactor, terms): | |
pool = HTTPConnectionPool(reactor) | |
agent = Agent(reactor, pool=pool) | |
oauth = twits.OAuthAgent(agent, consumer, token) | |
twitter = twits.Twitter(oauth, streamingAPI='https://stream.twitter.com/1.1/') | |
streamer = twits.StreamPreserver(twitter, 'statuses/filter.json', track=terms) | |
streamer.addDelegate(pprint.pprint) | |
streamer.startService() | |
return defer.Deferred() # run forever | |
task.react(main, sys.argv[1:]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Copyright (c) Aaron Gallagher <[email protected]> | |
# See COPYING for details. | |
"I HATE TWITTER" | |
import json | |
import re | |
import urllib | |
import urlparse | |
from twisted.application.service import Service | |
from twisted.internet import defer, protocol | |
from twisted.internet.error import ConnectionDone, ConnectionLost, TimeoutError | |
from twisted.protocols.basic import LineOnlyReceiver | |
from twisted.protocols.policies import TimeoutMixin | |
from twisted.python import failure, log | |
from twisted.web.client import ResponseDone, ResponseFailed | |
from twisted.web.http import PotentialDataLoss | |
from twisted.web.http_headers import Headers | |
import oauth2 | |
defaultSignature = oauth2.SignatureMethod_HMAC_SHA1() | |
defaultTwitterAPI = 'https://api.twitter.com/1.1/' | |
defaultStreamingAPI = 'https://userstream.twitter.com/1.1/' | |
OAUTH_KEYS = { | |
'oauth_nonce', 'oauth_timestamp', 'oauth_consumer_key', | |
'oauth_signature_method', 'oauth_version', 'oauth_signature', | |
} | |
class StringReceiver(protocol.Protocol): | |
def __init__(self, byteLimit=None): | |
self.bytesRemaining = byteLimit | |
self.deferred = defer.Deferred() | |
self._buffer = [] | |
def dataReceived(self, data): | |
data = data[:self.bytesRemaining] | |
self._buffer.append(data) | |
if self.bytesRemaining is not None: | |
self.bytesRemaining -= len(data) | |
if not self.bytesRemaining: | |
self.transport.stopProducing() | |
def connectionLost(self, reason): | |
if ((reason.check(ResponseFailed) and any(exn.check(ConnectionDone, ConnectionLost) | |
for exn in reason.value.reasons)) | |
or reason.check(ResponseDone, PotentialDataLoss)): | |
self.deferred.callback(''.join(self._buffer)) | |
else: | |
self.deferred.errback(reason) | |
def receive(response, receiver): | |
response.deliverBody(receiver) | |
return receiver.deferred | |
class UnexpectedHTTPStatus(Exception): | |
pass | |
def trapBadStatuses(response, goodStatuses=(200,)): | |
if response.code not in goodStatuses: | |
raise UnexpectedHTTPStatus(response.code, response.phrase) | |
return response | |
class OAuthAgent(object): | |
"An Agent wrapper that adds OAuth authorization headers." | |
def __init__(self, agent, consumer, token=None, | |
signatureMethod=defaultSignature): | |
self.agent = agent | |
self.consumer = consumer | |
self.token = token | |
self.signatureMethod = signatureMethod | |
def request(self, method, uri, headers=None, bodyProducer=None, | |
parameters=None): | |
"""Make a request, optionally signing it. | |
Any query string passed in `uri` will get clobbered by the urlencoded | |
version of `parameters`. | |
""" | |
if headers is None: | |
headers = Headers() | |
if parameters is None: | |
parameters = {} | |
req = oauth2.Request.from_consumer_and_token( | |
self.consumer, token=self.token, | |
http_method=method, http_url=uri, parameters=parameters, | |
is_form_encoded=True) | |
req.sign_request(self.signatureMethod, self.consumer, self.token) | |
for header, value in req.to_header().iteritems(): | |
# oauth2, for some bozotic reason, gives unicode header values | |
headers.addRawHeader(header, value.encode()) | |
parsed = urlparse.urlparse(uri) | |
uri = urlparse.urlunparse(parsed._replace(query=urllib.urlencode(req.get_nonoauth_parameters()))) | |
return self.agent.request(method, uri, headers, bodyProducer) | |
class TwitterStream(LineOnlyReceiver, TimeoutMixin): | |
"Receive a stream of JSON in twitter's weird streaming format." | |
def __init__(self, delegate, timeoutPeriod=60): | |
self.delegate = delegate | |
self.timeoutPeriod = timeoutPeriod | |
self.deferred = defer.Deferred(self._cancel) | |
self._done = False | |
def connectionMade(self): | |
"Start the timeout once the connection has been established." | |
self.setTimeout(self.timeoutPeriod) | |
LineOnlyReceiver.connectionMade(self) | |
def _cancel(self, ign): | |
"A Deferred canceler that drops the connection." | |
if self._done: | |
return | |
self._done = True | |
self.transport.stopProducing() | |
self.deferred.errback(defer.CancelledError()) | |
def dataReceived(self, data): | |
"Reset the timeout and parse the received data." | |
self.resetTimeout() | |
LineOnlyReceiver.dataReceived(self, data) | |
def lineReceived(self, line): | |
"Ignoring empty-line keepalives, inform the delegate about new data." | |
if not line: | |
return | |
obj = json.loads(line) | |
try: | |
self.delegate(obj) | |
except: | |
log.err(None, 'error in stream delegate %r' % (self.delegate,)) | |
def timeoutConnection(self): | |
"We haven't received data in too long, so drop the connection." | |
if self._done: | |
return | |
self._done = True | |
self.transport.stopProducing() | |
self.deferred.errback(TimeoutError()) | |
def connectionLost(self, reason): | |
"Report back how the connection was lost unless we already did." | |
self.setTimeout(None) | |
if self._done: | |
return | |
self._done = True | |
if reason.check(ResponseDone, PotentialDataLoss): | |
self.deferred.callback(None) | |
else: | |
self.deferred.errback(reason) | |
class Twitter(object): | |
"Close to the most minimal twitter interface ever." | |
def __init__(self, agent, twitterAPI=defaultTwitterAPI, | |
streamingAPI=defaultStreamingAPI): | |
self.agent = agent | |
self.twitterAPI = twitterAPI | |
self.streamingAPI = streamingAPI | |
def _makeRequest(self, whichAPI, method, resource, parameters): | |
d = self.agent.request(method, urlparse.urljoin(whichAPI, resource), | |
parameters=parameters) | |
d.addCallback(trapBadStatuses) | |
return d | |
def request(self, resource, method='GET', as_json=True, **parameters): | |
"""Make a GET request from the twitter 1.1 API. | |
`resource` is the part of the resource URL not including the API URL, | |
e.g. 'statuses/show.json'. As everything gets decoded by `json.loads`, | |
this should always end in '.json'. Any parameters passed in as keyword | |
arguments will be added to the URL as the query string. The `Deferred` | |
returned will fire with the decoded JSON. | |
""" | |
d = self._makeRequest(self.twitterAPI, method, resource, parameters) | |
d.addCallback(receive, StringReceiver()) | |
if as_json: | |
d.addCallback(json.loads) | |
return d | |
def stream(self, resource, delegate, **parameters): | |
"""Receive from the twitter 1.1 streaming API. | |
`resource` and keyword arguments are treated the same as the in | |
`request`, and `delegate` will be called with each JSON object which is | |
received from the stream. The `Deferred` returned will fire when the | |
stream has ended. | |
""" | |
d = self._makeRequest(self.streamingAPI, 'GET', resource, parameters) | |
d.addCallback(receive, TwitterStream(delegate)) | |
return d | |
class StreamPreserver(Service): | |
"Keep a stream connected as a service." | |
def __init__(self, twitter, resource, **parameters): | |
self.twitter = twitter | |
self.resource = resource | |
self.parameters = parameters | |
self._streamDone = None | |
self._delegates = set() | |
def __repr__(self): | |
return '<StreamPreserver %#x for %r/%r>' % ( | |
id(self), self.resource, self.parameters) | |
def _connectStream(self, r): | |
if isinstance(r, failure.Failure) and r.check(defer.CancelledError): | |
log.msg('not reconnecting twitter stream %r' % self) | |
return | |
log.msg('reconnecting twitter stream %r' % self) | |
self._streamDelegate({'started_connecting': True}) | |
d = self._streamDone = self.twitter.stream( | |
self.resource, self._streamDelegate, **self.parameters) | |
d.addBoth(self._connectStream) | |
d.addErrback(log.err, 'error reading from twitter stream %r' % self) | |
return r | |
def _streamDelegate(self, data): | |
for delegate in self._delegates: | |
try: | |
delegate(data) | |
except Exception: | |
log.err(None, 'error calling delegate %r' % (delegate,)) | |
def addDelegate(self, delegate): | |
"Add a delegate to receive stream data." | |
self._delegates.add(delegate) | |
def removeDelegate(self, delegate): | |
"Remove a previously-added stream data delegate." | |
self._delegates.discard(delegate) | |
def startService(self): | |
"Start reading from the stream." | |
if not self.running: | |
self._connectStream(None) | |
Service.startService(self) | |
def stopService(self): | |
"Stop reading from the stream." | |
ret = None | |
if self.running and self._streamDone is not None: | |
self._streamDone.cancel() | |
ret = self._streamDone | |
Service.startService(self) | |
return ret | |
entityReplacements = [ | |
('media', 'media_url_https'), | |
('urls', 'expanded_url'), | |
] | |
# SERIOUSLY why the FUCK do I have to do this | |
dumbCrapReplacements = { | |
'&': '&', | |
'<': '<', | |
'>': '>', | |
} | |
dumbCrapRegexp = re.compile( | |
'|'.join(re.escape(s) for s in dumbCrapReplacements)) | |
def extractRealTwitText(twit): | |
"Oh my god why is this necessary." | |
if 'retweeted_status' in twit: | |
rt = twit['retweeted_status'] | |
return u'RT @%s: %s' % ( | |
rt['user']['screen_name'], extractRealTwitText(rt)) | |
replacements = sorted( | |
(entity['indices'], entity[replacement]) | |
for entityType, replacement in entityReplacements | |
if entityType in twit['entities'] | |
for entity in twit['entities'][entityType]) | |
mutableText = list(twit['text']) | |
for (l, r), replacement in reversed(replacements): | |
mutableText[l:r] = replacement | |
text = u''.join(mutableText) | |
return dumbCrapRegexp.sub(lambda m: dumbCrapReplacements[m.group()], text) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment