Created
June 29, 2022 10:21
-
-
Save ndamulelonemakh/b314437b8908fe5168b767535d9b7a89 to your computer and use it in GitHub Desktop.
Get auto formatted address from google maps
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """Given a file containing a list of places, get the well formatted address containing the country, zip code, iso names etc. | |
| using the Google Maps API | |
| """ | |
| import os | |
| import json | |
| import logging | |
| import traceback | |
| import requests | |
| import pickle | |
| import pycountry | |
| logging.basicConfig(level=logging.DEBUG, filename='locations_retrive.log') | |
| log = logging.getLogger(__name__) | |
| PROCESSED_DIR = '' | |
| _PlacesAPICache = {} | |
| # region Helper methods | |
| def pickle_result(obj, filename): | |
| with open(filename, 'wb') as pfl: | |
| pickle.dump(obj, pfl, protocol=pickle.HIGHEST_PROTOCOL) | |
| def get_unique_values(my_dict): | |
| unique_dict = {} | |
| for k, v in my_dict.items(): | |
| unique_dict[v] = 1 | |
| log.debug('Original dict size={}, Unique dict size={}'.format(len(my_dict), len(unique_dict))) | |
| return list(unique_dict.keys()) | |
| def try_country_search(key): | |
| try: | |
| match = pycountry.countries.search_fuzzy(key) | |
| return True, match | |
| except LookupError: | |
| return False, key | |
| except: | |
| traceback.print_exc() | |
| return False, key | |
| def try_city_search(key): | |
| try: | |
| match = pycountry.subdivisions.lookup(key) | |
| return True, match | |
| except LookupError: | |
| return False, key | |
| except: | |
| traceback.print_exc() | |
| return False, key | |
| def try_google_places_search(key): | |
| try: | |
| if _PlacesAPICache.get(key) is not None: | |
| place = _PlacesAPICache.get(key) | |
| else: | |
| response = requests.get('https://maps.googleapis.com/maps/api/place/textsearch/json', | |
| params={'query': key, 'key': os.environ['API_KEY'], | |
| 'fields': 'fields=formatted_address,types,name,geometry,place_id'}) | |
| place = json.loads(response.text) | |
| if place['status'] == 'OK': | |
| place_id = place['results'][0]['place_id'] # Hoping the first result is the one! | |
| place_address = place['results'][0]['formatted_address'] | |
| place_geo = place['results'][0]['geometry']['location'] | |
| country = place_address.split(',')[-1] | |
| _PlacesAPICache[key] = place | |
| if len(_PlacesAPICache) % 100 == 0: | |
| log.info('_Plcaces Cache size is {}. Need to pickle'.format(len(_PlacesAPICache))) | |
| pickle_result(_PlacesAPICache, 'places_api_cache.pickle') | |
| # print("Place details:") | |
| # print("Place ID=", place_id) | |
| # print("Place Address=", place_address) | |
| # print("Place Geo=", place_geo) | |
| # print("Place Country=", place_address.split(',')[-1]) | |
| # print("Place Country Info=", try_country_search(country)) | |
| found, countries = try_country_search(country) | |
| if found: | |
| return found, countries[0] # trusting that google gave us the right country | |
| return found, key | |
| else: | |
| print("Google maps response not ok", response.text if response is not None else "") | |
| return False, key | |
| except LookupError: | |
| print("No result for {} after all attempts".format(key)) | |
| return False, key | |
| except: | |
| traceback.print_exc() | |
| return False, key | |
| # endregion | |
| def get_standard_locations(unique_locations_list): | |
| skipped = [] | |
| users_country_database = {} | |
| for location in unique_locations_list: | |
| if location in (None, 'unknown'): | |
| skipped.append(location) | |
| print("Skipping", location, "becuase it has an empty value") | |
| continue | |
| location_parts = [str(lc.strip()) for lc in location.split(",")] | |
| print("Processing user=", location, "'s location(s)=", location_parts) | |
| success_count = 0 | |
| batch = 0 | |
| for loc_part in location_parts: | |
| if loc_part in (None, ""): | |
| print("Ignore empty part") | |
| continue | |
| sucess, country_info = try_country_search(loc_part.lower()) | |
| print("\tSearch (country) by", loc_part, "returned=", len(country_info), "country info results") | |
| if sucess and len(country_info) == 1: | |
| users_country_database[location] = country_info[0] | |
| success_count += 1 | |
| break | |
| if success_count == 0: # Last resort, try google api | |
| sucess, country_info = try_google_places_search(location.strip().replace(".", "")) | |
| print("\tSearch (places api) by", location, "returned=", country_info, "country info results") | |
| if sucess: | |
| users_country_database[location] = country_info | |
| success_count += 1 | |
| if success_count == 0: | |
| skipped.append(location) | |
| print("Skiped", location, "beacuase after 3 attampts, no linekd country was found") | |
| if len(users_country_database) % 10 == 0: | |
| log.info('10 records reached, pickling...') | |
| _to_pickle = {k: str(v) for k, v in users_country_database.items()} | |
| pickle_result(_to_pickle, os.path.join(PROCESSED_DIR, 'users_country_db01.pickle')) | |
| print("Found", len(users_country_database), "Country info / ", len(unique_locations_list)) | |
| print("Skipped", len(skipped)) | |
| def main(): | |
| users_id_location_map_file = os.path.join(PROCESSED_DIR, 'users_id_location_map.json') | |
| log.info('Reading raw locations file in ') | |
| with open(users_id_location_map_file, 'r') as jlcm: | |
| users_location = json.load(jlcm) | |
| users_location = [json.loads(loc) for loc in users_location] | |
| users_location_lookup = {d['id']: d['location'] for d in users_location} | |
| unique_locations = get_unique_values(users_location_lookup) | |
| get_standard_locations(unique_locations) | |
| log.info('Location collect completed ok.') | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment