Last active
August 26, 2018 05:19
-
-
Save hassanabidpk/1a17eba279ef29d9727c43595386ad67 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from bs4 import BeautifulSoup | |
import requests | |
import re | |
import csv | |
import time | |
from selenium import webdriver | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.support.ui import WebDriverWait | |
from selenium.webdriver.support import expected_conditions as EC | |
""" | |
example_url : https://www.amazon.com/s/ref=nb_sb_noss?url=search-alias%3Dbeauty&field-keywords=lipstick&page=1 | |
Returns total number of recommended_items for a single searched item | |
<Every page has 48 search results> | |
Installation | |
1- Download chromedriver_mac64.zip from http://chromedriver.storage.googleapis.com/index.html?path=2.24/ | |
2- Unzip and place in the same folder with code and note down the path. | |
3- Repace CHROME_WEBDRIVER_PATH with Path on your computer for chromedriver | |
How to Run (python 3) | |
$ python -m venv amazonvenv | |
$ source amazonvenv\Scripts\activate | |
$ pip install requests | |
$ pip install beautifulsoup4 | |
$ pip install -U selenium | |
$ pip install html5lib | |
$ python crawling.py | |
""" | |
BASE_URL = "https://www.amazon.com/s/ref=nb_sb_noss_2" | |
KEYWORD = "lipstick" | |
headers = {'User-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.143 Safari/537.36'} | |
CHROME_WEBDRIVER_PATH = '/Users/hassanabid/Documents/hassan/GDE_code/web_crawling_amazon/chromedriver' | |
START_PAGE = 1 | |
END_PAGE = 2 | |
def main(): | |
payload_main = {'url': "search-alias%3Dbeauty",'field-keywords': KEYWORD,"page":1} | |
r_amazon = requests.get(BASE_URL, params=payload_main,headers=headers) | |
soup = BeautifulSoup(r_amazon.text,'html5lib') | |
count_raw = soup.select("#s-result-count") | |
if count_raw: | |
result = re.search(r'[\d]+.[\d]+.*[\d]+.[\d]+', count_raw[0].text) | |
if result: | |
total_pages_re = re.search(r'[\d]+,[\d]+',result.group()) | |
total_pages_count = int(total_pages_re.group().replace(",",""))/48 | |
print("Total_Page_Count : {}".format(int(total_pages_count))) | |
for i in range(START_PAGE,END_PAGE+1): | |
item_links_raw = get_next_page(i) | |
items_soup,items_links = get_items_soup(total_pages_count,item_links_raw) | |
find_recommended_items(items_soup,items_links,i) | |
else : | |
print("nothing found for {}".format(r_amazon.url)) | |
def get_next_page(page_no): | |
payload_main = {'url': "search-alias%3Dbeauty",'field-keywords': KEYWORD,"page":page_no} | |
r_amazon = requests.get(BASE_URL, params=payload_main,headers=headers) | |
soup = BeautifulSoup(r_amazon.text,'html5lib') | |
count_raw = soup.select("#s-result-count") | |
item_links_raw = soup.select("#resultsCol .a-row .a-spacing-top-mini > a") | |
print("Page No : {} - len(item_links) : {}".format(page_no,len(item_links_raw))) | |
return item_links_raw | |
def get_items_soup(total_pages_count,item_links_raw): | |
items_soup = [] | |
valid_item_links = [] | |
for item in item_links_raw: | |
single_item_link = item["href"] | |
try: | |
r_single_item = requests.get(single_item_link,headers=headers) | |
soup = BeautifulSoup(r_single_item.text,"html5lib") | |
items_soup.append(soup) | |
print("fetched item no. {}".format(len(items_soup))) | |
valid_item_links.append(single_item_link) | |
except: | |
print("couldn't find!") | |
return items_soup,valid_item_links | |
def find_recommended_items(items_soup,item_links,page_no): | |
key_items = 0 | |
titles = [] | |
brands = [] | |
prices = [] | |
asins = [] | |
recommended_items = {} | |
for index, soup in enumerate(items_soup): | |
main_title, brand, price, main_asin = find_title_brand_price(soup) | |
if not main_title: | |
# move this below and add a case when title is not found. (empty [] in dict) | |
print("title not found for item :{} - link: {}".format(key_items,item_links[index])) | |
continue | |
else : | |
key_items = index + 1 | |
titles.append(main_title) | |
brands.append(brand) | |
prices.append(price) | |
asins.append(main_asin) | |
recommended_items_raw = soup.select("#purchase-sims-feature .a-carousel-viewport li a") | |
print("fetching recommended_items (count: {}) for {}".format(len(recommended_items_raw),key_items)) | |
rec_items_href = [] | |
rec_items_titles = [] | |
rec_items_brands = [] | |
rec_items_prices = [] | |
rec_items_asins = [] | |
recommended_items_raw_extended = initiate_webdriver(item_links[index],recommended_items_raw) | |
recommended_items_raw_extended = recommended_items_raw_extended | |
for item in recommended_items_raw_extended: | |
try : | |
# print("item : {}".format(item["href"])) | |
if not "product-reviews" in item["href"]: | |
rec_items_href.append(item["href"]) | |
except KeyError: | |
print("no link found") | |
print("found {} valid links in rec_items for {}".format(len(rec_items_href),key_items)) | |
rec_items_href = rec_items_href[0:31] | |
print("fetch title, brand, price and asin for rec items") | |
for single_href in rec_items_href: | |
try: | |
r_single_rec_item = requests.get("https://www.amazon.com{}".format(single_href),headers=headers) | |
soup = BeautifulSoup(r_single_rec_item.text,"html5lib") | |
title,brand,price,asin = find_title_brand_price(soup) | |
if title and brand: | |
if not title in rec_items_titles: | |
# print("title : {}".format(len(title))) | |
rec_items_titles.append(title) | |
rec_items_brands.append(brand) | |
rec_items_prices.append(price) | |
rec_items_asins.append(asin) | |
except: | |
print("couldn't fetch single_href :{}".format(single_href)) | |
items_dict = {"titles" : rec_items_titles,"brands":rec_items_brands,"prices":rec_items_prices,"asins":rec_items_asins} | |
recommended_items[main_asin] = items_dict | |
print("recorded recommended items for key:{} - values:{}".format(main_asin,len(recommended_items))) | |
# write to csv | |
writecsv(titles,brands,prices,asins,recommended_items,page_no) | |
def find_title_brand_price(soup): | |
title = None | |
brand = None | |
price = None | |
asin = None | |
title_raw = soup.select("#productTitle") | |
brand_raw = soup.select("#brand") | |
price_raw = soup.select("#priceblock_ourprice") | |
asin_raw = soup.select("#detail-bullets") | |
max_recs = soup.select("#purchase-sims-feature span.a-carousel-page-max") | |
if not (title_raw or brand_raw) : | |
return title,brand,price,asin | |
else: | |
try : | |
title = title_raw[0].string.strip() | |
brand = brand_raw[0].string.strip() | |
if not asin_raw: | |
asin_raw = soup.select("#productDetails_detailBullets_sections1 .a-size-base") | |
asin = asin_raw[1].string.strip() | |
else: | |
asin = get_asin(asin_raw) | |
if not price_raw: | |
price_raw = soup.select("#priceblock_saleprice") | |
price = price_raw[0].string.strip() | |
return title,brand,price,asin | |
except: | |
return title,brand,price,asin | |
def get_asin(soup_raw): | |
asin_find = re.search(r'.*ASIN:.*[\w]{10}', soup_raw[0].text) | |
if asin_find: | |
#ASIN: B01HRNEHRE | |
return asin_find.group(0).split(" ")[1] | |
else : | |
print("asin not found :(") | |
return None | |
def writecsv(titles,brands,prices,asins,rec_items,page_no): | |
print("start writing {} rec_items to csv".format(len(rec_items))) | |
rec_items_name = ["rec_item{}_name".format(i) for i in range(1,81) ] | |
rec_items_brands = ["rec_item{}_brand".format(i) for i in range(1,81) ] | |
rec_items_price = ["rec_item{}_price".format(i) for i in range(1,81) ] | |
rec_items_asin = ["rec_item{}_asin".format(i) for i in range(1,81) ] | |
rec_items_fieldnames = create_rec_items_header(rec_items_name,rec_items_brands,rec_items_price,rec_items_asin) | |
fieldnames = ['item_num','item_name', 'item_brand','item_price','item_asin'] | |
fieldnames.extend(rec_items_fieldnames) | |
for key in rec_items: | |
print("key : {}".format(key)) | |
with open('amazon_'+str(page_no)+'.csv', 'a', newline='') as csvfile: | |
writer = csv.DictWriter(csvfile, fieldnames=fieldnames) | |
writer.writeheader() | |
for index_main, title in enumerate(titles): | |
rec_items_one = rec_items.get(asins[index_main]) | |
rec_items_titles_list = rec_items_one.get("titles") | |
rec_items_brands_list = rec_items_one.get("brands") | |
rec_items_prices_list = rec_items_one.get("prices") | |
rec_items_asins_list = rec_items_one.get("asins") | |
row = {'item_num': str(index_main+1), 'item_name': title,'item_brand': brands[index_main],'item_price': prices[index_main],'item_asin': asins[index_main]} | |
for index, rec_title in enumerate(rec_items_titles_list): | |
row["rec_item{}_name".format(index+1)] = rec_title | |
row["rec_item{}_brand".format(index+1)] = rec_items_brands_list[index] | |
row["rec_item{}_price".format(index+1)] = rec_items_prices_list[index] | |
row["rec_item{}_asin".format(index+1)] = rec_items_asins_list[index] | |
print("writing row : {}".format(index_main+1)) | |
writer.writerow(row) | |
def create_rec_items_header(names,brands,prices,asins): | |
result = [] | |
for i in range(0,30): | |
result.append(names[i]) | |
result.append(brands[i]) | |
result.append(prices[i]) | |
result.append(asins[i]) | |
return result | |
def initiate_webdriver(href,item_raw_links): | |
print("initiate_webdriver for : {}".format(href)) | |
result = [] | |
result.extend(item_raw_links) | |
driver = webdriver.Chrome(CHROME_WEBDRIVER_PATH) | |
driver.implicitly_wait(10) | |
driver.get(href) | |
print("wait...... fetching data") | |
driver.execute_script("window.scrollTo(0, 1200);") | |
time.sleep(2) | |
next_button = None | |
element = None | |
second_case = False | |
try: | |
next_button = driver.find_element_by_css_selector("#purchase-sims-feature a.a-carousel-goto-nextpage") | |
except: | |
print("get next_button after scrolling to 3000px more!!") | |
second_case = True | |
driver.execute_script("window.scrollTo(0, 3000);") | |
time.sleep(2) | |
next_button = driver.find_element_by_css_selector("#day0-sims-feature a.a-carousel-goto-nextpage") | |
print("next_button_raw_soup : {}".format(next_button)) | |
for i in range(6,80,5): | |
if len(result) > 95: | |
print("fetched 40 rec items") | |
break | |
try : | |
print("click executed for {}".format(i)) | |
next_button.click() | |
time.sleep(3) | |
html = driver.page_source | |
soup = BeautifulSoup(html,"html5lib") | |
if second_case: | |
recommended_items_raw = soup.select("#day0-sims-feature .a-carousel-viewport li a") | |
else : | |
recommended_items_raw = soup.select("#purchase-sims-feature .a-carousel-viewport li a") | |
print("fetched recommended_items using webdriver: {}".format(len(recommended_items_raw))) | |
result.extend(recommended_items_raw) | |
time.sleep(2) | |
except: | |
print("next_button is click failed - so try again") | |
time.sleep(3) | |
try: | |
next_button = driver.find_element_by_css_selector("#purchase-sims-feature a.a-carousel-goto-nextpage") | |
except: | |
second_case = True | |
driver.execute_script("window.scrollTo(0, 3000);") | |
time.sleep(2) | |
next_button = driver.find_element_by_css_selector("#day0-sims-feature a.a-carousel-goto-nextpage") | |
driver.quit() | |
print("initiate_webdriver results : {}".format(len(result))) | |
return result | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment