Last active
August 7, 2020 20:50
-
-
Save gfhuertac/86acef10068150201c1ee36989e97e6f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# python modules | |
import sqlite3 | |
from collections import namedtuple | |
# 3rd party modules | |
from bs4 import BeautifulSoup | |
# named tuple... we do not need a class for this | |
Product = namedtuple('Product', ['product_id', 'category', 'name', 'price', 'discount']) | |
""" | |
Class that defines functions for a generic crawler. | |
""" | |
class Crawler(object): | |
def _init_database(self, name): | |
# step 1. connect to the sqlite database that contains the list of products | |
connection = sqlite3.connect(name) | |
# step 2. create the table products | |
try: | |
connection.execute('''CREATE TABLE products (product_id text, name text, category text, price integer, discount integer)''') | |
except: | |
pass | |
# step 3. returns the connection | |
return connection | |
def _visit_or_return(self, url_or_element): | |
# step 0. check if parameter is a str or an element | |
if type(url_or_element) is str: | |
# step 1. if parameter is str, retrieves the site. Throws an exception if an error happens. | |
response = self.session.get(url_or_element) | |
response.raise_for_status() | |
# step 2. parse the site and extract the tags. | |
html_content = response.text | |
return BeautifulSoup(html_content, features='html.parser') | |
else: | |
# step 1. return the same parameter | |
return url_or_element | |
def _visit_and_extract(self, url_or_element, tag, *args, **kwargs): | |
parsed_site = self._visit_or_return(url_or_element) | |
return parsed_site.find_all(tag, *args, **kwargs) | |
def _visit_and_extract_single(self, url_or_element, tag, *args, **kwargs): | |
parsed_site = self._visit_or_return(url_or_element) | |
return parsed_site.find(tag, *args, **kwargs) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# python modules | |
import re | |
import sqlite3 | |
# 3rd party modules | |
import requests | |
# local modules | |
from crawler import Crawler, Product | |
# constants | |
PRICE_REGEX = r'((?:\d+\.?)+)' | |
DISCOUNT_REGEX = r'_(\d+)\.jpg' | |
""" | |
Class to extract the list of products (and discount) from the travelclub.cl/duty site. | |
This is useful if you want to check the products that are on sale at the site. | |
""" | |
class TravelDutyCrawler(Crawler): | |
""" | |
The constructor for the Crawler class. | |
Args: | |
- self (TravelDutyCrawler): the instance of the class | |
""" | |
def __init__(self): | |
# step 1. initialize the session we are going to use with the crawler. | |
self.session = requests.Session() | |
self.connection = self._init_database('travelclub.db') | |
def crawl(self): | |
self.connection.execute('DELETE FROM products') | |
root_url = 'https://www.travelclub.cl/tienda/home.asp' | |
# step 1. retrieves the categories. | |
categories = self._visit_and_extract(root_url, 'a', {'class': 'op_sub_menu2'}, href=True) | |
# step 2. traverse each category and extract the URL | |
categories_urls = dict() | |
for category in categories: | |
category_name = category.text | |
href = category['href'] | |
indice, categoria = href[href.index('(') + 1:href.index(')')].split(',') | |
url = f'https://www.travelclub.cl/tienda/categoriaproducto.asp?idcat={categoria[1:-1]}&p_categoria=categoriaproductoproducto&p_indice={indice[1:-1]}&p_subindice=' | |
categories_urls[category_name] = url | |
# step 4. visit each category and extract all the products | |
products_attributes = dict() | |
for category_name, category_url in categories_urls.items(): | |
products = self._visit_and_extract(category_url, 'td', {'class': 'box-prod'}) | |
for product in products: | |
product_name_td = self._visit_and_extract_single(product, 'td', {'class': 'det-prod'}) | |
product_name = product_name_td.text.strip() | |
sku_td = self._visit_and_extract_single(product, 'td', {'class': 'sku-prod'}) | |
sku = ''.join(self._visit_and_extract_single(sku_td, 'span').text.split()) | |
price_td = self._visit_and_extract_single(product, 'td', {'class': 'price-prod'}) | |
price = int(re.findall(PRICE_REGEX, self._visit_and_extract_single(price_td, 'span').text)[0].replace('.', '')) | |
discount_td = self._visit_and_extract_single(product, 'span', {'class': 'txt-descrip-prod'}) | |
try: | |
discount = int(re.findall(DISCOUNT_REGEX, self._visit_and_extract_single(discount_td, 'img')['src'])[0]) | |
except: | |
discount = 100 | |
products_attributes[sku] = [sku, category_name, product_name, price, discount] | |
for product_attributes in products_attributes.values(): | |
self.connection.execute('INSERT INTO products (product_id, category, name, price, discount) VALUES (?, ?, ?, ?, ?)', product_attributes) | |
self.connection.commit() | |
def retrieve(self, min_discount=0, max_discount=100, min_price=0, max_price=10000000): | |
stmt = self.connection.execute('SELECT product_id, category, name, price, discount FROM products WHERE discount >= ? AND discount <= ? AND price >= ? AND price <= ? ORDER BY discount DESC, price DESC', [min_discount, max_discount, min_price, max_price]) | |
products = stmt.fetchall() | |
return [Product(*p) for p in products] | |
def close(self): | |
self.connection.close() | |
if __name__ == '__main__': | |
import argparse | |
parser = argparse.ArgumentParser(description='Crawl travel duty webiste and find products fulfilling a criteria.') | |
parser.add_argument('--refresh', dest='refresh', action='store_true', help='Refresh the database contents, otherwise use current values.') | |
parser.add_argument('--min_discount', nargs='?', type=int, dest='min_discount', const=90, default=90, help='the min discount allowed (defaults to 90)') | |
parser.add_argument('--max_discount', nargs='?', type=int, dest='max_discount', const=100, default=100, help='the max discount allowed (defaults to 100)') | |
parser.add_argument('--min_price', nargs='?', type=int, dest='min_price', const=0, default=0, help='the min price allowed (defaults to 0)') | |
parser.add_argument('--max_price', nargs='?', type=int, dest='max_price', const=10000000, default=10000000, help='the max price allowed (defaults to 10,000,000)') | |
args = parser.parse_args() | |
crawler = TravelDutyCrawler() | |
if args.refresh: | |
crawler.crawl() | |
products = crawler.retrieve(args.min_discount, args.max_discount, args.min_price, args.max_price) | |
for product in products: | |
print(product.category, product.name, product.price, product.discount) | |
crawler.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment