Last active
June 14, 2017 02:25
-
-
Save flibbertigibbet/8092373 to your computer and use it in GitHub Desktop.
Checks for new GTFS feeds, then downloads and validates them. Fetches feeds for SEPTA, NYC MTA, NJ TRANSIT, CTTRANSIT, Delaware First State, NJ/NY PATH, and PATCO. Dependencies: Python requests, BeautifulSoup, git, and Google's transit feed validator. NJ TRANSIT developer login credentials required to download from that agency. Cannot check for …
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import requests, os, pickle, datetime, zipfile, subprocess, csv | |
from bs4 import BeautifulSoup | |
class FeedFetcher(): | |
def __init__(self, ddir=os.getcwd(), get_nj=True, nj_username='', nj_pass=''): | |
self.ddir = ddir | |
self.get_nj = get_nj # whether to fetch from NJ TRANSIT or not | |
self.tc = {} # time checks for GTFS fetches | |
self.new_use = [] # list of new feeds successfully downloaded and validated | |
self.nj_payload = {'userName': nj_username, 'password': nj_pass} | |
if not nj_username or not nj_pass: | |
self.get_nj = False | |
print("Cannot fetch from NJ TRANSIT without nj_username and nj_pass specified.") | |
def verify(self, file_name): | |
# file_name is local to download directory | |
f = os.path.join(self.ddir, file_name) | |
if not os.path.isfile(f): | |
print("File " + f + " not found; cannot verify it.") | |
return False | |
print("Validating feed in " + file_name + "...") | |
try: | |
p = subprocess.Popen(['feedvalidator.py', '--output=CONSOLE', | |
'-m', '-n', f], stdout=subprocess.PIPE) | |
out = p.communicate() | |
res = out[0].split('\n') | |
errct = res[-2:-1][0] # output line with count of errors/warnings | |
if errct.find('error') > -1: | |
print("Feed validator found errors in " + file_name + ": " + errct + ".") | |
return False | |
elif out[0].find('this feed is in the future,') > -1: | |
print("Feed validator found GTFS not in service until future for " + | |
file_name + ".") | |
return False | |
else: | |
if errct.find('successfully') > -1: | |
print("Feed " + file_name + " looks great: " + errct + ".") | |
else: | |
# have warnings | |
print("Feed " + file_name + " looks ok: " + errct[7:] + ".") | |
return True | |
except: | |
print("Failed to run feed validator on GTFS " + file_name + ".") | |
return False | |
print("How did we get here? In GTFS validation for " + file_name + ".") | |
return False # shouldn't get here | |
def verify_all(self): | |
# just run verify on all feeds in log file | |
time_check = os.path.join(self.ddir, 'time_checks.p') | |
if os.path.isfile(time_check): | |
tcf = open(time_check, 'rb') | |
self.tc = pickle.load(tcf) | |
tcf.close() | |
print("Loaded time check file.") | |
if self.tc.has_key('last_check'): | |
last_check = self.tc['last_check'] | |
print("Last check: ") | |
print(last_check) | |
timedelt = datetime.datetime.now() - last_check | |
print("Time since last check: " ) | |
print(timedelt) | |
else: | |
print("Couldn't find last check time in log file; that's odd.") | |
gtfs = self.tc | |
del gtfs['last_check'] | |
for g in gtfs: | |
if g.endswith('.zip'): | |
if g.startswith('septa'): | |
self.verify('google_bus.zip') | |
self.verify('google_rail.zip') | |
else: | |
self.verify(g) | |
else: | |
print("What is " + g + "? That doesn't look like a GTFS file name.") | |
print("All done verifying!") | |
else: | |
print("No log file found! Can't verify GTFS.") | |
def check_header_newer(self, url, file_name): | |
# return 1 if newer file available to download; | |
# return 0 if info missing; | |
# return -1 if current file is most recent. | |
if self.tc.has_key(file_name): | |
last_info = self.tc.get(file_name) | |
hdr = requests.head(url) | |
hdr = hdr.headers | |
if hdr.get('last-modified'): | |
last_mod = hdr.get('last-modified') | |
if last_mod == last_info: | |
print("No new download available for " + file_name + ".") | |
return -1 | |
else: | |
print("New download available for " + file_name + ".") | |
print("Last downloaded: " + last_info + ".") | |
print("New download posted: " + last_mod + ".") | |
return 1 | |
else: | |
print("No last-modified header set for " + file_name + " download link.") | |
return 0 | |
else: | |
print("Time check entry for " + file_name + " not found.") | |
return 0 | |
# shouldn't happen | |
print("How did we get here? Failed checking header info.") | |
return 0 | |
def get_stream(self, url, file_name, do_stream=True, session=None, do_verify=True): | |
if self.check_header_newer(url, file_name) == -1: | |
return False | |
# file_name is local to download directory | |
f = os.path.join(self.ddir, file_name) | |
print("Getting file " + f + "...") | |
if not session: | |
stream = requests.get(url, stream=do_stream) | |
else: | |
stream = session.get(url, stream=do_stream) | |
if stream.ok: | |
stream_file = open(f, 'wb') | |
if do_stream: | |
for chunk in stream.iter_content(): | |
stream_file.write(chunk) | |
else: | |
stream_file.write(stream.content) | |
stream_file.close() | |
info = os.stat(f) | |
if info.st_size < 10000: | |
# file smaller than 10K may not be a GTFS; just warn | |
print('Warning:') | |
print("Download for " + f + " is only " + str(info.st_size) + " bytes.") | |
print("It may not be a valid GTFS.") | |
if not zipfile.is_zipfile(f): | |
print("BAD DOWNLOAD FOR " + f + ".") | |
print("Download for " + f + " is not a zip file.") | |
return False | |
if stream.headers.get('last-modified'): | |
self.tc[file_name] = stream.headers.get('last-modified') | |
else: | |
# format like last-modified header | |
self.tc[file_name] = datetime.datetime.utcnow( | |
).strftime("%a, %d %b %Y %H:%M:%S GMT") | |
print("Download completed successfully.") | |
# verify download | |
if do_verify: | |
if self.verify(file_name): | |
print("GTFS verification succeeded.") | |
self.new_use.append(file_name) | |
return True | |
else: | |
print("GTFS verification failed.") | |
return False | |
else: | |
print("Skipping GTFS verification in get_stream.") | |
# not adding to new_use here; do elsewhere | |
return True | |
else: | |
print("DOWNLOAD FAILED FOR " + f + ".") | |
return False | |
def fetch(self): | |
# pickled log of last times downloaded | |
time_check = os.path.join(self.ddir, 'time_checks.p') | |
if os.path.isfile(time_check): | |
tcf = open(time_check, 'rb') | |
self.tc = pickle.load(tcf) | |
tcf.close() | |
print("Loaded time check file.") | |
if self.tc.has_key('last_check'): | |
last_check = self.tc['last_check'] | |
print("Last check: ") | |
print(last_check) | |
timedelt = datetime.datetime.now() - last_check | |
print("Time since last check: " ) | |
print(timedelt) | |
else: | |
print("Will create new time check file.") | |
self.tc['last_check'] = datetime.datetime.now() | |
######### NJ TRANSIT SESSION ###################### | |
if self.get_nj: | |
nj_login_url = 'https://www.njtransit.com/mt/mt_servlet.srv?hdnPageAction=MTDevLoginSubmitTo' | |
s = requests.Session() | |
s.post(nj_login_url, data=self.nj_payload) | |
nj_rail_url = 'https://www.njtransit.com/mt/mt_servlet.srv?hdnPageAction=MTDevResourceDownloadTo&Category=rail' | |
nj_bus_url = 'https://www.njtransit.com/mt/mt_servlet.srv?hdnPageAction=MTDevResourceDownloadTo&Category=bus' | |
self.get_stream(nj_rail_url, 'nj_rail.zip', session=s) | |
self.get_stream(nj_bus_url, 'nj_bus.zip', session=s) | |
s.close() | |
else: | |
print("Skipping NJ data fetch.") | |
########## END NJ TRANSIT SESSION ################## | |
### SEPTA ##### | |
# get html page and check last updated date | |
# (Last-Modified header not present) | |
get_septa = True | |
septa_file = 'septa.zip' | |
page = requests.get('http://www2.septa.org/developer/') | |
print("Checking last SEPTA update time...") | |
last_mod = datetime.datetime.now().strftime("%a, %d %b %Y %H:%M:%S") | |
if page.ok: | |
bs = BeautifulSoup(page.text) | |
last_mod = bs.find('div', 'col_content').find('p').text | |
print(last_mod) | |
last_mod = last_mod[14:] # trim out date string | |
if self.tc.has_key(septa_file): | |
got_last = self.tc.get(septa_file) | |
if got_last == last_mod: | |
print("No new download available for SEPTA.") | |
get_septa = False | |
else: | |
print("New SEPTA download available.") | |
print("Latest update: " + last_mod + ".") | |
print("Previous update: " + got_last + ".") | |
else: | |
print("No previous SEPTA download found.") | |
else: | |
print('failed to get SEPTA dowload info page.') | |
if get_septa: | |
if self.get_stream('http://www2.septa.org/developer/download.php', | |
septa_file, do_verify=False): | |
# extract bus and rail GTFS files from downloaded zip | |
z = zipfile.ZipFile(septa_file) | |
if len(z.namelist()) == 2: | |
z.extractall(path=self.ddir) | |
self.tc[septa_file] = last_mod | |
z.close() | |
# verify extracted GTFS zips | |
if os.path.isfile('google_bus.zip') and os.path.isfile('google_rail.zip'): | |
if self.verify('google_bus.zip'): | |
self.new_use.append('google_bus.zip') | |
if self.verify('google_rail.zip'): | |
self.new_use.append('google_rail.zip') | |
print("SEPTA GTFS files passed verification.") | |
else: | |
print("SEPTA rail GTFS verification failed.") | |
else: | |
print("SEPTA bus GTFS verification failed.") | |
else: | |
print("Could not find SEPTA GTFS files with expected names.") | |
else: | |
print("Unexpected contents in SEPTA zip file download:") | |
print(z.namelist()) | |
print('Expected two files, but got %d.' % len(z.namelist())) | |
print("Not extracting SEPTA zip.\n") | |
z.close() | |
# delete download file once the two GTFS zips in it are extracted | |
os.remove(septa_file) | |
############### | |
####### CT Transit ######## | |
# feeds for Hartford, New Haven, Stamford, Waterbury, New Britain, Meridien, | |
# and Shore Line East. Shore Line East has its own URL. | |
ct_suffixes = {'Hartford': 'ha', 'New Haven': 'nh', 'Stamford': 'stam', | |
'Waterbury': 'wat', 'New Britain': 'nb', 'Meridien': 'me'} | |
for ct in ct_suffixes: | |
print(ct) | |
url = 'http://www.cttransit.com/uploads_GTFS/google%s_transit.zip' % ct_suffixes[ct] | |
filename = 'ct_%s.zip' % ct_suffixes[ct] | |
self.get_stream(url, filename, do_stream=True) | |
print('Shore Line East') | |
self.get_stream('http://www.shorelineeast.com/google_transit.zip', | |
'ct_shoreline_east.zip', do_stream=True) | |
########################### | |
######## MTA ############## | |
nyc_base_url = 'http://web.mta.info/developers/data/' | |
nyc_sub_url = '%snyct/subway/google_transit.zip' % nyc_base_url | |
bronx_bus_url = '%snyct/bus/google_transit_bronx.zip' % nyc_base_url | |
brooklyn_bus_url = '%snyct/bus/google_transit_brooklyn.zip' % nyc_base_url | |
manhattan_bus_url = '%snyct/bus/google_transit_manhattan.zip' % nyc_base_url | |
queens_bus_url = '%snyct/bus/google_transit_queens.zip' % nyc_base_url | |
staten_bus_url = '%snyct/bus/google_transit_staten_island.zip' % nyc_base_url | |
lirr_url = '%slirr/google_transit.zip' % nyc_base_url | |
metro_north_url = '%smnr/google_transit.zip' % nyc_base_url | |
busco_url = '%sbusco/google_transit.zip' % nyc_base_url | |
nyc_info = {'nyc_subway.zip': nyc_sub_url, 'bronx.zip': bronx_bus_url, | |
'brooklyn.zip': brooklyn_bus_url, 'manhattan.zip': manhattan_bus_url, | |
'queens.zip': queens_bus_url, 'staten.zip': staten_bus_url, 'lirr.zip': lirr_url, | |
'metro_north.zip': metro_north_url, 'busco.zip': busco_url } | |
for nyc in nyc_info: | |
self.get_stream(nyc_info[nyc], nyc, do_stream=False) | |
########################### | |
#### PATH NJ/NY ########## | |
path_url = 'http://trilliumtransit.com/transit_feeds/path-nj-us/gtfs.zip' | |
self.get_stream(path_url, 'path.zip') | |
############################ | |
#### DELAWARE FIRST STATE ###### | |
de_url = 'http://dartfirststate.com/information/routes/gtfs_data/dartfirststate_de_us.zip' | |
self.get_stream(de_url, 'dart.zip') | |
################################ | |
########################### | |
# check PATCO repo for updates | |
got_repo = False | |
just_cloned = False | |
patco_filename = 'patco.zip' | |
start_dir = os.getcwd() | |
try: | |
os.chdir('patco-gtfs') | |
got_repo = True | |
except: | |
print("Could not find patco-gtfs repository directory. Checking out now...") | |
if not got_repo: | |
try: | |
subprocess.check_call(['git', 'clone', | |
'https://github.com/flibbertigibbet/patco-gtfs.git']) | |
os.chdir('patco-gtfs') | |
got_repo = just_cloned = True | |
except: | |
got_repo = just_cloned = False | |
print("Failed to check out patco-gtfs repository. Not getting PATCO GTFS.") | |
try: | |
behind = '' | |
if got_repo and not just_cloned: | |
subprocess.check_call(['git', 'fetch']) | |
# check if ahead of remote | |
#p = subprocess.Popen(['git', 'rev-list', 'HEAD@{upstream}..HEAD'], | |
# stdout=subprocess.PIPE) | |
#ahead = p.communicate() | |
#if ahead: | |
# print("Changes on local patco-gtfs repo not pushed upstream.") | |
# print("Not getting PATCO GTFS.") | |
#else: | |
# number of commits behind remote | |
p = subprocess.Popen(['git', 'rev-list', 'HEAD..HEAD@{upstream}'], | |
stdout=subprocess.PIPE) | |
behind = p.communicate()[0] | |
if behind: | |
print("New changes found for patco-gtfs repo. Merging...") | |
subprocess.check_call(['git', 'pull']) | |
else: | |
print("No new changes found for patco-gtfs repository.") | |
if just_cloned or behind: | |
print("Building new PATCO GTFS...") | |
subprocess.check_call(['python', 'make_calendar_dates.py']) | |
subprocess.check_call(['python', 'make_fare_rules.py']) | |
subprocess.check_call(['python', 'make_trips_stops.py']) | |
os.chdir('gtfs_files') | |
# zip it | |
z = zipfile.ZipFile(open(os.path.join(start_dir, patco_filename), | |
'wb'), mode='w') | |
with z: | |
for p, d, fs in os.walk(os.getcwd()): | |
for f in fs: | |
if f.endswith('.txt'): | |
z.write(f) | |
z.close() | |
print("New PATCO GTFS written. Validating...") | |
if self.verify(patco_filename): | |
print("New PATCO GTFS is valid.") | |
# format like last-modified header | |
self.tc[patco_filename] = datetime.datetime.utcnow( | |
).strftime("%a, %d %b %Y %H:%M:%S GMT") | |
self.new_use.append(patco_filename) | |
else: | |
print("New PATCO GTFS is invalid.") | |
except: | |
print("Encountered an error while fetching/creating PATCO GTFS data.") | |
os.chdir(start_dir) # back to where we were | |
########################### | |
print("Downloading finished. Writing time check file...") | |
tcf = open(time_check, 'wb') | |
pickle.dump(self.tc, tcf) | |
tcf.close() | |
print("Time check file written.") | |
print("Writing 'new_use.csv', file of validated new downloads...") | |
nu = open('new_use.csv', 'wb') | |
nuw = csv.writer(nu) | |
for n in self.new_use: | |
print("Got new GTFS " + n) | |
nuw.writerow([n]) | |
nu.close() | |
print("Done writing 'new_use.csv'.") | |
print("All done!") | |
############################### |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from FeedFetcher import FeedFetcher | |
username = 'NJ_TRANSIT_DEVELOPER_USERNAME' | |
password = 'NJ_TRANSIT_DEVELOPER_PASSWORD' | |
# change get_nj to 'True' when email alert received for new download | |
f = FeedFetcher(get_nj=False, nj_username=username, nj_pass=password) | |
f.fetch() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I've put this in a repo: https://github.com/flibbertigibbet/gtfs-feed-fetcher