Created
June 11, 2025 12:05
-
-
Save koi-tattoo/1decd7f52f55ddb902e8a9eb78f8c587 to your computer and use it in GitHub Desktop.
LLM Generated script to use docker-compose.yml files with apple containers.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Apple Container Compose | |
A script that mimics docker-compose but uses Apple's Container tool. | |
This script parses docker-compose.yml files and generates equivalent | |
Apple Container commands to build and run services. | |
Usage: | |
python apple-container-compose.py up [--compose-file docker-compose.yml] | |
python apple-container-compose.py down | |
python apple-container-compose.py build [service] | |
python apple-container-compose.py logs [service] | |
python apple-container-compose.py ps | |
python apple-container-compose.py urls | |
python apple-container-compose.py port-forward | |
python apple-container-compose.py stop-port-forward | |
""" | |
import argparse | |
import sys | |
import os | |
import subprocess | |
import signal | |
import threading | |
import time | |
from pathlib import Path | |
from typing import Dict, List, Optional, Any, Union, Tuple | |
import tempfile | |
import json | |
try: | |
import yaml # type: ignore | |
except ImportError: | |
print("β PyYAML is required. Install with: pip install PyYAML") | |
sys.exit(1) | |
class AppleContainerCompose: | |
def __init__(self, compose_file: str = "docker-compose.yml"): | |
self.compose_file = compose_file | |
self.project_name = Path.cwd().name.lower().replace("-", "").replace("_", "") | |
self.services: Dict[str, Dict[str, Any]] = {} | |
self.containers: Dict[str, str] = {} | |
self.networks: Dict[str, Any] = {} | |
self.volumes: Dict[str, Any] = {} | |
self.compose_data: Dict[str, Any] = {} | |
self.port_forward_pids: Dict[str, int] = {} | |
self.port_forward_file = f".{self.project_name}_port_forwards.json" | |
# Load and parse compose file | |
self._load_compose_file() | |
def _load_compose_file(self) -> None: | |
"""Load and parse the docker-compose.yml file.""" | |
if not os.path.exists(self.compose_file): | |
raise FileNotFoundError(f"Compose file '{self.compose_file}' not found") | |
with open(self.compose_file, 'r') as f: | |
try: | |
self.compose_data = yaml.safe_load(f) or {} | |
except yaml.YAMLError as e: | |
raise ValueError(f"Error parsing compose file: {e}") | |
self.services = self.compose_data.get('services', {}) | |
self.networks = self.compose_data.get('networks', {}) | |
self.volumes = self.compose_data.get('volumes', {}) | |
def _run_command(self, cmd: List[str], capture_output: bool = False, show_command: bool = True) -> subprocess.CompletedProcess[str]: | |
"""Execute a shell command.""" | |
if show_command: | |
print(f"β {' '.join(cmd)}") | |
return subprocess.run(cmd, capture_output=capture_output, text=True) | |
def _check_socat_available(self) -> bool: | |
"""Check if socat is available on the system.""" | |
try: | |
result = subprocess.run(['which', 'socat'], capture_output=True, text=True) | |
return result.returncode == 0 | |
except FileNotFoundError: | |
return False | |
def _install_socat_if_needed(self) -> bool: | |
"""Install socat if not available.""" | |
if self._check_socat_available(): | |
return True | |
print("π¦ socat not found. Installing via Homebrew...") | |
try: | |
# Check if brew is available | |
result = subprocess.run(['which', 'brew'], capture_output=True, text=True) | |
if result.returncode != 0: | |
print("β Homebrew not found. Please install socat manually:") | |
print(" brew install socat") | |
return False | |
# Install socat | |
result = subprocess.run(['brew', 'install', 'socat'], capture_output=True, text=True) | |
if result.returncode == 0: | |
print("β socat installed successfully") | |
return True | |
else: | |
print(f"β Failed to install socat: {result.stderr}") | |
return False | |
except Exception as e: | |
print(f"β Error installing socat: {e}") | |
return False | |
def _save_port_forward_pids(self) -> None: | |
"""Save port forward PIDs to file for cleanup.""" | |
try: | |
with open(self.port_forward_file, 'w') as f: | |
json.dump(self.port_forward_pids, f) | |
except Exception as e: | |
print(f"β οΈ Warning: Could not save port forward PIDs: {e}") | |
def _load_port_forward_pids(self) -> None: | |
"""Load port forward PIDs from file.""" | |
try: | |
if os.path.exists(self.port_forward_file): | |
with open(self.port_forward_file, 'r') as f: | |
self.port_forward_pids = json.load(f) | |
except Exception as e: | |
print(f"β οΈ Warning: Could not load port forward PIDs: {e}") | |
self.port_forward_pids = {} | |
def _get_container_name(self, service_name: str) -> str: | |
"""Generate container name for a service.""" | |
return f"{self.project_name}_{service_name}" | |
def _get_image_name(self, service_name: str, service_config: Dict[str, Any]) -> str: | |
"""Get or generate image name for a service.""" | |
if 'image' in service_config: | |
return str(service_config['image']) | |
elif 'build' in service_config: | |
return f"{self.project_name}_{service_name}:latest" | |
else: | |
raise ValueError(f"Service '{service_name}' has no image or build configuration") | |
def _build_dockerfile_for_service(self, service_name: str, service_config: Dict[str, Any]) -> Optional[str]: | |
"""Generate Dockerfile path for a service with build configuration.""" | |
build_config = service_config.get('build') | |
if not build_config: | |
return None | |
if isinstance(build_config, str): | |
# Simple string context | |
dockerfile_path = os.path.join(build_config, 'Dockerfile') | |
elif isinstance(build_config, dict): | |
# Complex build configuration | |
context = str(build_config.get('context', '.')) | |
dockerfile = str(build_config.get('dockerfile', 'Dockerfile')) | |
dockerfile_path = os.path.join(context, dockerfile) | |
else: | |
return None | |
return dockerfile_path if os.path.exists(dockerfile_path) else None | |
def _convert_environment_vars(self, env_config: Union[List[str], Dict[str, Any]]) -> List[str]: | |
"""Convert environment variables to container format.""" | |
env_vars: List[str] = [] | |
if isinstance(env_config, list): | |
for item in env_config: | |
if '=' in str(item): | |
env_vars.extend(['--env', str(item)]) | |
else: | |
# Environment variable without value (from host) | |
env_vars.extend(['--env', str(item)]) | |
elif isinstance(env_config, dict): | |
for key, value in env_config.items(): | |
if value is None: | |
env_vars.extend(['--env', str(key)]) | |
else: | |
env_vars.extend(['--env', f"{key}={value}"]) | |
return env_vars | |
def _convert_volumes(self, volumes_config: List[Any]) -> List[str]: | |
"""Convert volume mounts to container format.""" | |
volume_args: List[str] = [] | |
for volume in volumes_config: | |
if isinstance(volume, str): | |
if ':' in volume: | |
# Host:container or named volume | |
parts = volume.split(':') | |
source = parts[0] | |
target = parts[1] | |
# Check if it's a relative or absolute path (bind mount) | |
if source.startswith('./') or source.startswith('/') or source.startswith('~/'): | |
# Expand relative paths | |
if source.startswith('./'): | |
source = os.path.abspath(source) | |
elif source.startswith('~/'): | |
source = os.path.expanduser(source) | |
volume_args.extend(['--volume', f"{source}:{target}"]) | |
else: | |
# Named volume - Apple Container doesn't support these directly | |
print(f" β οΈ Named volume '{source}' not supported, skipping...") | |
else: | |
# Anonymous volume | |
print(f" β οΈ Anonymous volume '{volume}' not supported, skipping...") | |
elif isinstance(volume, dict): | |
# Long form volume configuration | |
volume_type = volume.get('type', 'bind') | |
source_val = volume.get('source') | |
target_val = volume.get('target') | |
if volume_type == 'bind' and source_val and target_val: | |
source = str(source_val) | |
target = str(target_val) | |
if source.startswith('./'): | |
source = os.path.abspath(source) | |
volume_args.extend(['--volume', f"{source}:{target}"]) | |
else: | |
print(f" β οΈ Volume type '{volume_type}' not fully supported, skipping...") | |
return volume_args | |
def _parse_port_mapping(self, port_config: Any) -> List[Tuple[str, str]]: | |
"""Parse port configuration and return list of (host_port, container_port) tuples.""" | |
port_mappings = [] | |
if isinstance(port_config, str): | |
if ':' in port_config: | |
parts = port_config.split(':') | |
if len(parts) == 2: | |
# host:container | |
host_port, container_port = parts | |
port_mappings.append((host_port, container_port)) | |
elif len(parts) == 3: | |
# host_ip:host_port:container_port | |
_, host_port, container_port = parts | |
port_mappings.append((host_port, container_port)) | |
else: | |
# Just container port, use same for host | |
port_mappings.append((port_config, port_config)) | |
elif isinstance(port_config, int): | |
port_str = str(port_config) | |
port_mappings.append((port_str, port_str)) | |
elif isinstance(port_config, dict): | |
# Long form port configuration | |
target = str(port_config.get('target', '')) | |
published = str(port_config.get('published', target)) | |
if target: | |
port_mappings.append((published, target)) | |
return port_mappings | |
def _convert_ports(self, ports_config: List[Any]) -> List[str]: | |
"""Convert port mappings to container format.""" | |
port_args: List[str] = [] | |
for port in ports_config: | |
port_mappings = self._parse_port_mapping(port) | |
for host_port, container_port in port_mappings: | |
print(f" π Port mapping {host_port}:{container_port} noted for socat forwarding") | |
return port_args | |
def _get_container_ip(self, container_name: str) -> Optional[str]: | |
"""Get the IP address of a running container.""" | |
result = self._run_command(['container', 'ls'], capture_output=True, show_command=False) | |
if result.returncode != 0: | |
return None | |
lines = result.stdout.strip().split('\n') | |
for line in lines[1:]: # Skip header | |
parts = line.split() | |
if len(parts) < 2: | |
continue | |
found_name = None | |
found_ip = None | |
# Find container name and IP | |
for part in parts: | |
if part == container_name: | |
found_name = part | |
elif '.' in part and len(part.split('.')) == 4: | |
try: | |
ip_parts = part.split('.') | |
if all(0 <= int(p) <= 255 for p in ip_parts): | |
found_ip = part | |
except (ValueError, IndexError): | |
continue | |
if found_name and found_ip: | |
return found_ip | |
return None | |
def _start_port_forward(self, host_port: str, container_ip: str, container_port: str, service_name: str) -> Optional[int]: | |
"""Start a socat port forward and return the PID.""" | |
if not self._install_socat_if_needed(): | |
return None | |
forward_key = f"{service_name}_{host_port}" | |
# Check if port forward already exists | |
if forward_key in self.port_forward_pids: | |
pid = self.port_forward_pids[forward_key] | |
try: | |
# Check if process still exists | |
os.kill(pid, 0) | |
print(f" β Port forward {host_port}β{container_ip}:{container_port} already running (PID: {pid})") | |
return pid | |
except OSError: | |
# Process doesn't exist, remove from tracking | |
del self.port_forward_pids[forward_key] | |
print(f" π Starting port forward {host_port}β{container_ip}:{container_port}") | |
try: | |
# Start socat in background | |
cmd = [ | |
'socat', | |
f'TCP-LISTEN:{host_port},fork,reuseaddr', | |
f'TCP:{container_ip}:{container_port}' | |
] | |
# Use a separate process group to manage the process properly | |
process = subprocess.Popen( | |
cmd, | |
stdout=subprocess.DEVNULL, | |
stderr=subprocess.DEVNULL, | |
preexec_fn=os.setsid | |
) | |
# Give it a moment to start | |
time.sleep(0.5) | |
# Check if process is still running | |
if process.poll() is None: | |
self.port_forward_pids[forward_key] = process.pid | |
self._save_port_forward_pids() | |
print(f" β Port forward started (PID: {process.pid})") | |
return process.pid | |
else: | |
print(f" β Port forward failed to start") | |
return None | |
except Exception as e: | |
print(f" β Error starting port forward: {e}") | |
return None | |
def _stop_port_forward(self, service_name: str, host_port: str) -> bool: | |
"""Stop a specific port forward.""" | |
forward_key = f"{service_name}_{host_port}" | |
if forward_key not in self.port_forward_pids: | |
return False | |
pid = self.port_forward_pids[forward_key] | |
try: | |
# Kill the process group | |
os.killpg(os.getpgid(pid), signal.SIGTERM) | |
time.sleep(0.5) | |
# Check if still running, force kill if needed | |
try: | |
os.kill(pid, 0) | |
os.killpg(os.getpgid(pid), signal.SIGKILL) | |
except OSError: | |
pass # Process already dead | |
del self.port_forward_pids[forward_key] | |
self._save_port_forward_pids() | |
print(f" π Stopped port forward for {service_name}:{host_port}") | |
return True | |
except OSError: | |
# Process doesn't exist | |
del self.port_forward_pids[forward_key] | |
self._save_port_forward_pids() | |
return False | |
except Exception as e: | |
print(f" β Error stopping port forward: {e}") | |
return False | |
def _stop_all_port_forwards(self) -> None: | |
"""Stop all port forwards for this project.""" | |
self._load_port_forward_pids() | |
if not self.port_forward_pids: | |
print("No port forwards to stop") | |
return | |
print("π Stopping all port forwards...") | |
for forward_key, pid in list(self.port_forward_pids.items()): | |
try: | |
service_name = forward_key.split('_')[0] | |
host_port = forward_key.split('_', 1)[1] | |
# Kill the process group | |
os.killpg(os.getpgid(pid), signal.SIGTERM) | |
time.sleep(0.1) | |
# Check if still running, force kill if needed | |
try: | |
os.kill(pid, 0) | |
os.killpg(os.getpgid(pid), signal.SIGKILL) | |
except OSError: | |
pass # Process already dead | |
print(f" β Stopped {service_name}:{host_port}") | |
except OSError: | |
# Process doesn't exist | |
pass | |
except Exception as e: | |
print(f" β οΈ Error stopping PID {pid}: {e}") | |
self.port_forward_pids.clear() | |
self._save_port_forward_pids() | |
# Clean up the PID file | |
try: | |
if os.path.exists(self.port_forward_file): | |
os.remove(self.port_forward_file) | |
except Exception: | |
pass | |
def _setup_port_forwards_for_service(self, service_name: str, service_config: Dict[str, Any]) -> None: | |
"""Set up port forwards for a service if it has port mappings.""" | |
if 'ports' not in service_config: | |
return | |
container_name = self._get_container_name(service_name) | |
container_ip = self._get_container_ip(container_name) | |
if not container_ip: | |
print(f" β οΈ Could not get IP for container {container_name}") | |
return | |
ports_config = service_config['ports'] | |
print(f" π Setting up port forwards for {service_name} (IP: {container_ip})") | |
for port in ports_config: | |
port_mappings = self._parse_port_mapping(port) | |
for host_port, container_port in port_mappings: | |
self._start_port_forward(host_port, container_ip, container_port, service_name) | |
def _start_container_system(self) -> bool: | |
"""Ensure Apple Container system is running.""" | |
print("π Starting Apple Container system...") | |
result = self._run_command(['container', 'system', 'start'], capture_output=True) | |
if result.returncode != 0: | |
print(f"β Failed to start container system: {result.stderr}") | |
return False | |
return True | |
def _build_service(self, service_name: str, service_config: Dict[str, Any]) -> None: | |
"""Build a service if it has build configuration.""" | |
build_config = service_config.get('build') | |
if not build_config: | |
return | |
print(f"π¨ Building service '{service_name}'...") | |
image_name = self._get_image_name(service_name, service_config) | |
dockerfile_path = self._build_dockerfile_for_service(service_name, service_config) | |
if not dockerfile_path: | |
print(f" β οΈ No Dockerfile found for service '{service_name}', skipping build") | |
return | |
# Build command | |
build_cmd = [ | |
'container', 'build', | |
'--tag', image_name, | |
'--file', dockerfile_path, | |
'.' | |
] | |
# Handle build context | |
if isinstance(build_config, dict): | |
context = str(build_config.get('context', '.')) | |
build_cmd[-1] = context | |
result = self._run_command(build_cmd) | |
if result.returncode != 0: | |
print(f"β Failed to build service '{service_name}'") | |
else: | |
print(f"β Successfully built service '{service_name}'") | |
def _run_service(self, service_name: str, service_config: Dict[str, Any]) -> None: | |
"""Run a service container.""" | |
print(f"π Starting service '{service_name}'...") | |
container_name = self._get_container_name(service_name) | |
image_name = self._get_image_name(service_name, service_config) | |
# Base run command | |
run_cmd = [ | |
'container', 'run', | |
'--name', container_name, | |
'--detach', | |
'--rm' | |
] | |
# Add environment variables | |
if 'environment' in service_config: | |
env_args = self._convert_environment_vars(service_config['environment']) | |
run_cmd.extend(env_args) | |
# Add env_file | |
if 'env_file' in service_config: | |
env_files = service_config['env_file'] | |
if isinstance(env_files, str): | |
env_files = [env_files] | |
for env_file in env_files: | |
if os.path.exists(str(env_file)): | |
run_cmd.extend(['--env-file', str(env_file)]) | |
# Add volumes | |
if 'volumes' in service_config: | |
volume_args = self._convert_volumes(service_config['volumes']) | |
run_cmd.extend(volume_args) | |
# Handle ports (just for information) | |
if 'ports' in service_config: | |
self._convert_ports(service_config['ports']) | |
# Add working directory | |
if 'working_dir' in service_config: | |
run_cmd.extend(['--workdir', str(service_config['working_dir'])]) | |
# Add image name | |
run_cmd.append(image_name) | |
# Add command if specified | |
if 'command' in service_config: | |
command = service_config['command'] | |
if isinstance(command, list): | |
run_cmd.extend([str(c) for c in command]) | |
else: | |
# Split string command | |
run_cmd.extend(str(command).split()) | |
# Run the container | |
result = self._run_command(run_cmd) | |
if result.returncode != 0: | |
print(f"β Failed to start service '{service_name}'") | |
else: | |
print(f"β Successfully started service '{service_name}'") | |
# Get container IP | |
time.sleep(2) # Wait for container to fully start | |
ip_result = self._run_command(['container', 'ls'], capture_output=True) | |
if ip_result.returncode == 0: | |
lines = ip_result.stdout.strip().split('\n') | |
for line in lines[1:]: # Skip header | |
parts = line.split() | |
if len(parts) < 2: # Skip malformed lines | |
continue | |
# Look for our container name in any of the parts | |
container_found = False | |
for part in parts: | |
if part == container_name: | |
container_found = True | |
break | |
if container_found: | |
# Try to find IP address in the line | |
# Look for parts that look like IP addresses | |
for part in parts: | |
if '.' in part and len(part.split('.')) == 4: | |
# Basic IP validation | |
try: | |
ip_parts = part.split('.') | |
if all(0 <= int(p) <= 255 for p in ip_parts): | |
print(f" π Container IP: {part}") | |
if 'ports' in service_config: | |
ports = service_config['ports'] | |
if ports: | |
first_port = str(ports[0]).split(':')[-1] | |
print(f" π Access service at: http://{part}:{first_port}") | |
break | |
except (ValueError, IndexError): | |
continue | |
break | |
def build(self, service_name: Optional[str] = None) -> None: | |
"""Build services.""" | |
if not self._start_container_system(): | |
return | |
if service_name: | |
if service_name not in self.services: | |
print(f"β Service '{service_name}' not found") | |
return | |
self._build_service(service_name, self.services[service_name]) | |
else: | |
# Build all services that have build configuration | |
for name, config in self.services.items(): | |
if 'build' in config: | |
self._build_service(name, config) | |
def up(self, services: Optional[List[str]] = None) -> None: | |
"""Start services.""" | |
if not self._start_container_system(): | |
return | |
services_to_start = services if services else list(self.services.keys()) | |
print(f"π Starting services: {', '.join(services_to_start)}") | |
# Build images first if needed | |
for service_name in services_to_start: | |
if service_name not in self.services: | |
print(f"β Service '{service_name}' not found in compose file") | |
continue | |
service_config = self.services[service_name] | |
print(f"\nπ¦ Preparing {service_name}...") | |
# Build if needed | |
self._build_service(service_name, service_config) | |
# Start containers | |
for service_name in services_to_start: | |
if service_name not in self.services: | |
continue | |
service_config = self.services[service_name] | |
print(f"\nπ Starting {service_name}...") | |
# Run the service | |
self._run_service(service_name, service_config) | |
print("\nβ All services started!") | |
# Wait a moment for containers to be fully up | |
print("β³ Waiting for containers to initialize...") | |
time.sleep(3) | |
# Set up port forwards | |
print("\nπ Setting up port forwards...") | |
for service_name in services_to_start: | |
if service_name in self.services: | |
self._setup_port_forwards_for_service(service_name, self.services[service_name]) | |
print("\nπ Services are ready! Use 'python3 apple-container-compose.py urls' to see access information") | |
def down(self) -> None: | |
"""Stop and remove all containers for this project.""" | |
print(f"π Stopping project '{self.project_name}'...") | |
# First stop all port forwards | |
self._stop_all_port_forwards() | |
# Get list of containers for this project | |
result = self._run_command(['container', 'ls'], capture_output=True, show_command=False) | |
if result.returncode != 0: | |
print("β Failed to list containers") | |
return | |
project_containers = [] | |
lines = result.stdout.strip().split('\n') | |
for line in lines[1:]: # Skip header | |
parts = line.split() | |
if len(parts) >= 1: | |
container_name = parts[0] | |
if container_name.startswith(f"{self.project_name}_"): | |
project_containers.append(container_name) | |
if not project_containers: | |
print("No containers found for this project") | |
return | |
# Stop containers | |
for container_name in project_containers: | |
print(f"π Stopping {container_name}...") | |
result = self._run_command(['container', 'stop', container_name], capture_output=True) | |
if result.returncode == 0: | |
print(f" β Stopped") | |
else: | |
print(f" β οΈ Warning: {result.stderr.strip()}") | |
# Remove containers | |
for container_name in project_containers: | |
print(f"ποΈ Removing {container_name}...") | |
result = self._run_command(['container', 'rm', container_name], capture_output=True) | |
if result.returncode == 0: | |
print(f" β Removed") | |
else: | |
print(f" β οΈ Warning: {result.stderr.strip()}") | |
print("β Project stopped and cleaned up") | |
def ps(self) -> None: | |
"""List containers (equivalent to docker-compose ps).""" | |
print(f"π Containers for project '{self.project_name}':") | |
result = self._run_command(['container', 'ls'], capture_output=True) | |
if result.returncode != 0: | |
print("β Failed to list containers") | |
return | |
lines = result.stdout.strip().split('\n') | |
if len(lines) <= 1: | |
print("No containers running") | |
return | |
# Print header | |
print(lines[0]) | |
# Look for project containers in all lines | |
found_containers = False | |
for line in lines[1:]: | |
parts = line.split() | |
if len(parts) < 2: # Skip malformed lines | |
continue | |
# Try to find container name in different positions | |
# Apple Container output format may vary | |
container_name = None | |
# Check each part to see if it matches our project pattern | |
for part in parts: | |
if part.startswith(f"{self.project_name}_"): | |
container_name = part | |
break | |
if container_name: | |
print(line) | |
found_containers = True | |
if not found_containers: | |
print(f"No containers found for project '{self.project_name}'") | |
def logs(self, service_name: Optional[str] = None, follow: bool = False, tail: int = 100) -> None: | |
"""Show logs for services using container exec.""" | |
if service_name: | |
container_name = self._get_container_name(service_name) | |
print(f"π Logs for service '{service_name}' (container: {container_name}):") | |
# Check if container is running | |
result = self._run_command(['container', 'ls'], capture_output=True, show_command=False) | |
if result.returncode != 0: | |
print("β Failed to list containers") | |
return | |
container_running = False | |
for line in result.stdout.strip().split('\n')[1:]: | |
parts = line.split() | |
for part in parts: | |
if part == container_name: | |
container_running = True | |
break | |
if container_running: | |
break | |
if not container_running: | |
print(f"β Container '{container_name}' is not running") | |
return | |
# Try different logging approaches | |
print(f"β Attempting to retrieve logs from container '{container_name}'...") | |
# Method 1: Try to get recent process output (works for most containers) | |
print("\n--- Recent Process Activity ---") | |
ps_result = self._run_command(['container', 'exec', container_name, 'ps', 'aux'], capture_output=True, show_command=False) | |
if ps_result.returncode == 0: | |
print(ps_result.stdout) | |
else: | |
print("Could not retrieve process list") | |
# Method 2: Try common log locations | |
log_locations = [ | |
'/var/log/nginx/access.log', | |
'/var/log/nginx/error.log', | |
'/var/log/apache2/access.log', | |
'/var/log/apache2/error.log', | |
'/app/logs/*.log', | |
'/logs/*.log', | |
'/tmp/*.log' | |
] | |
print("\n--- Application Logs ---") | |
found_logs = False | |
for log_path in log_locations: | |
if '*' in log_path: | |
# Use find for wildcard paths | |
find_result = self._run_command([ | |
'container', 'exec', container_name, 'find', | |
os.path.dirname(log_path), '-name', os.path.basename(log_path), '-type', 'f' | |
], capture_output=True, show_command=False) | |
if find_result.returncode == 0 and find_result.stdout.strip(): | |
for found_file in find_result.stdout.strip().split('\n'): | |
if found_file.strip(): | |
print(f"\nπ Found log file: {found_file}") | |
tail_result = self._run_command([ | |
'container', 'exec', container_name, 'tail', '-n', str(tail), found_file | |
], capture_output=True, show_command=False) | |
if tail_result.returncode == 0: | |
print(tail_result.stdout) | |
found_logs = True | |
else: | |
# Check if file exists and tail it | |
test_result = self._run_command([ | |
'container', 'exec', container_name, 'test', '-f', log_path | |
], capture_output=True, show_command=False) | |
if test_result.returncode == 0: | |
print(f"\nπ Log file: {log_path}") | |
tail_result = self._run_command([ | |
'container', 'exec', container_name, 'tail', '-n', str(tail), log_path | |
], capture_output=True, show_command=False) | |
if tail_result.returncode == 0: | |
print(tail_result.stdout) | |
found_logs = True | |
# Method 3: Try to check if there's a main process we can inspect | |
print("\n--- Main Process Info ---") | |
main_proc_result = self._run_command([ | |
'container', 'exec', container_name, 'ps', '-o', 'pid,ppid,cmd', '-p', '1' | |
], capture_output=True, show_command=False) | |
if main_proc_result.returncode == 0: | |
print(main_proc_result.stdout) | |
# Method 4: Show environment and working directory | |
print("\n--- Environment Info ---") | |
env_result = self._run_command(['container', 'exec', container_name, 'pwd'], capture_output=True, show_command=False) | |
if env_result.returncode == 0: | |
print(f"Working Directory: {env_result.stdout.strip()}") | |
ls_result = self._run_command(['container', 'exec', container_name, 'ls', '-la'], capture_output=True, show_command=False) | |
if ls_result.returncode == 0: | |
print("Directory Contents:") | |
print(ls_result.stdout) | |
if not found_logs: | |
print("No application logs found in common locations") | |
# Follow mode | |
if follow: | |
print(f"\nποΈ Following logs for '{service_name}' (Ctrl+C to stop)...") | |
try: | |
# Try to follow the most likely log file or just monitor processes | |
self._run_command(['container', 'exec', container_name, 'watch', '-n', '2', 'ps', 'aux']) | |
except KeyboardInterrupt: | |
print("\nπ Stopped following logs") | |
else: | |
print("π Service logs overview:") | |
for name in self.services.keys(): | |
print(f"\n--- {name} ---") | |
self.logs(name, follow=False, tail=10) | |
def urls(self) -> None: | |
"""Show service URLs and access information.""" | |
print(f"π Service URLs for project '{self.project_name}':") | |
print("=" * 60) | |
# Load existing port forwards | |
self._load_port_forward_pids() | |
# Get running containers with IPs | |
result = self._run_command(['container', 'ls'], capture_output=True, show_command=False) | |
if result.returncode != 0: | |
print("β Failed to list containers") | |
return | |
container_ips: Dict[str, str] = {} | |
lines = result.stdout.strip().split('\n') | |
for line in lines[1:]: # Skip header | |
parts = line.split() | |
if len(parts) < 2: | |
continue | |
container_name = None | |
container_ip = None | |
# Find container name and IP | |
for part in parts: | |
if part.startswith(f"{self.project_name}_"): | |
container_name = part | |
elif '.' in part and len(part.split('.')) == 4: | |
try: | |
ip_parts = part.split('.') | |
if all(0 <= int(p) <= 255 for p in ip_parts): | |
container_ip = part | |
except (ValueError, IndexError): | |
continue | |
if container_name and container_ip: | |
container_ips[container_name] = container_ip | |
if not container_ips: | |
print("No running containers found for this project") | |
return | |
# Show service information | |
for service_name, service_config in self.services.items(): | |
container_name = self._get_container_name(service_name) | |
if container_name in container_ips: | |
ip = container_ips[container_name] | |
print(f"\nπ {service_name}") | |
print(f" Container: {container_name}") | |
print(f" IP: {ip}") | |
# Show ports and forwarding status | |
if 'ports' in service_config: | |
ports = service_config['ports'] | |
print(f" Ports:") | |
for port in ports: | |
port_mappings = self._parse_port_mapping(port) | |
for host_port, container_port in port_mappings: | |
# Check if port forward is active | |
forward_key = f"{service_name}_{host_port}" | |
forward_active = forward_key in self.port_forward_pids | |
forward_status = "π’ FORWARDED" if forward_active else "π΄ NOT FORWARDED" | |
print(f" β’ {container_port} (host:{host_port}) {forward_status}") | |
print(f" β Direct: http://{ip}:{container_port}") | |
if forward_active: | |
print(f" β Forwarded: http://localhost:{host_port}") | |
# Add common service-specific access hints | |
container_port_num = str(container_port) | |
if container_port_num == "80": | |
url = f"http://localhost:{host_port}" if forward_active else f"http://{ip}" | |
print(f" π Web: {url}") | |
elif container_port_num == "443": | |
url = f"https://localhost:{host_port}" if forward_active else f"https://{ip}" | |
print(f" π HTTPS: {url}") | |
elif container_port_num in ["3000", "8000", "8080"]: | |
url = f"http://localhost:{host_port}" if forward_active else f"http://{ip}:{container_port_num}" | |
print(f" π§ Dev Server: {url}") | |
elif container_port_num in ["5432"]: | |
host_part = f"localhost:{host_port}" if forward_active else f"{ip}:{container_port_num}" | |
print(f" ποΈ PostgreSQL: postgresql://user:pass@{host_part}/db") | |
elif container_port_num in ["3306"]: | |
host_part = f"localhost:{host_port}" if forward_active else f"{ip}:{container_port_num}" | |
print(f" ποΈ MySQL: mysql://user:pass@{host_part}/db") | |
elif container_port_num in ["6379"]: | |
host_part = f"localhost:{host_port}" if forward_active else f"{ip}:{container_port_num}" | |
print(f" π΄ Redis: redis://{host_part}") | |
elif container_port_num in ["27017"]: | |
host_part = f"localhost:{host_port}" if forward_active else f"{ip}:{container_port_num}" | |
print(f" π MongoDB: mongodb://{host_part}") | |
else: | |
print(f" No exposed ports") | |
print(f" β SSH/Exec: container exec {container_name} /bin/sh") | |
# Show environment hints | |
if 'environment' in service_config: | |
env_vars = service_config['environment'] | |
if isinstance(env_vars, dict): | |
interesting_env = {k: v for k, v in env_vars.items() | |
if any(keyword in k.lower() for keyword in ['port', 'host', 'url', 'endpoint'])} | |
if interesting_env: | |
print(f" Environment:") | |
for k, v in interesting_env.items(): | |
print(f" β’ {k}={v}") | |
else: | |
print(f"\nβ {service_name} (not running)") | |
print(f"\nπ‘ Quick commands:") | |
print(f" β’ List containers: container ls") | |
print(f" β’ Connect to service: container exec <container_name> /bin/sh") | |
print(f" β’ View logs: python3 apple-container-compose.py logs <service_name>") | |
print(f" β’ Setup port forwards: python3 apple-container-compose.py port-forward") | |
print(f" β’ Stop port forwards: python3 apple-container-compose.py stop-port-forward") | |
print(f" β’ Stop all: python3 apple-container-compose.py down") | |
def port_forward(self) -> None: | |
"""Set up port forwards for all services.""" | |
if not self._start_container_system(): | |
return | |
print("π Setting up port forwards for all services...") | |
for service_name, service_config in self.services.items(): | |
self._setup_port_forwards_for_service(service_name, service_config) | |
print("β Port forwards set up successfully") | |
def stop_port_forward(self) -> None: | |
"""Stop all port forwards.""" | |
self._stop_all_port_forwards() | |
print("π Stopped all port forwards") | |
def main() -> None: | |
parser = argparse.ArgumentParser(description='Apple Container Compose - Docker Compose alternative using Apple Container') | |
parser.add_argument('command', choices=['up', 'down', 'build', 'ps', 'logs', 'urls', 'port-forward', 'stop-port-forward'], help='Command to execute') | |
parser.add_argument('service', nargs='?', help='Service name (for build/logs commands)') | |
parser.add_argument('--compose-file', '-f', default='docker-compose.yml', help='Compose file path') | |
parser.add_argument('--services', nargs='*', help='Specific services to start (for up command)') | |
parser.add_argument('--follow', action='store_true', help='Follow log output (for logs command)') | |
parser.add_argument('--tail', type=int, default=100, help='Number of lines to show from end of logs (default: 100)') | |
args = parser.parse_args() | |
try: | |
compose = AppleContainerCompose(args.compose_file) | |
if args.command == 'up': | |
compose.up(args.services) | |
elif args.command == 'down': | |
compose.down() | |
elif args.command == 'build': | |
compose.build(args.service) | |
elif args.command == 'ps': | |
compose.ps() | |
elif args.command == 'logs': | |
compose.logs(args.service, follow=args.follow, tail=args.tail) | |
elif args.command == 'urls': | |
compose.urls() | |
elif args.command == 'port-forward': | |
compose.port_forward() | |
elif args.command == 'stop-port-forward': | |
compose.stop_port_forward() | |
except FileNotFoundError as e: | |
print(f"β {e}") | |
sys.exit(1) | |
except ValueError as e: | |
print(f"β {e}") | |
sys.exit(1) | |
except KeyboardInterrupt: | |
print("\nπ Interrupted by user") | |
sys.exit(1) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Kudo on this one π