Skip to content

Instantly share code, notes, and snippets.

@csabatini
Last active August 29, 2015 14:00
Show Gist options
  • Save csabatini/11337678 to your computer and use it in GitHub Desktop.
Save csabatini/11337678 to your computer and use it in GitHub Desktop.
from __future__ import print_function
from bs4 import BeautifulSoup
from urlparse import urlparse
from urlparse import parse_qs
import sys, logging
import mechanize
import time
import csv
import sqlite3
import requests
import json
MAPQUEST_URL = 'http://www.mapquestapi.com/geocoding/v1/address?&key={appkey}'
MAPQUEST_APP_KEY = 'MY_KEY_HERE'
zipcodes = [];
locations = []
def loadZipCodes():
list = []
f = open('test.csv', 'rb')
reader = csv.DictReader(f)
for row in reader:
list.append(row["ZIPCode"])
return list;
def submitForm(browser, zipcode):
attempts = 0
while (attempts < 3):
try:
browser.open('http://www.icee.com/locationsICEE.asp')
browser.select_form('form1')
browser.form['zip']=zipcode
browser.submit()
return BeautifulSoup(browser.response().read())
except:
attempts += 1
# exit if submission fails 3 times
sys.exit(1)
def parseData(zip, list, response):
tags = response.find_all(target="_blank")
for t in tags:
place = t.next.strip()
url = t.get('href')
list.append(place + '\t' + url)
return list
def storeMapquestApiData(list, cursor):
for str in list:
str = str.split('\t')
store = str[0]
url_text = str[1]
url = urlparse(url_text)
dict = parse_qs(url.query)
street = ''.join(dict['address'])
city = ''.join(dict['city'])
state = ''.join(dict['state'])
zip = ''.join(dict['zip'])
request_body = {'location':{'street':street,'city':city,'state':state,'postalCode':zip}}
r = requests.post(MAPQUEST_URL.format(appkey=MAPQUEST_APP_KEY), data=json.dumps(request_body))
data = json.loads(r.content)
if len(data['results'][0]['locations']) > 1:
continue
lat = data['results'][0]['locations'][0]['latLng']['lat']
long = data['results'][0]['locations'][0]['latLng']['lng']
record = (store, street, city, state, zip, lat, long, url_text)
try:
cursor.execute('INSERT INTO Locations VALUES (?,?,?,?,?,?,?,?)', record)
except sqlite3.Error, e:
print('Error %s:' % e.args[0])
sys.exit(1)
time.sleep(1)
# set up logging
logging.basicConfig(level=logging.INFO, format='%(message)s')
logger = logging.getLogger()
logger.addHandler(logging.FileHandler('Log.log', 'a'))
print = logger.info
# db initialize
db = sqlite3.connect('geodata.db')
csr = db.cursor()
csr.execute('CREATE TABLE Locations(Store TEXT, Street TEXT, City TEXT, State TEXT, Zip INT, Lat REAL, Long REAL, Url TEXT)')
# initialize browser
br = mechanize.Browser()
# disable robots.txt
br.set_handle_robots(False)
# add user agent string
br.addheaders = [('User-agent', 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:24.0) Gecko/20100101 Firefox/24.0')]
# load zip codes from csv
zipcodes = loadZipCodes()
# submit and parse response for each zip
for zip in zipcodes:
response = submitForm(br, zip)
locations = parseData(zip, locations, response)
time.sleep(0.2)
# get reverse-geocode information for database
storeMapquestApiData(locations, csr)
# save and close database connection
db.commit()
db.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment