Skip to content

Instantly share code, notes, and snippets.

@glciampaglia
Created November 12, 2018 19:27
Show Gist options
  • Save glciampaglia/2ec146be16796fbac8821d27739f530e to your computer and use it in GitHub Desktop.
Save glciampaglia/2ec146be16796fbac8821d27739f530e to your computer and use it in GitHub Desktop.
Client for the filter endpoint of the Twitter Streaming API
#!/usr/bin/env python
# vim: set sw=4 sts=4 expandtab:
# author: Giovanni Luca Ciampaglia <[email protected]>
# TODO: set request timeout to 90 seconds to manage stalls and implement
# backoff strategies as documented here:
# https://dev.twitter.com/streaming/overview/connecting (see Stalls and
# Reconnecting)
#
# and also see:
# http://docs.python-requests.org/en/latest/user/quickstart/#timeouts
""" Use the stream API to track tweets """
from __future__ import print_function
try:
import ujson as json
except ImportError:
import json
import sys
import requests
import logging
import time
import gzip
import socket
from requests_oauthlib import OAuth1
from argparse import ArgumentParser, FileType
url = 'https://stream.twitter.com/1.1/statuses/filter.json'
log_fmt = '%(asctime)s: %(threadName)s: %(levelname)s: %(message)s'
# all times are in seconds. Start with time, then depending on whether
# kind is linear or exponential either add step or multiply by factor.
# Raise a fatal error after waiting max.
backoff_params = {
'tcp': {
'time': 0,
'kind': 'linear',
'step': .250,
'max': 16
},
'http': {
'time': 5,
'kind': 'exponential',
'factor': 2,
'max': 320
},
'http_420': {
'time': 60,
'kind': 'exponential',
'factor': 2,
'max': 600
}
}
class TwitterStreamError(Exception):
pass
class TwitterStream(object):
def __init__(self, f, outpath, params):
self.outpath = outpath
self.params = params
self.credentials = json.load(f)
self._stall_timeout = 90
self._backoff_sleep = None # if not None, we are backing off
self._backoff_strategy = None
self._conn_timeout_sleep = .5
self._counter = 0 # overall counter of processed tweets
self._backoff_params = backoff_params
logging.info("appending to: {}".format(self.outpath))
logging.info("follow: {follow}".format(**params))
logging.info("track: {track}".format(**params))
logging.info("locations: {locations}".format(**params))
logging.info("stall_warnings: {stall_warnings}".format(**params))
logging.info("delimited: {delimited}""".format(**params))
def _append(self):
if self.outpath == '-':
self.outfp = sys.stdout
elif self.outpath.endswith("gz"):
self.outfp = gzip.GzipFile(filename=self.outpath, mode='a')
else:
self.outfp = open(self.outpath, 'a')
def _authenticate(self):
"""
Authenticate and return a requests client object.
"""
c = self.credentials
oauth = OAuth1(client_key=c['consumer_key'],
client_secret=c['consumer_secret'],
resource_owner_key=c['access_token'],
resource_owner_secret=c['access_token_secret'],
signature_type='auth_header')
self.client = requests.session()
self.client.auth = oauth
def _backoff(self, strategy):
"""
See https://dev.twitter.com/streaming/overview/connecting (Stalls and Reconnecting)
A strategy defines a set of parameters for the backoff, including the initial time,
the way it increases the sleep period (linear or exponential), and a
maximum time after which it's better to just raise a fatal error.
"""
try:
params = self._backoff_params[strategy]
except KeyError:
raise ValueError("unknown strategy: {}".format(strategy))
if self._backoff_sleep is None or self._backoff_strategy != strategy:
# start with initial time if first backoff or if strategy has changed
self._backoff_sleep = params['time']
self._backoff_strategy = strategy
else:
# continue with previous strategy
if self._backoff_sleep >= params['max']:
logging.error("Reached maximum backoff time. Raising fatal error!")
raise TwitterStreamError()
if params['kind'] == 'linear':
self._backoff_sleep += params['step']
else:
self._backoff_sleep *= params['factor']
logging.warn("Sleeping {:.2f}s as part of {} backoff.".format(self._backoff_sleep, params['kind']))
time.sleep(self._backoff_sleep)
def _reset_backoff(self):
self._backoff_sleep = None
self._backoff_strategy = None
def stream(self):
logging.info("Started streaming.")
try:
while True:
try:
self._append()
self._authenticate()
stream = self.client.post(url, data=self.params, stream=True, timeout=self._stall_timeout)
data_lines = 0 # includes keepalives
for line in stream.iter_lines():
data_lines += 1
if line:
self.outfp.write(line + "\n")
self._counter += 1
if self._counter % 1000 == 0:
logging.info("{} tweets.".format(self._counter))
if data_lines >= 8:
# reset backoff status if received at least 8 data
# lines (including keep-alive newlines). Stream
# seems to send at least 8 keepalives, regardless
# of whether authentication was successful or not.
logging.debug("Reset backoff")
self._reset_backoff()
data_lines = 0
logging.warn("Backing off..")
self._backoff('tcp')
except requests.exceptions.ConnectTimeout:
# wait just a (small) fixed amount of time and try to
# reconnect.
msg = "Timeout while trying to connect to server. Retrying in {}s.."
logging.warn(msg.format(self._conn_timeout_sleep))
time.sleep(self._conn_timeout_sleep)
except requests.Timeout:
# catching requests.Timeout instead of requests.ReadTimeout
# because we are setting a timeout parameter in the POST
msg = "Server did not send any data for {}s. Backing off.."
logging.warn(msg.format(self._stall_timeout))
self._backoff('tcp')
except requests.ConnectionError:
logging.warn("Reconnecting to stream endpoint...")
self._backoff('tcp')
except socket.error, e:
msg = "Socket error {}: {}. Reconnecting to stream endpoint..."
logging.warn(msg.format(e.errno, e.message))
self._backoff('tcp')
except requests.HTTPError, e:
if e.response.status_code == 420:
msg = "Got HTTP 420 Error. Backing off.."
logging.warn(msg)
self._backoff("http_420")
else:
msg = "Got HTTP Error. Backing off.."
logging.warn(msg)
self._backoff("http")
except KeyboardInterrupt:
logging.info("got ^C from user. Exit.")
return
finally:
# catch any fatal error (including TwitterStreamError we raise if
# backoff reaches maximum sleep time)
stream.close()
try:
self.outfp.close()
except IOError, e:
if e.errno == 32: # Broken Pipe
pass
parser = ArgumentParser(description=__doc__)
parser.add_argument("key_file", metavar="key", help="key JSON file",
type=FileType())
parser.add_argument("output_path", metavar="output",
help="output file (append mode)")
parser.add_argument("-f", "--follow",
help="comma-separated list of user IDs")
parser.add_argument("-t", "--track",
help="keywords to track")
parser.add_argument("-l", "--locations",
help="set of bounding boxes")
parser.add_argument("-d", "--delimited", action="store_const",
const="length")
parser.add_argument("-s", "--stall-warnings", action="store_const",
const="true")
parser.add_argument('-D', '--debug', action='store_const',
help='print debug messages', dest='logging_level',
default=logging.INFO, const=logging.DEBUG)
if __name__ == '__main__':
args = parser.parse_args()
logging.basicConfig(format=log_fmt, level=args.logging_level)
if (args.follow is None) \
and (args.track is None) \
and (args.locations is None):
parser.error("please specify at least one of follow/track/locations")
params = dict(args._get_kwargs())
del params['key_file']
del params['output_path']
del params['logging_level']
streamer = TwitterStream(args.key_file, args.output_path, params)
streamer.stream()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment