Last active
April 27, 2018 08:55
-
-
Save nbraud/900f5ee5718e3277d113b242b6fd71ea to your computer and use it in GitHub Desktop.
Tweet dump
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/keys.py | |
__pycache__/ | |
*.state | |
*.xz |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import cbor, lzma, tweepy, sys | |
def dump_user(api, user, since=None): | |
user_id = api.get_user(user).id | |
if since: | |
return tweepy.Cursor(api.user_timeline, | |
id=user_id, since_id=since).items() | |
else: | |
return tweepy.Cursor(api.user_timeline, id=user_id).items() | |
def process(state, out, tweet, flush=False): | |
from util import debug | |
debug('process', tweet) | |
cbor.dump(tweet._json, out) | |
user = tweet.user.screen_name | |
if state[user]: | |
state[user] = max(state[user], tweet.id) | |
else: | |
state[user] = tweet.id | |
if flush: | |
out.flush() | |
def argparser(): | |
import argparse | |
from util import OverwriteablePath | |
parser = argparse.ArgumentParser( | |
description="Dump statuses from Tweeter." | |
) | |
parser.add_argument('--state', '-s', | |
help='State file (to support resumption)', | |
type=OverwriteablePath) | |
parser.add_argument('--follow', '-f', | |
action='store_true', | |
help='Wait for new tweets.') | |
parser.add_argument('--output', '-o', | |
default=sys.stdout.buffer, | |
help='Output file/stream (default: stdout)', | |
type=argparse.FileType('ab')) | |
parser.add_argument('user', nargs='*', | |
help='User(s) to follow') | |
return parser | |
if __name__ == "__main__": | |
from keys import consumer, access | |
from atomicwrites import atomic_write | |
import os.path | |
auth = tweepy.OAuthHandler(*consumer) | |
auth.set_access_token(*access) | |
api = tweepy.API(auth) | |
args = argparser().parse_args() | |
state = { user: None for user in args.user } | |
if args.state and os.path.exists(args.state): | |
with open(args.state, 'rb') as f: | |
state.update(cbor.load(f)) | |
with lzma.open(args.output, "a", check=lzma.CHECK_SHA256) as out: | |
try: | |
for user, last_id in state.items(): | |
for tweet in dump_user(api, user, since=last_id): | |
process(state, out, tweet) | |
if args.follow: | |
from util import StreamTweets, debug | |
from sys import stderr | |
out.flush() | |
print('Done fetching old tweets, now streaming.', | |
file=stderr, flush=True) | |
for tweet in StreamTweets(api, state.keys()): | |
debug('main/for:', tweet) | |
process(state, out, tweet, flush=True) | |
finally: | |
if args.state: | |
with atomic_write(args.state, mode='wb', overwrite=True) as f: | |
cbor.dump(state, f) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
consumer = ("consumer token", | |
"consumer secret") | |
access = ("OAuth access token", | |
"OAuth secret") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import cbor, lzma, sys | |
def cbors(input): | |
try: | |
while True: | |
yield cbor.load(input) | |
except EOFError: | |
return | |
def argparser(): | |
import argparse | |
from jsonpath_rw import jsonpath as jp | |
from jsonpath_rw import parse as jparse | |
parser = argparse.ArgumentParser( | |
description="Extract data from Twitter dumps." | |
) | |
parser.add_argument('--path', '-p', | |
default=jp.Fields('text'), | |
help='The JSONpath to extract (default: text)', | |
type=jparse) | |
parser.add_argument('file', nargs='*', | |
default=[sys.stdin.buffer], | |
help='Compressed file(s) to process (default: stdin)', | |
type=argparse.FileType('rb')) | |
return parser | |
if __name__ == "__main__": | |
args = argparser().parse_args() | |
for in_file in args.file: | |
with lzma.open(in_file, "r") as inp: | |
for i, tweet in enumerate(cbors(inp)): | |
print(in_file.name, i, | |
[ '{0.full_path}: {0.value}'.format(match) | |
for match in args.path.find(tweet) | |
] | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/sh -e | |
if [ "$#" -ne 1 ]; then | |
echo "Usage: $0 twitter_handle" >&2 | |
exit 1 | |
fi | |
./dump.py -s "$1.state" "$1" | \ | |
tee -a "$1.xz" | \ | |
./load.py |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def OverwriteablePath(path): | |
import os | |
from argparse import ArgumentError | |
path = os.path.abspath(path) | |
dir = os.path.dirname(path) | |
if os.path.exists(path): | |
if not os.access(path, os.R_OK | os.W_OK): | |
raise ArgumentError("File cannot be read or written to.") | |
if not os.path.exists(dir): | |
raise ArgumentError("Parent directory does not exist.") | |
if not os.access(dir, os.R_OK | os.W_OK | os.X_OK): | |
raise ArgumentError("Parent directory not accessible.") | |
return path | |
def StreamTweets(api, follows): | |
import tweepy | |
from queue import Queue | |
q = Queue() | |
follows = list(map(lambda user: api.get_user(user).id_str, follows)) | |
class Producer(tweepy.StreamListener): | |
def on_status(self, status): | |
debug('on_status:', status) | |
q.put(status) | |
def on_error(self, status): | |
debug('on_error:', status) | |
stream = tweepy.Stream(api.auth, listener=Producer()) | |
debug('stream:', stream) | |
debug('follow:', follows) | |
stream.filter(follow=follows, async=True) | |
while True: | |
debug('q.get()') | |
yield q.get() | |
import os | |
if 'DEBUG' in os.environ: | |
def debug(txt, obj=None): | |
from tweepy import Status | |
from sys import stderr | |
if isinstance(obj, Status): | |
print(txt, obj.created_at, obj.text, file=stderr, flush=True) | |
elif obj is not None: | |
print(txt, obj, file=stderr, flush=True) | |
else: | |
print(txt, file=stderr, flush=True) | |
else: | |
def debug(txt, obj=None): | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment