Last active
July 19, 2024 06:39
-
-
Save blakev/a6bbe3b5a861d64c6e36 to your computer and use it in GitHub Desktop.
Command line application to search KSL Classifieds
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import argparse | |
import concurrent.futures | |
import string | |
from collections import namedtuple | |
from urllib.request import urlopen | |
from urllib.parse import urlencode | |
from bs4 import BeautifulSoup | |
Listing = namedtuple('Listing', 'title city state age price') | |
class KSL(object): | |
URL = 'http://ksl.com/?nid=231' | |
URL_QS = { | |
'sold': 0, # do not list sold items | |
'nocache': 1, # don't cache results, FRESH! | |
'viewNumResults': 20, # maximum results per "page" | |
'sort': 1 # newest first | |
} | |
def __init__(self): | |
self.thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=4) | |
def __fix_query(self, q): | |
return [item.strip() for item in q.split(',')] | |
def __do_request(self, args): | |
if len(args) == 2: | |
query, url = args | |
timeout = 5 # seconds | |
else: | |
query, url, timeout = args | |
return (query, urlopen(url, timeout=timeout).read(), ) | |
def search(self, query, **etc): | |
with self.thread_pool as ex: | |
# perform every search using the thread pool executor | |
yield from ex.map(self.__do_request, self.build_qs(query, **etc)) | |
def find_elements(self, html): | |
soup = BeautifulSoup(html, 'html.parser') | |
for ad_box in soup.find_all('div', class_='adBox'): | |
links = ad_box.find_all('a', class_='listlink') | |
# get the listing title | |
if links: | |
# and clean it up... | |
title = links[0].text.strip(string.punctuation) | |
title = [t.capitalize() for t in title.split() if len(t) > 3] | |
title = ' '.join(title) | |
else: | |
continue | |
# get the price | |
price_box = ad_box.find('div', class_='priceBox') | |
# ignore prices that "don't exist" | |
if not price_box or price_box.text.count('-') >= 6: | |
continue | |
else: | |
price = price_box.find('span').text.strip('$') | |
price = price.replace(',', '').strip() | |
price = int(float(price) / 100) | |
# get the location | |
ad_time = ad_box.find('div', class_='adTime') | |
location = ad_time.find('span').text.encode('ascii', 'ignore') | |
location = location.decode('utf-8') | |
location = location.split(',') | |
# get the city and state, clean up the city from shit formatting | |
city, state = location[0].strip(','), location[-1][:2].upper() | |
city = ' '.join([p.capitalize() for p in city.split()]) | |
# get the age of the posting, separating the cruft from time | |
lifespan = ad_time.text | |
lifespan = lifespan.encode('ascii', 'ignore').split(b'|')[-1].strip() | |
lifespan = lifespan.decode('ascii') | |
yield Listing(title, city, state, lifespan, price) | |
def build_qs(self, query, **etc): | |
# extract the search terms in a comma-list | |
searches = self.__fix_query(query) | |
for query in searches: | |
# ensure we always have a minimum price, of at least $0 | |
minp = etc.get('min_price', None) or 0 | |
minp = max(0, int(minp)) | |
maxp = etc.get('max_price', None) | |
if maxp is not None: | |
maxp = max(0, int(maxp)) | |
# if we have a minimum and maximum price | |
# then we want to make sure the lower value is set to `minp` | |
minp, maxp = sorted([minp, maxp]) | |
qs = { | |
'min_price': minp, | |
'max_price': maxp, | |
'zip': etc.get('zipcode', None), | |
'distance': etc.get('distance', None), | |
'search': query | |
} | |
# apply defaults | |
qs.update(self.URL_QS) | |
# fill in any additional parameters | |
# that were passed, but not explicitly handled | |
for k, value in etc.items(): | |
k = k.lower() | |
qs.setdefault(k, value) | |
# make all `None` values blank for our querystring | |
for k, value in qs.items(): | |
if value is None: | |
qs[k] = '' | |
# encode that shitttt | |
qs = urlencode(qs) | |
# boom | |
yield (query, '{}&{}'.format(self.URL, qs), ) | |
def listing(id): | |
pass | |
def main(args): | |
if args.get('query') is None: | |
return | |
# create the thin object | |
ksl = KSL() | |
# find our resultssss | |
for query, data in ksl.search(args.pop('query'), **args): | |
for index, result in enumerate(ksl.find_elements(data)): | |
if index == 0: | |
print('==== {}'.format(query)) | |
f = '{0: >2}. {2: >7} - {1: <35} : {3: <23} {4} - {5: <8}'.format( | |
index+1, | |
result.title[:33] + ('..' if len(result.title) >= 35 else ''), | |
'${}'.format(result.price), | |
result.city, | |
result.state, | |
result.age | |
) | |
print(f) | |
print() | |
if __name__ == '__main__': | |
p = argparse.ArgumentParser( | |
description='ksl - command line utility to query KSL classifieds' | |
) | |
p.add_argument('query', action='store', default=None) | |
p.add_argument('-m', '--min-price', action='store', default=0, dest='min_price') | |
p.add_argument('-M', '--max-price', action='store', default=None, dest='max_price') | |
p.add_argument('-z', '--zip-code', action='store', default=None, dest='zipcode') | |
p.add_argument('-d', '--distance', action='store', default=None, dest='distance') | |
args = p.parse_args() | |
# do eeeeet | |
main(vars(args)) | |
How can we update ads or update stock to KSL site ?
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thanks for the well-written script!
Looks like KSL servers have changed formatting since you wrote this. I've forked and updated it. I made a few other changes (including to the results), so if you want to update just the parsing, just grab the stuff in the
find_elements
function. Oh, and I changed tabs to spaces which makes the diff useless -- sorry!Edit: I now made a repository for a script that will notify the user of new matches to the query. Thanks for the base class! https://github.com/jdanders/ksl-classifieds-notifier