Created
October 16, 2025 08:11
-
-
Save aldoborrero/29b257b52937c4030cb38086e74527b4 to your computer and use it in GitHub Desktop.
Nixified OCI ARM grabber
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env nix-shell | |
| #!nix-shell -i python3 -p python3 python3Packages.oci python3Packages.requests python3Packages.rich | |
| """ | |
| Oracle Cloud ARM Instance Creator | |
| Continuously attempts to create an ARM instance in Oracle Cloud Free Tier | |
| """ | |
| import argparse | |
| import json | |
| import os | |
| import random | |
| import sys | |
| import time | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Tuple | |
| import oci | |
| import requests | |
| from rich.console import Console | |
| from rich.panel import Panel | |
| from rich.rule import Rule | |
| from rich.table import Table | |
| # Always use consistent console settings | |
| console = Console(force_terminal=True, force_interactive=False) | |
| class OracleInstanceCreator: | |
| def __init__(self, args): | |
| # Multi-instance support - detect shape type | |
| self.is_arm = args.shape == "VM.Standard.A1.Flex" | |
| self.is_micro = args.shape == "VM.Standard.E2.1.Micro" | |
| self.instance_name = args.name | |
| self.shape = args.shape | |
| self.ocpus = args.ocpus | |
| self.memory_gb = args.memory | |
| self.boot_volume_size_gb = args.boot_volume | |
| self.retry_delay = args.delay | |
| self.profile = args.profile | |
| self.ssh_key_file = args.ssh_key | |
| self.dry_run = args.dry_run | |
| self.assign_public_ip = args.public_ip | |
| # Statistics tracking | |
| self.start_time = datetime.now() | |
| self.attempts_per_ad = {} | |
| self.last_error_per_ad = {} | |
| # Load OCI configuration | |
| self.load_config() | |
| # Initialize OCI client | |
| self.setup_oci_client() | |
| # Will be set during run() | |
| self.image_id = None | |
| self.subnet_id = None | |
| # Multi-instance support | |
| self.max_instances = args.max_instances if self.is_micro else 1 | |
| def load_config(self): | |
| """Load configuration from OCI config file""" | |
| try: | |
| # Load OCI config | |
| config = oci.config.from_file(profile_name=self.profile) | |
| self.tenancy_ocid = config["tenancy"] | |
| self.compartment_id = config["tenancy"] # Use tenancy as compartment | |
| self.user_ocid = config["user"] | |
| self.fingerprint = config["fingerprint"] | |
| self.region = config["region"] | |
| # Read private key from file | |
| key_file = Path(config["key_file"]).expanduser() | |
| if not key_file.exists(): | |
| console.print(f"[red]Error: Key file not found: {key_file}[/red]") | |
| sys.exit(1) | |
| self.private_key = key_file.read_text() | |
| # Load SSH public key - must be specified | |
| if not self.ssh_key_file: | |
| console.print("[red]Error: SSH public key must be specified with --ssh-key[/red]") | |
| console.print("[yellow]Example: oci-arm --ssh-key ~/.ssh/mimas.pub[/yellow]") | |
| sys.exit(1) | |
| ssh_key_path = Path(self.ssh_key_file).expanduser() | |
| if not ssh_key_path.exists(): | |
| # Auto-generate SSH key if missing | |
| console.print(f"[yellow]SSH key not found at {ssh_key_path}[/yellow]") | |
| if args.auto_gen_ssh: | |
| self.generate_ssh_key_pair(ssh_key_path) | |
| console.print(f"[green]Generated new SSH key pair at {ssh_key_path}[/green]") | |
| else: | |
| console.print(f"[red]Error: SSH key file not found: {ssh_key_path}[/red]") | |
| console.print("[yellow]Use --auto-gen-ssh to automatically generate a key pair[/yellow]") | |
| sys.exit(1) | |
| self.ssh_public_key = ssh_key_path.read_text().strip() | |
| console.print(f"[green]Using SSH key: {ssh_key_path}[/green]") | |
| console.print(f"[green]Loaded OCI config profile: {self.profile}[/green]") | |
| console.print(f"[cyan]Region: {self.region}[/cyan]") | |
| except Exception as e: | |
| console.print(f"[red]Error loading OCI config: {e}[/red]") | |
| console.print("[yellow]Make sure ~/.oci/config is properly configured[/yellow]") | |
| sys.exit(1) | |
| # Notification settings | |
| self.ntfy_topic = os.getenv("NTFY_TOPIC", "oracle-arm") | |
| # Image discovery settings | |
| self.os_name = args.os_name | |
| self.os_version = args.os_version | |
| self.save_images = args.save_images | |
| def setup_oci_client(self): | |
| """Setup OCI SDK clients""" | |
| config = { | |
| "user": self.user_ocid, | |
| "key_content": self.private_key, | |
| "fingerprint": self.fingerprint, | |
| "tenancy": self.tenancy_ocid, | |
| "region": self.region, | |
| } | |
| self.compute_client = oci.core.ComputeClient(config) | |
| self.network_client = oci.core.VirtualNetworkClient(config) | |
| self.identity_client = oci.identity.IdentityClient(config) | |
| def get_availability_domains(self) -> List[str]: | |
| """Get all availability domains""" | |
| with console.status("[blue]Getting availability domains...[/blue]"): | |
| ads = self.identity_client.list_availability_domains( | |
| compartment_id=self.compartment_id | |
| ) | |
| ad_names = [ad.name for ad in ads.data] | |
| # Initialize attempt counters | |
| for ad in ad_names: | |
| self.attempts_per_ad[ad] = 0 | |
| self.last_error_per_ad[ad] = "Not attempted" | |
| return ad_names | |
| def _list_images(self, os_name: str = None, os_version: str = None, limit: int = 20): | |
| """Helper to list images with common parameters""" | |
| return self.compute_client.list_images( | |
| compartment_id=self.compartment_id, | |
| operating_system=os_name, | |
| operating_system_version=os_version, | |
| shape=self.shape, | |
| sort_by="TIMECREATED", | |
| sort_order="DESC", | |
| limit=limit, | |
| ) | |
| def _categorize_error(self, error_msg: str) -> Tuple[str, str]: | |
| """Categorize error message and return display text and color""" | |
| if "Out of capacity" in error_msg: | |
| return "Out of capacity", "red" | |
| elif "Limit exceeded" in error_msg: | |
| return "Limit exceeded", "yellow" | |
| elif "Rate limited" in error_msg: | |
| return "Rate limited (429)", "yellow" | |
| else: | |
| return error_msg[:30], "red" | |
| def _get_instance_public_ip(self, instance_id: str) -> Optional[str]: | |
| """Get public IP for an instance""" | |
| try: | |
| vnics = self.compute_client.list_vnic_attachments( | |
| compartment_id=self.compartment_id, | |
| instance_id=instance_id | |
| ) | |
| if vnics.data: | |
| vnic = self.network_client.get_vnic(vnics.data[0].vnic_id) | |
| return vnic.data.public_ip | |
| except: | |
| pass | |
| return None | |
| def generate_ssh_key_pair(self, public_key_path: Path) -> None: | |
| """Generate SSH key pair if it doesn't exist""" | |
| try: | |
| # Try to use paramiko if available | |
| import paramiko | |
| private_key_path = public_key_path.with_suffix('') | |
| # Generate RSA key | |
| key = paramiko.RSAKey.generate(2048) | |
| # Save private key | |
| key.write_private_key_file(str(private_key_path)) | |
| # Save public key | |
| public_key_str = f"ssh-rsa {key.get_base64()} {public_key_path.stem}_auto_generated" | |
| public_key_path.write_text(public_key_str) | |
| # Set appropriate permissions | |
| private_key_path.chmod(0o600) | |
| public_key_path.chmod(0o644) | |
| console.print(f"[green]Generated SSH keypair:[/green]") | |
| console.print(f" Private: {private_key_path}") | |
| console.print(f" Public: {public_key_path}") | |
| except ImportError: | |
| # Fallback to ssh-keygen command | |
| console.print("[yellow]paramiko not available, using ssh-keygen[/yellow]") | |
| private_key_path = public_key_path.with_suffix('') | |
| import subprocess | |
| result = subprocess.run( | |
| [ | |
| "ssh-keygen", | |
| "-t", "rsa", | |
| "-b", "2048", | |
| "-f", str(private_key_path), | |
| "-N", "", # No passphrase | |
| "-C", f"{public_key_path.stem}_auto_generated" | |
| ], | |
| capture_output=True, | |
| text=True | |
| ) | |
| if result.returncode != 0: | |
| raise Exception(f"Failed to generate SSH key: {result.stderr}") | |
| # ssh-keygen creates the .pub file automatically | |
| if not public_key_path.exists(): | |
| # Move the generated .pub file if needed | |
| generated_pub = private_key_path.with_suffix('.pub') | |
| if generated_pub.exists(): | |
| generated_pub.rename(public_key_path) | |
| def discover_images(self) -> Tuple[str, List[Dict]]: | |
| """Discover available images by OS name and version""" | |
| with console.status(f"[blue]Discovering {self.os_name} {self.os_version} images...[/blue]"): | |
| # Get all available images for the shape | |
| images = self._list_images(self.os_name, self.os_version if self.os_version else None) | |
| if not images.data and self.os_version: | |
| # Try without version if no results | |
| console.print(f"[yellow]No {self.os_name} {self.os_version} images found, trying all versions...[/yellow]") | |
| images = self._list_images(self.os_name) | |
| if not images.data: | |
| # List all available OS options | |
| console.print(f"[yellow]No {self.os_name} images found. Listing all available images...[/yellow]") | |
| all_images = self._list_images(limit=50) | |
| # Group by OS | |
| os_groups = {} | |
| for img in all_images.data: | |
| os_key = f"{img.operating_system} {img.operating_system_version}" | |
| if os_key not in os_groups: | |
| os_groups[os_key] = [] | |
| os_groups[os_key].append(img) | |
| # Display available OS options | |
| table = Table(title="Available Operating Systems") | |
| table.add_column("OS", style="cyan") | |
| table.add_column("Version", style="yellow") | |
| table.add_column("Count", style="green") | |
| for os_key, imgs in sorted(os_groups.items()): | |
| parts = os_key.split(' ', 1) | |
| os_name = parts[0] if parts else os_key | |
| os_ver = parts[1] if len(parts) > 1 else "" | |
| table.add_row(os_name, os_ver, str(len(imgs))) | |
| console.print(table) | |
| # Use the first available image as fallback | |
| if all_images.data: | |
| images = all_images | |
| console.print(f"\n[yellow]Using first available image as fallback[/yellow]") | |
| # Process images for export | |
| image_list = [] | |
| for img in images.data: | |
| image_info = { | |
| "id": img.id, | |
| "display_name": img.display_name, | |
| "operating_system": img.operating_system, | |
| "operating_system_version": img.operating_system_version, | |
| "size_in_gbs": getattr(img, 'size_in_gbs', 'N/A'), | |
| "time_created": str(img.time_created), | |
| "lifecycle_state": img.lifecycle_state, | |
| } | |
| image_list.append(image_info) | |
| # Save to JSON if requested | |
| if self.save_images and image_list: | |
| images_file = Path(f"images_{self.shape.replace('.', '_')}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json") | |
| with open(images_file, 'w') as f: | |
| json.dump(image_list, f, indent=2) | |
| console.print(f"[green]Saved {len(image_list)} images to {images_file}[/green]") | |
| if not images.data: | |
| raise Exception(f"No images found for shape {self.shape}") | |
| # Use the most recent image | |
| selected_image = images.data[0] | |
| console.print(f"[green]Selected image: {selected_image.display_name}[/green]") | |
| console.print(f"[dim] OS: {selected_image.operating_system} {selected_image.operating_system_version}[/dim]") | |
| console.print(f"[dim] ID: {selected_image.id}[/dim]") | |
| return selected_image.id, image_list | |
| def get_arm_image_id(self) -> str: | |
| """Get the latest image for the shape (with auto-discovery)""" | |
| if self.os_name: | |
| # Use image discovery by OS name/version | |
| image_id, _ = self.discover_images() | |
| return image_id | |
| # Fallback to original logic for Oracle Linux | |
| image_type = "x86" if self.is_micro else "ARM" | |
| with console.status(f"[blue]Getting latest Oracle Linux {image_type} image...[/blue]"): | |
| # First try with Oracle Linux 9 | |
| images = self._list_images("Oracle Linux", "9", limit=5) | |
| if not images.data: | |
| console.print(f"[yellow]No Oracle Linux 9 images found, trying Oracle Linux 8...[/yellow]") | |
| images = self._list_images("Oracle Linux", "8", limit=5) | |
| if not images.data: | |
| raise Exception(f"No Oracle Linux images found for shape {self.shape}") | |
| selected_image = images.data[0] | |
| console.print(f"[dim]Selected: {selected_image.display_name}[/dim]") | |
| return selected_image.id | |
| def get_subnet_id(self) -> Optional[str]: | |
| """Get subnet ID from existing VCN""" | |
| with console.status("[blue]Looking for existing subnet...[/blue]"): | |
| try: | |
| # List VCNs | |
| vcns = self.network_client.list_vcns(compartment_id=self.compartment_id) | |
| console.print(f"[dim]Found {len(vcns.data)} VCNs in compartment[/dim]") | |
| for vcn in vcns.data: | |
| console.print(f"[dim]Checking VCN: {vcn.display_name} ({vcn.id[:20]}...)[/dim]") | |
| if "nixos" in vcn.display_name.lower() or "vcn" in vcn.display_name.lower(): | |
| # Found VCN, now get subnet | |
| subnets = self.network_client.list_subnets( | |
| compartment_id=self.compartment_id, vcn_id=vcn.id | |
| ) | |
| if subnets.data: | |
| subnet = subnets.data[0] | |
| console.print(f"[green]Found subnet: {subnet.display_name}[/green]") | |
| console.print(f"[dim]Subnet ID: {subnet.id}[/dim]") | |
| console.print(f"[dim]Subnet State: {subnet.lifecycle_state}[/dim]") | |
| # Verify subnet is active | |
| if subnet.lifecycle_state != "AVAILABLE": | |
| console.print(f"[yellow]Warning: Subnet is {subnet.lifecycle_state}, not AVAILABLE[/yellow]") | |
| return subnet.id | |
| console.print("[yellow]No existing subnet found. Please create network resources first.[/yellow]") | |
| console.print("[yellow]Looking for ANY subnet as fallback...[/yellow]") | |
| # Try to find ANY subnet as fallback | |
| all_subnets = self.network_client.list_subnets(compartment_id=self.compartment_id) | |
| if all_subnets.data: | |
| subnet = all_subnets.data[0] | |
| console.print(f"[yellow]Using first available subnet: {subnet.display_name}[/yellow]") | |
| return subnet.id | |
| except Exception as e: | |
| console.print(f"[red]Error getting subnet: {e}[/red]") | |
| return None | |
| def create_instance(self, availability_domain: str) -> Optional[str]: | |
| """Attempt to create an instance""" | |
| self.attempts_per_ad[availability_domain] += 1 | |
| if self.dry_run: | |
| console.print(f"[yellow][DRY RUN] Would create instance in {availability_domain}[/yellow]") | |
| console.print(f" Shape: {self.shape}") | |
| console.print(f" OCPUs: {self.ocpus}") | |
| console.print(f" Memory: {self.memory_gb}GB") | |
| console.print(f" Boot Volume: {self.boot_volume_size_gb}GB") | |
| self.last_error_per_ad[availability_domain] = "Dry run - not attempted" | |
| return None | |
| instance_details = oci.core.models.LaunchInstanceDetails( | |
| availability_domain=availability_domain, | |
| compartment_id=self.compartment_id, | |
| shape=self.shape, | |
| shape_config=oci.core.models.LaunchInstanceShapeConfigDetails( | |
| ocpus=self.ocpus, memory_in_gbs=self.memory_gb | |
| ), | |
| display_name=self.instance_name, | |
| image_id=self.image_id, | |
| subnet_id=self.subnet_id, | |
| metadata={"ssh_authorized_keys": self.ssh_public_key}, | |
| source_details=oci.core.models.InstanceSourceViaImageDetails( | |
| image_id=self.image_id, boot_volume_size_in_gbs=self.boot_volume_size_gb | |
| ), | |
| create_vnic_details=oci.core.models.CreateVnicDetails( | |
| subnet_id=self.subnet_id, | |
| assign_public_ip=self.assign_public_ip, | |
| assign_private_dns_record=True, | |
| display_name="primary-vnic", | |
| ), | |
| # Add availability config for instance recovery | |
| availability_config=oci.core.models.LaunchInstanceAvailabilityConfigDetails( | |
| recovery_action="RESTORE_INSTANCE" | |
| ), | |
| # Enable legacy IMDS endpoints for compatibility | |
| instance_options=oci.core.models.InstanceOptions( | |
| are_legacy_imds_endpoints_disabled=False | |
| ), | |
| # Enable monitoring | |
| agent_config=oci.core.models.LaunchInstanceAgentConfigDetails( | |
| is_monitoring_disabled=False, | |
| is_management_disabled=False, | |
| plugins_config=[ | |
| oci.core.models.InstanceAgentPluginConfigDetails( | |
| name="Compute Instance Monitoring", | |
| desired_state="ENABLED" | |
| ) | |
| ] | |
| ), | |
| ) | |
| try: | |
| response = self.compute_client.launch_instance( | |
| launch_instance_details=instance_details | |
| ) | |
| instance_id = response.data.id | |
| console.print(f"[green]✓ Instance created! ID: {instance_id}[/green]") | |
| # Wait for running state | |
| console.print("[blue]Waiting for instance to be RUNNING...[/blue]") | |
| get_instance_response = oci.wait_until( | |
| self.compute_client, | |
| self.compute_client.get_instance(instance_id), | |
| "lifecycle_state", | |
| "RUNNING", | |
| max_wait_seconds=3600, | |
| ) | |
| return instance_id | |
| except oci.exceptions.ServiceError as e: | |
| if "Out of host capacity" in str(e): | |
| self.last_error_per_ad[availability_domain] = "Out of capacity" | |
| elif "LimitExceeded" in str(e) or e.code == "LimitExceeded": | |
| # Special handling for LimitExceeded - check if instance was created anyway | |
| self.last_error_per_ad[availability_domain] = "Limit exceeded - checking if created" | |
| console.print(f" [yellow]⚠ Limit exceeded, checking if instance was created anyway...[/yellow]") | |
| # Wait a moment and check for the instance | |
| time.sleep(2) | |
| existing_count = self.check_existing_instances() | |
| if existing_count > 0 or existing_count == -1: | |
| console.print(f" [green]✓ Instance was created despite limit error![/green]") | |
| # Find the newly created instance | |
| instances = self.compute_client.list_instances( | |
| compartment_id=self.compartment_id | |
| ) | |
| for instance in instances.data: | |
| if instance.display_name == self.instance_name and instance.lifecycle_state != "TERMINATED": | |
| return instance.id | |
| else: | |
| self.last_error_per_ad[availability_domain] = "Limit exceeded" | |
| elif "NotAuthenticated" in str(e) or e.status == 401: | |
| self.last_error_per_ad[availability_domain] = "Auth failed" | |
| console.print("[red]Authentication failed. Check OCI config.[/red]") | |
| sys.exit(1) | |
| elif e.status == 404: | |
| # HTTP 404 - Resource not found | |
| self.last_error_per_ad[availability_domain] = "Resource not found (404)" | |
| # Show error details only once | |
| if self.attempts_per_ad[availability_domain] == 1: | |
| console.print(f"\n[red]HTTP 404 Error Details:[/red]") | |
| console.print(f"[red]Full error: {e.message}[/red]") | |
| console.print(f"\n[yellow]Debug Information:[/yellow]") | |
| console.print(f" Subnet ID: {self.subnet_id}") | |
| console.print(f" Image ID: {self.image_id}") | |
| console.print(f" Compartment: {self.compartment_id}") | |
| console.print(f" Shape: {self.shape}") | |
| console.print(f" AD: {availability_domain}") | |
| elif "TooManyRequests" in str(e): | |
| self.last_error_per_ad[availability_domain] = "Rate limited (429)" | |
| elif "InternalError" in str(e): | |
| self.last_error_per_ad[availability_domain] = "Internal error" | |
| elif "InvalidParameter" in str(e): | |
| self.last_error_per_ad[availability_domain] = f"Invalid param: {str(e.message)[:30]}" | |
| else: | |
| # Extract HTTP status code if available | |
| if hasattr(e, 'status') and e.status: | |
| self.last_error_per_ad[availability_domain] = f"HTTP {e.status}: {str(e.message)[:40]}" | |
| else: | |
| self.last_error_per_ad[availability_domain] = str(e.message)[:50] | |
| return None | |
| except Exception as e: | |
| self.last_error_per_ad[availability_domain] = f"Error: {str(e)[:50]}" | |
| return None | |
| def send_notification(self, instance_id: str, public_ip: str): | |
| """Send success notification via ntfy.sh""" | |
| message = ( | |
| f"🎉 Oracle ARM instance created!\n" | |
| f"Instance: {self.instance_name}\n" | |
| f"ID: {instance_id}\n" | |
| f"Public IP: {public_ip}\n" | |
| f"SSH: ssh root@{public_ip}" | |
| ) | |
| # Send to ntfy.sh | |
| try: | |
| requests.post( | |
| f"https://ntfy.sh/{self.ntfy_topic}", | |
| data=message.encode('utf-8'), | |
| headers={ | |
| "Title": "Oracle ARM Instance Ready", | |
| "Priority": "high", | |
| "Tags": "tada,cloud" | |
| } | |
| ) | |
| console.print(f"[green]Notification sent to ntfy.sh/{self.ntfy_topic}[/green]") | |
| except: | |
| pass | |
| def check_existing_instances(self) -> int: | |
| """Check existing instances and return count of matching shape""" | |
| with console.status("[blue]Checking for existing instances...[/blue]"): | |
| instances = self.compute_client.list_instances( | |
| compartment_id=self.compartment_id | |
| ) | |
| # Count instances of the same shape | |
| shape_instances = [] | |
| for instance in instances.data: | |
| if instance.shape == self.shape and instance.lifecycle_state not in ["TERMINATED", "TERMINATING"]: | |
| shape_instances.append(instance) | |
| if shape_instances: | |
| console.print(f"\n[cyan]Found {len(shape_instances)} existing {self.shape} instance(s):[/cyan]") | |
| for instance in shape_instances: | |
| console.print(f" • {instance.display_name} - {instance.lifecycle_state}") | |
| if instance.lifecycle_state == "RUNNING": | |
| # Get public IP | |
| public_ip = self._get_instance_public_ip(instance.id) | |
| if public_ip: | |
| console.print(f" Public IP: {public_ip}") | |
| # For micro instances, check if we can create more | |
| if self.is_micro: | |
| if len(shape_instances) >= self.max_instances: | |
| console.print(f"[yellow]Already have {len(shape_instances)}/{self.max_instances} micro instances[/yellow]") | |
| return len(shape_instances) | |
| else: | |
| console.print(f"[green]Have {len(shape_instances)}/{self.max_instances} micro instances, can create {self.max_instances - len(shape_instances)} more[/green]") | |
| return len(shape_instances) | |
| # For ARM, check if instance with the desired name exists | |
| for instance in instances.data: | |
| if instance.display_name == self.instance_name and instance.lifecycle_state != "TERMINATED": | |
| console.print(f"[yellow]Instance '{self.instance_name}' already exists![/yellow]") | |
| return -1 # Signal that specific instance exists | |
| return len(shape_instances) | |
| def run(self): | |
| """Main loop to create instance""" | |
| # Show welcome banner | |
| if self.is_arm: | |
| title = "🚀 Oracle Cloud ARM Instance Creator" | |
| elif self.is_micro: | |
| title = "🖥️ Oracle Cloud Micro Instance Creator" | |
| else: | |
| title = "☁️ Oracle Cloud Instance Creator" | |
| border_style = "green" | |
| console.print(Rule(title, style="bold blue")) | |
| config_text = ( | |
| f"[cyan]Instance:[/cyan] {self.instance_name}\n" | |
| f"[cyan]Shape:[/cyan] {self.shape} ({self.ocpus} OCPUs, {self.memory_gb}GB RAM)\n" | |
| f"[cyan]Boot Volume:[/cyan] {self.boot_volume_size_gb}GB\n" | |
| f"[cyan]Region:[/cyan] {self.region}\n" | |
| f"[cyan]Profile:[/cyan] {self.profile}\n" | |
| ) | |
| if self.is_micro: | |
| config_text += f"[cyan]Max Instances:[/cyan] {self.max_instances}\n" | |
| if self.os_name: | |
| config_text += f"[cyan]OS:[/cyan] {self.os_name} {self.os_version or 'latest'}\n" | |
| config_text += f"[cyan]Dry Run:[/cyan] {'Yes' if self.dry_run else 'No'}" | |
| console.print( | |
| Panel.fit( | |
| config_text, | |
| title="Configuration", | |
| border_style=border_style, | |
| ) | |
| ) | |
| # Check for existing instances | |
| if not self.dry_run: | |
| existing_count = self.check_existing_instances() | |
| # For micro instances, check if we've reached the limit | |
| if self.is_micro and existing_count >= self.max_instances: | |
| console.print(f"[yellow]Already have maximum {self.max_instances} micro instances[/yellow]") | |
| return | |
| # For other instances, check if specific instance exists | |
| if not self.is_micro and existing_count == -1: | |
| console.print("[green]Instance already exists, no need to create.[/green]") | |
| return | |
| # Get configuration | |
| self.subnet_id = self.get_subnet_id() | |
| if not self.subnet_id: | |
| console.print("[red]Error: No subnet found. Please create network resources first.[/red]") | |
| sys.exit(1) | |
| try: | |
| self.image_id = self.get_arm_image_id() | |
| console.print(f"[cyan]Image ID:[/cyan] {self.image_id}") | |
| # Verify image exists | |
| try: | |
| image = self.compute_client.get_image(self.image_id) | |
| console.print(f"[dim]Image: {image.data.display_name}[/dim]") | |
| console.print(f"[dim]Image State: {image.data.lifecycle_state}[/dim]") | |
| except Exception as e: | |
| console.print(f"[yellow]Warning: Could not verify image: {e}[/yellow]") | |
| except Exception as e: | |
| console.print(f"[red]Error getting image: {e}[/red]") | |
| sys.exit(1) | |
| # Get availability domains | |
| ads = self.get_availability_domains() | |
| console.print(f"[cyan]Found {len(ads)} availability domains[/cyan]") | |
| # Main loop with consistent logging style | |
| round_num = 0 | |
| total_attempts = 0 | |
| start_time = datetime.now() | |
| try: | |
| while True: | |
| round_num += 1 | |
| elapsed = datetime.now() - start_time | |
| round_results = {} # Store results for this round | |
| # Round header | |
| console.print(f"\n[bold blue]╔{'═' * 58}╗[/bold blue]") | |
| console.print(f"[bold blue]║ ROUND {round_num:3d} | Elapsed: {str(elapsed).split('.')[0]:>10s} | Total Attempts: {total_attempts:4d} ║[/bold blue]") | |
| console.print(f"[bold blue]╚{'═' * 58}╝[/bold blue]") | |
| # Try each AD in this round | |
| console.print("\n[cyan]Checking Availability Domains:[/cyan]") | |
| for i, ad in enumerate(ads): | |
| ad_short = ad.split(":")[1] if ":" in ad else ad | |
| console.print(f" [{i+1}/{len(ads)}] {ad_short}...", end="") | |
| # For micro instances, check if we need to create more | |
| if self.is_micro: | |
| current_count = self.check_existing_instances() | |
| if current_count >= self.max_instances: | |
| console.print(f" [yellow]✗ Already have {self.max_instances} instances[/yellow]") | |
| instance_id = "limit_reached" | |
| break | |
| # Try to create instance | |
| total_attempts += 1 | |
| instance_id = self.create_instance(ad) | |
| # Store and display result | |
| if instance_id: | |
| round_results[ad_short] = ("SUCCESS", "green") | |
| console.print(f" [green]✓ SUCCESS[/green]") | |
| break | |
| else: | |
| error_msg = self.last_error_per_ad[ad] | |
| status, color = self._categorize_error(error_msg) | |
| round_results[ad_short] = (status, color) | |
| console.print(f" [{color}]✗ {status}[/{color}]") | |
| # Minimal delay with jitter between ADs | |
| if i < len(ads) - 1 and not instance_id: | |
| time.sleep(0.1 + random.random() * 0.4) | |
| # Round summary | |
| console.print("\n[cyan]Round Summary:[/cyan]") | |
| console.print(f" • Attempts this round: {len(round_results)}") | |
| console.print(f" • Total attempts: {total_attempts}") | |
| console.print(f" • Time elapsed: {str(elapsed).split('.')[0]}") | |
| # Show cumulative statistics | |
| capacity_count = sum(1 for r in round_results.values() if "Out of capacity" in r[0]) | |
| if capacity_count == len(round_results): | |
| console.print(f" • Status: [yellow]All {len(round_results)} ADs out of capacity[/yellow]") | |
| elif "SUCCESS" in [r[0] for r in round_results.values()]: | |
| console.print(f" • Status: [green]Instance created successfully![/green]") | |
| else: | |
| console.print(f" • Status: [red]{capacity_count}/{len(round_results)} ADs out of capacity[/red]") | |
| if instance_id: | |
| if instance_id == "limit_reached": | |
| console.print(f"\n[yellow]Instance limit reached, stopping.[/yellow]") | |
| else: | |
| console.print(f"\n[green]🎉 Instance creation successful in round {round_num}![/green]") | |
| break | |
| # Wait before next round | |
| wait_time = 0.1 + random.random() * 1.9 | |
| console.print(f"\n[dim]Waiting {wait_time:.1f}s before next round...[/dim]") | |
| time.sleep(wait_time) | |
| # Handle success (common for both interactive and non-interactive) | |
| if instance_id and instance_id != "limit_reached": | |
| # Get instance details | |
| public_ip = self._get_instance_public_ip(instance_id) or "none" | |
| console.print(Rule("✨ SUCCESS", style="bold green")) | |
| console.print( | |
| Panel.fit( | |
| f"[green]Instance created successfully![/green]\n\n" | |
| f"[cyan]Instance ID:[/cyan] {instance_id}\n" | |
| f"[cyan]Public IP:[/cyan] {public_ip}\n" | |
| f"[cyan]SSH:[/cyan] ssh root@{public_ip}", | |
| title="Instance Details", | |
| border_style="green", | |
| ) | |
| ) | |
| # Send notification | |
| self.send_notification(instance_id, public_ip) | |
| return | |
| except Exception as e: | |
| console.print(f"[red]Error in main loop: {e}[/red]") | |
| import traceback | |
| traceback.print_exc() | |
| def parse_args(): | |
| """Parse command line arguments""" | |
| parser = argparse.ArgumentParser( | |
| description="Oracle Cloud ARM Instance Creator - Continuously attempts to create an ARM instance", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Examples: | |
| # Use default settings (4 OCPUs, 24GB RAM ARM instance) | |
| %(prog)s --ssh-key ~/.ssh/mimas.pub | |
| # Create micro instances (x86 free tier) | |
| %(prog)s --ssh-key ~/.ssh/mimas.pub --shape VM.Standard.E2.1.Micro --max-instances 2 | |
| # Use custom OS with image discovery | |
| %(prog)s --ssh-key ~/.ssh/mimas.pub --os-name Ubuntu --os-version 22.04 --save-images | |
| # Auto-generate SSH key if missing | |
| %(prog)s --ssh-key ~/.ssh/mimas.pub --auto-gen-ssh | |
| # Use minimal ARM resources | |
| %(prog)s --ssh-key ~/.ssh/mimas.pub --ocpus 1 --memory 6 | |
| # Dry run to test configuration | |
| %(prog)s --ssh-key ~/.ssh/mimas.pub --dry-run | |
| Environment Variables: | |
| NTFY_TOPIC - ntfy.sh topic for notifications (default: oracle-arm) | |
| """, | |
| ) | |
| # Instance configuration | |
| parser.add_argument( | |
| "--name", | |
| default="mimas", | |
| help="Instance name (default: mimas)", | |
| ) | |
| parser.add_argument( | |
| "--shape", | |
| default="VM.Standard.A1.Flex", | |
| choices=["VM.Standard.A1.Flex", "VM.Standard.E2.1.Micro"], | |
| help="Instance shape (default: VM.Standard.A1.Flex)", | |
| ) | |
| parser.add_argument( | |
| "--ocpus", | |
| type=int, | |
| default=4, | |
| help="Number of OCPUs (default: 4, max free tier ARM: 4, micro: 1)", | |
| ) | |
| parser.add_argument( | |
| "--memory", | |
| type=int, | |
| default=24, | |
| help="Memory in GB (default: 24, max free tier ARM: 24, micro: 1)", | |
| ) | |
| parser.add_argument( | |
| "--boot-volume", | |
| type=int, | |
| default=200, | |
| help="Boot volume size in GB (default: 200, maximum free tier)", | |
| ) | |
| parser.add_argument( | |
| "--max-instances", | |
| type=int, | |
| default=2, | |
| help="Maximum number of instances to create (for E2.1.Micro only, default: 2)", | |
| ) | |
| # OCI configuration | |
| parser.add_argument( | |
| "--profile", | |
| default=os.getenv("OCI_CLI_PROFILE", "DEFAULT"), | |
| help="OCI config profile to use (default: DEFAULT or $OCI_CLI_PROFILE)", | |
| ) | |
| parser.add_argument( | |
| "--ssh-key", | |
| required=True, | |
| help="Path to SSH public key file (required)", | |
| ) | |
| parser.add_argument( | |
| "--auto-gen-ssh", | |
| action="store_true", | |
| help="Automatically generate SSH key pair if not found", | |
| ) | |
| # Image discovery options | |
| parser.add_argument( | |
| "--os-name", | |
| help="Operating system name for image discovery (e.g., 'Oracle Linux', 'Ubuntu')", | |
| ) | |
| parser.add_argument( | |
| "--os-version", | |
| help="Operating system version (e.g., '9', '22.04')", | |
| ) | |
| parser.add_argument( | |
| "--save-images", | |
| action="store_true", | |
| help="Save discovered images to JSON file", | |
| ) | |
| # Retry configuration | |
| parser.add_argument( | |
| "--delay", | |
| type=int, | |
| default=5, | |
| help="Delay in seconds between retry rounds (default: 5)", | |
| ) | |
| # Other options | |
| parser.add_argument( | |
| "--public-ip", | |
| action="store_true", | |
| default=True, | |
| help="Assign public IP to instance (default: True)", | |
| ) | |
| parser.add_argument( | |
| "--no-public-ip", | |
| dest="public_ip", | |
| action="store_false", | |
| help="Do not assign public IP to instance", | |
| ) | |
| parser.add_argument( | |
| "--dry-run", | |
| action="store_true", | |
| help="Perform a dry run without actually creating instances", | |
| ) | |
| parser.add_argument( | |
| "--version", | |
| action="version", | |
| version="%(prog)s 1.0.0", | |
| ) | |
| args = parser.parse_args() | |
| # Validate arguments based on shape | |
| if args.shape == "VM.Standard.A1.Flex": | |
| if args.ocpus < 1 or args.ocpus > 4: | |
| parser.error("OCPUs must be between 1 and 4 for ARM free tier") | |
| if args.memory < 1 or args.memory > 24: | |
| parser.error("Memory must be between 1 and 24 GB for ARM free tier") | |
| if args.memory < args.ocpus * 1: | |
| parser.error("Memory must be at least 1GB per OCPU") | |
| if args.memory > args.ocpus * 8: | |
| parser.error("Memory cannot exceed 8GB per OCPU") | |
| elif args.shape == "VM.Standard.E2.1.Micro": | |
| # Force micro instance settings | |
| args.ocpus = 1 | |
| args.memory = 1 | |
| if args.max_instances < 1 or args.max_instances > 2: | |
| parser.error("max-instances must be 1 or 2 for micro instances") | |
| return args | |
| if __name__ == "__main__": | |
| try: | |
| args = parse_args() | |
| creator = OracleInstanceCreator(args) | |
| creator.run() | |
| except KeyboardInterrupt: | |
| console.print("\n[yellow]Interrupted by user[/yellow]") | |
| sys.exit(0) | |
| except Exception as e: | |
| console.print(f"[red]Fatal error: {e}[/red]") | |
| import traceback | |
| if os.getenv("DEBUG"): | |
| traceback.print_exc() | |
| sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment