Skip to content

Instantly share code, notes, and snippets.

@dfm
Created November 30, 2012 21:18
Show Gist options
  • Select an option

  • Save dfm/4178703 to your computer and use it in GitHub Desktop.

Select an option

Save dfm/4178703 to your computer and use it in GitHub Desktop.
Sometimes I do evil things...
#!/usr/bin/env python
from __future__ import print_function
import os
import re
import time
import requests
resume_re = re.compile(r".*<resumptionToken.*?>(.*?)</resumptionToken>.*")
url = "http://export.arxiv.org/oai2"
def get(basepath=u".", max_tries=10):
"""
Get all the listings from the ArXiv.
"""
req = {u"verb": "ListRecords",
u"metadataPrefix": u"arXivRaw"}
failures = 0
count = 0
while True:
# Send the request.
r = requests.post(url, data=req)
# Handle the response.
code = r.status_code
if code == 503:
# Asked to retry
to = int(r.headers["retry-after"])
print(u"Got 503. Retrying after {0:d} seconds.".format(to))
time.sleep(to)
failures += 1
if failures >= max_tries:
print(u"Failed too many times...")
break
elif code == 200:
failures = 0
# Write to file.
content = r.text
count += 1
fn = os.path.join(basepath, u"raw-{0:08d}.xml".format(count))
print(u"Writing to: {0}".format(fn))
with open(fn, u"w") as f:
f.write(content)
# Look for a resumption token.
token = resume_re.search(content)
if token is None:
break
token = token.groups()[0]
# If there isn't one, we're all done.
if token == "":
print(u"All done.")
break
print(u"Resumption token: {0}.".format(token))
# If there is a resumption token, rebuild the request.
req = {u"verb": u"ListRecords",
u"resumptionToken": token}
# Pause so as not to get banned.
to = 20
print(u"Sleeping for {0:d} seconds so as not to get banned."
.format(to))
time.sleep(to)
else:
# Wha happen'?
r.raise_for_status()
if __name__ == "__main__":
import sys
if len(sys.argv) == 1:
bp = u"."
else:
bp = sys.argv[1]
get(basepath=bp)
#!/usr/bin/env python
from __future__ import print_function
import time
import json
import requests
from requests.auth import OAuth1
url = u"https://stream.twitter.com/1/statuses/filter.json"
client_key = u"<REDACTED>"
client_secret = u"<REDACTED>"
user_key = u"<REDACTED>"
user_secret = u"<REDACTED>"
def monitor(kw):
wait = 0
auth = OAuth1(client_key, client_secret, user_key, user_secret)
while 1:
try:
try:
r = requests.post(url, data={"track": kw}, auth=auth,
prefetch=False, timeout=90)
except requests.exceptions.ConnectionError:
print("request failed.")
wait = min(wait + 0.25, 16)
else:
code = r.status_code
print("{0} returned: {1}".format(url, code))
if code == 200:
wait = 0
try:
for line in r.iter_lines():
if line:
tweet = json.loads(line)
fn = "tweets/{0}.json".format(tweet["id_str"])
with open(fn, "w") as f:
f.write(line)
except requests.exceptions.Timeout:
print("request timed out.")
except Exception as e:
print("failed with {0}".format(e))
elif code == 420:
if wait == 0:
wait = 60
else:
wait *= 2
elif code in [401, 403, 404, 500]:
if wait == 0:
wait = 5
else:
wait = min(wait * 2, 320)
else:
r.raise_for_status()
except KeyboardInterrupt:
print("Exiting.")
break
time.sleep(wait)
if __name__ == "__main__":
import sys
if len(sys.argv) > 1:
kw = u",".join(sys.argv[1:])
else:
kw = u"arxiv"
monitor(kw)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment