Skip to content

Instantly share code, notes, and snippets.

@thejhh
Created April 15, 2025 21:31
Show Gist options
  • Save thejhh/f1e408dcbcc3b4330da05e2ef5adac9e to your computer and use it in GitHub Desktop.
Save thejhh/f1e408dcbcc3b4330da05e2ef5adac9e to your computer and use it in GitHub Desktop.
Code to detect where sky meets land
import cv2
import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path
import json
from scipy.ndimage import gaussian_filter1d
import logging
import sys
import traceback
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
def smooth_boundary(boundary, sigma=3):
"""Apply Gaussian smoothing to the boundary."""
try:
logger.debug(f"Smoothing boundary with sigma={sigma}")
smoothed = gaussian_filter1d(boundary, sigma=sigma)
logger.debug(f"Boundary range before smoothing: {min(boundary):.1f} to {max(boundary):.1f}")
logger.debug(f"Boundary range after smoothing: {min(smoothed):.1f} to {max(smoothed):.1f}")
return smoothed
except Exception as e:
logger.error(f"Error in smooth_boundary: {str(e)}")
logger.error(traceback.format_exc())
raise
def limit_jumps(boundary, max_jump=50):
"""Limit the maximum jump between adjacent columns."""
limited = boundary.copy()
for x in range(1, len(boundary)):
if abs(boundary[x] - boundary[x-1]) > max_jump:
direction = np.sign(boundary[x] - boundary[x-1])
limited[x] = boundary[x-1] + direction * max_jump
return limited
def remove_column_outliers(boundary, threshold=30):
"""Remove outlier columns that deviate too much from their neighbors."""
filtered = boundary.copy()
for x in range(1, len(boundary)-1):
median_neighbor = np.median([boundary[x-1], boundary[x], boundary[x+1]])
if abs(boundary[x] - median_neighbor) > threshold:
filtered[x] = median_neighbor
return filtered
def verify_sky_region(gray, hsv, x, y, patch_size=5):
"""Verify if the region above the boundary looks like sky."""
height = gray.shape[0]
# Get patches above and below the boundary
sky_patch_y = max(0, y - patch_size)
land_patch_y = min(height - 1, y + patch_size)
sky_patch_gray = gray[sky_patch_y:y, x]
land_patch_gray = gray[y:land_patch_y, x]
sky_patch_hsv = hsv[sky_patch_y:y, x]
# Check brightness (sky should be brighter)
sky_mean = np.mean(sky_patch_gray) if len(sky_patch_gray) > 0 else 0
land_mean = np.mean(land_patch_gray) if len(land_patch_gray) > 0 else 255
# Check color (sky should be blue-ish or gray)
sky_hue = np.mean(sky_patch_hsv[:, 0]) if len(sky_patch_hsv) > 0 else 0
sky_sat = np.mean(sky_patch_hsv[:, 1]) if len(sky_patch_hsv) > 0 else 0
# More lenient brightness check
is_brighter = sky_mean > land_mean + 5
# Broader hue range for blue sky
is_blue = 80 <= sky_hue <= 160
# More lenient saturation check for gray sky
is_gray = sky_sat < 80
# Return True if either brightness difference is significant OR color looks like sky
return is_brighter or (is_blue or is_gray)
def find_horizon_line(edges, gray, hsv, min_width=100):
"""Find the most prominent horizontal line in the edge image."""
# Use a smaller kernel for morphological opening
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (2,2))
opened_edges = cv2.morphologyEx(edges, cv2.MORPH_OPEN, kernel)
# Find connected components
num_labels, labels, stats, centroids = cv2.connectedComponentsWithStats(opened_edges)
# Filter components by width, height, and area
valid_components = []
min_width_pixels = 0.5 * edges.shape[1] # Lowered to 50% of image width
min_area = 50 # Lowered minimum area
for i in range(1, num_labels): # Skip background (0)
width = stats[i, cv2.CC_STAT_WIDTH]
height = stats[i, cv2.CC_STAT_HEIGHT]
area = stats[i, cv2.CC_STAT_AREA]
# Look for wide, relatively flat components with sufficient area
if (width > min_width_pixels and
height < 15 and # Slightly increased height tolerance
area > min_area):
valid_components.append(i)
if not valid_components:
return None
# Find the component with the highest average Y position (closest to top)
# that also passes the sky verification
best_component = None
best_y = float('inf')
for i in valid_components:
y = int(centroids[i][1])
x = int(centroids[i][0])
# Verify this looks like a sky-land boundary
if verify_sky_region(gray, hsv, x, y):
if y < best_y:
best_y = y
best_component = i
if best_component is None:
return None
# Extract the Y coordinates of the best component
component_mask = (labels == best_component)
y_coords = np.where(component_mask)[0]
x_coords = np.where(component_mask)[1]
# For each X position, find the highest Y (closest to top)
unique_x = np.unique(x_coords)
boundary = np.zeros(edges.shape[1])
for x in unique_x:
mask = x_coords == x
if np.any(mask):
boundary[x] = np.min(y_coords[mask])
return boundary
def analyze_field_image(image_path):
"""Analyze a single field image to find the sky boundary."""
try:
logger.info(f"Processing image: {image_path}")
# Read the image
img = cv2.imread(str(image_path))
if img is None:
raise ValueError(f"Could not read image: {image_path}")
height, width = img.shape[:2]
logger.debug(f"Image dimensions: {width}x{height}")
# Convert to grayscale and HSV
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
# Apply median filter to remove small bright spots
median = cv2.medianBlur(gray, 5)
# Apply Gaussian blur to reduce noise
blurred = cv2.GaussianBlur(median, (5, 5), 0)
# Calculate vertical gradient (keep it signed)
sobel_y = cv2.Sobel(blurred, cv2.CV_64F, 0, 1, ksize=3)
# Convert to 8-bit for edge detection
sobel_8u = cv2.normalize(sobel_y, None, 0, 255, cv2.NORM_MINMAX, dtype=cv2.CV_8U)
# Apply Canny edge detection
edges = cv2.Canny(sobel_8u, 50, 150)
# Find the most prominent horizontal line
boundary = find_horizon_line(edges, gray, hsv)
if boundary is None:
logger.warning("No prominent horizon line found, using gradient-based approach")
# Fallback to gradient-based approach
boundaries = []
prev_y = height // 2
# Expanded search range
search_start = height // 5 # Start higher up
search_end = 4 * height // 5 # End lower down
for x in range(width):
column = sobel_y[search_start:search_end, x]
y_local = np.argmin(column) # largest negative slope
y = y_local + search_start
# Verify this looks like a sky-land boundary
if not verify_sky_region(gray, hsv, x, y):
y = prev_y # Use previous value if not sky-like
# Ensure the boundary doesn't jump too much
if abs(y - prev_y) > 50:
y = prev_y + np.sign(y - prev_y) * 50
boundaries.append(y)
prev_y = y
boundary = np.array(boundaries)
# Smooth the boundary
boundary = smooth_boundary(boundary, sigma=5)
logger.info(f"Successfully processed {image_path}")
return boundary
except Exception as e:
logger.error(f"Error processing image {image_path}: {str(e)}")
logger.error(traceback.format_exc())
raise
def main():
try:
logger.info("Starting field boundary analysis")
# Get all field images
field_dir = Path('/home/jhh/git/hangovergames/flowers/src/assets/fields')
if not field_dir.exists():
raise FileNotFoundError(f"Field directory not found: {field_dir}")
field_images = list(field_dir.glob('*.png'))
if not field_images:
raise FileNotFoundError(f"No PNG images found in {field_dir}")
logger.info(f"Found {len(field_images)} field images")
# Initialize array to store all boundaries
all_boundaries = []
# Analyze each image
for img_path in field_images:
try:
boundary = analyze_field_image(img_path)
all_boundaries.append(boundary)
except Exception as e:
logger.error(f"Skipping {img_path} due to error: {str(e)}")
continue
if not all_boundaries:
raise ValueError("No valid boundaries were generated from any images")
# Convert to numpy array for easier processing
all_boundaries = np.array(all_boundaries)
# Use 90th percentile instead of max to avoid outliers
percentile_boundary = np.percentile(all_boundaries, 90, axis=0)
# Smooth the boundary
smoothed_boundary = smooth_boundary(percentile_boundary, sigma=7)
# Remove column outliers
filtered_boundary = remove_column_outliers(smoothed_boundary)
# Limit jumps between columns
final_boundary = limit_jumps(filtered_boundary)
logger.info(f"Final boundary range: {min(final_boundary):.1f} to {max(final_boundary):.1f}")
# Save the boundary to a JSON file
output_path = Path('/home/jhh/git/hangovergames/flowers/public/boundary.json')
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, 'w') as f:
json.dump(final_boundary.tolist(), f)
logger.info(f"Successfully saved boundary to {output_path}")
# Visualize the boundary
plt.figure(figsize=(10, 6))
plt.plot(final_boundary)
plt.title('Robust Sky Boundary')
plt.xlabel('X Position')
plt.ylabel('Y Position')
plt.grid(True)
plt.savefig('boundary_visualization.png')
logger.info("Saved boundary visualization to boundary_visualization.png")
except Exception as e:
logger.error(f"Error in main: {str(e)}")
logger.error(traceback.format_exc())
sys.exit(1)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment