Created
July 17, 2023 10:47
-
-
Save lobstrio/b5d7cfd94361c681708a55eb51198ff7 to your computer and use it in GitHub Desktop.
π Collect all listings from Yelp from a Search URL e.g. https://www.yelp.fr/search?find_desc=Pizza&find_loc=marseille β phones included!
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import csv | |
from lxml import html | |
import argparse | |
import time | |
class YelpSearchScraper: | |
def iter_listings(self, url): | |
response = requests.get(url) | |
if response.status_code != 200: | |
print("Error: Failed to fetch the URL") | |
return None | |
with open('response.html', 'w') as f: | |
f.write(response.text) | |
tree = html.fromstring(response.content) | |
scraped_data = [] | |
businesses = tree.xpath('//div[contains(@class, "container__09f24__mpR8_") and contains(@class, "hoverable__09f24__wQ_on") and contains(@class, "border-color--default__09f24__NPAKY")]') | |
for business in businesses: | |
data = {} | |
name_element = business.xpath('.//h3[contains(@class, "css-1agk4wl")]/span/a') | |
if name_element: | |
data['Name'] = name_element[0].text.strip() | |
data['URL'] = "https://www.yelp.com" + name_element[0].get('href') | |
rating_element = business.xpath('.//div[contains(@aria-label, "star rating")]') | |
if rating_element: | |
rating_value = rating_element[0].get('aria-label').split()[0] | |
if rating_value != 'Slideshow': | |
data['Rating'] = float(rating_value) | |
else: | |
data['Rating'] = None | |
reviews_element = business.xpath('.//span[contains(@class, "css-chan6m")]') | |
if reviews_element: | |
reviews_text = reviews_element[0].text | |
if reviews_text: | |
reviews_text = reviews_text.strip().split()[0] | |
if reviews_text.isnumeric(): | |
data['Reviews'] = int(reviews_text) | |
else: | |
data['Reviews'] = None | |
price_element = business.xpath('.//span[contains(@class, "priceRange__09f24__mmOuH")]') | |
if price_element: | |
data['Price Range'] = price_element[0].text.strip() | |
# ok getting proper xpath | |
categories_element = business.xpath('.//span[contains(@class, "css-11bijt4")]') | |
if categories_element: | |
data['Categories'] = ", ".join([c.text for c in categories_element]) | |
neighborhood_element = business.xpath('.//p[@class="css-dzq7l1"]/span[contains(@class, "css-chan6m")]') | |
if neighborhood_element: | |
neighborhood_text = neighborhood_element[0].text | |
if neighborhood_text: | |
data['Neighborhood'] = neighborhood_text.strip() | |
assert data | |
scraped_data.append(data) | |
return scraped_data | |
def save_to_csv(self, data, filename): | |
keys = data[0].keys() | |
with open(filename, 'w', newline='', encoding='utf-8-sig') as csvfile: | |
writer = csv.DictWriter(csvfile, fieldnames=keys, extrasaction='ignore') | |
writer.writeheader() | |
writer.writerows(data) | |
print("Success! \nData written to CSV file:", filename) | |
def scrape_results(self, search_url, max_page): | |
all_results = [] | |
for page in range(1, max_page): | |
page_url = search_url + f'&start={(page-1)*10}' | |
print(f"Scraping Page {page}") | |
results = self.iter_listings(page_url) | |
if results: | |
all_results.extend(results) | |
time.sleep(2) | |
return all_results | |
def main(): | |
s = time.perf_counter() | |
argparser = argparse.ArgumentParser() | |
argparser.add_argument('--search-url', '-u', type=str, required=False, help='Yelp search URL', default='https://www.yelp.com/search?find_desc=Burgers&find_loc=London') | |
argparser.add_argument('--max-page', '-p', type=int, required=False, help='Max page to visit', default=5) | |
args = argparser.parse_args() | |
search_url = args.search_url | |
max_page = args.max_page | |
assert all([search_url, max_page]) | |
scraper = YelpSearchScraper() | |
results = scraper.scrape_results(search_url, max_page) | |
if results: | |
scraper.save_to_csv(results, 'yelp_search_results.csv') | |
else: | |
print("No results to save to CSV") | |
elapsed = time.perf_counter() - s | |
elapsed_formatted = "{:.2f}".format(elapsed) | |
print("Elapsed time:", elapsed_formatted, "seconds") | |
print('''~~ success | |
_ _ _ | |
| | | | | | | |
| | ___ | |__ ___| |_ __ __ | |
| |/ _ \| '_ \/ __| __/| '__| | |
| | (_) | |_) \__ \ |_ | | | |
|_|\___/|_.__/|___/\__||_| | |
''') | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment