Last active
October 10, 2016 05:01
-
-
Save surajkapoor/7862057 to your computer and use it in GitHub Desktop.
The crawler below is configured to pull data from 'http://www.whippingpost.com/collections/all', returning scraped data in the form of nested dictionaries with the product URL as keys, and product data (price, image url, description) as corresponding values. I've displayed an example of the format below. I would like to rewrite the program so th…
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
Data returned in the following format... | |
{ | |
'status': True, | |
'date': 'December 08, 2013, 13:10:41', | |
'data': | |
{ | |
u'http://www.whippingpost.com/collections/all/products/arizona-arena-bifold-picker-s-wallet': | |
{ | |
'price': u'$37.00', 'product': u'Arizona Arena Bifold Picker\u2019s Wallet', | |
'image': u'https://cdn.shopify.com/s/files/1/0043/9252/products/wp-az-pickers-bifold-wallet-1_large.jpg?v=1353262691', | |
'description': u'4 slots for cards Compartment for bills Pocket for guitar pick Lined in pigskin 100% full grain leather'}, | |
... | |
}} | |
''' | |
import sys | |
import urllib | |
import re | |
import requests | |
import tldextract | |
import datetime | |
from bs4 import BeautifulSoup | |
from urlparse import urljoin | |
processors = {} | |
def process_url(url): | |
extract = tldextract.extract(url) | |
stamp = datetime.datetime.now() | |
if extract.domain in processors: | |
return {'status':True, 'date': stamp.strftime("%B %d, %Y, %H:%M:%S"), 'data': processors[extract.domain](url=url).crawl_procedure()} | |
else: | |
return {'status':False, 'Error':"URL not recognized" } | |
class Crawler(object): | |
def __init__(self, url = None, domain = None): | |
self.visited = [] | |
self.URLS = [] | |
self.url = url | |
self.domain = domain | |
self.request = requests.get(self.url) | |
self.soup = BeautifulSoup(self.request.text) | |
def filter_product_urls(self, link): | |
return None | |
def get_product(self): | |
return None | |
def get_image(self): | |
return None | |
def get_price(self): | |
return None | |
def get_description(self): | |
return None | |
def crawl_procedure(self): | |
self.products = {} | |
self.URLS.append(self.url) | |
while len(self.URLS) > 0: | |
try: | |
self.html = requests.get(self.URLS[0]).content | |
except: | |
print self.URLS[0] | |
self.links = self.soup.find_all('a', {'href' : True}) | |
self.URLS.pop(0) | |
print len(self.URLS) | |
for self.link in self.links: | |
self.link = self.link.get('href') | |
self.product_link = self.filter_product_urls(self.link) | |
if self.product_link and self.product_link not in self.visited: | |
self.request = requests.get(self.product_link) | |
self.soup = BeautifulSoup(self.request.text) | |
self.product = self.get_product(self.soup) | |
self.image = self.get_image(self.soup) | |
self.price = self.get_price(self.soup) | |
self.description = self.get_description(self.soup) | |
self.products[self.product_link] = {'product':self.product, 'image':self.image, 'price':self.price, 'description': self.description} | |
self.URLS.append(self.product_link) | |
self.visited.append(self.product_link) | |
return self.products | |
class WhippingPost(Crawler): | |
def filter_product_urls(self, url): | |
product_link = False | |
absolute_link = urljoin('http://www.whippingpost.com/', url) | |
possible_product_link = re.search(r'http://www.whippingpost.com/collections/all/products/\S+', absolute_link) | |
if possible_product_link: | |
product_link = possible_product_link.group() | |
return product_link | |
def get_product(self, link): | |
result = link.find('meta', {'property':'og:title'})['content'] | |
return result if result else None | |
def get_image(self, link): | |
result = 'https:'+link.find('meta', {'property':'og:image'})['content'] | |
return result if result else None | |
def get_description(self, link): | |
result = link.find('meta', {'property':'og:description'})['content'] | |
return result if result else None | |
def get_price(self, link): | |
result = link.find('span', {'class':'current_price'}).text.strip() | |
return result if result else None | |
processors['whippingpost'] = WhippingPost | |
def main(): | |
url = str((sys.argv)[1]) | |
if url and len(sys.argv) == 2: | |
print process_url(url) | |
else: | |
print "error" | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment