Created
April 15, 2025 21:31
-
-
Save thejhh/f1e408dcbcc3b4330da05e2ef5adac9e to your computer and use it in GitHub Desktop.
Code to detect where sky meets land
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cv2 | |
import numpy as np | |
import matplotlib.pyplot as plt | |
from pathlib import Path | |
import json | |
from scipy.ndimage import gaussian_filter1d | |
import logging | |
import sys | |
import traceback | |
# Set up logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.StreamHandler(sys.stdout) | |
] | |
) | |
logger = logging.getLogger(__name__) | |
def smooth_boundary(boundary, sigma=3): | |
"""Apply Gaussian smoothing to the boundary.""" | |
try: | |
logger.debug(f"Smoothing boundary with sigma={sigma}") | |
smoothed = gaussian_filter1d(boundary, sigma=sigma) | |
logger.debug(f"Boundary range before smoothing: {min(boundary):.1f} to {max(boundary):.1f}") | |
logger.debug(f"Boundary range after smoothing: {min(smoothed):.1f} to {max(smoothed):.1f}") | |
return smoothed | |
except Exception as e: | |
logger.error(f"Error in smooth_boundary: {str(e)}") | |
logger.error(traceback.format_exc()) | |
raise | |
def limit_jumps(boundary, max_jump=50): | |
"""Limit the maximum jump between adjacent columns.""" | |
limited = boundary.copy() | |
for x in range(1, len(boundary)): | |
if abs(boundary[x] - boundary[x-1]) > max_jump: | |
direction = np.sign(boundary[x] - boundary[x-1]) | |
limited[x] = boundary[x-1] + direction * max_jump | |
return limited | |
def remove_column_outliers(boundary, threshold=30): | |
"""Remove outlier columns that deviate too much from their neighbors.""" | |
filtered = boundary.copy() | |
for x in range(1, len(boundary)-1): | |
median_neighbor = np.median([boundary[x-1], boundary[x], boundary[x+1]]) | |
if abs(boundary[x] - median_neighbor) > threshold: | |
filtered[x] = median_neighbor | |
return filtered | |
def verify_sky_region(gray, hsv, x, y, patch_size=5): | |
"""Verify if the region above the boundary looks like sky.""" | |
height = gray.shape[0] | |
# Get patches above and below the boundary | |
sky_patch_y = max(0, y - patch_size) | |
land_patch_y = min(height - 1, y + patch_size) | |
sky_patch_gray = gray[sky_patch_y:y, x] | |
land_patch_gray = gray[y:land_patch_y, x] | |
sky_patch_hsv = hsv[sky_patch_y:y, x] | |
# Check brightness (sky should be brighter) | |
sky_mean = np.mean(sky_patch_gray) if len(sky_patch_gray) > 0 else 0 | |
land_mean = np.mean(land_patch_gray) if len(land_patch_gray) > 0 else 255 | |
# Check color (sky should be blue-ish or gray) | |
sky_hue = np.mean(sky_patch_hsv[:, 0]) if len(sky_patch_hsv) > 0 else 0 | |
sky_sat = np.mean(sky_patch_hsv[:, 1]) if len(sky_patch_hsv) > 0 else 0 | |
# More lenient brightness check | |
is_brighter = sky_mean > land_mean + 5 | |
# Broader hue range for blue sky | |
is_blue = 80 <= sky_hue <= 160 | |
# More lenient saturation check for gray sky | |
is_gray = sky_sat < 80 | |
# Return True if either brightness difference is significant OR color looks like sky | |
return is_brighter or (is_blue or is_gray) | |
def find_horizon_line(edges, gray, hsv, min_width=100): | |
"""Find the most prominent horizontal line in the edge image.""" | |
# Use a smaller kernel for morphological opening | |
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (2,2)) | |
opened_edges = cv2.morphologyEx(edges, cv2.MORPH_OPEN, kernel) | |
# Find connected components | |
num_labels, labels, stats, centroids = cv2.connectedComponentsWithStats(opened_edges) | |
# Filter components by width, height, and area | |
valid_components = [] | |
min_width_pixels = 0.5 * edges.shape[1] # Lowered to 50% of image width | |
min_area = 50 # Lowered minimum area | |
for i in range(1, num_labels): # Skip background (0) | |
width = stats[i, cv2.CC_STAT_WIDTH] | |
height = stats[i, cv2.CC_STAT_HEIGHT] | |
area = stats[i, cv2.CC_STAT_AREA] | |
# Look for wide, relatively flat components with sufficient area | |
if (width > min_width_pixels and | |
height < 15 and # Slightly increased height tolerance | |
area > min_area): | |
valid_components.append(i) | |
if not valid_components: | |
return None | |
# Find the component with the highest average Y position (closest to top) | |
# that also passes the sky verification | |
best_component = None | |
best_y = float('inf') | |
for i in valid_components: | |
y = int(centroids[i][1]) | |
x = int(centroids[i][0]) | |
# Verify this looks like a sky-land boundary | |
if verify_sky_region(gray, hsv, x, y): | |
if y < best_y: | |
best_y = y | |
best_component = i | |
if best_component is None: | |
return None | |
# Extract the Y coordinates of the best component | |
component_mask = (labels == best_component) | |
y_coords = np.where(component_mask)[0] | |
x_coords = np.where(component_mask)[1] | |
# For each X position, find the highest Y (closest to top) | |
unique_x = np.unique(x_coords) | |
boundary = np.zeros(edges.shape[1]) | |
for x in unique_x: | |
mask = x_coords == x | |
if np.any(mask): | |
boundary[x] = np.min(y_coords[mask]) | |
return boundary | |
def analyze_field_image(image_path): | |
"""Analyze a single field image to find the sky boundary.""" | |
try: | |
logger.info(f"Processing image: {image_path}") | |
# Read the image | |
img = cv2.imread(str(image_path)) | |
if img is None: | |
raise ValueError(f"Could not read image: {image_path}") | |
height, width = img.shape[:2] | |
logger.debug(f"Image dimensions: {width}x{height}") | |
# Convert to grayscale and HSV | |
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV) | |
# Apply median filter to remove small bright spots | |
median = cv2.medianBlur(gray, 5) | |
# Apply Gaussian blur to reduce noise | |
blurred = cv2.GaussianBlur(median, (5, 5), 0) | |
# Calculate vertical gradient (keep it signed) | |
sobel_y = cv2.Sobel(blurred, cv2.CV_64F, 0, 1, ksize=3) | |
# Convert to 8-bit for edge detection | |
sobel_8u = cv2.normalize(sobel_y, None, 0, 255, cv2.NORM_MINMAX, dtype=cv2.CV_8U) | |
# Apply Canny edge detection | |
edges = cv2.Canny(sobel_8u, 50, 150) | |
# Find the most prominent horizontal line | |
boundary = find_horizon_line(edges, gray, hsv) | |
if boundary is None: | |
logger.warning("No prominent horizon line found, using gradient-based approach") | |
# Fallback to gradient-based approach | |
boundaries = [] | |
prev_y = height // 2 | |
# Expanded search range | |
search_start = height // 5 # Start higher up | |
search_end = 4 * height // 5 # End lower down | |
for x in range(width): | |
column = sobel_y[search_start:search_end, x] | |
y_local = np.argmin(column) # largest negative slope | |
y = y_local + search_start | |
# Verify this looks like a sky-land boundary | |
if not verify_sky_region(gray, hsv, x, y): | |
y = prev_y # Use previous value if not sky-like | |
# Ensure the boundary doesn't jump too much | |
if abs(y - prev_y) > 50: | |
y = prev_y + np.sign(y - prev_y) * 50 | |
boundaries.append(y) | |
prev_y = y | |
boundary = np.array(boundaries) | |
# Smooth the boundary | |
boundary = smooth_boundary(boundary, sigma=5) | |
logger.info(f"Successfully processed {image_path}") | |
return boundary | |
except Exception as e: | |
logger.error(f"Error processing image {image_path}: {str(e)}") | |
logger.error(traceback.format_exc()) | |
raise | |
def main(): | |
try: | |
logger.info("Starting field boundary analysis") | |
# Get all field images | |
field_dir = Path('/home/jhh/git/hangovergames/flowers/src/assets/fields') | |
if not field_dir.exists(): | |
raise FileNotFoundError(f"Field directory not found: {field_dir}") | |
field_images = list(field_dir.glob('*.png')) | |
if not field_images: | |
raise FileNotFoundError(f"No PNG images found in {field_dir}") | |
logger.info(f"Found {len(field_images)} field images") | |
# Initialize array to store all boundaries | |
all_boundaries = [] | |
# Analyze each image | |
for img_path in field_images: | |
try: | |
boundary = analyze_field_image(img_path) | |
all_boundaries.append(boundary) | |
except Exception as e: | |
logger.error(f"Skipping {img_path} due to error: {str(e)}") | |
continue | |
if not all_boundaries: | |
raise ValueError("No valid boundaries were generated from any images") | |
# Convert to numpy array for easier processing | |
all_boundaries = np.array(all_boundaries) | |
# Use 90th percentile instead of max to avoid outliers | |
percentile_boundary = np.percentile(all_boundaries, 90, axis=0) | |
# Smooth the boundary | |
smoothed_boundary = smooth_boundary(percentile_boundary, sigma=7) | |
# Remove column outliers | |
filtered_boundary = remove_column_outliers(smoothed_boundary) | |
# Limit jumps between columns | |
final_boundary = limit_jumps(filtered_boundary) | |
logger.info(f"Final boundary range: {min(final_boundary):.1f} to {max(final_boundary):.1f}") | |
# Save the boundary to a JSON file | |
output_path = Path('/home/jhh/git/hangovergames/flowers/public/boundary.json') | |
output_path.parent.mkdir(parents=True, exist_ok=True) | |
with open(output_path, 'w') as f: | |
json.dump(final_boundary.tolist(), f) | |
logger.info(f"Successfully saved boundary to {output_path}") | |
# Visualize the boundary | |
plt.figure(figsize=(10, 6)) | |
plt.plot(final_boundary) | |
plt.title('Robust Sky Boundary') | |
plt.xlabel('X Position') | |
plt.ylabel('Y Position') | |
plt.grid(True) | |
plt.savefig('boundary_visualization.png') | |
logger.info("Saved boundary visualization to boundary_visualization.png") | |
except Exception as e: | |
logger.error(f"Error in main: {str(e)}") | |
logger.error(traceback.format_exc()) | |
sys.exit(1) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment