Skip to content

Instantly share code, notes, and snippets.

@gfhuertac
Last active August 7, 2020 20:50
Show Gist options
  • Save gfhuertac/86acef10068150201c1ee36989e97e6f to your computer and use it in GitHub Desktop.
Save gfhuertac/86acef10068150201c1ee36989e97e6f to your computer and use it in GitHub Desktop.
# python modules
import sqlite3
from collections import namedtuple
# 3rd party modules
from bs4 import BeautifulSoup
# named tuple... we do not need a class for this
Product = namedtuple('Product', ['product_id', 'category', 'name', 'price', 'discount'])
"""
Class that defines functions for a generic crawler.
"""
class Crawler(object):
def _init_database(self, name):
# step 1. connect to the sqlite database that contains the list of products
connection = sqlite3.connect(name)
# step 2. create the table products
try:
connection.execute('''CREATE TABLE products (product_id text, name text, category text, price integer, discount integer)''')
except:
pass
# step 3. returns the connection
return connection
def _visit_or_return(self, url_or_element):
# step 0. check if parameter is a str or an element
if type(url_or_element) is str:
# step 1. if parameter is str, retrieves the site. Throws an exception if an error happens.
response = self.session.get(url_or_element)
response.raise_for_status()
# step 2. parse the site and extract the tags.
html_content = response.text
return BeautifulSoup(html_content, features='html.parser')
else:
# step 1. return the same parameter
return url_or_element
def _visit_and_extract(self, url_or_element, tag, *args, **kwargs):
parsed_site = self._visit_or_return(url_or_element)
return parsed_site.find_all(tag, *args, **kwargs)
def _visit_and_extract_single(self, url_or_element, tag, *args, **kwargs):
parsed_site = self._visit_or_return(url_or_element)
return parsed_site.find(tag, *args, **kwargs)
# python modules
import re
import sqlite3
# 3rd party modules
import requests
# local modules
from crawler import Crawler, Product
# constants
PRICE_REGEX = r'((?:\d+\.?)+)'
DISCOUNT_REGEX = r'_(\d+)\.jpg'
"""
Class to extract the list of products (and discount) from the travelclub.cl/duty site.
This is useful if you want to check the products that are on sale at the site.
"""
class TravelDutyCrawler(Crawler):
"""
The constructor for the Crawler class.
Args:
- self (TravelDutyCrawler): the instance of the class
"""
def __init__(self):
# step 1. initialize the session we are going to use with the crawler.
self.session = requests.Session()
self.connection = self._init_database('travelclub.db')
def crawl(self):
self.connection.execute('DELETE FROM products')
root_url = 'https://www.travelclub.cl/tienda/home.asp'
# step 1. retrieves the categories.
categories = self._visit_and_extract(root_url, 'a', {'class': 'op_sub_menu2'}, href=True)
# step 2. traverse each category and extract the URL
categories_urls = dict()
for category in categories:
category_name = category.text
href = category['href']
indice, categoria = href[href.index('(') + 1:href.index(')')].split(',')
url = f'https://www.travelclub.cl/tienda/categoriaproducto.asp?idcat={categoria[1:-1]}&p_categoria=categoriaproductoproducto&p_indice={indice[1:-1]}&p_subindice='
categories_urls[category_name] = url
# step 4. visit each category and extract all the products
products_attributes = dict()
for category_name, category_url in categories_urls.items():
products = self._visit_and_extract(category_url, 'td', {'class': 'box-prod'})
for product in products:
product_name_td = self._visit_and_extract_single(product, 'td', {'class': 'det-prod'})
product_name = product_name_td.text.strip()
sku_td = self._visit_and_extract_single(product, 'td', {'class': 'sku-prod'})
sku = ''.join(self._visit_and_extract_single(sku_td, 'span').text.split())
price_td = self._visit_and_extract_single(product, 'td', {'class': 'price-prod'})
price = int(re.findall(PRICE_REGEX, self._visit_and_extract_single(price_td, 'span').text)[0].replace('.', ''))
discount_td = self._visit_and_extract_single(product, 'span', {'class': 'txt-descrip-prod'})
try:
discount = int(re.findall(DISCOUNT_REGEX, self._visit_and_extract_single(discount_td, 'img')['src'])[0])
except:
discount = 100
products_attributes[sku] = [sku, category_name, product_name, price, discount]
for product_attributes in products_attributes.values():
self.connection.execute('INSERT INTO products (product_id, category, name, price, discount) VALUES (?, ?, ?, ?, ?)', product_attributes)
self.connection.commit()
def retrieve(self, min_discount=0, max_discount=100, min_price=0, max_price=10000000):
stmt = self.connection.execute('SELECT product_id, category, name, price, discount FROM products WHERE discount >= ? AND discount <= ? AND price >= ? AND price <= ? ORDER BY discount DESC, price DESC', [min_discount, max_discount, min_price, max_price])
products = stmt.fetchall()
return [Product(*p) for p in products]
def close(self):
self.connection.close()
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='Crawl travel duty webiste and find products fulfilling a criteria.')
parser.add_argument('--refresh', dest='refresh', action='store_true', help='Refresh the database contents, otherwise use current values.')
parser.add_argument('--min_discount', nargs='?', type=int, dest='min_discount', const=90, default=90, help='the min discount allowed (defaults to 90)')
parser.add_argument('--max_discount', nargs='?', type=int, dest='max_discount', const=100, default=100, help='the max discount allowed (defaults to 100)')
parser.add_argument('--min_price', nargs='?', type=int, dest='min_price', const=0, default=0, help='the min price allowed (defaults to 0)')
parser.add_argument('--max_price', nargs='?', type=int, dest='max_price', const=10000000, default=10000000, help='the max price allowed (defaults to 10,000,000)')
args = parser.parse_args()
crawler = TravelDutyCrawler()
if args.refresh:
crawler.crawl()
products = crawler.retrieve(args.min_discount, args.max_discount, args.min_price, args.max_price)
for product in products:
print(product.category, product.name, product.price, product.discount)
crawler.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment