-
-
Save cat768/1f499a566c73d6d0e58172966ce3e467 to your computer and use it in GitHub Desktop.
Download Mapillary images a JPGs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import mercantile | |
import requests | |
import json | |
import os | |
import time | |
import random | |
import asyncio | |
import aiohttp | |
import aiofiles | |
import threading | |
import concurrent.futures | |
import queue | |
import psutil | |
import logging | |
import shutil | |
import signal | |
import sys | |
import argparse | |
from tqdm import tqdm | |
from datetime import datetime | |
from typing import Dict, List, Set, Tuple, Optional | |
class MapillaryDownloader: | |
""" | |
Advanced Mapillary Image Downloader with multi-threading, token rotation, | |
progress tracking, and resumability. | |
""" | |
def __init__(self): | |
# Configuration variables | |
self.tile_coverage = 'mly1_public' | |
self.tile_layer = "image" | |
self.zoom_level = 14 # Must be at zoom level 14 where data is available | |
# Default bounding box if non is provided | |
self.default_bbox = [-80.13423442840576, 25.77376933762778, -80.1264238357544, 25.788608487732198] | |
# Path configurations | |
self.base_dir = "Images" | |
self.temp_dir = os.path.join(self.base_dir, "temp") | |
self.progress_file = os.path.join(self.base_dir, "progress.txt") | |
self.stats_file = os.path.join(self.base_dir, "download_stats.json") | |
self.token_files = ["mapillary_access_tokens.txt", "tokens.txt"] | |
# Runtime variables | |
self.access_tokens = [] | |
self.token_usage = {} | |
self.token_cooldowns = {} | |
self.downloaded_images = set() | |
self.processed_tiles = set() | |
self.current_tile_features = 0 | |
self.total_features_downloaded = 0 | |
self.start_time = None | |
self.is_running = False | |
# Concurrency settings | |
self.max_workers = min(32, os.cpu_count() * 2) | |
self.connection_pool_size = min(100, self.max_workers * 4) | |
self.semaphore = None # Will be initialized in run() | |
# Setup logging | |
self.setup_logging() | |
def setup_logging(self): | |
"""Configure logging with appropriate format and level.""" | |
# Ensure the base directory exists for log file | |
os.makedirs(self.base_dir, exist_ok=True) | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.FileHandler(os.path.join(self.base_dir, "downloader.log")), | |
logging.StreamHandler() | |
] | |
) | |
self.logger = logging.getLogger(__name__) | |
def load_tokens(self): | |
"""Load access tokens from token files with fallbacks.""" | |
for token_file in self.token_files: | |
if os.path.exists(token_file): | |
try: | |
with open(token_file, 'r') as f: | |
tokens = [line.strip() for line in f if line.strip()] | |
self.access_tokens.extend(tokens) | |
self.logger.info(f"Loaded {len(tokens)} tokens from {token_file}") | |
except Exception as e: | |
self.logger.error(f"Error loading tokens from {token_file}: {e}") | |
# Initialize token usage tracking | |
for token in self.access_tokens: | |
self.token_usage[token] = 0 | |
self.token_cooldowns[token] = 0 | |
if not self.access_tokens: | |
self.logger.error("No access tokens found! Please provide at least one token.") | |
raise ValueError("No access tokens available") | |
def select_token(self): | |
"""Select the most optimal token based on usage and cooldown status.""" | |
available_tokens = [t for t in self.access_tokens if time.time() > self.token_cooldowns.get(t, 0)] | |
if not available_tokens: | |
# If all tokens are in cooldown, choose the one with earliest expiry | |
next_available = min(self.token_cooldowns.values()) | |
cooldown_wait = next_available - time.time() | |
self.logger.warning(f"All tokens are rate-limited. Waiting {cooldown_wait:.2f} seconds...") | |
time.sleep(min(60, cooldown_wait)) # Wait but not more than 60 seconds | |
return self.select_token() # Try again | |
# Select token with lowest usage | |
selected_token = min(available_tokens, key=lambda t: self.token_usage[t]) | |
self.token_usage[selected_token] += 1 | |
return selected_token | |
def mark_token_cooldown(self, token, seconds=60): | |
"""Mark a token as being in cooldown (rate limited).""" | |
self.token_cooldowns[token] = time.time() + seconds | |
self.logger.warning(f"Token put in cooldown for {seconds} seconds due to rate limiting") | |
def ensure_directories(self): | |
"""Create necessary directories if they don't exist.""" | |
os.makedirs(self.base_dir, exist_ok=True) | |
os.makedirs(self.temp_dir, exist_ok=True) | |
def load_progress(self): | |
"""Load progress data from previous runs.""" | |
if os.path.exists(self.progress_file): | |
try: | |
with open(self.progress_file, 'r') as f: | |
for line in f: | |
parts = line.strip().split(',') | |
if len(parts) >= 2: | |
if parts[0] == "tile": | |
# Format: tile,x,y,z | |
self.processed_tiles.add((int(parts[1]), int(parts[2]), int(parts[3]))) | |
elif parts[0] == "image": | |
# Format: image,image_id | |
self.downloaded_images.add(parts[1]) | |
self.logger.info(f"Resumed progress: {len(self.processed_tiles)} tiles, {len(self.downloaded_images)} images") | |
except Exception as e: | |
self.logger.error(f"Error loading progress: {e}") | |
# Load statistics | |
if os.path.exists(self.stats_file): | |
try: | |
with open(self.stats_file, 'r') as f: | |
stats = json.load(f) | |
self.total_features_downloaded = stats.get("total_features_downloaded", 0) | |
self.logger.info(f"Loaded previous statistics: {self.total_features_downloaded} total features") | |
except Exception as e: | |
self.logger.error(f"Error loading statistics: {e}") | |
def save_progress(self, tile=None, image_id=None): | |
"""Save progress information for resumability.""" | |
try: | |
with open(self.progress_file, 'a') as f: | |
if tile: | |
f.write(f"tile,{tile.x},{tile.y},{tile.z}\n") | |
if image_id: | |
f.write(f"image,{image_id}\n") | |
except Exception as e: | |
self.logger.error(f"Error saving progress: {e}") | |
def save_statistics(self): | |
"""Save download statistics to JSON file.""" | |
try: | |
elapsed = time.time() - self.start_time if self.start_time else 0 | |
stats = { | |
"total_features_downloaded": self.total_features_downloaded, | |
"processed_tiles": len(self.processed_tiles), | |
"downloaded_images": len(self.downloaded_images), | |
"elapsed_time": elapsed, | |
"download_rate": self.total_features_downloaded / elapsed if elapsed > 0 else 0, | |
"tokens_usage": self.token_usage, | |
"timestamp": datetime.now().isoformat() | |
} | |
with open(self.stats_file, 'w') as f: | |
json.dump(stats, f, indent=2) | |
except Exception as e: | |
self.logger.error(f"Error saving statistics: {e}") | |
async def fetch_tile_data(self, session, tile, token): | |
"""Fetch vector tile data from Mapillary API.""" | |
tile_url = f'https://tiles.mapillary.com/maps/vtp/{self.tile_coverage}/2/{tile.z}/{tile.x}/{tile.y}?access_token={token}' | |
for attempt in range(3): # Retry up to 3 times | |
try: | |
async with session.get(tile_url) as response: | |
if response.status == 200: | |
# Use a synchronous call for vt_bytes_to_geojson as it's CPU-bound | |
loop = asyncio.get_event_loop() | |
content = await response.read() | |
# This is CPU-bound, so use ThreadPoolExecutor | |
with concurrent.futures.ThreadPoolExecutor() as pool: | |
data = await loop.run_in_executor( | |
pool, | |
lambda: self.vt_bytes_to_geojson_wrapper(content, tile.x, tile.y, tile.z) | |
) | |
return data | |
elif response.status == 429: # Rate limiting | |
self.mark_token_cooldown(token, 60) | |
new_token = self.select_token() | |
self.logger.warning(f"Rate limited. Retrying with new token: attempt {attempt+1}/3") | |
token = new_token | |
else: | |
self.logger.warning(f"Tile request failed with status {response.status}. Attempt {attempt+1}/3") | |
await asyncio.sleep(2 ** attempt) # Exponential backoff | |
except Exception as e: | |
self.logger.error(f"Error fetching tile {tile.x}/{tile.y}/{tile.z}: {str(e)}") | |
await asyncio.sleep(2 ** attempt) # Exponential backoff | |
self.logger.error(f"Failed to fetch tile {tile.x}/{tile.y}/{tile.z} after 3 attempts") | |
return {"features": []} # Return empty feature list if all attempts fail | |
def vt_bytes_to_geojson_wrapper(self, content, x, y, z): | |
"""Wrapper for vt_bytes_to_geojson to handle imports and errors.""" | |
try: | |
# Import here to avoid potential import errors at module level | |
from vt2geojson.tools import vt_bytes_to_geojson | |
return vt_bytes_to_geojson(content, x, y, z, layer=self.tile_layer) | |
except Exception as e: | |
self.logger.error(f"Error converting tile to GeoJSON: {e}") | |
return {"features": []} | |
async def fetch_image_url(self, session, image_id, token): | |
"""Fetch the URL for a specific image ID from Mapillary API.""" | |
url = f'https://graph.mapillary.com/{image_id}?fields=thumb_2048_url' | |
headers = {'Authorization': f'OAuth {token}'} | |
for attempt in range(3): # Retry up to 3 times | |
try: | |
async with session.get(url, headers=headers) as response: | |
if response.status == 200: | |
data = await response.json() | |
return data.get('thumb_2048_url') | |
elif response.status == 429: # Rate limiting | |
self.mark_token_cooldown(token, 60) | |
new_token = self.select_token() | |
self.logger.warning(f"Rate limited on image URL. Retrying with new token: attempt {attempt+1}/3") | |
token = new_token | |
headers = {'Authorization': f'OAuth {token}'} | |
else: | |
self.logger.warning(f"Image URL request failed with status {response.status}. Attempt {attempt+1}/3") | |
await asyncio.sleep(2 ** attempt) # Exponential backoff | |
except Exception as e: | |
self.logger.error(f"Error fetching image URL for {image_id}: {str(e)}") | |
await asyncio.sleep(2 ** attempt) # Exponential backoff | |
self.logger.error(f"Failed to fetch image URL for {image_id} after 3 attempts") | |
return None | |
async def download_image(self, session, image_url, tile, sequence_id, image_id): | |
"""Download an image from its URL.""" | |
if image_id in self.downloaded_images: | |
return True # Skip if already downloaded | |
# Create a unique tile ID for directory structure | |
tile_id = f"{tile.z}_{tile.x}_{tile.y}" | |
# Create tile directory if it doesn't exist | |
tile_dir = os.path.join(self.base_dir, tile_id) | |
os.makedirs(tile_dir, exist_ok=True) | |
# Create sequence directory inside tile directory if it doesn't exist | |
sequence_dir = os.path.join(tile_dir, sequence_id) | |
os.makedirs(sequence_dir, exist_ok=True) | |
# Prepare temporary and final paths | |
temp_path = os.path.join(self.temp_dir, f"{image_id}.jpg.tmp") | |
final_path = os.path.join(sequence_dir, f"{image_id}.jpg") | |
if os.path.exists(final_path): | |
self.downloaded_images.add(image_id) | |
return True # Skip if file already exists | |
for attempt in range(3): # Retry up to 3 times | |
try: | |
async with session.get(image_url, timeout=30) as response: | |
if response.status == 200: | |
# Stream the download to temporary file | |
async with aiofiles.open(temp_path, 'wb') as f: | |
async for chunk in response.content.iter_chunked(8192): | |
await f.write(chunk) | |
# Move temp file to final location (atomic operation) | |
shutil.move(temp_path, final_path) | |
# Update progress | |
self.downloaded_images.add(image_id) | |
self.save_progress(image_id=image_id) | |
self.total_features_downloaded += 1 | |
return True | |
else: | |
self.logger.warning(f"Image download failed with status {response.status}. Attempt {attempt+1}/3") | |
await asyncio.sleep(2 ** attempt) # Exponential backoff | |
except Exception as e: | |
self.logger.error(f"Error downloading image {image_id}: {str(e)}") | |
if os.path.exists(temp_path): | |
try: | |
os.remove(temp_path) # Clean up partial download | |
except: | |
pass | |
await asyncio.sleep(2 ** attempt) # Exponential backoff | |
self.logger.error(f"Failed to download image {image_id} after 3 attempts") | |
return False | |
async def process_feature(self, session, feature, tile, west, south, east, north, pbar): | |
"""Process a single feature from the tile data.""" | |
# Get coordinates | |
lng = feature['geometry']['coordinates'][0] | |
lat = feature['geometry']['coordinates'][1] | |
# Ensure feature falls inside bounding box | |
if lng > west and lng < east and lat > south and lat < north: | |
# Get feature properties | |
sequence_id = feature['properties']['sequence_id'] | |
image_id = feature['properties']['id'] | |
# Skip if already downloaded | |
if image_id in self.downloaded_images: | |
return True | |
# Use semaphore to limit concurrent API calls | |
async with self.semaphore: | |
token = self.select_token() | |
image_url = await self.fetch_image_url(session, image_id, token) | |
if image_url: | |
success = await self.download_image(session, image_url, tile, sequence_id, image_id) | |
if success: | |
pbar.update(1) | |
return success | |
return False | |
async def process_tile(self, session, tile, bbox, overall_pbar, tiles_pbar): | |
"""Process a single tile, extracting and downloading all features.""" | |
west, south, east, north = bbox | |
# Skip if already processed | |
tile_key = (tile.x, tile.y, tile.z) | |
if tile_key in self.processed_tiles: | |
tiles_pbar.update(1) | |
return 0 | |
# Get a token for this request | |
token = self.select_token() | |
# Fetch tile data | |
tile_data = await self.fetch_tile_data(session, tile, token) | |
features = tile_data.get('features', []) | |
# Create a progress bar for this tile's features | |
with tqdm(total=len(features), desc=f"Tile {tile.x}/{tile.y}", leave=False) as pbar: | |
self.current_tile_features = len(features) | |
# Create tasks for processing each feature | |
tasks = [] | |
for feature in features: | |
task = self.process_feature(session, feature, tile, west, south, east, north, pbar) | |
tasks.append(task) | |
# Process all features | |
if tasks: | |
results = await asyncio.gather(*tasks, return_exceptions=True) | |
successful = sum(1 for r in results if r is True) | |
else: | |
successful = 0 | |
# Mark tile as processed | |
self.processed_tiles.add(tile_key) | |
self.save_progress(tile=tile) | |
tiles_pbar.update(1) | |
return successful | |
async def download_region(self, bbox=None): | |
"""Download all Mapillary images from a specified region.""" | |
# Use default bbox if none provided | |
if bbox is None: | |
bbox = self.default_bbox | |
self.logger.info(f"Using default bounding box for somewhere in Miami: {bbox}") | |
west, south, east, north = bbox | |
# Get tiles that intersect with the bounding box | |
tiles = list(mercantile.tiles(west, south, east, north, self.zoom_level)) | |
self.logger.info(f"Found {len(tiles)} tiles intersecting the bounding box") | |
# Initialize progress bars | |
with tqdm(total=len(tiles), desc="Overall Progress", position=0) as overall_pbar, \ | |
tqdm(total=len(tiles), desc="Completed Tiles", position=1) as tiles_pbar: | |
# Skip already processed tiles in progress bar | |
already_processed = sum(1 for tile in tiles if (tile.x, tile.y, tile.z) in self.processed_tiles) | |
tiles_pbar.update(already_processed) | |
# Calculate remaining tiles | |
remaining_tiles = [tile for tile in tiles if (tile.x, tile.y, tile.z) not in self.processed_tiles] | |
# Setup aiohttp session with connection pooling | |
conn = aiohttp.TCPConnector(limit=self.connection_pool_size, ttl_dns_cache=300) | |
timeout = aiohttp.ClientTimeout(total=3600) # 1 hour timeout for the entire session | |
async with aiohttp.ClientSession(connector=conn, timeout=timeout) as session: | |
# Process tiles one by one to ensure full completion before moving on | |
for tile in remaining_tiles: | |
if not self.is_running: | |
self.logger.info("Download stopped by user") | |
break | |
successful = await self.process_tile(session, tile, bbox, overall_pbar, tiles_pbar) | |
overall_pbar.update(1) | |
# Save statistics periodically | |
if random.random() < 0.1: # ~10% chance to save stats on each tile | |
self.save_statistics() | |
# Final statistics save | |
self.save_statistics() | |
self.logger.info(f"Download completed. Total features: {self.total_features_downloaded}") | |
def signal_handler(self, sig, frame): | |
"""Handle interrupt signals for clean exit.""" | |
self.is_running = False | |
self.logger.info("Received interrupt signal. Exiting...") | |
self.save_statistics() # Save stats before exiting | |
print("\nDownload terminated by user. Exiting...") | |
sys.exit(0) # Exit immediately | |
def run(self, bbox=None): | |
"""Main entry point to run the downloader.""" | |
try: | |
# Setup | |
self.ensure_directories() | |
self.load_tokens() | |
self.load_progress() | |
# Configure semaphore for concurrency limiting | |
self.semaphore = asyncio.Semaphore(self.max_workers) | |
# Start timer | |
self.start_time = time.time() | |
self.is_running = True | |
# Setup signal handler for immediate exit | |
signal.signal(signal.SIGINT, self.signal_handler) | |
# Run the main download process | |
try: | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(self.download_region(bbox)) | |
except KeyboardInterrupt: | |
# In case the signal handler doesn't catch it | |
self.logger.info("Download stopped by user") | |
self.is_running = False | |
self.save_statistics() | |
sys.exit(0) | |
# Final cleanup | |
self.save_statistics() | |
elapsed = time.time() - self.start_time | |
self.logger.info(f"Download completed in {elapsed:.2f} seconds") | |
self.logger.info(f"Downloaded {self.total_features_downloaded} images across {len(self.processed_tiles)} tiles") | |
except Exception as e: | |
self.logger.error(f"Error in main execution: {e}") | |
import traceback | |
self.logger.error(traceback.format_exc()) | |
return 1 | |
return 0 | |
def parse_bbox(bbox_str): | |
"""Parse a string bbox into [west, south, east, north] format.""" | |
try: | |
coords = [float(x.strip()) for x in bbox_str.split(',')] | |
if len(coords) != 4: | |
print("Invalid bounding box format. Expected format: west,south,east,north") | |
return None | |
return coords | |
except ValueError: | |
print("Error parsing bounding box. Please provide numeric values in format: west,south,east,north") | |
return None | |
if __name__ == "__main__": | |
# Set up argument parser | |
parser = argparse.ArgumentParser(description='Download Mapillary images for a specified region.') | |
# Add bounding box arguments with both long and short forms | |
parser.add_argument('--boundingbox', '--bbox', '-b', | |
type=str, | |
help='Bounding box coordinates in format: west,south,east,north') | |
# Add optional output directory argument | |
parser.add_argument('--output', '-o', | |
type=str, | |
help='Output directory for downloaded images (default: Images)') | |
# Parse the arguments | |
args = parser.parse_args() | |
# Initialize the downloader | |
downloader = MapillaryDownloader() | |
# Set output directory if specified | |
if args.output: | |
downloader.base_dir = args.output | |
downloader.temp_dir = os.path.join(downloader.base_dir, "temp") | |
downloader.progress_file = os.path.join(downloader.base_dir, "progress.txt") | |
downloader.stats_file = os.path.join(downloader.base_dir, "download_stats.json") | |
# Parse bounding box if provided | |
bbox = None | |
if args.boundingbox: | |
bbox = parse_bbox(args.boundingbox) | |
if bbox: | |
print(f"Using bounding box: {bbox}") | |
else: | |
print("Using default bounding box.") | |
# Run the downloader | |
sys.exit(downloader.run(bbox)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment