Skip to content

Instantly share code, notes, and snippets.

@cat768
Forked from cbeddow/mapillary_jpg_download.py
Last active March 30, 2025 02:55
Show Gist options
  • Save cat768/1f499a566c73d6d0e58172966ce3e467 to your computer and use it in GitHub Desktop.
Save cat768/1f499a566c73d6d0e58172966ce3e467 to your computer and use it in GitHub Desktop.
Download Mapillary images a JPGs
import mercantile
import requests
import json
import os
import time
import random
import asyncio
import aiohttp
import aiofiles
import threading
import concurrent.futures
import queue
import psutil
import logging
import shutil
import signal
import sys
import argparse
from tqdm import tqdm
from datetime import datetime
from typing import Dict, List, Set, Tuple, Optional
class MapillaryDownloader:
"""
Advanced Mapillary Image Downloader with multi-threading, token rotation,
progress tracking, and resumability.
"""
def __init__(self):
# Configuration variables
self.tile_coverage = 'mly1_public'
self.tile_layer = "image"
self.zoom_level = 14 # Must be at zoom level 14 where data is available
# Default bounding box if non is provided
self.default_bbox = [-80.13423442840576, 25.77376933762778, -80.1264238357544, 25.788608487732198]
# Path configurations
self.base_dir = "Images"
self.temp_dir = os.path.join(self.base_dir, "temp")
self.progress_file = os.path.join(self.base_dir, "progress.txt")
self.stats_file = os.path.join(self.base_dir, "download_stats.json")
self.token_files = ["mapillary_access_tokens.txt", "tokens.txt"]
# Runtime variables
self.access_tokens = []
self.token_usage = {}
self.token_cooldowns = {}
self.downloaded_images = set()
self.processed_tiles = set()
self.current_tile_features = 0
self.total_features_downloaded = 0
self.start_time = None
self.is_running = False
# Concurrency settings
self.max_workers = min(32, os.cpu_count() * 2)
self.connection_pool_size = min(100, self.max_workers * 4)
self.semaphore = None # Will be initialized in run()
# Setup logging
self.setup_logging()
def setup_logging(self):
"""Configure logging with appropriate format and level."""
# Ensure the base directory exists for log file
os.makedirs(self.base_dir, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(os.path.join(self.base_dir, "downloader.log")),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def load_tokens(self):
"""Load access tokens from token files with fallbacks."""
for token_file in self.token_files:
if os.path.exists(token_file):
try:
with open(token_file, 'r') as f:
tokens = [line.strip() for line in f if line.strip()]
self.access_tokens.extend(tokens)
self.logger.info(f"Loaded {len(tokens)} tokens from {token_file}")
except Exception as e:
self.logger.error(f"Error loading tokens from {token_file}: {e}")
# Initialize token usage tracking
for token in self.access_tokens:
self.token_usage[token] = 0
self.token_cooldowns[token] = 0
if not self.access_tokens:
self.logger.error("No access tokens found! Please provide at least one token.")
raise ValueError("No access tokens available")
def select_token(self):
"""Select the most optimal token based on usage and cooldown status."""
available_tokens = [t for t in self.access_tokens if time.time() > self.token_cooldowns.get(t, 0)]
if not available_tokens:
# If all tokens are in cooldown, choose the one with earliest expiry
next_available = min(self.token_cooldowns.values())
cooldown_wait = next_available - time.time()
self.logger.warning(f"All tokens are rate-limited. Waiting {cooldown_wait:.2f} seconds...")
time.sleep(min(60, cooldown_wait)) # Wait but not more than 60 seconds
return self.select_token() # Try again
# Select token with lowest usage
selected_token = min(available_tokens, key=lambda t: self.token_usage[t])
self.token_usage[selected_token] += 1
return selected_token
def mark_token_cooldown(self, token, seconds=60):
"""Mark a token as being in cooldown (rate limited)."""
self.token_cooldowns[token] = time.time() + seconds
self.logger.warning(f"Token put in cooldown for {seconds} seconds due to rate limiting")
def ensure_directories(self):
"""Create necessary directories if they don't exist."""
os.makedirs(self.base_dir, exist_ok=True)
os.makedirs(self.temp_dir, exist_ok=True)
def load_progress(self):
"""Load progress data from previous runs."""
if os.path.exists(self.progress_file):
try:
with open(self.progress_file, 'r') as f:
for line in f:
parts = line.strip().split(',')
if len(parts) >= 2:
if parts[0] == "tile":
# Format: tile,x,y,z
self.processed_tiles.add((int(parts[1]), int(parts[2]), int(parts[3])))
elif parts[0] == "image":
# Format: image,image_id
self.downloaded_images.add(parts[1])
self.logger.info(f"Resumed progress: {len(self.processed_tiles)} tiles, {len(self.downloaded_images)} images")
except Exception as e:
self.logger.error(f"Error loading progress: {e}")
# Load statistics
if os.path.exists(self.stats_file):
try:
with open(self.stats_file, 'r') as f:
stats = json.load(f)
self.total_features_downloaded = stats.get("total_features_downloaded", 0)
self.logger.info(f"Loaded previous statistics: {self.total_features_downloaded} total features")
except Exception as e:
self.logger.error(f"Error loading statistics: {e}")
def save_progress(self, tile=None, image_id=None):
"""Save progress information for resumability."""
try:
with open(self.progress_file, 'a') as f:
if tile:
f.write(f"tile,{tile.x},{tile.y},{tile.z}\n")
if image_id:
f.write(f"image,{image_id}\n")
except Exception as e:
self.logger.error(f"Error saving progress: {e}")
def save_statistics(self):
"""Save download statistics to JSON file."""
try:
elapsed = time.time() - self.start_time if self.start_time else 0
stats = {
"total_features_downloaded": self.total_features_downloaded,
"processed_tiles": len(self.processed_tiles),
"downloaded_images": len(self.downloaded_images),
"elapsed_time": elapsed,
"download_rate": self.total_features_downloaded / elapsed if elapsed > 0 else 0,
"tokens_usage": self.token_usage,
"timestamp": datetime.now().isoformat()
}
with open(self.stats_file, 'w') as f:
json.dump(stats, f, indent=2)
except Exception as e:
self.logger.error(f"Error saving statistics: {e}")
async def fetch_tile_data(self, session, tile, token):
"""Fetch vector tile data from Mapillary API."""
tile_url = f'https://tiles.mapillary.com/maps/vtp/{self.tile_coverage}/2/{tile.z}/{tile.x}/{tile.y}?access_token={token}'
for attempt in range(3): # Retry up to 3 times
try:
async with session.get(tile_url) as response:
if response.status == 200:
# Use a synchronous call for vt_bytes_to_geojson as it's CPU-bound
loop = asyncio.get_event_loop()
content = await response.read()
# This is CPU-bound, so use ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor() as pool:
data = await loop.run_in_executor(
pool,
lambda: self.vt_bytes_to_geojson_wrapper(content, tile.x, tile.y, tile.z)
)
return data
elif response.status == 429: # Rate limiting
self.mark_token_cooldown(token, 60)
new_token = self.select_token()
self.logger.warning(f"Rate limited. Retrying with new token: attempt {attempt+1}/3")
token = new_token
else:
self.logger.warning(f"Tile request failed with status {response.status}. Attempt {attempt+1}/3")
await asyncio.sleep(2 ** attempt) # Exponential backoff
except Exception as e:
self.logger.error(f"Error fetching tile {tile.x}/{tile.y}/{tile.z}: {str(e)}")
await asyncio.sleep(2 ** attempt) # Exponential backoff
self.logger.error(f"Failed to fetch tile {tile.x}/{tile.y}/{tile.z} after 3 attempts")
return {"features": []} # Return empty feature list if all attempts fail
def vt_bytes_to_geojson_wrapper(self, content, x, y, z):
"""Wrapper for vt_bytes_to_geojson to handle imports and errors."""
try:
# Import here to avoid potential import errors at module level
from vt2geojson.tools import vt_bytes_to_geojson
return vt_bytes_to_geojson(content, x, y, z, layer=self.tile_layer)
except Exception as e:
self.logger.error(f"Error converting tile to GeoJSON: {e}")
return {"features": []}
async def fetch_image_url(self, session, image_id, token):
"""Fetch the URL for a specific image ID from Mapillary API."""
url = f'https://graph.mapillary.com/{image_id}?fields=thumb_2048_url'
headers = {'Authorization': f'OAuth {token}'}
for attempt in range(3): # Retry up to 3 times
try:
async with session.get(url, headers=headers) as response:
if response.status == 200:
data = await response.json()
return data.get('thumb_2048_url')
elif response.status == 429: # Rate limiting
self.mark_token_cooldown(token, 60)
new_token = self.select_token()
self.logger.warning(f"Rate limited on image URL. Retrying with new token: attempt {attempt+1}/3")
token = new_token
headers = {'Authorization': f'OAuth {token}'}
else:
self.logger.warning(f"Image URL request failed with status {response.status}. Attempt {attempt+1}/3")
await asyncio.sleep(2 ** attempt) # Exponential backoff
except Exception as e:
self.logger.error(f"Error fetching image URL for {image_id}: {str(e)}")
await asyncio.sleep(2 ** attempt) # Exponential backoff
self.logger.error(f"Failed to fetch image URL for {image_id} after 3 attempts")
return None
async def download_image(self, session, image_url, tile, sequence_id, image_id):
"""Download an image from its URL."""
if image_id in self.downloaded_images:
return True # Skip if already downloaded
# Create a unique tile ID for directory structure
tile_id = f"{tile.z}_{tile.x}_{tile.y}"
# Create tile directory if it doesn't exist
tile_dir = os.path.join(self.base_dir, tile_id)
os.makedirs(tile_dir, exist_ok=True)
# Create sequence directory inside tile directory if it doesn't exist
sequence_dir = os.path.join(tile_dir, sequence_id)
os.makedirs(sequence_dir, exist_ok=True)
# Prepare temporary and final paths
temp_path = os.path.join(self.temp_dir, f"{image_id}.jpg.tmp")
final_path = os.path.join(sequence_dir, f"{image_id}.jpg")
if os.path.exists(final_path):
self.downloaded_images.add(image_id)
return True # Skip if file already exists
for attempt in range(3): # Retry up to 3 times
try:
async with session.get(image_url, timeout=30) as response:
if response.status == 200:
# Stream the download to temporary file
async with aiofiles.open(temp_path, 'wb') as f:
async for chunk in response.content.iter_chunked(8192):
await f.write(chunk)
# Move temp file to final location (atomic operation)
shutil.move(temp_path, final_path)
# Update progress
self.downloaded_images.add(image_id)
self.save_progress(image_id=image_id)
self.total_features_downloaded += 1
return True
else:
self.logger.warning(f"Image download failed with status {response.status}. Attempt {attempt+1}/3")
await asyncio.sleep(2 ** attempt) # Exponential backoff
except Exception as e:
self.logger.error(f"Error downloading image {image_id}: {str(e)}")
if os.path.exists(temp_path):
try:
os.remove(temp_path) # Clean up partial download
except:
pass
await asyncio.sleep(2 ** attempt) # Exponential backoff
self.logger.error(f"Failed to download image {image_id} after 3 attempts")
return False
async def process_feature(self, session, feature, tile, west, south, east, north, pbar):
"""Process a single feature from the tile data."""
# Get coordinates
lng = feature['geometry']['coordinates'][0]
lat = feature['geometry']['coordinates'][1]
# Ensure feature falls inside bounding box
if lng > west and lng < east and lat > south and lat < north:
# Get feature properties
sequence_id = feature['properties']['sequence_id']
image_id = feature['properties']['id']
# Skip if already downloaded
if image_id in self.downloaded_images:
return True
# Use semaphore to limit concurrent API calls
async with self.semaphore:
token = self.select_token()
image_url = await self.fetch_image_url(session, image_id, token)
if image_url:
success = await self.download_image(session, image_url, tile, sequence_id, image_id)
if success:
pbar.update(1)
return success
return False
async def process_tile(self, session, tile, bbox, overall_pbar, tiles_pbar):
"""Process a single tile, extracting and downloading all features."""
west, south, east, north = bbox
# Skip if already processed
tile_key = (tile.x, tile.y, tile.z)
if tile_key in self.processed_tiles:
tiles_pbar.update(1)
return 0
# Get a token for this request
token = self.select_token()
# Fetch tile data
tile_data = await self.fetch_tile_data(session, tile, token)
features = tile_data.get('features', [])
# Create a progress bar for this tile's features
with tqdm(total=len(features), desc=f"Tile {tile.x}/{tile.y}", leave=False) as pbar:
self.current_tile_features = len(features)
# Create tasks for processing each feature
tasks = []
for feature in features:
task = self.process_feature(session, feature, tile, west, south, east, north, pbar)
tasks.append(task)
# Process all features
if tasks:
results = await asyncio.gather(*tasks, return_exceptions=True)
successful = sum(1 for r in results if r is True)
else:
successful = 0
# Mark tile as processed
self.processed_tiles.add(tile_key)
self.save_progress(tile=tile)
tiles_pbar.update(1)
return successful
async def download_region(self, bbox=None):
"""Download all Mapillary images from a specified region."""
# Use default bbox if none provided
if bbox is None:
bbox = self.default_bbox
self.logger.info(f"Using default bounding box for somewhere in Miami: {bbox}")
west, south, east, north = bbox
# Get tiles that intersect with the bounding box
tiles = list(mercantile.tiles(west, south, east, north, self.zoom_level))
self.logger.info(f"Found {len(tiles)} tiles intersecting the bounding box")
# Initialize progress bars
with tqdm(total=len(tiles), desc="Overall Progress", position=0) as overall_pbar, \
tqdm(total=len(tiles), desc="Completed Tiles", position=1) as tiles_pbar:
# Skip already processed tiles in progress bar
already_processed = sum(1 for tile in tiles if (tile.x, tile.y, tile.z) in self.processed_tiles)
tiles_pbar.update(already_processed)
# Calculate remaining tiles
remaining_tiles = [tile for tile in tiles if (tile.x, tile.y, tile.z) not in self.processed_tiles]
# Setup aiohttp session with connection pooling
conn = aiohttp.TCPConnector(limit=self.connection_pool_size, ttl_dns_cache=300)
timeout = aiohttp.ClientTimeout(total=3600) # 1 hour timeout for the entire session
async with aiohttp.ClientSession(connector=conn, timeout=timeout) as session:
# Process tiles one by one to ensure full completion before moving on
for tile in remaining_tiles:
if not self.is_running:
self.logger.info("Download stopped by user")
break
successful = await self.process_tile(session, tile, bbox, overall_pbar, tiles_pbar)
overall_pbar.update(1)
# Save statistics periodically
if random.random() < 0.1: # ~10% chance to save stats on each tile
self.save_statistics()
# Final statistics save
self.save_statistics()
self.logger.info(f"Download completed. Total features: {self.total_features_downloaded}")
def signal_handler(self, sig, frame):
"""Handle interrupt signals for clean exit."""
self.is_running = False
self.logger.info("Received interrupt signal. Exiting...")
self.save_statistics() # Save stats before exiting
print("\nDownload terminated by user. Exiting...")
sys.exit(0) # Exit immediately
def run(self, bbox=None):
"""Main entry point to run the downloader."""
try:
# Setup
self.ensure_directories()
self.load_tokens()
self.load_progress()
# Configure semaphore for concurrency limiting
self.semaphore = asyncio.Semaphore(self.max_workers)
# Start timer
self.start_time = time.time()
self.is_running = True
# Setup signal handler for immediate exit
signal.signal(signal.SIGINT, self.signal_handler)
# Run the main download process
try:
loop = asyncio.get_event_loop()
loop.run_until_complete(self.download_region(bbox))
except KeyboardInterrupt:
# In case the signal handler doesn't catch it
self.logger.info("Download stopped by user")
self.is_running = False
self.save_statistics()
sys.exit(0)
# Final cleanup
self.save_statistics()
elapsed = time.time() - self.start_time
self.logger.info(f"Download completed in {elapsed:.2f} seconds")
self.logger.info(f"Downloaded {self.total_features_downloaded} images across {len(self.processed_tiles)} tiles")
except Exception as e:
self.logger.error(f"Error in main execution: {e}")
import traceback
self.logger.error(traceback.format_exc())
return 1
return 0
def parse_bbox(bbox_str):
"""Parse a string bbox into [west, south, east, north] format."""
try:
coords = [float(x.strip()) for x in bbox_str.split(',')]
if len(coords) != 4:
print("Invalid bounding box format. Expected format: west,south,east,north")
return None
return coords
except ValueError:
print("Error parsing bounding box. Please provide numeric values in format: west,south,east,north")
return None
if __name__ == "__main__":
# Set up argument parser
parser = argparse.ArgumentParser(description='Download Mapillary images for a specified region.')
# Add bounding box arguments with both long and short forms
parser.add_argument('--boundingbox', '--bbox', '-b',
type=str,
help='Bounding box coordinates in format: west,south,east,north')
# Add optional output directory argument
parser.add_argument('--output', '-o',
type=str,
help='Output directory for downloaded images (default: Images)')
# Parse the arguments
args = parser.parse_args()
# Initialize the downloader
downloader = MapillaryDownloader()
# Set output directory if specified
if args.output:
downloader.base_dir = args.output
downloader.temp_dir = os.path.join(downloader.base_dir, "temp")
downloader.progress_file = os.path.join(downloader.base_dir, "progress.txt")
downloader.stats_file = os.path.join(downloader.base_dir, "download_stats.json")
# Parse bounding box if provided
bbox = None
if args.boundingbox:
bbox = parse_bbox(args.boundingbox)
if bbox:
print(f"Using bounding box: {bbox}")
else:
print("Using default bounding box.")
# Run the downloader
sys.exit(downloader.run(bbox))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment