Created
July 8, 2024 23:50
-
-
Save 0187773933/149402485d1ab5ad922b643b4c5458ad to your computer and use it in GitHub Desktop.
Google Places Search
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
from pprint import pprint | |
import json | |
import csv | |
import folium | |
import time | |
from folium.plugins import MarkerCluster | |
API_KEY = 'asdf' | |
def write_json( file_path , python_object ): | |
with open( file_path , 'w', encoding='utf-8' ) as f: | |
json.dump( python_object , f , ensure_ascii=False , indent=4 ) | |
def read_json( file_path ): | |
with open( file_path ) as f: | |
return json.load( f ) | |
def write_csv( filename , results): | |
# fieldnames = ['id', 'nationalPhoneNumber', 'internationalPhoneNumber', 'formattedAddress', 'latitude', 'longitude', 'rating', 'websiteUri', 'userRatingCount', 'displayName'] | |
fieldnames = [ 'Name' , 'Phone Number' , 'Address' , 'Rating' , 'Total Reviews' , 'Website' ] | |
with open( filename , mode='w', newline='', encoding='utf-8') as csvfile: | |
writer = csv.DictWriter(csvfile, fieldnames=fieldnames) | |
writer.writeheader() | |
for result in results: | |
row = { | |
'displayName': result['displayName']['text'] , | |
'nationalPhoneNumber': result.get('nationalPhoneNumber'), | |
'formattedAddress': result.get('formattedAddress'), | |
'rating': result.get('rating'), | |
'userRatingCount': result.get('userRatingCount'), | |
'websiteUri': result.get('websiteUri'), | |
} | |
writer.writerow(row) | |
def create_map(map_filename , data ): | |
# Filter out entries without location data | |
valid_data = [place for place in data if 'location' in place and 'latitude' in place['location'] and 'longitude' in place['location']] | |
if not valid_data: | |
print("No valid location data found.") | |
return | |
# Initialize the map centered around the first valid result | |
first_location = valid_data[0]['location'] | |
folium_map = folium.Map(location=[first_location['latitude'], first_location['longitude']], zoom_start=13) | |
# Add marker cluster to the map | |
marker_cluster = MarkerCluster().add_to(folium_map) | |
# Add each place to the map with a marker and popup (excluding review text) | |
for place in valid_data: | |
location = place['location'] | |
name = place.get('displayName', {}).get('text', 'No name') | |
address = place.get('formattedAddress', 'No address') | |
rating = place.get('rating', 'No rating') | |
user_ratings_count = place.get('userRatingCount', 'No ratings count') | |
popup_content = f""" | |
<strong>{name}</strong><br> | |
Address: {address}<br> | |
Rating: {rating}<br> | |
User Ratings Count: {user_ratings_count} | |
""" | |
folium.Marker( | |
location=[location['latitude'], location['longitude']], | |
popup=folium.Popup(popup_content, max_width=300), | |
icon=folium.Icon(icon='info-sign') | |
).add_to(marker_cluster) | |
# Save the map to an HTML file | |
folium_map.save(map_filename) | |
def nearby_search(location, radius, max_results, included_types): | |
url = "https://maps.googleapis.com/v1/places:searchNearby" | |
params = { | |
"locationRestriction.circle.center.latitude": location['latitude'], | |
"locationRestriction.circle.center.longitude": location['longitude'], | |
"locationRestriction.circle.radius": radius, | |
"maxResultCount": max_results, | |
"includedTypes": included_types | |
} | |
headers = { | |
"Content-Type": "application/json", | |
"X-Goog-Api-Key": API_KEY, | |
"X-Goog-FieldMask": "places.displayName" | |
} | |
response = requests.post(url, json=params, headers=headers) | |
response.raise_for_status() | |
return response.json() | |
def place_details_search(place_id): | |
url = f"https://places.googleapis.com/v1/places/{place_id}" | |
headers = { | |
"Content-Type": "application/json", | |
"X-Goog-Api-Key": API_KEY, | |
"X-Goog-FieldMask": "id,displayName" | |
} | |
response = requests.get(url, headers=headers) | |
response.raise_for_status() | |
return response.json() | |
def text_search_new(text_query, location, radius=50000.0): | |
url = "https://places.googleapis.com/v1/places:searchText" | |
headers = { | |
"Content-Type": "application/json", | |
"X-Goog-Api-Key": API_KEY, | |
"X-Goog-FieldMask": "nextPageToken,places.reviews,places.id,places.rating,places.userRatingCount,places.nationalPhoneNumber,places.internationalPhoneNumber,places.websiteUri,places.displayName,places.formattedAddress,places.priceLevel,places.location" | |
} | |
data = { | |
"textQuery": text_query, | |
"pageSize": 20, | |
"rankPreference": "RELEVANCE", | |
"locationBias": { | |
"circle": { | |
"center": { | |
"latitude": location['latitude'], | |
"longitude": location['longitude'] | |
}, | |
"radius": radius | |
} | |
} | |
} | |
all_places = [] | |
next_page_token = None | |
page_token = None | |
while True: | |
if next_page_token: | |
data["pageToken"] = page_token | |
response = requests.post(url, json=data, headers=headers) | |
response.raise_for_status() | |
result = response.json() | |
places = result.get('places', []) | |
all_places.extend(places) | |
# Debug statements | |
if len( places ) == 0: | |
break | |
print(f"Retrieved {len(places)} places, total so far: {len(all_places)}") | |
next_page_token = result.get('nextPageToken') | |
if next_page_token: | |
if next_page_token != page_token: | |
page_token = next_page_token | |
print(f"Next page token: {next_page_token}") | |
time.sleep( 1 ) | |
else: | |
print("No more pages.") | |
break | |
else: | |
print("No more pages.") | |
break | |
return all_places | |
# https://developers.google.com/maps/documentation/places/web-service/supported_types | |
def text_search(text_query, location, radius=5000, rankby="prominence"): | |
url = "https://maps.googleapis.com/maps/api/place/nearbysearch/json" | |
query = { | |
"keyword": text_query, | |
"location": f"{location['latitude']},{location['longitude']}", | |
"radius": radius, | |
"rankby": rankby, | |
"key": API_KEY, | |
} | |
headers = { | |
"Content-Type": "application/json", | |
} | |
all_results = [] | |
while True: | |
response = requests.get(url, params=query, headers=headers) | |
response.raise_for_status() | |
results = response.json() | |
all_results.extend(results.get('results', [])) | |
print(f"Total results so far: {len(all_results)}") | |
next_page_token = results.get('next_page_token') | |
if not next_page_token or len(all_results) >= 60: | |
break | |
print(f"Next page token: {next_page_token}") | |
query['pagetoken'] = next_page_token | |
print("Waiting for next page token to become valid...") | |
time.sleep(2) # Required delay to let the next_page_token become valid | |
return all_results | |
def filter_unique_ids( results ): | |
seen_ids = set() | |
unique = [] | |
for result in results: | |
if result["id"] not in seen_ids: | |
unique.append( result ) | |
seen_ids.add( result[ "id" ] ) | |
return unique | |
if __name__ == "__main__": | |
search_term = "endodontist general anesthesia" | |
save_file_name = "test-3" | |
cities = [ | |
{ "name": "Huber Heights, Ohio" , "location": { 'latitude': 39.890783 , 'longitude': -84.095164 } } , | |
{ "name": "Cincinnati, Ohio" , "location": { 'latitude': 39.140772 , 'longitude': -84.504651 } } , | |
{ "name": "Columbus, Ohio" , "location": { 'latitude': 39.967962 , 'longitude': -83.004995 } } , | |
{ "name": "Cleveland, Ohio" , "location": { 'latitude': 41.485341 , 'longitude': -81.683451 } } , | |
{ "name": "Toledo, Ohio" , "location": { 'latitude': 41.647074 , 'longitude': -83.522599 } } , | |
] | |
results = [] | |
for city in cities: | |
results.extend( text_search_new( f"{search_term} in {city['name']}" , city['location'] , radius=50000.0 ) ) | |
results = filter_unique_ids( results ) | |
sorted_results = sorted( results , key=lambda x: ( -x.get( 'rating' , 0 ) , -x.get( 'userRatingCount' , 0 ) ) ) | |
pprint( sorted_results ) | |
print( len( sorted_results ) ) | |
write_json( f"{save_file_name}.json" , sorted_results ) | |
write_csv( f"{save_file_name}.csv" , sorted_results ) | |
create_map( f"{save_file_name}.html" , sorted_results ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment