Skip to content

Instantly share code, notes, and snippets.

@aldoborrero
Created October 16, 2025 08:11
Show Gist options
  • Save aldoborrero/29b257b52937c4030cb38086e74527b4 to your computer and use it in GitHub Desktop.
Save aldoborrero/29b257b52937c4030cb38086e74527b4 to your computer and use it in GitHub Desktop.
Nixified OCI ARM grabber
#!/usr/bin/env nix-shell
#!nix-shell -i python3 -p python3 python3Packages.oci python3Packages.requests python3Packages.rich
"""
Oracle Cloud ARM Instance Creator
Continuously attempts to create an ARM instance in Oracle Cloud Free Tier
"""
import argparse
import json
import os
import random
import sys
import time
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import oci
import requests
from rich.console import Console
from rich.panel import Panel
from rich.rule import Rule
from rich.table import Table
# Always use consistent console settings
console = Console(force_terminal=True, force_interactive=False)
class OracleInstanceCreator:
def __init__(self, args):
# Multi-instance support - detect shape type
self.is_arm = args.shape == "VM.Standard.A1.Flex"
self.is_micro = args.shape == "VM.Standard.E2.1.Micro"
self.instance_name = args.name
self.shape = args.shape
self.ocpus = args.ocpus
self.memory_gb = args.memory
self.boot_volume_size_gb = args.boot_volume
self.retry_delay = args.delay
self.profile = args.profile
self.ssh_key_file = args.ssh_key
self.dry_run = args.dry_run
self.assign_public_ip = args.public_ip
# Statistics tracking
self.start_time = datetime.now()
self.attempts_per_ad = {}
self.last_error_per_ad = {}
# Load OCI configuration
self.load_config()
# Initialize OCI client
self.setup_oci_client()
# Will be set during run()
self.image_id = None
self.subnet_id = None
# Multi-instance support
self.max_instances = args.max_instances if self.is_micro else 1
def load_config(self):
"""Load configuration from OCI config file"""
try:
# Load OCI config
config = oci.config.from_file(profile_name=self.profile)
self.tenancy_ocid = config["tenancy"]
self.compartment_id = config["tenancy"] # Use tenancy as compartment
self.user_ocid = config["user"]
self.fingerprint = config["fingerprint"]
self.region = config["region"]
# Read private key from file
key_file = Path(config["key_file"]).expanduser()
if not key_file.exists():
console.print(f"[red]Error: Key file not found: {key_file}[/red]")
sys.exit(1)
self.private_key = key_file.read_text()
# Load SSH public key - must be specified
if not self.ssh_key_file:
console.print("[red]Error: SSH public key must be specified with --ssh-key[/red]")
console.print("[yellow]Example: oci-arm --ssh-key ~/.ssh/mimas.pub[/yellow]")
sys.exit(1)
ssh_key_path = Path(self.ssh_key_file).expanduser()
if not ssh_key_path.exists():
# Auto-generate SSH key if missing
console.print(f"[yellow]SSH key not found at {ssh_key_path}[/yellow]")
if args.auto_gen_ssh:
self.generate_ssh_key_pair(ssh_key_path)
console.print(f"[green]Generated new SSH key pair at {ssh_key_path}[/green]")
else:
console.print(f"[red]Error: SSH key file not found: {ssh_key_path}[/red]")
console.print("[yellow]Use --auto-gen-ssh to automatically generate a key pair[/yellow]")
sys.exit(1)
self.ssh_public_key = ssh_key_path.read_text().strip()
console.print(f"[green]Using SSH key: {ssh_key_path}[/green]")
console.print(f"[green]Loaded OCI config profile: {self.profile}[/green]")
console.print(f"[cyan]Region: {self.region}[/cyan]")
except Exception as e:
console.print(f"[red]Error loading OCI config: {e}[/red]")
console.print("[yellow]Make sure ~/.oci/config is properly configured[/yellow]")
sys.exit(1)
# Notification settings
self.ntfy_topic = os.getenv("NTFY_TOPIC", "oracle-arm")
# Image discovery settings
self.os_name = args.os_name
self.os_version = args.os_version
self.save_images = args.save_images
def setup_oci_client(self):
"""Setup OCI SDK clients"""
config = {
"user": self.user_ocid,
"key_content": self.private_key,
"fingerprint": self.fingerprint,
"tenancy": self.tenancy_ocid,
"region": self.region,
}
self.compute_client = oci.core.ComputeClient(config)
self.network_client = oci.core.VirtualNetworkClient(config)
self.identity_client = oci.identity.IdentityClient(config)
def get_availability_domains(self) -> List[str]:
"""Get all availability domains"""
with console.status("[blue]Getting availability domains...[/blue]"):
ads = self.identity_client.list_availability_domains(
compartment_id=self.compartment_id
)
ad_names = [ad.name for ad in ads.data]
# Initialize attempt counters
for ad in ad_names:
self.attempts_per_ad[ad] = 0
self.last_error_per_ad[ad] = "Not attempted"
return ad_names
def _list_images(self, os_name: str = None, os_version: str = None, limit: int = 20):
"""Helper to list images with common parameters"""
return self.compute_client.list_images(
compartment_id=self.compartment_id,
operating_system=os_name,
operating_system_version=os_version,
shape=self.shape,
sort_by="TIMECREATED",
sort_order="DESC",
limit=limit,
)
def _categorize_error(self, error_msg: str) -> Tuple[str, str]:
"""Categorize error message and return display text and color"""
if "Out of capacity" in error_msg:
return "Out of capacity", "red"
elif "Limit exceeded" in error_msg:
return "Limit exceeded", "yellow"
elif "Rate limited" in error_msg:
return "Rate limited (429)", "yellow"
else:
return error_msg[:30], "red"
def _get_instance_public_ip(self, instance_id: str) -> Optional[str]:
"""Get public IP for an instance"""
try:
vnics = self.compute_client.list_vnic_attachments(
compartment_id=self.compartment_id,
instance_id=instance_id
)
if vnics.data:
vnic = self.network_client.get_vnic(vnics.data[0].vnic_id)
return vnic.data.public_ip
except:
pass
return None
def generate_ssh_key_pair(self, public_key_path: Path) -> None:
"""Generate SSH key pair if it doesn't exist"""
try:
# Try to use paramiko if available
import paramiko
private_key_path = public_key_path.with_suffix('')
# Generate RSA key
key = paramiko.RSAKey.generate(2048)
# Save private key
key.write_private_key_file(str(private_key_path))
# Save public key
public_key_str = f"ssh-rsa {key.get_base64()} {public_key_path.stem}_auto_generated"
public_key_path.write_text(public_key_str)
# Set appropriate permissions
private_key_path.chmod(0o600)
public_key_path.chmod(0o644)
console.print(f"[green]Generated SSH keypair:[/green]")
console.print(f" Private: {private_key_path}")
console.print(f" Public: {public_key_path}")
except ImportError:
# Fallback to ssh-keygen command
console.print("[yellow]paramiko not available, using ssh-keygen[/yellow]")
private_key_path = public_key_path.with_suffix('')
import subprocess
result = subprocess.run(
[
"ssh-keygen",
"-t", "rsa",
"-b", "2048",
"-f", str(private_key_path),
"-N", "", # No passphrase
"-C", f"{public_key_path.stem}_auto_generated"
],
capture_output=True,
text=True
)
if result.returncode != 0:
raise Exception(f"Failed to generate SSH key: {result.stderr}")
# ssh-keygen creates the .pub file automatically
if not public_key_path.exists():
# Move the generated .pub file if needed
generated_pub = private_key_path.with_suffix('.pub')
if generated_pub.exists():
generated_pub.rename(public_key_path)
def discover_images(self) -> Tuple[str, List[Dict]]:
"""Discover available images by OS name and version"""
with console.status(f"[blue]Discovering {self.os_name} {self.os_version} images...[/blue]"):
# Get all available images for the shape
images = self._list_images(self.os_name, self.os_version if self.os_version else None)
if not images.data and self.os_version:
# Try without version if no results
console.print(f"[yellow]No {self.os_name} {self.os_version} images found, trying all versions...[/yellow]")
images = self._list_images(self.os_name)
if not images.data:
# List all available OS options
console.print(f"[yellow]No {self.os_name} images found. Listing all available images...[/yellow]")
all_images = self._list_images(limit=50)
# Group by OS
os_groups = {}
for img in all_images.data:
os_key = f"{img.operating_system} {img.operating_system_version}"
if os_key not in os_groups:
os_groups[os_key] = []
os_groups[os_key].append(img)
# Display available OS options
table = Table(title="Available Operating Systems")
table.add_column("OS", style="cyan")
table.add_column("Version", style="yellow")
table.add_column("Count", style="green")
for os_key, imgs in sorted(os_groups.items()):
parts = os_key.split(' ', 1)
os_name = parts[0] if parts else os_key
os_ver = parts[1] if len(parts) > 1 else ""
table.add_row(os_name, os_ver, str(len(imgs)))
console.print(table)
# Use the first available image as fallback
if all_images.data:
images = all_images
console.print(f"\n[yellow]Using first available image as fallback[/yellow]")
# Process images for export
image_list = []
for img in images.data:
image_info = {
"id": img.id,
"display_name": img.display_name,
"operating_system": img.operating_system,
"operating_system_version": img.operating_system_version,
"size_in_gbs": getattr(img, 'size_in_gbs', 'N/A'),
"time_created": str(img.time_created),
"lifecycle_state": img.lifecycle_state,
}
image_list.append(image_info)
# Save to JSON if requested
if self.save_images and image_list:
images_file = Path(f"images_{self.shape.replace('.', '_')}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json")
with open(images_file, 'w') as f:
json.dump(image_list, f, indent=2)
console.print(f"[green]Saved {len(image_list)} images to {images_file}[/green]")
if not images.data:
raise Exception(f"No images found for shape {self.shape}")
# Use the most recent image
selected_image = images.data[0]
console.print(f"[green]Selected image: {selected_image.display_name}[/green]")
console.print(f"[dim] OS: {selected_image.operating_system} {selected_image.operating_system_version}[/dim]")
console.print(f"[dim] ID: {selected_image.id}[/dim]")
return selected_image.id, image_list
def get_arm_image_id(self) -> str:
"""Get the latest image for the shape (with auto-discovery)"""
if self.os_name:
# Use image discovery by OS name/version
image_id, _ = self.discover_images()
return image_id
# Fallback to original logic for Oracle Linux
image_type = "x86" if self.is_micro else "ARM"
with console.status(f"[blue]Getting latest Oracle Linux {image_type} image...[/blue]"):
# First try with Oracle Linux 9
images = self._list_images("Oracle Linux", "9", limit=5)
if not images.data:
console.print(f"[yellow]No Oracle Linux 9 images found, trying Oracle Linux 8...[/yellow]")
images = self._list_images("Oracle Linux", "8", limit=5)
if not images.data:
raise Exception(f"No Oracle Linux images found for shape {self.shape}")
selected_image = images.data[0]
console.print(f"[dim]Selected: {selected_image.display_name}[/dim]")
return selected_image.id
def get_subnet_id(self) -> Optional[str]:
"""Get subnet ID from existing VCN"""
with console.status("[blue]Looking for existing subnet...[/blue]"):
try:
# List VCNs
vcns = self.network_client.list_vcns(compartment_id=self.compartment_id)
console.print(f"[dim]Found {len(vcns.data)} VCNs in compartment[/dim]")
for vcn in vcns.data:
console.print(f"[dim]Checking VCN: {vcn.display_name} ({vcn.id[:20]}...)[/dim]")
if "nixos" in vcn.display_name.lower() or "vcn" in vcn.display_name.lower():
# Found VCN, now get subnet
subnets = self.network_client.list_subnets(
compartment_id=self.compartment_id, vcn_id=vcn.id
)
if subnets.data:
subnet = subnets.data[0]
console.print(f"[green]Found subnet: {subnet.display_name}[/green]")
console.print(f"[dim]Subnet ID: {subnet.id}[/dim]")
console.print(f"[dim]Subnet State: {subnet.lifecycle_state}[/dim]")
# Verify subnet is active
if subnet.lifecycle_state != "AVAILABLE":
console.print(f"[yellow]Warning: Subnet is {subnet.lifecycle_state}, not AVAILABLE[/yellow]")
return subnet.id
console.print("[yellow]No existing subnet found. Please create network resources first.[/yellow]")
console.print("[yellow]Looking for ANY subnet as fallback...[/yellow]")
# Try to find ANY subnet as fallback
all_subnets = self.network_client.list_subnets(compartment_id=self.compartment_id)
if all_subnets.data:
subnet = all_subnets.data[0]
console.print(f"[yellow]Using first available subnet: {subnet.display_name}[/yellow]")
return subnet.id
except Exception as e:
console.print(f"[red]Error getting subnet: {e}[/red]")
return None
def create_instance(self, availability_domain: str) -> Optional[str]:
"""Attempt to create an instance"""
self.attempts_per_ad[availability_domain] += 1
if self.dry_run:
console.print(f"[yellow][DRY RUN] Would create instance in {availability_domain}[/yellow]")
console.print(f" Shape: {self.shape}")
console.print(f" OCPUs: {self.ocpus}")
console.print(f" Memory: {self.memory_gb}GB")
console.print(f" Boot Volume: {self.boot_volume_size_gb}GB")
self.last_error_per_ad[availability_domain] = "Dry run - not attempted"
return None
instance_details = oci.core.models.LaunchInstanceDetails(
availability_domain=availability_domain,
compartment_id=self.compartment_id,
shape=self.shape,
shape_config=oci.core.models.LaunchInstanceShapeConfigDetails(
ocpus=self.ocpus, memory_in_gbs=self.memory_gb
),
display_name=self.instance_name,
image_id=self.image_id,
subnet_id=self.subnet_id,
metadata={"ssh_authorized_keys": self.ssh_public_key},
source_details=oci.core.models.InstanceSourceViaImageDetails(
image_id=self.image_id, boot_volume_size_in_gbs=self.boot_volume_size_gb
),
create_vnic_details=oci.core.models.CreateVnicDetails(
subnet_id=self.subnet_id,
assign_public_ip=self.assign_public_ip,
assign_private_dns_record=True,
display_name="primary-vnic",
),
# Add availability config for instance recovery
availability_config=oci.core.models.LaunchInstanceAvailabilityConfigDetails(
recovery_action="RESTORE_INSTANCE"
),
# Enable legacy IMDS endpoints for compatibility
instance_options=oci.core.models.InstanceOptions(
are_legacy_imds_endpoints_disabled=False
),
# Enable monitoring
agent_config=oci.core.models.LaunchInstanceAgentConfigDetails(
is_monitoring_disabled=False,
is_management_disabled=False,
plugins_config=[
oci.core.models.InstanceAgentPluginConfigDetails(
name="Compute Instance Monitoring",
desired_state="ENABLED"
)
]
),
)
try:
response = self.compute_client.launch_instance(
launch_instance_details=instance_details
)
instance_id = response.data.id
console.print(f"[green]✓ Instance created! ID: {instance_id}[/green]")
# Wait for running state
console.print("[blue]Waiting for instance to be RUNNING...[/blue]")
get_instance_response = oci.wait_until(
self.compute_client,
self.compute_client.get_instance(instance_id),
"lifecycle_state",
"RUNNING",
max_wait_seconds=3600,
)
return instance_id
except oci.exceptions.ServiceError as e:
if "Out of host capacity" in str(e):
self.last_error_per_ad[availability_domain] = "Out of capacity"
elif "LimitExceeded" in str(e) or e.code == "LimitExceeded":
# Special handling for LimitExceeded - check if instance was created anyway
self.last_error_per_ad[availability_domain] = "Limit exceeded - checking if created"
console.print(f" [yellow]⚠ Limit exceeded, checking if instance was created anyway...[/yellow]")
# Wait a moment and check for the instance
time.sleep(2)
existing_count = self.check_existing_instances()
if existing_count > 0 or existing_count == -1:
console.print(f" [green]✓ Instance was created despite limit error![/green]")
# Find the newly created instance
instances = self.compute_client.list_instances(
compartment_id=self.compartment_id
)
for instance in instances.data:
if instance.display_name == self.instance_name and instance.lifecycle_state != "TERMINATED":
return instance.id
else:
self.last_error_per_ad[availability_domain] = "Limit exceeded"
elif "NotAuthenticated" in str(e) or e.status == 401:
self.last_error_per_ad[availability_domain] = "Auth failed"
console.print("[red]Authentication failed. Check OCI config.[/red]")
sys.exit(1)
elif e.status == 404:
# HTTP 404 - Resource not found
self.last_error_per_ad[availability_domain] = "Resource not found (404)"
# Show error details only once
if self.attempts_per_ad[availability_domain] == 1:
console.print(f"\n[red]HTTP 404 Error Details:[/red]")
console.print(f"[red]Full error: {e.message}[/red]")
console.print(f"\n[yellow]Debug Information:[/yellow]")
console.print(f" Subnet ID: {self.subnet_id}")
console.print(f" Image ID: {self.image_id}")
console.print(f" Compartment: {self.compartment_id}")
console.print(f" Shape: {self.shape}")
console.print(f" AD: {availability_domain}")
elif "TooManyRequests" in str(e):
self.last_error_per_ad[availability_domain] = "Rate limited (429)"
elif "InternalError" in str(e):
self.last_error_per_ad[availability_domain] = "Internal error"
elif "InvalidParameter" in str(e):
self.last_error_per_ad[availability_domain] = f"Invalid param: {str(e.message)[:30]}"
else:
# Extract HTTP status code if available
if hasattr(e, 'status') and e.status:
self.last_error_per_ad[availability_domain] = f"HTTP {e.status}: {str(e.message)[:40]}"
else:
self.last_error_per_ad[availability_domain] = str(e.message)[:50]
return None
except Exception as e:
self.last_error_per_ad[availability_domain] = f"Error: {str(e)[:50]}"
return None
def send_notification(self, instance_id: str, public_ip: str):
"""Send success notification via ntfy.sh"""
message = (
f"🎉 Oracle ARM instance created!\n"
f"Instance: {self.instance_name}\n"
f"ID: {instance_id}\n"
f"Public IP: {public_ip}\n"
f"SSH: ssh root@{public_ip}"
)
# Send to ntfy.sh
try:
requests.post(
f"https://ntfy.sh/{self.ntfy_topic}",
data=message.encode('utf-8'),
headers={
"Title": "Oracle ARM Instance Ready",
"Priority": "high",
"Tags": "tada,cloud"
}
)
console.print(f"[green]Notification sent to ntfy.sh/{self.ntfy_topic}[/green]")
except:
pass
def check_existing_instances(self) -> int:
"""Check existing instances and return count of matching shape"""
with console.status("[blue]Checking for existing instances...[/blue]"):
instances = self.compute_client.list_instances(
compartment_id=self.compartment_id
)
# Count instances of the same shape
shape_instances = []
for instance in instances.data:
if instance.shape == self.shape and instance.lifecycle_state not in ["TERMINATED", "TERMINATING"]:
shape_instances.append(instance)
if shape_instances:
console.print(f"\n[cyan]Found {len(shape_instances)} existing {self.shape} instance(s):[/cyan]")
for instance in shape_instances:
console.print(f" • {instance.display_name} - {instance.lifecycle_state}")
if instance.lifecycle_state == "RUNNING":
# Get public IP
public_ip = self._get_instance_public_ip(instance.id)
if public_ip:
console.print(f" Public IP: {public_ip}")
# For micro instances, check if we can create more
if self.is_micro:
if len(shape_instances) >= self.max_instances:
console.print(f"[yellow]Already have {len(shape_instances)}/{self.max_instances} micro instances[/yellow]")
return len(shape_instances)
else:
console.print(f"[green]Have {len(shape_instances)}/{self.max_instances} micro instances, can create {self.max_instances - len(shape_instances)} more[/green]")
return len(shape_instances)
# For ARM, check if instance with the desired name exists
for instance in instances.data:
if instance.display_name == self.instance_name and instance.lifecycle_state != "TERMINATED":
console.print(f"[yellow]Instance '{self.instance_name}' already exists![/yellow]")
return -1 # Signal that specific instance exists
return len(shape_instances)
def run(self):
"""Main loop to create instance"""
# Show welcome banner
if self.is_arm:
title = "🚀 Oracle Cloud ARM Instance Creator"
elif self.is_micro:
title = "🖥️ Oracle Cloud Micro Instance Creator"
else:
title = "☁️ Oracle Cloud Instance Creator"
border_style = "green"
console.print(Rule(title, style="bold blue"))
config_text = (
f"[cyan]Instance:[/cyan] {self.instance_name}\n"
f"[cyan]Shape:[/cyan] {self.shape} ({self.ocpus} OCPUs, {self.memory_gb}GB RAM)\n"
f"[cyan]Boot Volume:[/cyan] {self.boot_volume_size_gb}GB\n"
f"[cyan]Region:[/cyan] {self.region}\n"
f"[cyan]Profile:[/cyan] {self.profile}\n"
)
if self.is_micro:
config_text += f"[cyan]Max Instances:[/cyan] {self.max_instances}\n"
if self.os_name:
config_text += f"[cyan]OS:[/cyan] {self.os_name} {self.os_version or 'latest'}\n"
config_text += f"[cyan]Dry Run:[/cyan] {'Yes' if self.dry_run else 'No'}"
console.print(
Panel.fit(
config_text,
title="Configuration",
border_style=border_style,
)
)
# Check for existing instances
if not self.dry_run:
existing_count = self.check_existing_instances()
# For micro instances, check if we've reached the limit
if self.is_micro and existing_count >= self.max_instances:
console.print(f"[yellow]Already have maximum {self.max_instances} micro instances[/yellow]")
return
# For other instances, check if specific instance exists
if not self.is_micro and existing_count == -1:
console.print("[green]Instance already exists, no need to create.[/green]")
return
# Get configuration
self.subnet_id = self.get_subnet_id()
if not self.subnet_id:
console.print("[red]Error: No subnet found. Please create network resources first.[/red]")
sys.exit(1)
try:
self.image_id = self.get_arm_image_id()
console.print(f"[cyan]Image ID:[/cyan] {self.image_id}")
# Verify image exists
try:
image = self.compute_client.get_image(self.image_id)
console.print(f"[dim]Image: {image.data.display_name}[/dim]")
console.print(f"[dim]Image State: {image.data.lifecycle_state}[/dim]")
except Exception as e:
console.print(f"[yellow]Warning: Could not verify image: {e}[/yellow]")
except Exception as e:
console.print(f"[red]Error getting image: {e}[/red]")
sys.exit(1)
# Get availability domains
ads = self.get_availability_domains()
console.print(f"[cyan]Found {len(ads)} availability domains[/cyan]")
# Main loop with consistent logging style
round_num = 0
total_attempts = 0
start_time = datetime.now()
try:
while True:
round_num += 1
elapsed = datetime.now() - start_time
round_results = {} # Store results for this round
# Round header
console.print(f"\n[bold blue]╔{'═' * 58}╗[/bold blue]")
console.print(f"[bold blue]║ ROUND {round_num:3d} | Elapsed: {str(elapsed).split('.')[0]:>10s} | Total Attempts: {total_attempts:4d} ║[/bold blue]")
console.print(f"[bold blue]╚{'═' * 58}╝[/bold blue]")
# Try each AD in this round
console.print("\n[cyan]Checking Availability Domains:[/cyan]")
for i, ad in enumerate(ads):
ad_short = ad.split(":")[1] if ":" in ad else ad
console.print(f" [{i+1}/{len(ads)}] {ad_short}...", end="")
# For micro instances, check if we need to create more
if self.is_micro:
current_count = self.check_existing_instances()
if current_count >= self.max_instances:
console.print(f" [yellow]✗ Already have {self.max_instances} instances[/yellow]")
instance_id = "limit_reached"
break
# Try to create instance
total_attempts += 1
instance_id = self.create_instance(ad)
# Store and display result
if instance_id:
round_results[ad_short] = ("SUCCESS", "green")
console.print(f" [green]✓ SUCCESS[/green]")
break
else:
error_msg = self.last_error_per_ad[ad]
status, color = self._categorize_error(error_msg)
round_results[ad_short] = (status, color)
console.print(f" [{color}]✗ {status}[/{color}]")
# Minimal delay with jitter between ADs
if i < len(ads) - 1 and not instance_id:
time.sleep(0.1 + random.random() * 0.4)
# Round summary
console.print("\n[cyan]Round Summary:[/cyan]")
console.print(f" • Attempts this round: {len(round_results)}")
console.print(f" • Total attempts: {total_attempts}")
console.print(f" • Time elapsed: {str(elapsed).split('.')[0]}")
# Show cumulative statistics
capacity_count = sum(1 for r in round_results.values() if "Out of capacity" in r[0])
if capacity_count == len(round_results):
console.print(f" • Status: [yellow]All {len(round_results)} ADs out of capacity[/yellow]")
elif "SUCCESS" in [r[0] for r in round_results.values()]:
console.print(f" • Status: [green]Instance created successfully![/green]")
else:
console.print(f" • Status: [red]{capacity_count}/{len(round_results)} ADs out of capacity[/red]")
if instance_id:
if instance_id == "limit_reached":
console.print(f"\n[yellow]Instance limit reached, stopping.[/yellow]")
else:
console.print(f"\n[green]🎉 Instance creation successful in round {round_num}![/green]")
break
# Wait before next round
wait_time = 0.1 + random.random() * 1.9
console.print(f"\n[dim]Waiting {wait_time:.1f}s before next round...[/dim]")
time.sleep(wait_time)
# Handle success (common for both interactive and non-interactive)
if instance_id and instance_id != "limit_reached":
# Get instance details
public_ip = self._get_instance_public_ip(instance_id) or "none"
console.print(Rule("✨ SUCCESS", style="bold green"))
console.print(
Panel.fit(
f"[green]Instance created successfully![/green]\n\n"
f"[cyan]Instance ID:[/cyan] {instance_id}\n"
f"[cyan]Public IP:[/cyan] {public_ip}\n"
f"[cyan]SSH:[/cyan] ssh root@{public_ip}",
title="Instance Details",
border_style="green",
)
)
# Send notification
self.send_notification(instance_id, public_ip)
return
except Exception as e:
console.print(f"[red]Error in main loop: {e}[/red]")
import traceback
traceback.print_exc()
def parse_args():
"""Parse command line arguments"""
parser = argparse.ArgumentParser(
description="Oracle Cloud ARM Instance Creator - Continuously attempts to create an ARM instance",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Use default settings (4 OCPUs, 24GB RAM ARM instance)
%(prog)s --ssh-key ~/.ssh/mimas.pub
# Create micro instances (x86 free tier)
%(prog)s --ssh-key ~/.ssh/mimas.pub --shape VM.Standard.E2.1.Micro --max-instances 2
# Use custom OS with image discovery
%(prog)s --ssh-key ~/.ssh/mimas.pub --os-name Ubuntu --os-version 22.04 --save-images
# Auto-generate SSH key if missing
%(prog)s --ssh-key ~/.ssh/mimas.pub --auto-gen-ssh
# Use minimal ARM resources
%(prog)s --ssh-key ~/.ssh/mimas.pub --ocpus 1 --memory 6
# Dry run to test configuration
%(prog)s --ssh-key ~/.ssh/mimas.pub --dry-run
Environment Variables:
NTFY_TOPIC - ntfy.sh topic for notifications (default: oracle-arm)
""",
)
# Instance configuration
parser.add_argument(
"--name",
default="mimas",
help="Instance name (default: mimas)",
)
parser.add_argument(
"--shape",
default="VM.Standard.A1.Flex",
choices=["VM.Standard.A1.Flex", "VM.Standard.E2.1.Micro"],
help="Instance shape (default: VM.Standard.A1.Flex)",
)
parser.add_argument(
"--ocpus",
type=int,
default=4,
help="Number of OCPUs (default: 4, max free tier ARM: 4, micro: 1)",
)
parser.add_argument(
"--memory",
type=int,
default=24,
help="Memory in GB (default: 24, max free tier ARM: 24, micro: 1)",
)
parser.add_argument(
"--boot-volume",
type=int,
default=200,
help="Boot volume size in GB (default: 200, maximum free tier)",
)
parser.add_argument(
"--max-instances",
type=int,
default=2,
help="Maximum number of instances to create (for E2.1.Micro only, default: 2)",
)
# OCI configuration
parser.add_argument(
"--profile",
default=os.getenv("OCI_CLI_PROFILE", "DEFAULT"),
help="OCI config profile to use (default: DEFAULT or $OCI_CLI_PROFILE)",
)
parser.add_argument(
"--ssh-key",
required=True,
help="Path to SSH public key file (required)",
)
parser.add_argument(
"--auto-gen-ssh",
action="store_true",
help="Automatically generate SSH key pair if not found",
)
# Image discovery options
parser.add_argument(
"--os-name",
help="Operating system name for image discovery (e.g., 'Oracle Linux', 'Ubuntu')",
)
parser.add_argument(
"--os-version",
help="Operating system version (e.g., '9', '22.04')",
)
parser.add_argument(
"--save-images",
action="store_true",
help="Save discovered images to JSON file",
)
# Retry configuration
parser.add_argument(
"--delay",
type=int,
default=5,
help="Delay in seconds between retry rounds (default: 5)",
)
# Other options
parser.add_argument(
"--public-ip",
action="store_true",
default=True,
help="Assign public IP to instance (default: True)",
)
parser.add_argument(
"--no-public-ip",
dest="public_ip",
action="store_false",
help="Do not assign public IP to instance",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Perform a dry run without actually creating instances",
)
parser.add_argument(
"--version",
action="version",
version="%(prog)s 1.0.0",
)
args = parser.parse_args()
# Validate arguments based on shape
if args.shape == "VM.Standard.A1.Flex":
if args.ocpus < 1 or args.ocpus > 4:
parser.error("OCPUs must be between 1 and 4 for ARM free tier")
if args.memory < 1 or args.memory > 24:
parser.error("Memory must be between 1 and 24 GB for ARM free tier")
if args.memory < args.ocpus * 1:
parser.error("Memory must be at least 1GB per OCPU")
if args.memory > args.ocpus * 8:
parser.error("Memory cannot exceed 8GB per OCPU")
elif args.shape == "VM.Standard.E2.1.Micro":
# Force micro instance settings
args.ocpus = 1
args.memory = 1
if args.max_instances < 1 or args.max_instances > 2:
parser.error("max-instances must be 1 or 2 for micro instances")
return args
if __name__ == "__main__":
try:
args = parse_args()
creator = OracleInstanceCreator(args)
creator.run()
except KeyboardInterrupt:
console.print("\n[yellow]Interrupted by user[/yellow]")
sys.exit(0)
except Exception as e:
console.print(f"[red]Fatal error: {e}[/red]")
import traceback
if os.getenv("DEBUG"):
traceback.print_exc()
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment