Skip to content

Instantly share code, notes, and snippets.

@rawnly
Created January 3, 2019 16:55
Show Gist options
  • Save rawnly/d2c64099aeb5f1b02b74850247512110 to your computer and use it in GitHub Desktop.
Save rawnly/d2c64099aeb5f1b02b74850247512110 to your computer and use it in GitHub Desktop.
Scrape an amazon product and save the data.
# -*- coding: utf-8 -*-
import re
from requests import session, get
from bs4 import BeautifulSoup
from contextlib import closing
from requests.exceptions import RequestException
from tinydb import TinyDB, Query
db = TinyDB("products.json")
print "You're monitoring {0} products:".format(len(db.all()))
for index, item in enumerate(db.all()):
print "{1}) #{0}".format(item["id"], index)
print
def scrape(url):
"""
Attempts to get the content at `url` by making an HTTP GET request.
If the content-type of response is some kind of HTML/XML, return the
text content, otherwise return None.
"""
try:
with closing(get(url, stream=True)) as resp:
if is_good_response(resp):
return resp.content
else:
return None
except RequestException as e:
print('Error during requests to {0} : {1}'.format(url, str(e)))
return None
def is_good_response(resp):
"""
Returns True if the response seems to be HTML, False otherwise.
"""
content_type = resp.headers['Content-Type'].lower()
return (resp.status_code == 200
and content_type is not None
and content_type.find('html') > -1)
def price_to_number(price):
return float(price.replace("EUR", "").replace(" ", "").replace(",", ".").strip())
def getProduct(product_id):
raw_html = scrape("https://amazon.it/dp/{0}".format(product_id))
html = BeautifulSoup(raw_html, 'html.parser')
title = html.select_one("#productTitle")
price = html.select_one("#priceblock_ourprice")
prime_badge = html.select_one(".a-icon-prime")
used_price = html.select_one("#usedPrice")
has_prime = False
if prime_badge is not None:
has_prime = True
return {
"id": product_id,
"title": title.text.encode('utf-8').strip() if title is not None else None,
"price": price_to_number(price.text) if price is not None else None,
"used": price_to_number(used_price.text) if used_price is not None else "Not available",
"isPrime": has_prime
}
class Product:
id = ""
db = None
def __init__(self, id, db="products.json"):
self.id = id
self.db = TinyDB(db)
self.Product = Query()
self.data = self.db.search(self.Product.id == self.id)[0] if len(self.db.search(
self.Product.id == self.id)) == 1 else self.db.search(self.Product.id == self.id)
def exists(self):
Product = self.Product
return len(self.db.search(Product.id == self.id)) >= 1
def update(self, product):
Product = self.Product
self.db.update(product, Product.id == self.id)
def delete(self):
Product = self.Product
self.db.remove(Product.id == self.id)
def main():
product_id = raw_input("Product ID: ").replace(
"#", "").replace(" ", "").strip()
while len(product_id) != 10:
product_id = raw_input("Product ID: ").replace(
"#", "").replace(" ", "").strip()
product = getProduct(product_id)
p = Product(product_id)
price_now = product["price"]
diff = 0.0
print ""
print ""
print "===================="
print ""
print ""
if p.exists() == True:
registered_price = p.data["price"]
diff = price_now - registered_price
print "-> {0}".format(product["title"])
print ""
print "• Price: {0} € | Last diff: {1} €".format(price_now, diff)
print "• Used Price: {0}".format(product["used"])
print ""
if product["isPrime"]:
print "[!!!] Available with Amazon Prime"
print
print
print "===================="
print
print
if p.exists() == False:
p.db.insert(product)
print "#{0} | New product created!".format(product_id)
else:
p.update(product)
print "Product #{0} updated!".format(product_id)
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment