Created
February 12, 2025 09:18
-
-
Save spinningcat/092e5b79c701a7defe42afab70c60b70 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import pandas as pd | |
import requests | |
# Constants | |
API_URL = "https://nominatim.openstreetmap.org/search" | |
HEADERS = {"User-Agent": "MyPythonApp/1.0 ([email protected])"} | |
CSV_FILE = "neighhbourdata.csv" | |
MAX_ROWS_TO_PROCESS = 10 | |
API_DELAY = 5 # Delay in seconds to avoid hitting API rate limits | |
# Function to clean and format strings | |
def clean_string(text): | |
""" | |
Cleans the input string based on specific rules: | |
1. Trims whitespace. | |
2. If it contains "Köyü", returns "Mahalle". | |
3. Removes parentheses and anything after them. | |
""" | |
text = text.strip() | |
if "Köyü" in text: | |
return "Mahalle" | |
if "(" in text: | |
text = text.split("(")[0].strip() | |
return text | |
# Function to format the query for the OpenStreetMap API | |
def format_query(city_name, district_name, neighbourhood): | |
""" | |
Formats the query by cleaning city, district, and neighborhood names and combining them. | |
""" | |
city_name = clean_string(city_name) | |
district_name = clean_string(district_name) | |
neighbourhood = clean_string(neighbourhood) | |
return f'{city_name}, {district_name}, {neighbourhood}' | |
# Function to fetch latitude and longitude from the OpenStreetMap API | |
def fetch_lat_long(query): | |
""" | |
Fetches latitude and longitude from the OpenStreetMap API for the given query. | |
Returns a tuple (lat, lon, is_empty) where is_empty is True if no data is found. | |
If multiple results are returned, it prioritizes the result with osm_type "node". | |
""" | |
url = f'{API_URL}?q={query}&format=json' | |
response = requests.get(url, headers=HEADERS) | |
if response.status_code == 200: | |
data = response.json() | |
if data: # If data is not empty | |
# Prioritize osm_type "node" | |
node_data = next((item for item in data if item["osm_type"] == "node"), None) | |
if node_data: | |
return node_data["lat"], node_data["lon"], False | |
else: | |
# If no "node" type is found, return the first result | |
return data[0]["lat"], data[0]["lon"], False | |
else: | |
return "", "", True # Return empty values and is_empty=True | |
else: | |
print(f"Failed to fetch data for: {query}") | |
return "", "", True # Return empty values and is_empty=True | |
# Function to process a single row of the DataFrame | |
def process_row(df, index, row): | |
""" | |
Processes a single row of the DataFrame by fetching latitude and longitude from the API | |
and updating the DataFrame with the results. | |
""" | |
query = format_query(row["CityName"], row["DistrictName"], row["neighbourhood"]) | |
print(f"Processing row {index}: {query}") | |
lat, lon, is_empty = fetch_lat_long(query) | |
# Update the DataFrame with the results | |
df.at[index, 'Lat'] = lat | |
df.at[index, 'Long'] = lon | |
df.at[index, 'Read'] = True # Mark the row as read | |
df.at[index, 'URL'] = f'{API_URL}?q={query}&format=json' | |
df.at[index, 'Emoty'] = is_empty | |
if not is_empty: | |
print(f"Processed row {index}: {query} -> Lat: {lat}, Lon: {lon}") | |
else: | |
print(f"No data found for row {index}: {query}") | |
# Main function to process the DataFrame | |
def main(): | |
# Read the CSV file | |
df = pd.read_csv(CSV_FILE) | |
# Counter to keep track of the number of rows processed | |
counter = 0 | |
# Iterate over the rows of the DataFrame | |
for index, row in df.iterrows(): | |
print(f"Processing row {counter}") | |
if row["Read"] == True: | |
print("Row already processed (True line)") | |
continue # Skip already processed rows | |
if row["Read"] == False: | |
print("Row not processed yet (False line)") | |
if counter >= MAX_ROWS_TO_PROCESS: # Stop after processing the specified number of rows | |
print(f"Stopping after processing {MAX_ROWS_TO_PROCESS} rows.") | |
break | |
# Process the row | |
process_row(df, index, row) | |
counter += 1 | |
# Add a delay to avoid hitting API rate limits | |
time.sleep(API_DELAY) | |
# Save the updated DataFrame to a new CSV file (or overwrite the existing one) | |
df.to_csv(CSV_FILE, index=False) | |
# Print the updated DataFrame | |
print("\nUpdated DataFrame:") | |
print(df) | |
# Run the main function | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment