Skip to content

Instantly share code, notes, and snippets.

@blakev
Last active July 19, 2024 06:39
Show Gist options
  • Save blakev/a6bbe3b5a861d64c6e36 to your computer and use it in GitHub Desktop.
Save blakev/a6bbe3b5a861d64c6e36 to your computer and use it in GitHub Desktop.
Command line application to search KSL Classifieds
import re
import argparse
import concurrent.futures
import string
from collections import namedtuple
from urllib.request import urlopen
from urllib.parse import urlencode
from bs4 import BeautifulSoup
Listing = namedtuple('Listing', 'title city state age price')
class KSL(object):
URL = 'http://ksl.com/?nid=231'
URL_QS = {
'sold': 0, # do not list sold items
'nocache': 1, # don't cache results, FRESH!
'viewNumResults': 20, # maximum results per "page"
'sort': 1 # newest first
}
def __init__(self):
self.thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=4)
def __fix_query(self, q):
return [item.strip() for item in q.split(',')]
def __do_request(self, args):
if len(args) == 2:
query, url = args
timeout = 5 # seconds
else:
query, url, timeout = args
return (query, urlopen(url, timeout=timeout).read(), )
def search(self, query, **etc):
with self.thread_pool as ex:
# perform every search using the thread pool executor
yield from ex.map(self.__do_request, self.build_qs(query, **etc))
def find_elements(self, html):
soup = BeautifulSoup(html, 'html.parser')
for ad_box in soup.find_all('div', class_='adBox'):
links = ad_box.find_all('a', class_='listlink')
# get the listing title
if links:
# and clean it up...
title = links[0].text.strip(string.punctuation)
title = [t.capitalize() for t in title.split() if len(t) > 3]
title = ' '.join(title)
else:
continue
# get the price
price_box = ad_box.find('div', class_='priceBox')
# ignore prices that "don't exist"
if not price_box or price_box.text.count('-') >= 6:
continue
else:
price = price_box.find('span').text.strip('$')
price = price.replace(',', '').strip()
price = int(float(price) / 100)
# get the location
ad_time = ad_box.find('div', class_='adTime')
location = ad_time.find('span').text.encode('ascii', 'ignore')
location = location.decode('utf-8')
location = location.split(',')
# get the city and state, clean up the city from shit formatting
city, state = location[0].strip(','), location[-1][:2].upper()
city = ' '.join([p.capitalize() for p in city.split()])
# get the age of the posting, separating the cruft from time
lifespan = ad_time.text
lifespan = lifespan.encode('ascii', 'ignore').split(b'|')[-1].strip()
lifespan = lifespan.decode('ascii')
yield Listing(title, city, state, lifespan, price)
def build_qs(self, query, **etc):
# extract the search terms in a comma-list
searches = self.__fix_query(query)
for query in searches:
# ensure we always have a minimum price, of at least $0
minp = etc.get('min_price', None) or 0
minp = max(0, int(minp))
maxp = etc.get('max_price', None)
if maxp is not None:
maxp = max(0, int(maxp))
# if we have a minimum and maximum price
# then we want to make sure the lower value is set to `minp`
minp, maxp = sorted([minp, maxp])
qs = {
'min_price': minp,
'max_price': maxp,
'zip': etc.get('zipcode', None),
'distance': etc.get('distance', None),
'search': query
}
# apply defaults
qs.update(self.URL_QS)
# fill in any additional parameters
# that were passed, but not explicitly handled
for k, value in etc.items():
k = k.lower()
qs.setdefault(k, value)
# make all `None` values blank for our querystring
for k, value in qs.items():
if value is None:
qs[k] = ''
# encode that shitttt
qs = urlencode(qs)
# boom
yield (query, '{}&{}'.format(self.URL, qs), )
def listing(id):
pass
def main(args):
if args.get('query') is None:
return
# create the thin object
ksl = KSL()
# find our resultssss
for query, data in ksl.search(args.pop('query'), **args):
for index, result in enumerate(ksl.find_elements(data)):
if index == 0:
print('==== {}'.format(query))
f = '{0: >2}. {2: >7} - {1: <35} : {3: <23} {4} - {5: <8}'.format(
index+1,
result.title[:33] + ('..' if len(result.title) >= 35 else ''),
'${}'.format(result.price),
result.city,
result.state,
result.age
)
print(f)
print()
if __name__ == '__main__':
p = argparse.ArgumentParser(
description='ksl - command line utility to query KSL classifieds'
)
p.add_argument('query', action='store', default=None)
p.add_argument('-m', '--min-price', action='store', default=0, dest='min_price')
p.add_argument('-M', '--max-price', action='store', default=None, dest='max_price')
p.add_argument('-z', '--zip-code', action='store', default=None, dest='zipcode')
p.add_argument('-d', '--distance', action='store', default=None, dest='distance')
args = p.parse_args()
# do eeeeet
main(vars(args))
@jdanders
Copy link

jdanders commented Feb 17, 2017

Thanks for the well-written script!

Looks like KSL servers have changed formatting since you wrote this. I've forked and updated it. I made a few other changes (including to the results), so if you want to update just the parsing, just grab the stuff in the find_elements function. Oh, and I changed tabs to spaces which makes the diff useless -- sorry!

Edit: I now made a repository for a script that will notify the user of new matches to the query. Thanks for the base class! https://github.com/jdanders/ksl-classifieds-notifier

@tejastank
Copy link

How can we update ads or update stock to KSL site ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment