Created
March 3, 2025 06:17
-
-
Save spinningcat/2993b39b35869a81af13b3c5ff14b01b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import pandas as pd | |
import requests | |
# Constants | |
API_URL = "https://nominatim.openstreetmap.org/search" | |
HEADERS = {"User-Agent": "MyPythonApp/1.0 ([email protected])"} | |
CSV_FILE = "neighbourdata.csv" | |
MAX_ROWS_TO_PROCESS = 70000 | |
API_DELAY = 1 # Delay in seconds to avoid hitting API rate limits | |
# Function to fetch latitude and longitude from the OpenStreetMap API | |
def fetch_lat_long(query): | |
""" | |
Fetches latitude and longitude from the OpenStreetMap API for the given query. | |
Returns a tuple (lat, lon, is_empty) where is_empty is True if no data is found. | |
If multiple results are returned, it prioritizes the result with osm_type "node". | |
""" | |
try: | |
url = f'{API_URL}?q={query}&format=json' | |
response = requests.get(url, headers=HEADERS) | |
if response.status_code == 200: | |
data = response.json() | |
if data: # If data is not empty | |
# Prioritize osm_type "node" | |
node_data = next((item for item in data if item["osm_type"] == "node"), None) | |
if node_data: | |
return node_data["lat"], node_data["lon"], False | |
else: | |
# If no "node" type is found, return the first result | |
return data[0]["lat"], data[0]["lon"], False | |
else: | |
return "", "", True # Return empty values and is_empty=True | |
else: | |
print(f"Failed to fetch data for: {query}") | |
return "", "", True # Return empty values and is_empty=True | |
except(err): | |
Time.sleep(10000) | |
pass | |
# Function to process a single row of the DataFrame | |
def process_row(df, index, row): | |
""" | |
Processes a single row of the DataFrame by fetching latitude and longitude from the API. | |
It tries different keys until a result is found or all keys are exhausted. | |
""" | |
keys = ['key', 'keywithoutbelde', 'keywithoutkoyu', 'keywithoutmahalle', 'keywithoutmah', 'keywithoutkoy'] | |
for key_name in keys: | |
query = row[key_name] | |
print(f"Processing row {index} with {key_name}: {query}") | |
lat, lon, is_empty = fetch_lat_long(query) | |
if not is_empty: | |
# Update the DataFrame with the results | |
df.at[index, 'Lat'] = lat | |
df.at[index, 'Long'] = lon | |
df.at[index, 'Read'] = True # Mark the row as read | |
df.at[index, 'URL'] = f'{API_URL}?q={query}&format=json' | |
df.at[index, 'Emoty'] = False | |
print(f"Processed row {index} with {key_name}: {query} -> Lat: {lat}, Lon: {lon}") | |
return # Exit the function if a result is found | |
# If all keys return empty results | |
df.at[index, 'Read'] = True | |
df.at[index, 'Emoty'] = True | |
print(f"No data found for row {index} with any key.") | |
# Main function to process the DataFrame | |
def main(): | |
# Read the CSV file | |
df = pd.read_csv(CSV_FILE) | |
# Counter to keep track of the number of rows processed | |
counter = 0 | |
# Iterate over the rows of the DataFrame | |
for index, row in df.iterrows(): | |
print(f"Processing row {counter}") | |
if row.get("Read", False) == True: | |
print("Row already processed (True line)") | |
continue # Skip already processed rows | |
if row.get("Read", False) == False: | |
print("Row not processed yet (False line)") | |
if counter >= MAX_ROWS_TO_PROCESS: # Stop after processing the specified number of rows | |
print(f"Stopping after processing {MAX_ROWS_TO_PROCESS} rows.") | |
break | |
# Process the row | |
process_row(df, index, row) | |
counter += 1 | |
# Save the updated DataFrame to the CSV file after each row is processed | |
df.to_csv(CSV_FILE, index=False) | |
print(f"Saved row {index} to {CSV_FILE}") | |
# Add a delay to avoid hitting API rate limits | |
time.sleep(API_DELAY) | |
# Print the updated DataFrame | |
print("\nFinal DataFrame:") | |
print(df) | |
# Run the main function | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment