-
-
Save jdanders/3afd7d22f2c68caeddb5ac2ed1240bab to your computer and use it in GitHub Desktop.
Command line application to search KSL Classifieds
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import argparse | |
import concurrent.futures | |
import string | |
from collections import namedtuple | |
from urllib.request import urlopen | |
from urllib.parse import urlencode, urljoin | |
from bs4 import BeautifulSoup | |
Listing = namedtuple('Listing', 'title city state age price link description') | |
class KSL(object): | |
URL = 'http://ksl.com/classifieds/search?' | |
URL_QS = { | |
'sold': 0, # do not list sold items | |
'nocache': 1, # don't cache results, FRESH! | |
'viewNumResults': 20, # maximum results per "page" | |
'sort': 0, # newest first | |
} | |
def __init__(self): | |
self.thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=4) | |
def __fix_query(self, q): | |
return [item.strip() for item in q.split(',')] | |
def __do_request(self, args): | |
if len(args) == 2: | |
query, url = args | |
timeout = 5 # seconds | |
else: | |
query, url, timeout = args | |
return (query, urlopen(url, timeout=timeout).read(), ) | |
def search(self, query, **etc): | |
with self.thread_pool as ex: | |
# perform every search using the thread pool executor | |
yield from ex.map(self.__do_request, self.build_qs(query, **etc)) | |
def find_elements(self, html): | |
soup = BeautifulSoup(html, 'html.parser') | |
for ad_box in soup.find_all('div', class_='listing'): | |
if 'featured' in ad_box.attrs['class']: | |
continue | |
links = ad_box.find_all('a', class_='link') | |
# get the listing title | |
if links: | |
# and clean it up... | |
title = links[0].text.strip(string.punctuation) | |
title = [t.capitalize() for t in title.split() if len(t) > 3] | |
title = ' '.join(title) | |
link = urljoin(self.URL, links[0].get('href')) | |
else: | |
continue | |
# get the price | |
price_box = ad_box.find('h3', class_='price') | |
# ignore prices that "don't exist" | |
if not price_box or price_box.text.count('-') >= 6: | |
continue | |
else: | |
price = price_box.text.strip() | |
# get the location | |
ad_detail = ad_box.find('div', class_='listing-detail-line') | |
location = ad_detail.find('span', class_='address').text | |
location = location.encode('ascii', 'ignore') | |
location = location.decode('utf-8') | |
location = location.split(',') | |
# get the city and state, clean up the city from formatting | |
city, state = location[0].strip(), location[-1].strip().upper() | |
city = ' '.join([p.capitalize() for p in city.split()]) | |
# get the age of the posting, separating the cruft from time | |
lifespan = ad_detail.find('span', class_='timeOnSite').text.strip() | |
lifespan = lifespan.encode('ascii', 'ignore').split(b'|')[-1].strip() | |
lifespan = lifespan.decode('ascii') | |
# get the description | |
description = ad_box.find('div', class_='description-text') | |
linktxt = description.find('a').text | |
description = description.text.replace(linktxt,'').strip() | |
yield Listing(title, city, state, lifespan, price, link, description) | |
def build_qs(self, query, **etc): | |
# extract the search terms in a comma-list | |
searches = self.__fix_query(query) | |
for query in searches: | |
# ensure we always have a minimum price, of at least $0 | |
minp = etc.get('min_price', None) or 0 | |
minp = max(0, int(minp)) | |
maxp = etc.get('max_price', None) | |
if maxp is not None: | |
maxp = max(0, int(maxp)) | |
# if we have a minimum and maximum price | |
# then we want to make sure the lower value is set to `minp` | |
minp, maxp = sorted([minp, maxp]) | |
qs = { | |
'priceFrom': minp, | |
'priceTo': maxp, | |
'zip': etc.get('zipcode', None), | |
'miles': etc.get('distance', None), | |
'keyword': query | |
} | |
# apply defaults | |
qs.update(self.URL_QS) | |
# fill in any additional parameters | |
# that were passed, but not explicitly handled | |
for k, value in etc.items(): | |
k = k.lower() | |
qs.setdefault(k, value) | |
# make all `None` values blank for our querystring | |
for k, value in qs.items(): | |
if value is None: | |
qs[k] = '' | |
# encode | |
qs = urlencode(qs) | |
# boom | |
yield (query, '{}&{}'.format(self.URL, qs), ) | |
def listing(id): | |
pass | |
def main(args): | |
if args.get('query') is None: | |
return | |
# create the thin object | |
ksl = KSL() | |
# find our results | |
for query, data in ksl.search(args.pop('query'), **args): | |
for index, result in enumerate(ksl.find_elements(data)): | |
f = ('{0}. {2} - {1} : {3} {4} - {5}\n' | |
' {6}\n {7}\n'.format( | |
index+1, | |
result.title, | |
result.price, | |
result.city, | |
result.state, | |
result.age, | |
result.link, | |
result.description, | |
) | |
) | |
print(f) | |
if __name__ == '__main__': | |
p = argparse.ArgumentParser( | |
description='ksl - command line utility to query KSL classifieds' | |
) | |
p.add_argument('query', action='store', default=None) | |
p.add_argument('-m', '--min-price', action='store', default=0, dest='min_price') | |
p.add_argument('-M', '--max-price', action='store', default=None, dest='max_price') | |
p.add_argument('-z', '--zip-code', action='store', default=None, dest='zipcode') | |
p.add_argument('-d', '--distance', action='store', default=None, dest='distance') | |
args = p.parse_args() | |
# do it | |
main(vars(args)) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment